From 5a2f37c7189591c90b0b39bb89760bf3a527b716 Mon Sep 17 00:00:00 2001 From: Johnson C Date: Wed, 23 Feb 2022 11:55:18 +0800 Subject: [PATCH] http transport data race issue (#2436) * [fix] #2431 http transport data race issue --- client/rpc_stream.go | 5 ++- transport/http_transport.go | 13 ++++-- transport/http_transport_test.go | 76 ++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 4 deletions(-) diff --git a/client/rpc_stream.go b/client/rpc_stream.go index a5b617c4..16f0fa53 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -74,10 +74,10 @@ func (r *rpcStream) Send(msg interface{}) error { func (r *rpcStream) Recv(msg interface{}) error { r.Lock() - defer r.Unlock() if r.isClosed() { r.err = errShutdown + r.Unlock() return errShutdown } @@ -89,9 +89,11 @@ func (r *rpcStream) Recv(msg interface{}) error { if err != nil { if err == io.EOF && !r.isClosed() { r.err = io.ErrUnexpectedEOF + r.Unlock() return io.ErrUnexpectedEOF } r.err = err + r.Unlock() return err } @@ -120,6 +122,7 @@ func (r *rpcStream) Recv(msg interface{}) error { } } + r.Unlock() return r.err } diff --git a/transport/http_transport.go b/transport/http_transport.go index 1063769c..449443f0 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -35,9 +35,10 @@ type httpTransportClient struct { sync.RWMutex // request must be stored for response processing - r chan *http.Request - bl []*http.Request - buff *bufio.Reader + r chan *http.Request + bl []*http.Request + buff *bufio.Reader + closed bool // local/remote ip local string @@ -138,7 +139,12 @@ func (h *httpTransportClient) Recv(m *Message) error { h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) } + h.Lock() + if h.closed { + return io.EOF + } rsp, err := http.ReadResponse(h.buff, r) + h.Unlock() if err != nil { return err } @@ -174,6 +180,7 @@ func (h *httpTransportClient) Close() error { h.once.Do(func() { h.Lock() h.buff.Reset(nil) + h.closed = true h.Unlock() close(h.r) }) diff --git a/transport/http_transport_test.go b/transport/http_transport_test.go index 85bea2f6..eb922167 100644 --- a/transport/http_transport_test.go +++ b/transport/http_transport_test.go @@ -1,8 +1,10 @@ package transport import ( + "fmt" "io" "net" + "sync" "testing" "time" ) @@ -244,3 +246,77 @@ func TestHTTPTransportTimeout(t *testing.T) { <-done } + +func TestHTTPTransportCloseWhenRecv(t *testing.T) { + tr := NewHTTPTransport() + + l, err := tr.Listen("127.0.0.1:0") + if err != nil { + t.Errorf("Unexpected listen err: %v", err) + } + defer l.Close() + + fn := func(sock Socket) { + defer sock.Close() + + for { + var m Message + if err := sock.Recv(&m); err != nil { + return + } + if err := sock.Send(&m); err != nil { + return + } + } + } + + done := make(chan bool) + + go func() { + if err := l.Accept(fn); err != nil { + select { + case <-done: + default: + t.Errorf("Unexpected accept err: %v", err) + } + } + }() + + c, err := tr.Dial(l.Addr()) + if err != nil { + t.Errorf("Unexpected dial err: %v", err) + } + defer c.Close() + + m := Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + var rm Message + + if err := c.Recv(&rm); err != nil { + if err == io.EOF { + return + } + t.Errorf("Unexpected recv err: %v", err) + } + fmt.Println("aa") + } + }() + for i := 1; i < 3; i++ { + if err := c.Send(&m); err != nil { + t.Errorf("Unexpected send err: %v", err) + } + } + close(done) + + c.Close() + wg.Wait() +}