diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 2fea1b83..2fbddc0e 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "sync" "github.com/micro/go-micro/codec" raw "github.com/micro/go-micro/codec/bytes" @@ -11,20 +12,25 @@ import ( "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/transport" + "github.com/oxtoacart/bpool" "github.com/pkg/errors" ) type rpcCodec struct { socket transport.Socket codec codec.Codec - first bool protocol string req *transport.Message buf *readWriteCloser + + // check if we're the first + sync.RWMutex + first chan bool } type readWriteCloser struct { + sync.RWMutex wbuf *bytes.Buffer rbuf *bytes.Buffer } @@ -51,19 +57,24 @@ var ( "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": protorpc.NewCodec, } + + // the local buffer pool + bufferPool = bpool.NewSizedBufferPool(32, 1) ) func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { + rwc.RLock() + defer rwc.RUnlock() return rwc.rbuf.Read(p) } func (rwc *readWriteCloser) Write(p []byte) (n int, err error) { + rwc.Lock() + defer rwc.Unlock() return rwc.wbuf.Write(p) } func (rwc *readWriteCloser) Close() error { - rwc.rbuf.Reset() - rwc.wbuf.Reset() return nil } @@ -155,8 +166,8 @@ func setupProtocol(msg *transport.Message) codec.NewCodec { func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { rwc := &readWriteCloser{ - rbuf: bytes.NewBuffer(nil), - wbuf: bytes.NewBuffer(nil), + rbuf: bufferPool.Get(), + wbuf: bufferPool.Get(), } r := &rpcCodec{ @@ -165,18 +176,20 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod req: req, socket: socket, protocol: "mucp", + first: make(chan bool), } // if grpc pre-load the buffer // TODO: remove this terrible hack switch r.codec.String() { case "grpc": - // set as first - r.first = true // write the body rwc.rbuf.Write(req.Body) // set the protocol r.protocol = "grpc" + default: + // first is not preloaded + close(r.first) } return r @@ -190,7 +203,9 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { } // first message could be pre-loaded - if !c.first { + select { + case <-c.first: + // not the first var tm transport.Message // read off the socket @@ -212,11 +227,26 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { // set req c.req = &tm + default: + // we need to lock here to prevent race conditions + // and we make use of a channel otherwise because + // this does not result in a context switch + // locking to check c.first on every call to ReadHeader + // would otherwise drastically slow the code execution + c.Lock() + // recheck before closing because the select statement + // above is not thread safe, so thread safety here is + // mandatory + select { + case <-c.first: + default: + // disable first + close(c.first) + } + // now unlock and we never need this again + c.Unlock() } - // disable first - c.first = false - // set some internal things getHeaders(&m) @@ -309,9 +339,15 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { } func (c *rpcCodec) Close() error { - c.buf.Close() + // close the codec c.codec.Close() - return c.socket.Close() + // close the socket + err := c.socket.Close() + // put back the buffers + bufferPool.Put(c.buf.rbuf) + bufferPool.Put(c.buf.wbuf) + // return the error + return err } func (c *rpcCodec) String() string {