1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00

support use of listen options (#2536)

* feat: support use of listen options

* style: Adjust the import order of packages.
This commit is contained in:
Arvin 2022-07-30 17:56:14 +08:00 committed by GitHub
parent adcc1761d0
commit 63a9b94820
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 246 additions and 16 deletions

View File

@ -304,6 +304,13 @@ func WrapSubscriber(w ...server.SubscriberWrapper) Option {
}
}
// Add opt to server option
func AddListenOption(option server.Option) Option {
return func(o *Options) {
o.Server.Init(option)
}
}
// Before and Afters
// BeforeStart run funcs before service starts

View File

@ -14,19 +14,20 @@ import (
)
type Options struct {
Codecs map[string]codec.NewCodec
Broker broker.Broker
Registry registry.Registry
Tracer trace.Tracer
Transport transport.Transport
Metadata map[string]string
Name string
Address string
Advertise string
Id string
Version string
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
Codecs map[string]codec.NewCodec
Broker broker.Broker
Registry registry.Registry
Tracer trace.Tracer
Transport transport.Transport
Metadata map[string]string
Name string
Address string
Advertise string
Id string
Version string
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
ListenOptions []transport.ListenOption
// RegisterCheck runs a check function before registering the service
RegisterCheck func(context.Context) error
@ -256,3 +257,11 @@ func WrapSubscriber(w SubscriberWrapper) Option {
o.SubWrappers = append(o.SubWrappers, w)
}
}
// Add transport.ListenOption to the ListenOptions list, when using it, it will be passed to the
// httpTransport.Listen() method
func ListenOption(option transport.ListenOption) Option {
return func(o *Options) {
o.ListenOptions = append(o.ListenOptions, option)
}
}

View File

@ -811,7 +811,7 @@ func (s *rpcServer) Start() error {
config := s.Options()
// start listening on the transport
ts, err := config.Transport.Listen(config.Address)
ts, err := config.Transport.Listen(config.Address, config.ListenOptions...)
if err != nil {
return err
}

View File

@ -3,6 +3,7 @@ package micro
import (
"context"
"errors"
"net"
"sync"
"testing"
@ -10,6 +11,8 @@ import (
"go-micro.dev/v4/debug/handler"
proto "go-micro.dev/v4/debug/proto"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/server"
"go-micro.dev/v4/transport"
"go-micro.dev/v4/util/test"
)
@ -48,6 +51,34 @@ func testService(ctx context.Context, wg *sync.WaitGroup, name string) Service {
return srv
}
func testCustomListenService(ctx context.Context, customListener net.Listener, wg *sync.WaitGroup, name string) Service {
// add self
wg.Add(1)
r := registry.NewMemoryRegistry(registry.Services(test.Data))
// create service
srv := NewService(
Name(name),
Context(ctx),
Registry(r),
// injection customListener
AddListenOption(server.ListenOption(transport.NetListener(customListener))),
AfterStart(func() error {
wg.Done()
return nil
}),
AfterStop(func() error {
wg.Done()
return nil
}),
)
RegisterHandler(srv.Server(), handler.NewHandler(srv.Client()))
return srv
}
func testRequest(ctx context.Context, c client.Client, name string) error {
// test call debug
req := c.NewRequest(
@ -100,6 +131,72 @@ func TestService(t *testing.T) {
}
}
func benchmarkCustomListenService(b *testing.B, n int, name string) {
// create custom listen
customListen, err := net.Listen("tcp", server.DefaultAddress)
if err != nil {
b.Fatal(err)
}
// stop the timer
b.StopTimer()
// waitgroup for server start
var wg sync.WaitGroup
// cancellation context
ctx, cancel := context.WithCancel(context.Background())
// create test server
service := testCustomListenService(ctx, customListen, &wg, name)
// start the server
go func() {
if err := service.Run(); err != nil {
b.Fatal(err)
}
}()
// wait for service to start
wg.Wait()
// make a test call to warm the cache
for i := 0; i < 10; i++ {
if err := testRequest(ctx, service.Client(), name); err != nil {
b.Fatal(err)
}
}
// start the timer
b.StartTimer()
// number of iterations
for i := 0; i < b.N; i++ {
// for concurrency
for j := 0; j < n; j++ {
wg.Add(1)
go func() {
err := testRequest(ctx, service.Client(), name)
wg.Done()
if err != nil {
b.Fatal(err)
}
}()
}
// wait for test completion
wg.Wait()
}
// stop the timer
b.StopTimer()
// shutdown service
testShutdown(&wg, cancel)
}
func benchmarkService(b *testing.B, n int, name string) {
// stop the timer
b.StopTimer()
@ -178,3 +275,7 @@ func BenchmarkService32(b *testing.B) {
func BenchmarkService64(b *testing.B) {
benchmarkService(b, 64, "test.service.64")
}
func BenchmarkCustomListenService1(b *testing.B) {
benchmarkCustomListenService(b, 1, "test.service.1")
}

20
transport/context.go Normal file
View File

@ -0,0 +1,20 @@
package transport
import (
"net"
)
type netListener struct{}
// getNetListener Get net.Listener from ListenOptions
func getNetListener(o *ListenOptions) net.Listener {
if o.Context == nil {
return nil
}
if l, ok := o.Context.Value(netListener{}).(net.Listener); ok && l != nil {
return l
}
return nil
}

View File

@ -562,8 +562,14 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
var l net.Listener
var err error
// TODO: support use of listen options
if h.opts.Secure || h.opts.TLSConfig != nil {
if listener := getNetListener(&options); listener != nil {
fn := func(addr string) (net.Listener, error) {
return listener, nil
}
l, err = mnet.Listen(addr, fn)
} else if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {

View File

@ -401,3 +401,76 @@ func TestHTTPTransportMultipleSendWhenRecv(t *testing.T) {
c.Close()
wg.Wait()
}
func TestHttpTransportListenerNetListener(t *testing.T) {
address := "127.0.0.1:0"
customListener, err := net.Listen("tcp", address)
if err != nil {
return
}
tr := NewHTTPTransport(Timeout(time.Millisecond * 100))
// injection
l, err := tr.Listen(address, NetListener(customListener))
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
done := make(chan bool)
fn := func(sock Socket) {
defer func() {
sock.Close()
close(done)
}()
go func() {
select {
case <-done:
return
case <-time.After(time.Second):
t.Fatal("deadline not executed")
}
}()
for {
var m Message
if err := sock.Recv(&m); err != nil {
return
}
}
}
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
<-done
}

View File

@ -3,6 +3,7 @@ package transport
import (
"context"
"crypto/tls"
"net"
"time"
"go-micro.dev/v4/codec"
@ -102,3 +103,16 @@ func WithTimeout(d time.Duration) DialOption {
o.Timeout = d
}
}
// NetListener Set net.Listener for httpTransport
func NetListener(customListener net.Listener) ListenOption {
return func(o *ListenOptions) {
if customListener != nil {
return
}
if o.Context == nil {
o.Context = context.TODO()
}
o.Context = context.WithValue(o.Context, netListener{}, customListener)
}
}