mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-24 10:07:04 +02:00
fix(transport/memory): Improve the memory transport, 4x speed (#2581)
This commit is contained in:
parent
065f9714e9
commit
24dfcef49a
@ -2,8 +2,10 @@ package transport
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/gob"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
@ -14,8 +16,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type memorySocket struct {
|
type memorySocket struct {
|
||||||
recv chan *Message
|
// True server mode, False client mode
|
||||||
send chan *Message
|
server bool
|
||||||
|
// Client receiver of io.Pipe with gob
|
||||||
|
crecv *gob.Decoder
|
||||||
|
// Client sender of the io.Pipe with gob
|
||||||
|
csend *gob.Encoder
|
||||||
|
// Server receiver of the io.Pip with gob
|
||||||
|
srecv *gob.Decoder
|
||||||
|
// Server sender of the io.Pip with gob
|
||||||
|
ssend *gob.Encoder
|
||||||
// sock exit
|
// sock exit
|
||||||
exit chan bool
|
exit chan bool
|
||||||
// listener exit
|
// listener exit
|
||||||
@ -27,7 +37,6 @@ type memorySocket struct {
|
|||||||
// for send/recv Timeout
|
// for send/recv Timeout
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type memoryClient struct {
|
type memoryClient struct {
|
||||||
@ -52,9 +61,6 @@ type memoryTransport struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ms *memorySocket) Recv(m *Message) error {
|
func (ms *memorySocket) Recv(m *Message) error {
|
||||||
ms.RLock()
|
|
||||||
defer ms.RUnlock()
|
|
||||||
|
|
||||||
ctx := ms.ctx
|
ctx := ms.ctx
|
||||||
if ms.timeout > 0 {
|
if ms.timeout > 0 {
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
@ -66,12 +72,23 @@ func (ms *memorySocket) Recv(m *Message) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case <-ms.exit:
|
case <-ms.exit:
|
||||||
return errors.New("connection closed")
|
// connection closed
|
||||||
|
return io.EOF
|
||||||
case <-ms.lexit:
|
case <-ms.lexit:
|
||||||
return errors.New("server connection closed")
|
// Server connection closed
|
||||||
case cm := <-ms.recv:
|
return io.EOF
|
||||||
*m = *cm
|
default:
|
||||||
|
if ms.server {
|
||||||
|
if err := ms.srecv.Decode(m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := ms.crecv.Decode(m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,9 +101,6 @@ func (ms *memorySocket) Remote() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ms *memorySocket) Send(m *Message) error {
|
func (ms *memorySocket) Send(m *Message) error {
|
||||||
ms.RLock()
|
|
||||||
defer ms.RUnlock()
|
|
||||||
|
|
||||||
ctx := ms.ctx
|
ctx := ms.ctx
|
||||||
if ms.timeout > 0 {
|
if ms.timeout > 0 {
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
@ -98,17 +112,27 @@ func (ms *memorySocket) Send(m *Message) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case <-ms.exit:
|
case <-ms.exit:
|
||||||
return errors.New("connection closed")
|
// connection closed
|
||||||
|
return io.EOF
|
||||||
case <-ms.lexit:
|
case <-ms.lexit:
|
||||||
return errors.New("server connection closed")
|
// Server connection closed
|
||||||
case ms.send <- m:
|
return io.EOF
|
||||||
|
default:
|
||||||
|
if ms.server {
|
||||||
|
if err := ms.ssend.Encode(m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := ms.csend.Encode(m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *memorySocket) Close() error {
|
func (ms *memorySocket) Close() error {
|
||||||
ms.Lock()
|
|
||||||
defer ms.Unlock()
|
|
||||||
select {
|
select {
|
||||||
case <-ms.exit:
|
case <-ms.exit:
|
||||||
return nil
|
return nil
|
||||||
@ -141,10 +165,11 @@ func (m *memoryListener) Accept(fn func(Socket)) error {
|
|||||||
return nil
|
return nil
|
||||||
case c := <-m.conn:
|
case c := <-m.conn:
|
||||||
go fn(&memorySocket{
|
go fn(&memorySocket{
|
||||||
|
server: true,
|
||||||
lexit: c.lexit,
|
lexit: c.lexit,
|
||||||
exit: c.exit,
|
exit: c.exit,
|
||||||
send: c.recv,
|
ssend: c.ssend,
|
||||||
recv: c.send,
|
srecv: c.srecv,
|
||||||
local: c.Remote(),
|
local: c.Remote(),
|
||||||
remote: c.Local(),
|
remote: c.Local(),
|
||||||
timeout: m.topts.Timeout,
|
timeout: m.topts.Timeout,
|
||||||
@ -168,11 +193,16 @@ func (m *memoryTransport) Dial(addr string, opts ...DialOption) (Client, error)
|
|||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
creader, swriter := io.Pipe()
|
||||||
|
sreader, cwriter := io.Pipe()
|
||||||
|
|
||||||
client := &memoryClient{
|
client := &memoryClient{
|
||||||
&memorySocket{
|
&memorySocket{
|
||||||
send: make(chan *Message),
|
server: false,
|
||||||
recv: make(chan *Message),
|
csend: gob.NewEncoder(cwriter),
|
||||||
exit: make(chan bool),
|
crecv: gob.NewDecoder(creader),
|
||||||
|
ssend: gob.NewEncoder(swriter),
|
||||||
|
srecv: gob.NewDecoder(sreader), exit: make(chan bool),
|
||||||
lexit: listener.exit,
|
lexit: listener.exit,
|
||||||
local: addr,
|
local: addr,
|
||||||
remote: addr,
|
remote: addr,
|
||||||
|
Loading…
Reference in New Issue
Block a user