1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Add json codec

This commit is contained in:
Asim 2015-11-28 18:40:32 +00:00
parent 609a59d3ab
commit 80f53ab176
5 changed files with 302 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/transport"
rpc "github.com/youtube/vitess/go/rpcplus"
@ -26,8 +27,8 @@ var (
defaultContentType = "application/octet-stream"
defaultCodecs = map[string]codec.NewCodec{
// "application/json": jsonrpc.NewClientCodec,
// "application/json-rpc": jsonrpc.NewClientCodec,
"application/json": json.NewCodec,
"application/json-rpc": json.NewCodec,
"application/protobuf": proto.NewCodec,
"application/proto-rpc": proto.NewCodec,
"application/octet-stream": proto.NewCodec,

97
codec/json/client.go Normal file
View File

@ -0,0 +1,97 @@
package json
import (
"encoding/json"
"fmt"
"io"
"sync"
"github.com/micro/go-micro/codec"
)
type clientCodec struct {
dec *json.Decoder // for reading JSON values
enc *json.Encoder // for writing JSON values
c io.Closer
// temporary work space
req clientRequest
resp clientResponse
sync.Mutex
pending map[uint64]string
}
type clientRequest struct {
Method string `json:"method"`
Params [1]interface{} `json:"params"`
ID uint64 `json:"id"`
}
type clientResponse struct {
ID uint64 `json:"id"`
Result *json.RawMessage `json:"result"`
Error interface{} `json:"error"`
}
func newClientCodec(conn io.ReadWriteCloser) *clientCodec {
return &clientCodec{
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
pending: make(map[uint64]string),
}
}
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
c.Lock()
c.pending[m.Id] = m.Method
c.Unlock()
c.req.Method = m.Method
c.req.Params[0] = b
c.req.ID = m.Id
return c.enc.Encode(&c.req)
}
func (r *clientResponse) reset() {
r.ID = 0
r.Result = nil
r.Error = nil
}
func (c *clientCodec) ReadHeader(m *codec.Message) error {
c.resp.reset()
if err := c.dec.Decode(&c.resp); err != nil {
return err
}
c.Lock()
m.Method = c.pending[c.resp.ID]
delete(c.pending, c.resp.ID)
c.Unlock()
m.Error = ""
m.Id = c.resp.ID
if c.resp.Error != nil {
x, ok := c.resp.Error.(string)
if !ok {
return fmt.Errorf("invalid error %v", c.resp.Error)
}
if x == "" {
x = "unspecified error"
}
m.Error = x
}
return nil
}
func (c *clientCodec) ReadBody(x interface{}) error {
if x == nil {
return nil
}
return json.Unmarshal(*c.resp.Result, x)
}
func (c *clientCodec) Close() error {
return c.c.Close()
}

88
codec/json/json.go Normal file
View File

@ -0,0 +1,88 @@
package json
import (
"bytes"
"encoding/json"
"fmt"
"io"
"github.com/micro/go-micro/codec"
)
type jsonCodec struct {
buf *bytes.Buffer
mt codec.MessageType
rwc io.ReadWriteCloser
c *clientCodec
s *serverCodec
}
func (j *jsonCodec) Close() error {
j.buf.Reset()
return j.rwc.Close()
}
func (j *jsonCodec) String() string {
return "json"
}
func (j *jsonCodec) Write(m *codec.Message, b interface{}) error {
switch m.Type {
case codec.Request:
return j.c.Write(m, b)
case codec.Response:
return j.s.Write(m, b)
case codec.Publication:
data, err := json.Marshal(b)
if err != nil {
return err
}
_, err = j.rwc.Write(data)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", m.Type)
}
return nil
}
func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
j.buf.Reset()
j.mt = mt
switch mt {
case codec.Request:
return j.s.ReadHeader(m)
case codec.Response:
return j.c.ReadHeader(m)
case codec.Publication:
io.Copy(j.buf, j.rwc)
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}
return nil
}
func (j *jsonCodec) ReadBody(b interface{}) error {
switch j.mt {
case codec.Request:
return j.s.ReadBody(b)
case codec.Response:
return j.c.ReadBody(b)
case codec.Publication:
if b != nil {
return json.Unmarshal(j.buf.Bytes(), b)
}
default:
return fmt.Errorf("Unrecognised message type: %v", j.mt)
}
return nil
}
func NewCodec(rwc io.ReadWriteCloser) codec.Codec {
return &jsonCodec{
buf: bytes.NewBuffer(nil),
rwc: rwc,
c: newClientCodec(rwc),
s: newServerCodec(rwc),
}
}

111
codec/json/server.go Normal file
View File

@ -0,0 +1,111 @@
package json
import (
"encoding/json"
"errors"
"io"
"sync"
"github.com/micro/go-micro/codec"
)
type serverCodec struct {
dec *json.Decoder // for reading JSON values
enc *json.Encoder // for writing JSON values
c io.Closer
// temporary work space
req serverRequest
resp serverResponse
sync.Mutex
seq uint64
pending map[uint64]*json.RawMessage
}
type serverRequest struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
ID *json.RawMessage `json:"id"`
}
type serverResponse struct {
ID *json.RawMessage `json:"id"`
Result interface{} `json:"result"`
Error interface{} `json:"error"`
}
func newServerCodec(conn io.ReadWriteCloser) *serverCodec {
return &serverCodec{
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
pending: make(map[uint64]*json.RawMessage),
}
}
func (r *serverRequest) reset() {
r.Method = ""
if r.Params != nil {
*r.Params = (*r.Params)[0:0]
}
if r.ID != nil {
*r.ID = (*r.ID)[0:0]
}
}
func (c *serverCodec) ReadHeader(m *codec.Message) error {
c.req.reset()
if err := c.dec.Decode(&c.req); err != nil {
return err
}
m.Method = c.req.Method
c.Lock()
c.seq++
c.pending[c.seq] = c.req.ID
c.req.ID = nil
m.Id = c.seq
c.Unlock()
return nil
}
func (c *serverCodec) ReadBody(x interface{}) error {
if x == nil {
return nil
}
var params [1]interface{}
params[0] = x
return json.Unmarshal(*c.req.Params, &params)
}
var null = json.RawMessage([]byte("null"))
func (c *serverCodec) Write(m *codec.Message, x interface{}) error {
var resp serverResponse
c.Lock()
b, ok := c.pending[m.Id]
if !ok {
c.Unlock()
return errors.New("invalid sequence number in response")
}
c.Unlock()
if b == nil {
// Invalid request so no id. Use JSON null.
b = &null
}
resp.ID = b
resp.Result = x
if m.Error == "" {
resp.Error = nil
} else {
resp.Error = m.Error
}
return c.enc.Encode(resp)
}
func (c *serverCodec) Close() error {
return c.c.Close()
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/transport"
rpc "github.com/youtube/vitess/go/rpcplus"
@ -24,8 +25,8 @@ type readWriteCloser struct {
var (
defaultCodecs = map[string]codec.NewCodec{
// "application/json": jsonrpc.NewServerCodec,
// "application/json-rpc": jsonrpc.NewServerCodec,
"application/json": json.NewCodec,
"application/json-rpc": json.NewCodec,
"application/protobuf": proto.NewCodec,
"application/proto-rpc": proto.NewCodec,
"application/octet-stream": proto.NewCodec,