mirror of
https://github.com/go-micro/go-micro.git
synced 2025-06-24 22:26:54 +02:00
Move pool to util
This commit is contained in:
@ -1,114 +0,0 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
type pool struct {
|
||||
size int
|
||||
ttl time.Duration
|
||||
tr transport.Transport
|
||||
|
||||
sync.Mutex
|
||||
conns map[string][]*poolConn
|
||||
}
|
||||
|
||||
type poolConn struct {
|
||||
transport.Client
|
||||
id string
|
||||
created time.Time
|
||||
}
|
||||
|
||||
func newPool(options Options) *pool {
|
||||
return &pool{
|
||||
size: options.Size,
|
||||
tr: options.Transport,
|
||||
ttl: options.TTL,
|
||||
conns: make(map[string][]*poolConn),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *pool) Close() error {
|
||||
p.Lock()
|
||||
for k, c := range p.conns {
|
||||
for _, conn := range c {
|
||||
conn.Client.Close()
|
||||
}
|
||||
delete(p.conns, k)
|
||||
}
|
||||
p.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// NoOp the Close since we manage it
|
||||
func (p *poolConn) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *poolConn) Id() string {
|
||||
return p.id
|
||||
}
|
||||
|
||||
func (p *poolConn) Created() time.Time {
|
||||
return p.created
|
||||
}
|
||||
|
||||
func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) {
|
||||
p.Lock()
|
||||
conns := p.conns[addr]
|
||||
|
||||
// while we have conns check age and then return one
|
||||
// otherwise we'll create a new conn
|
||||
for len(conns) > 0 {
|
||||
conn := conns[len(conns)-1]
|
||||
conns = conns[:len(conns)-1]
|
||||
p.conns[addr] = conns
|
||||
|
||||
// if conn is old kill it and move on
|
||||
if d := time.Since(conn.Created()); d > p.ttl {
|
||||
conn.Client.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
// we got a good conn, lets unlock and return it
|
||||
p.Unlock()
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
p.Unlock()
|
||||
|
||||
// create new conn
|
||||
c, err := p.tr.Dial(addr, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &poolConn{
|
||||
Client: c,
|
||||
id: uuid.New().String(),
|
||||
created: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *pool) Release(conn Conn, err error) error {
|
||||
// don't store the conn if it has errored
|
||||
if err != nil {
|
||||
return conn.(*poolConn).Client.Close()
|
||||
}
|
||||
|
||||
// otherwise put it back for reuse
|
||||
p.Lock()
|
||||
conns := p.conns[conn.Remote()]
|
||||
if len(conns) >= p.size {
|
||||
p.Unlock()
|
||||
return conn.(*poolConn).Client.Close()
|
||||
}
|
||||
p.conns[conn.Remote()] = append(conns, conn.(*poolConn))
|
||||
p.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
@ -1,89 +0,0 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/transport"
|
||||
"github.com/micro/go-micro/transport/memory"
|
||||
)
|
||||
|
||||
func testPool(t *testing.T, size int, ttl time.Duration) {
|
||||
// mock transport
|
||||
tr := memory.NewTransport()
|
||||
|
||||
options := Options{
|
||||
TTL: ttl,
|
||||
Size: size,
|
||||
Transport: tr,
|
||||
}
|
||||
// zero pool
|
||||
p := newPool(options)
|
||||
|
||||
// listen
|
||||
l, err := tr.Listen(":0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
// accept loop
|
||||
go func() {
|
||||
for {
|
||||
if err := l.Accept(func(s transport.Socket) {
|
||||
for {
|
||||
var msg transport.Message
|
||||
if err := s.Recv(&msg); err != nil {
|
||||
return
|
||||
}
|
||||
if err := s.Send(&msg); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
// get a conn
|
||||
c, err := p.Get(l.Addr())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg := &transport.Message{
|
||||
Body: []byte(`hello world`),
|
||||
}
|
||||
|
||||
if err := c.Send(msg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var rcv transport.Message
|
||||
|
||||
if err := c.Recv(&rcv); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if string(rcv.Body) != string(msg.Body) {
|
||||
t.Fatalf("got %v, expected %v", rcv.Body, msg.Body)
|
||||
}
|
||||
|
||||
// release the conn
|
||||
p.Release(c, nil)
|
||||
|
||||
p.Lock()
|
||||
if i := len(p.conns[l.Addr()]); i > size {
|
||||
p.Unlock()
|
||||
t.Fatalf("pool size %d is greater than expected %d", i, size)
|
||||
}
|
||||
p.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientPool(t *testing.T) {
|
||||
testPool(t, 0, time.Minute)
|
||||
testPool(t, 2, time.Minute)
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Transport transport.Transport
|
||||
TTL time.Duration
|
||||
Size int
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func Size(i int) Option {
|
||||
return func(o *Options) {
|
||||
o.Size = i
|
||||
}
|
||||
}
|
||||
|
||||
func Transport(t transport.Transport) Option {
|
||||
return func(o *Options) {
|
||||
o.Transport = t
|
||||
}
|
||||
}
|
||||
|
||||
func TTL(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.TTL = t
|
||||
}
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
// Package pool is a connection pool
|
||||
package pool
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
// Pool is an interface for connection pooling
|
||||
type Pool interface {
|
||||
// Close the pool
|
||||
Close() error
|
||||
// Get a connection
|
||||
Get(addr string, opts ...transport.DialOption) (Conn, error)
|
||||
// Releaes the connection
|
||||
Release(c Conn, status error) error
|
||||
}
|
||||
|
||||
type Conn interface {
|
||||
// unique id of connection
|
||||
Id() string
|
||||
// time it was created
|
||||
Created() time.Time
|
||||
// embedded connection
|
||||
transport.Client
|
||||
}
|
||||
|
||||
func NewPool(opts ...Option) Pool {
|
||||
var options Options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return newPool(options)
|
||||
}
|
@ -10,7 +10,7 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/client/pool"
|
||||
"github.com/micro/go-micro/util/pool"
|
||||
"github.com/micro/go-micro/client/selector"
|
||||
"github.com/micro/go-micro/codec"
|
||||
raw "github.com/micro/go-micro/codec/bytes"
|
||||
|
Reference in New Issue
Block a user