1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-05-19 21:23:04 +02:00

Fix stream eos error (#2716)

* [fix] etcd config source prefix issue (#2389)

* http transport data race issue (#2436)

* [fix] #2431 http transport data race issue

* [feature] Ability to close connection while receiving.
Ability to send messages while receiving.
Icreased r channel limit to 100 to more fluently communication.
Do not dropp sent request if r channel is full.

* [fix] Do not send error when stream send eos header, just close the connection

* [fix] Do not overwrite the error in http client

---------

Co-authored-by: Johnson C <chengqiaosheng@gmail.com>
This commit is contained in:
Ak-Army 2024-07-02 14:06:14 +02:00 committed by GitHub
parent 4d5b3b084f
commit 1515db5240
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 41 additions and 22 deletions

View File

@ -205,7 +205,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// If we don't have a socket and its a stream // If we don't have a socket and its a stream
// Check if its a last stream EOS error // Check if its a last stream EOS error
if !ok && stream && msg.Header[headers.Error] == errLastStreamResponse.Error() { if !ok && stream && msg.Header[headers.Error] == errLastStreamResponse.Error() {
closeConn = true
pool.Release(psock) pool.Release(psock)
continue continue
} }
@ -512,7 +514,11 @@ func (s *rpcServer) Deregister() error {
logger.Logf(log.InfoLevel, "Unsubscribing %s from topic: %s", node.Id, sub.Topic()) logger.Logf(log.InfoLevel, "Unsubscribing %s from topic: %s", node.Id, sub.Topic())
if err := sub.Unsubscribe(); err != nil { if err := sub.Unsubscribe(); err != nil {
logger.Logf(log.ErrorLevel, "Failed to unsubscribe subscriber nr. %d from topic %s: %v", i+1, sub.Topic(), err) logger.Logf(log.ErrorLevel,
"Failed to unsubscribe subscriber nr. %d from topic %s: %v",
i+1,
sub.Topic(),
err)
} }
} }
@ -759,7 +765,11 @@ Loop:
rerr := s.opts.RegisterCheck(s.opts.Context) rerr := s.opts.RegisterCheck(s.opts.Context)
if rerr != nil && registered { if rerr != nil && registered {
logger.Logf(log.ErrorLevel, "Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) logger.Logf(log.ErrorLevel,
"Server %s-%s register check error: %s, deregister it",
config.Name,
config.Id,
rerr)
// deregister self in case of error // deregister self in case of error
if err := s.Deregister(); err != nil { if err := s.Deregister(); err != nil {
logger.Logf(log.ErrorLevel, "Server %s-%s deregister error: %s", config.Name, config.Id, err) logger.Logf(log.ErrorLevel, "Server %s-%s deregister error: %s", config.Name, config.Id, err)
@ -809,7 +819,11 @@ Loop:
s.setOptsAddr(addr) s.setOptsAddr(addr)
} }
func (s *rpcServer) serveReq(ctx context.Context, msg transport.Message, req *rpcRequest, resp *rpcResponse, rcodec codec.Codec) { func (s *rpcServer) serveReq(ctx context.Context,
msg transport.Message,
req *rpcRequest,
resp *rpcResponse,
rcodec codec.Codec) {
logger := s.opts.Logger logger := s.opts.Logger
router := s.getRouter() router := s.getRouter()

View File

@ -147,8 +147,8 @@ func (h *httpTransportClient) Recv(msg *Message) (err error) {
} }
defer func() { defer func() {
if err = rsp.Body.Close(); err != nil { if err2 := rsp.Body.Close(); err2 != nil {
err = errors.Wrap(err, "failed to close body") err = errors.Wrap(err2, "failed to close body")
} }
}() }()
@ -180,25 +180,29 @@ func (h *httpTransportClient) Recv(msg *Message) (err error) {
func (h *httpTransportClient) Close() error { func (h *httpTransportClient) Close() error {
if !h.dialOpts.Stream { if !h.dialOpts.Stream {
h.once.Do(func() { h.once.Do(
h.Lock() func() {
h.buff.Reset(nil) h.Lock()
h.closed = true h.buff.Reset(nil)
h.Unlock() h.closed = true
close(h.req) h.Unlock()
}) close(h.req)
},
)
return h.conn.Close() return h.conn.Close()
} }
err := h.conn.Close() err := h.conn.Close()
h.once.Do(func() { h.once.Do(
h.Lock() func() {
h.buff.Reset(nil) h.Lock()
h.closed = true h.buff.Reset(nil)
h.Unlock() h.closed = true
close(h.req) h.Unlock()
}) close(h.req)
},
)
return err return err
} }

View File

@ -1,6 +1,7 @@
package transport package transport
import ( import (
"errors"
"io" "io"
"net" "net"
"sync" "sync"
@ -126,7 +127,7 @@ func TestHTTPTransportError(t *testing.T) {
for { for {
var m Message var m Message
if err := sock.Recv(&m); err != nil { if err := sock.Recv(&m); err != nil {
if err == io.EOF { if errors.Is(err, io.EOF) {
return return
} }
t.Fatal(err) t.Fatal(err)
@ -335,7 +336,7 @@ func TestHTTPTransportMultipleSendWhenRecv(t *testing.T) {
Body: []byte(`{"message": "Hello World"}`), Body: []byte(`{"message": "Hello World"}`),
} }
wgSend := sync.WaitGroup{} var wgSend sync.WaitGroup
fn := func(sock Socket) { fn := func(sock Socket) {
defer sock.Close() defer sock.Close()
@ -344,7 +345,6 @@ func TestHTTPTransportMultipleSendWhenRecv(t *testing.T) {
if err := sock.Recv(&mr); err != nil { if err := sock.Recv(&mr); err != nil {
return return
} }
wgSend.Add(1)
go func() { go func() {
defer wgSend.Done() defer wgSend.Done()
<-readyToSend <-readyToSend
@ -388,6 +388,7 @@ func TestHTTPTransportMultipleSendWhenRecv(t *testing.T) {
} }
} }
}() }()
wgSend.Add(3)
<-readyForRecv <-readyForRecv
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
if err := c.Send(&m); err != nil { if err := c.Send(&m); err != nil {