mirror of
https://github.com/go-micro/go-micro.git
synced 2025-11-23 21:44:41 +02:00
Fix http transport client blocking recv (#2744)
* reproduce blocking recv * fix blocking recv call on httpTransportClient * prevent race condition * refactoring
This commit is contained in:
@@ -111,9 +111,17 @@ func (h *httpTransportClient) Recv(msg *Message) (err error) {
|
|||||||
var req *http.Request
|
var req *http.Request
|
||||||
|
|
||||||
if !h.dialOpts.Stream {
|
if !h.dialOpts.Stream {
|
||||||
rc, ok := <-h.req
|
|
||||||
if !ok {
|
var rc *http.Request
|
||||||
|
var ok bool
|
||||||
|
|
||||||
h.Lock()
|
h.Lock()
|
||||||
|
select {
|
||||||
|
case rc, ok = <-h.req:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
if len(h.reqList) == 0 {
|
if len(h.reqList) == 0 {
|
||||||
h.Unlock()
|
h.Unlock()
|
||||||
return io.EOF
|
return io.EOF
|
||||||
@@ -121,8 +129,8 @@ func (h *httpTransportClient) Recv(msg *Message) (err error) {
|
|||||||
|
|
||||||
rc = h.reqList[0]
|
rc = h.reqList[0]
|
||||||
h.reqList = h.reqList[1:]
|
h.reqList = h.reqList[1:]
|
||||||
h.Unlock()
|
|
||||||
}
|
}
|
||||||
|
h.Unlock()
|
||||||
|
|
||||||
req = rc
|
req = rc
|
||||||
}
|
}
|
||||||
|
|||||||
77
transport/http_client_test.go
Normal file
77
transport/http_client_test.go
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHttpTransportClient(t *testing.T) {
|
||||||
|
// arrange
|
||||||
|
l, c, err := echoHttpTransportClient("127.0.0.1:")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
defer l.Close()
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
// act + assert
|
||||||
|
N := cap(c.req)
|
||||||
|
// Send N+1 messages to overflow the buffered channel and place the extra message in the internal buffer
|
||||||
|
for i := 0; i < N+1; i++ {
|
||||||
|
body := fmt.Sprintf("msg-%d", i)
|
||||||
|
if err := c.Send(&Message{Body: []byte(body)}); err != nil {
|
||||||
|
t.Errorf("Unexpected send err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// consume all requests from the buffered channel
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
msg := Message{}
|
||||||
|
if err := c.Recv(&msg); err != nil {
|
||||||
|
t.Errorf("Unexpected recv err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(c.reqList) != 1 {
|
||||||
|
t.Error("Unexpected reqList")
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := Message{}
|
||||||
|
if err := c.Recv(&msg); err != nil {
|
||||||
|
t.Errorf("Unexpected recv err: %v", err)
|
||||||
|
}
|
||||||
|
want := fmt.Sprintf("msg-%d", N)
|
||||||
|
got := string(msg.Body)
|
||||||
|
if want != got {
|
||||||
|
t.Errorf("Unexpected message: got %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func echoHttpTransportClient(addr string) (*httpTransportListener, *httpTransportClient, error) {
|
||||||
|
tr := NewHTTPTransport()
|
||||||
|
l, err := tr.Listen(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, errors.Errorf("Unexpected listen err: %v", err)
|
||||||
|
}
|
||||||
|
c, err := tr.Dial(l.Addr())
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, errors.Errorf("Unexpected dial err: %v", err)
|
||||||
|
}
|
||||||
|
go l.Accept(echoHandler)
|
||||||
|
return l.(*httpTransportListener), c.(*httpTransportClient), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func echoHandler(sock Socket) {
|
||||||
|
defer sock.Close()
|
||||||
|
for {
|
||||||
|
var msg Message
|
||||||
|
if err := sock.Recv(&msg); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := sock.Send(&msg); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user