1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-05 10:20:53 +02:00

Fix double close in stream client (#2693)

* [fix] etcd config source prefix issue (#2389)

* http transport data race issue (#2436)

* [fix] #2431 http transport data race issue

* [feature] Ability to close connection while receiving.
Ability to send messages while receiving.
Icreased r channel limit to 100 to more fluently communication.
Do not dropp sent request if r channel is full.

* [fix] Do not close the transport client twice in stream connection , the transport client is closed in the rpc codec

---------

Co-authored-by: Johnson C <chengqiaosheng@gmail.com>
Co-authored-by: Hunyadvári Péter <peter.hunyadvari@vcc.live>
This commit is contained in:
Ak-Army 2024-02-15 21:26:36 +01:00 committed by GitHub
parent 6e55bb1a0e
commit f1a8b39309
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -77,7 +77,13 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("unsupported Content-Type: %s", contentType) return nil, fmt.Errorf("unsupported Content-Type: %s", contentType)
} }
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error { func (r *rpcClient) call(
ctx context.Context,
node *registry.Node,
req Request,
resp interface{},
opts CallOptions,
) error {
address := node.Address address := node.Address
logger := r.Options().Logger logger := r.Options().Logger
@ -292,12 +298,6 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
r.codec = codec r.codec = codec
} }
releaseFunc := func(_ error) {
if err = c.Close(); err != nil {
logger.Log(log.ErrorLevel, err)
}
}
stream := &rpcStream{ stream := &rpcStream{
id: id, id: id,
context: ctx, context: ctx,
@ -308,7 +308,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
closed: make(chan bool), closed: make(chan bool),
// signal the end of stream, // signal the end of stream,
sendEOS: true, sendEOS: true,
release: releaseFunc, release: func(_ error) {},
} }
// wait for error response // wait for error response
@ -490,7 +490,10 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) return merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
} }
return merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) return merrors.InternalServerError("go.micro.client",
"error getting next %s node: %s",
service,
err.Error())
} }
// make the call // make the call
@ -586,7 +589,10 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) return nil, merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
} }
return nil, merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) return nil, merrors.InternalServerError("go.micro.client",
"error getting next %s node: %s",
service,
err.Error())
} }
stream, err := r.stream(ctx, node, request, callOpts) stream, err := r.stream(ctx, node, request, callOpts)