1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

add cruft

This commit is contained in:
Asim Aslam 2019-06-17 20:05:58 +01:00
parent 1a571b8c82
commit f65694670e
4 changed files with 126 additions and 3 deletions

View File

@ -2,13 +2,16 @@ package grpc
import (
"fmt"
"strings"
"github.com/golang/protobuf/proto"
"github.com/json-iterator/go"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc"
)
type jsonCodec struct{}
@ -52,7 +55,28 @@ func (w wrapCodec) String() string {
return w.Codec.Name()
}
func (w wrapCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*bytes.Frame)
if ok {
return b.Data, nil
}
return w.Codec.Marshal(v)
}
func (w wrapCodec) Unmarshal(data []byte, v interface{}) error {
b, ok := v.(*bytes.Frame)
if ok {
b.Data = data
return nil
}
return w.Codec.Unmarshal(data, v)
}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*bytes.Frame)
if ok {
return b.Data, nil
}
return proto.Marshal(v.(proto.Message))
}
@ -96,3 +120,68 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
func (jsonCodec) Name() string {
return "json"
}
type grpcCodec struct {
// headers
id string
target string
method string
endpoint string
s grpc.ClientStream
c encoding.Codec
}
func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
md, err := g.s.Header()
if err != nil {
return err
}
if m == nil {
m = new(codec.Message)
}
if m.Header == nil {
m.Header = make(map[string]string)
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
return nil
}
func (g *grpcCodec) ReadBody(v interface{}) error {
frame := &bytes.Frame{}
if err := g.s.RecvMsg(frame); err != nil {
return err
}
return g.c.Unmarshal(frame.Data, v)
}
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
// if we don't have a body
if len(m.Body) == 0 {
b, err := g.c.Marshal(v)
if err != nil {
return err
}
m.Body = b
}
// create an encoded frame
frame := &bytes.Frame{m.Body}
// write the body using the framing codec
return g.s.SendMsg(frame)
}
func (g *grpcCodec) Close() error {
return g.s.CloseSend()
}
func (g *grpcCodec) String() string {
return g.c.Name()
}

View File

@ -177,7 +177,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
dialCtx, cancel = context.WithCancel(ctx)
}
defer cancel()
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure())
wc := wrapCodec{cf}
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
@ -193,6 +196,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
}
// set request codec
if r, ok := req.(*grpcRequest); ok {
r.codec = &grpcCodec{
s: st,
c: wc,
}
}
rsp := &response{
conn: cc,
stream: st,

View File

@ -6,6 +6,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc"
"google.golang.org/grpc/encoding"
@ -14,6 +15,7 @@ import (
type jsonCodec struct{}
type bytesCodec struct{}
type protoCodec struct{}
type wrapCodec struct { encoding.Codec }
var (
defaultGRPCCodecs = map[string]encoding.Codec{
@ -36,6 +38,27 @@ var (
}
)
func (w wrapCodec) String() string {
return w.Codec.Name()
}
func (w wrapCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*bytes.Frame)
if ok {
return b.Data, nil
}
return w.Codec.Marshal(v)
}
func (w wrapCodec) Unmarshal(data []byte, v interface{}) error {
b, ok := v.(*bytes.Frame)
if ok {
b.Data = data
return nil
}
return w.Codec.Unmarshal(data, v)
}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}

View File

@ -56,8 +56,8 @@ type grpcServer struct {
}
func init() {
encoding.RegisterCodec(jsonCodec{})
encoding.RegisterCodec(bytesCodec{})
encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(wrapCodec{bytesCodec{}})
}
func newGRPCServer(opts ...server.Option) server.Server {