1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Grpc server injection (#2208)

* plugin: update grpc server readme

* plugin: refactor gprc server test

This is a no-op change to enable test logic reuse for different
combinations.

* plugin: grpc server test Init after New

* plugin: allow grpc.Server to be injected
This commit is contained in:
Qiu Yu 2021-08-12 10:26:26 -07:00 committed by GitHub
parent ffb0a2f896
commit c7195aae98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 29 deletions

View File

@ -1,6 +1,6 @@
# GRPC Server # GRPC Server
The grpc server is a [micro.Server](https://godoc.org/github.com/micro/go-micro/server#Server) compatible server. The grpc server is a [micro.Server](https://pkg.go.dev/github.com/asim/go-micro/server#Server) compatible server.
## Overview ## Overview
@ -13,8 +13,8 @@ Specify the server to your micro service
```go ```go
import ( import (
"github.com/micro/go-micro" "github.com/asim/go-micro/v3"
"github.com/micro/go-plugins/server/grpc" "github.com/asim/go-micro/plugins/server/grpc/v3"
) )
func main() { func main() {

View File

@ -119,10 +119,23 @@ func (g *grpcServer) configure(opts ...server.Option) {
return return
} }
// Optionally use injected grpc.Server if there's a one
var srv *grpc.Server
if srv = g.getGrpcServer(); srv != nil {
g.srv = srv
}
for _, o := range opts { for _, o := range opts {
o(&g.opts) o(&g.opts)
} }
g.rsvc = nil
// NOTE: injected grpc.Server doesn't have g.handler registered
if srv != nil {
return
}
maxMsgSize := g.getMaxMsgSize() maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{ gopts := []grpc.ServerOption{
@ -139,7 +152,6 @@ func (g *grpcServer) configure(opts ...server.Option) {
gopts = append(gopts, opts...) gopts = append(gopts, opts...)
} }
g.rsvc = nil
g.srv = grpc.NewServer(gopts...) g.srv = grpc.NewServer(gopts...)
} }
@ -188,6 +200,18 @@ func (g *grpcServer) getListener() net.Listener {
return nil return nil
} }
func (g *grpcServer) getGrpcServer() *grpc.Server {
if g.opts.Context == nil {
return nil
}
if srv, ok := g.opts.Context.Value(grpcServerKey{}).(*grpc.Server); ok && srv != nil {
return srv
}
return nil
}
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
if g.wg != nil { if g.wg != nil {
g.wg.Add(1) g.wg.Add(1)

View File

@ -5,19 +5,23 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/asim/go-micro/v3"
bmemory "github.com/asim/go-micro/plugins/broker/memory/v3"
"github.com/asim/go-micro/v3/client"
gcli "github.com/asim/go-micro/plugins/client/grpc/v3"
"github.com/asim/go-micro/v3/errors"
rmemory "github.com/asim/go-micro/plugins/registry/memory/v3"
"github.com/asim/go-micro/v3/server"
gsrv "github.com/asim/go-micro/plugins/server/grpc/v3"
tgrpc "github.com/asim/go-micro/plugins/transport/grpc/v3"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/asim/go-micro/v3"
"github.com/asim/go-micro/v3/broker"
"github.com/asim/go-micro/v3/client"
"github.com/asim/go-micro/v3/errors"
"github.com/asim/go-micro/v3/registry"
"github.com/asim/go-micro/v3/server"
"github.com/asim/go-micro/v3/transport"
bmemory "github.com/asim/go-micro/plugins/broker/memory/v3"
gcli "github.com/asim/go-micro/plugins/client/grpc/v3"
rmemory "github.com/asim/go-micro/plugins/registry/memory/v3"
gsrv "github.com/asim/go-micro/plugins/server/grpc/v3"
pb "github.com/asim/go-micro/plugins/server/grpc/v3/proto" pb "github.com/asim/go-micro/plugins/server/grpc/v3/proto"
tgrpc "github.com/asim/go-micro/plugins/transport/grpc/v3"
) )
// server is used to implement helloworld.GreeterServer. // server is used to implement helloworld.GreeterServer.
@ -106,21 +110,7 @@ func BenchmarkServer(b *testing.B) {
} }
*/ */
func TestGRPCServer(t *testing.T) { func testGRPCServer(t *testing.T, s server.Server, c client.Client, r registry.Registry, testRPC bool) {
r := rmemory.NewRegistry()
b := bmemory.NewBroker()
tr := tgrpc.NewTransport()
s := gsrv.NewServer(
server.Broker(b),
server.Name("foo"),
server.Registry(r),
server.Transport(tr),
)
c := gcli.NewClient(
client.Registry(r),
client.Broker(b),
client.Transport(tr),
)
ctx := context.TODO() ctx := context.TODO()
h := &testServer{} h := &testServer{}
@ -165,6 +155,10 @@ func TestGRPCServer(t *testing.T) {
t.Fatal("this must return error, as we return error from handler") t.Fatal("this must return error, as we return error from handler")
} }
if !testRPC {
return
}
cc, err := grpc.Dial(s.Options().Address, grpc.WithInsecure()) cc, err := grpc.Dial(s.Options().Address, grpc.WithInsecure())
if err != nil { if err != nil {
t.Fatalf("failed to dial server: %v", err) t.Fatalf("failed to dial server: %v", err)
@ -201,3 +195,63 @@ func TestGRPCServer(t *testing.T) {
} }
} }
} }
func getTestHarness() (registry.Registry, broker.Broker, transport.Transport) {
r := rmemory.NewRegistry()
b := bmemory.NewBroker()
tr := tgrpc.NewTransport()
return r, b, tr
}
func TestGRPCServer(t *testing.T) {
r, b, tr := getTestHarness()
s := gsrv.NewServer(
server.Broker(b),
server.Name("foo"),
server.Registry(r),
server.Transport(tr),
)
c := gcli.NewClient(
client.Registry(r),
client.Broker(b),
client.Transport(tr),
)
testGRPCServer(t, s, c, r, true)
}
func TestGRPCServerInitAfterNew(t *testing.T) {
r, b, tr := getTestHarness()
s := gsrv.NewServer()
s.Init(
server.Broker(b),
server.Name("foo"),
server.Registry(r),
server.Transport(tr),
)
c := gcli.NewClient(
client.Registry(r),
client.Broker(b),
client.Transport(tr),
)
testGRPCServer(t, s, c, r, true)
}
func TestGRPCServerInjectedServer(t *testing.T) {
r, b, tr := getTestHarness()
srv := grpc.NewServer()
s := gsrv.NewServer(
gsrv.Server(srv),
)
s.Init(
server.Broker(b),
server.Name("foo"),
server.Registry(r),
server.Transport(tr),
)
c := gcli.NewClient(
client.Registry(r),
client.Broker(b),
client.Transport(tr),
)
testGRPCServer(t, s, c, r, false)
}

View File

@ -20,6 +20,7 @@ type netListener struct{}
type maxMsgSizeKey struct{} type maxMsgSizeKey struct{}
type maxConnKey struct{} type maxConnKey struct{}
type tlsAuth struct{} type tlsAuth struct{}
type grpcServerKey struct{}
// gRPC Codec to be used to encode/decode requests for a given content type // gRPC Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c encoding.Codec) server.Option { func Codec(contentType string, c encoding.Codec) server.Option {
@ -51,6 +52,14 @@ func Listener(l net.Listener) server.Option {
return setServerOption(netListener{}, l) return setServerOption(netListener{}, l)
} }
// Server specifies a *grpc.Server to use instead of the default
// This is for rare use case where user need to expose grpc.Server for
// customization. Please NOTE however user injected grpcServer doesn't support
// server Handler abstraction
func Server(srv *grpc.Server) server.Option {
return setServerOption(grpcServerKey{}, srv)
}
// Options to be used to configure gRPC options // Options to be used to configure gRPC options
func Options(opts ...grpc.ServerOption) server.Option { func Options(opts ...grpc.ServerOption) server.Option {
return setServerOption(grpcOptions{}, opts) return setServerOption(grpcOptions{}, opts)