From f1a8b3930927e071a030e80c595a2295fc2e258b Mon Sep 17 00:00:00 2001 From: Ak-Army Date: Thu, 15 Feb 2024 21:26:36 +0100 Subject: [PATCH] Fix double close in stream client (#2693) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [fix] etcd config source prefix issue (#2389) * http transport data race issue (#2436) * [fix] #2431 http transport data race issue * [feature] Ability to close connection while receiving. Ability to send messages while receiving. Icreased r channel limit to 100 to more fluently communication. Do not dropp sent request if r channel is full. * [fix] Do not close the transport client twice in stream connection , the transport client is closed in the rpc codec --------- Co-authored-by: Johnson C Co-authored-by: Hunyadvári Péter --- client/rpc_client.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index ac179fb0..634ad34e 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -77,7 +77,13 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { return nil, fmt.Errorf("unsupported Content-Type: %s", contentType) } -func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error { +func (r *rpcClient) call( + ctx context.Context, + node *registry.Node, + req Request, + resp interface{}, + opts CallOptions, +) error { address := node.Address logger := r.Options().Logger @@ -292,12 +298,6 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request r.codec = codec } - releaseFunc := func(_ error) { - if err = c.Close(); err != nil { - logger.Log(log.ErrorLevel, err) - } - } - stream := &rpcStream{ id: id, context: ctx, @@ -308,7 +308,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request closed: make(chan bool), // signal the end of stream, sendEOS: true, - release: releaseFunc, + release: func(_ error) {}, } // wait for error response @@ -490,7 +490,10 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) } - return merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) + return merrors.InternalServerError("go.micro.client", + "error getting next %s node: %s", + service, + err.Error()) } // make the call @@ -586,7 +589,10 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt return nil, merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) } - return nil, merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) + return nil, merrors.InternalServerError("go.micro.client", + "error getting next %s node: %s", + service, + err.Error()) } stream, err := r.stream(ctx, node, request, callOpts)