mirror of
https://github.com/go-micro/go-micro.git
synced 2025-07-12 22:41:07 +02:00
[feature] stream CloseSend (#2323)
* support stream CloseSend * move CloseSend into Closer
This commit is contained in:
@ -1,19 +1,17 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
b "bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
b "bytes"
|
||||
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go-micro.dev/v4/codec"
|
||||
"go-micro.dev/v4/codec/bytes"
|
||||
"github.com/oxtoacart/bpool"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/encoding"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type jsonCodec struct{}
|
||||
@ -21,12 +19,8 @@ type protoCodec struct{}
|
||||
type bytesCodec struct{}
|
||||
type wrapCodec struct{ encoding.Codec }
|
||||
|
||||
var jsonpbMarshaler = &jsonpb.Marshaler{}
|
||||
var useNumber bool
|
||||
|
||||
// create buffer pool with 16 instances each preallocated with 256 bytes
|
||||
var bufferPool = bpool.NewSizedBufferPool(16, 256)
|
||||
|
||||
var (
|
||||
defaultGRPCCodecs = map[string]encoding.Codec{
|
||||
"application/json": jsonCodec{},
|
||||
@ -115,12 +109,11 @@ func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
}
|
||||
|
||||
if pb, ok := v.(proto.Message); ok {
|
||||
buf := bufferPool.Get()
|
||||
defer bufferPool.Put(buf)
|
||||
if err := jsonpbMarshaler.Marshal(buf, pb); err != nil {
|
||||
bytes, err := protojson.Marshal(pb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
return bytes, nil
|
||||
}
|
||||
|
||||
return json.Marshal(v)
|
||||
@ -135,7 +128,7 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
|
||||
return nil
|
||||
}
|
||||
if pb, ok := v.(proto.Message); ok {
|
||||
return jsonpb.Unmarshal(b.NewReader(data), pb)
|
||||
return protojson.Unmarshal(data, pb)
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(b.NewReader(data))
|
||||
|
@ -3,11 +3,10 @@ module github.com/asim/go-micro/plugins/client/grpc/v4
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
|
||||
go-micro.dev/v4 v4.2.1
|
||||
google.golang.org/grpc v1.41.0
|
||||
google.golang.org/grpc/examples v0.0.0-20211020220737-f00baa6c3c84
|
||||
google.golang.org/protobuf v1.26.0
|
||||
)
|
||||
|
||||
replace go-micro.dev/v4 => ../../../../go-micro
|
||||
|
@ -12,15 +12,14 @@ import (
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/broker"
|
||||
"go-micro.dev/v4/cmd"
|
||||
"go-micro.dev/v4/client"
|
||||
"go-micro.dev/v4/cmd"
|
||||
raw "go-micro.dev/v4/codec/bytes"
|
||||
"go-micro.dev/v4/errors"
|
||||
"go-micro.dev/v4/metadata"
|
||||
"go-micro.dev/v4/registry"
|
||||
"go-micro.dev/v4/selector"
|
||||
pnet "go-micro.dev/v4/util/net"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/encoding"
|
||||
@ -104,7 +103,6 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
||||
|
||||
address := node.Address
|
||||
|
||||
header = make(map[string]string)
|
||||
if md, ok := metadata.FromContext(ctx); ok {
|
||||
header = make(map[string]string, len(md))
|
||||
for k, v := range md {
|
||||
@ -132,8 +130,16 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
||||
|
||||
var grr error
|
||||
|
||||
var dialCtx context.Context
|
||||
var cancel context.CancelFunc
|
||||
if opts.DialTimeout > 0 {
|
||||
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
|
||||
} else {
|
||||
dialCtx, cancel = context.WithCancel(ctx)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
grpcDialOptions := []grpc.DialOption{
|
||||
grpc.WithTimeout(opts.DialTimeout),
|
||||
g.secure(address),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
||||
@ -145,7 +151,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
||||
}
|
||||
|
||||
cc, err := g.pool.getConn(address, grpcDialOptions...)
|
||||
cc, err := g.pool.getConn(dialCtx, address, grpcDialOptions...)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||
}
|
||||
@ -208,7 +214,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
||||
|
||||
var dialCtx context.Context
|
||||
var cancel context.CancelFunc
|
||||
if opts.DialTimeout >= 0 {
|
||||
if opts.DialTimeout > 0 {
|
||||
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
|
||||
} else {
|
||||
dialCtx, cancel = context.WithCancel(ctx)
|
||||
@ -218,7 +224,6 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
||||
wc := wrapCodec{cf}
|
||||
|
||||
grpcDialOptions := []grpc.DialOption{
|
||||
grpc.WithTimeout(opts.DialTimeout),
|
||||
g.secure(address),
|
||||
}
|
||||
|
||||
@ -226,7 +231,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
||||
}
|
||||
|
||||
cc, err := grpc.DialContext(dialCtx, address, grpcDialOptions...)
|
||||
cc, err := g.pool.getConn(dialCtx, address, grpcDialOptions...)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||
}
|
||||
@ -274,14 +279,16 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
||||
context: ctx,
|
||||
request: req,
|
||||
response: &response{
|
||||
conn: cc,
|
||||
conn: cc.ClientConn,
|
||||
stream: st,
|
||||
codec: cf,
|
||||
gcodec: codec,
|
||||
},
|
||||
stream: st,
|
||||
conn: cc,
|
||||
cancel: cancel,
|
||||
release: func(err error) {
|
||||
g.pool.release(address, cc, err)
|
||||
},
|
||||
}
|
||||
|
||||
// set the stream as the response
|
||||
@ -347,7 +354,7 @@ func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) {
|
||||
if c, ok := defaultGRPCCodecs[contentType]; ok {
|
||||
return wrapCodec{c}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
||||
return nil, fmt.Errorf("unsupported Content-Type: %s", contentType)
|
||||
}
|
||||
|
||||
func (g *grpcClient) Init(opts ...client.Option) error {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -66,7 +67,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
||||
func (p *pool) getConn(dialCtx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
||||
now := time.Now().Unix()
|
||||
p.Lock()
|
||||
sp, ok := p.conns[addr]
|
||||
@ -135,7 +136,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error)
|
||||
p.Unlock()
|
||||
|
||||
// create new conn
|
||||
cc, err := grpc.Dial(addr, opts...)
|
||||
cc, err := grpc.DialContext(dialCtx, addr, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -184,7 +185,6 @@ func (p *pool) release(addr string, conn *poolConn, err error) {
|
||||
sp.idle++
|
||||
}
|
||||
p.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (conn *poolConn) Close() {
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
pgrpc "google.golang.org/grpc"
|
||||
pb "google.golang.org/grpc/examples/helloworld/helloworld"
|
||||
)
|
||||
|
||||
@ -19,7 +18,7 @@ func testPool(t *testing.T, size int, ttl time.Duration, idle int, ms int) {
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
s := pgrpc.NewServer()
|
||||
s := grpc.NewServer()
|
||||
pb.RegisterGreeterServer(s, &greeterServer{})
|
||||
|
||||
go s.Serve(l)
|
||||
@ -30,7 +29,7 @@ func testPool(t *testing.T, size int, ttl time.Duration, idle int, ms int) {
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
// get a conn
|
||||
cc, err := p.getConn(l.Addr().String(), grpc.WithInsecure())
|
||||
cc, err := p.getConn(context.TODO(), l.Addr().String(), grpc.WithInsecure())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -14,12 +14,12 @@ type grpcStream struct {
|
||||
sync.RWMutex
|
||||
closed bool
|
||||
err error
|
||||
conn *grpc.ClientConn
|
||||
stream grpc.ClientStream
|
||||
request client.Request
|
||||
response client.Response
|
||||
context context.Context
|
||||
cancel func()
|
||||
release func(error)
|
||||
}
|
||||
|
||||
func (g *grpcStream) Context() context.Context {
|
||||
@ -43,15 +43,11 @@ func (g *grpcStream) Send(msg interface{}) error {
|
||||
}
|
||||
|
||||
func (g *grpcStream) Recv(msg interface{}) (err error) {
|
||||
defer g.setError(err)
|
||||
if err = g.stream.RecvMsg(msg); err != nil {
|
||||
// #202 - inconsistent gRPC stream behavior
|
||||
// the only way to tell if the stream is done is when we get a EOF on the Recv
|
||||
// here we should close the underlying gRPC ClientConn
|
||||
closeErr := g.Close()
|
||||
if err == io.EOF && closeErr != nil {
|
||||
err = closeErr
|
||||
if err != io.EOF {
|
||||
g.setError(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -68,11 +64,10 @@ func (g *grpcStream) setError(e error) {
|
||||
g.Unlock()
|
||||
}
|
||||
|
||||
// Close the gRPC send stream
|
||||
// #202 - inconsistent gRPC stream behavior
|
||||
// The underlying gRPC stream should not be closed here since the
|
||||
// stream should still be able to receive after this function call
|
||||
// TODO: should the conn be closed in another way?
|
||||
func (g *grpcStream) CloseSend() error {
|
||||
return g.stream.CloseSend()
|
||||
}
|
||||
|
||||
func (g *grpcStream) Close() error {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
@ -83,6 +78,7 @@ func (g *grpcStream) Close() error {
|
||||
// cancel the context
|
||||
g.cancel()
|
||||
g.closed = true
|
||||
g.stream.CloseSend()
|
||||
return g.conn.Close()
|
||||
// release back to pool
|
||||
g.release(g.err)
|
||||
return nil
|
||||
}
|
||||
|
@ -120,6 +120,10 @@ func (h *httpStream) Error() error {
|
||||
return h.err
|
||||
}
|
||||
|
||||
func (h *httpStream) CloseSend() error {
|
||||
return errors.New("streamer not implemented")
|
||||
}
|
||||
|
||||
func (h *httpStream) Close() error {
|
||||
select {
|
||||
case <-h.closed:
|
||||
|
Reference in New Issue
Block a user