mirror of
https://github.com/go-micro/go-micro.git
synced 2024-11-24 08:02:32 +02:00
api: add static router and improve path parser in rpc handler (#1437)
* api: add static router and improve path parser in rpc handler Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * expose metadata context key to be able to get unmodified map keys Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * server/grpc: fix jsonpb codec for protobuf msg Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * api/handler/rpc: write 204 status code when rsp is nil Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * api/handler/rpc: add check for nil response for non javascript Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
8282e781e4
commit
3d7d5ce6b4
16
api/api.go
16
api/api.go
@ -9,6 +9,20 @@ import (
|
|||||||
"github.com/micro/go-micro/v2/server"
|
"github.com/micro/go-micro/v2/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Api interface {
|
||||||
|
// Register a http handler
|
||||||
|
Register(*Endpoint) error
|
||||||
|
// Register a route
|
||||||
|
Deregister(*Endpoint) error
|
||||||
|
// Init initialises the command line.
|
||||||
|
// It also parses further options.
|
||||||
|
//Init(...Option) error
|
||||||
|
// Options
|
||||||
|
//Options() Options
|
||||||
|
// String
|
||||||
|
String() string
|
||||||
|
}
|
||||||
|
|
||||||
// Endpoint is a mapping between an RPC method and HTTP endpoint
|
// Endpoint is a mapping between an RPC method and HTTP endpoint
|
||||||
type Endpoint struct {
|
type Endpoint struct {
|
||||||
// RPC Method e.g. Greeter.Hello
|
// RPC Method e.g. Greeter.Hello
|
||||||
@ -23,6 +37,8 @@ type Endpoint struct {
|
|||||||
Method []string
|
Method []string
|
||||||
// HTTP Path e.g /greeter. Expect POSIX regex
|
// HTTP Path e.g /greeter. Expect POSIX regex
|
||||||
Path []string
|
Path []string
|
||||||
|
// Stream flag
|
||||||
|
Stream bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service represents an API service
|
// Service represents an API service
|
||||||
|
132
api/grpc_test.go
Normal file
132
api/grpc_test.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
package api_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2"
|
||||||
|
"github.com/micro/go-micro/v2/api"
|
||||||
|
ahandler "github.com/micro/go-micro/v2/api/handler"
|
||||||
|
apirpc "github.com/micro/go-micro/v2/api/handler/rpc"
|
||||||
|
"github.com/micro/go-micro/v2/api/router"
|
||||||
|
rstatic "github.com/micro/go-micro/v2/api/router/static"
|
||||||
|
bmemory "github.com/micro/go-micro/v2/broker/memory"
|
||||||
|
"github.com/micro/go-micro/v2/client"
|
||||||
|
gcli "github.com/micro/go-micro/v2/client/grpc"
|
||||||
|
rmemory "github.com/micro/go-micro/v2/registry/memory"
|
||||||
|
"github.com/micro/go-micro/v2/server"
|
||||||
|
gsrv "github.com/micro/go-micro/v2/server/grpc"
|
||||||
|
tgrpc "github.com/micro/go-micro/v2/transport/grpc"
|
||||||
|
|
||||||
|
pb "github.com/micro/go-micro/v2/server/grpc/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// server is used to implement helloworld.GreeterServer.
|
||||||
|
type testServer struct {
|
||||||
|
msgCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHello implements helloworld.GreeterServer
|
||||||
|
func (s *testServer) Call(ctx context.Context, req *pb.Request, rsp *pb.Response) error {
|
||||||
|
rsp.Msg = "Hello " + req.Name
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApiAndGRPC(t *testing.T) {
|
||||||
|
r := rmemory.NewRegistry()
|
||||||
|
b := bmemory.NewBroker()
|
||||||
|
tr := tgrpc.NewTransport()
|
||||||
|
s := gsrv.NewServer(
|
||||||
|
server.Broker(b),
|
||||||
|
server.Name("foo"),
|
||||||
|
server.Registry(r),
|
||||||
|
server.Transport(tr),
|
||||||
|
)
|
||||||
|
c := gcli.NewClient(
|
||||||
|
client.Registry(r),
|
||||||
|
client.Broker(b),
|
||||||
|
client.Transport(tr),
|
||||||
|
)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
svc := micro.NewService(
|
||||||
|
micro.Server(s),
|
||||||
|
micro.Client(c),
|
||||||
|
micro.Broker(b),
|
||||||
|
micro.Registry(r),
|
||||||
|
micro.Transport(tr),
|
||||||
|
micro.Context(ctx))
|
||||||
|
h := &testServer{}
|
||||||
|
pb.RegisterTestHandler(s, h)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := svc.Run(); err != nil {
|
||||||
|
t.Fatalf("failed to start: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
// check registration
|
||||||
|
services, err := r.GetService("foo")
|
||||||
|
if err != nil || len(services) == 0 {
|
||||||
|
t.Fatalf("failed to get service: %v # %d", err, len(services))
|
||||||
|
}
|
||||||
|
|
||||||
|
router := rstatic.NewRouter(
|
||||||
|
router.WithHandler(apirpc.Handler),
|
||||||
|
router.WithRegistry(svc.Server().Options().Registry),
|
||||||
|
)
|
||||||
|
err = router.Register(&api.Endpoint{
|
||||||
|
Name: "foo.Test.Call",
|
||||||
|
Method: []string{"GET"},
|
||||||
|
Path: []string{"/api/v0/test/call/{name}"},
|
||||||
|
Handler: "rpc",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
hrpc := apirpc.NewHandler(
|
||||||
|
ahandler.WithService(svc),
|
||||||
|
ahandler.WithRouter(router),
|
||||||
|
)
|
||||||
|
|
||||||
|
hsrv := &http.Server{
|
||||||
|
Handler: hrpc,
|
||||||
|
Addr: "127.0.0.1:6543",
|
||||||
|
WriteTimeout: 15 * time.Second,
|
||||||
|
ReadTimeout: 15 * time.Second,
|
||||||
|
IdleTimeout: 20 * time.Second,
|
||||||
|
MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
log.Println(hsrv.ListenAndServe())
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
rsp, err := http.Get(fmt.Sprintf("http://%s/api/v0/test/call/TEST", hsrv.Addr))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to created http.Request: %v", err)
|
||||||
|
}
|
||||||
|
defer rsp.Body.Close()
|
||||||
|
buf, err := ioutil.ReadAll(rsp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonMsg := `{"msg":"Hello TEST"}`
|
||||||
|
if string(buf) != jsonMsg {
|
||||||
|
t.Fatalf("invalid message received, parsing error %s != %s", buf, jsonMsg)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -12,7 +12,7 @@ import (
|
|||||||
"github.com/joncalhoun/qson"
|
"github.com/joncalhoun/qson"
|
||||||
"github.com/micro/go-micro/v2/api"
|
"github.com/micro/go-micro/v2/api"
|
||||||
"github.com/micro/go-micro/v2/api/handler"
|
"github.com/micro/go-micro/v2/api/handler"
|
||||||
proto "github.com/micro/go-micro/v2/api/internal/proto"
|
"github.com/micro/go-micro/v2/api/internal/proto"
|
||||||
"github.com/micro/go-micro/v2/client"
|
"github.com/micro/go-micro/v2/client"
|
||||||
"github.com/micro/go-micro/v2/client/selector"
|
"github.com/micro/go-micro/v2/client/selector"
|
||||||
"github.com/micro/go-micro/v2/codec"
|
"github.com/micro/go-micro/v2/codec"
|
||||||
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/micro/go-micro/v2/codec/protorpc"
|
"github.com/micro/go-micro/v2/codec/protorpc"
|
||||||
"github.com/micro/go-micro/v2/errors"
|
"github.com/micro/go-micro/v2/errors"
|
||||||
"github.com/micro/go-micro/v2/logger"
|
"github.com/micro/go-micro/v2/logger"
|
||||||
|
"github.com/micro/go-micro/v2/metadata"
|
||||||
"github.com/micro/go-micro/v2/registry"
|
"github.com/micro/go-micro/v2/registry"
|
||||||
"github.com/micro/go-micro/v2/util/ctx"
|
"github.com/micro/go-micro/v2/util/ctx"
|
||||||
"github.com/oxtoacart/bpool"
|
"github.com/oxtoacart/bpool"
|
||||||
@ -128,7 +129,6 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
so := selector.WithStrategy(strategy(service.Services))
|
so := selector.WithStrategy(strategy(service.Services))
|
||||||
|
|
||||||
// walk the standard call path
|
// walk the standard call path
|
||||||
|
|
||||||
// get payload
|
// get payload
|
||||||
br, err := requestPayload(r)
|
br, err := requestPayload(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -164,7 +164,12 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// marshall response
|
// marshall response
|
||||||
rsp, _ = response.Marshal()
|
rsp, err = response.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// if json codec is not present set to json
|
// if json codec is not present set to json
|
||||||
if !hasCodec(ct, jsonCodecs) {
|
if !hasCodec(ct, jsonCodecs) {
|
||||||
@ -195,7 +200,11 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// marshall response
|
// marshall response
|
||||||
rsp, _ = response.MarshalJSON()
|
rsp, err = response.MarshalJSON()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// write the response
|
// write the response
|
||||||
@ -219,8 +228,11 @@ func hasCodec(ct string, codecs []string) bool {
|
|||||||
// If the request is a GET the query string parameters are extracted and marshaled to JSON and the raw bytes are returned.
|
// If the request is a GET the query string parameters are extracted and marshaled to JSON and the raw bytes are returned.
|
||||||
// If the request method is a POST the request body is read and returned
|
// If the request method is a POST the request body is read and returned
|
||||||
func requestPayload(r *http.Request) ([]byte, error) {
|
func requestPayload(r *http.Request) ([]byte, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
// we have to decode json-rpc and proto-rpc because we suck
|
// we have to decode json-rpc and proto-rpc because we suck
|
||||||
// well actually because there's no proxy codec right now
|
// well actually because there's no proxy codec right now
|
||||||
|
|
||||||
ct := r.Header.Get("Content-Type")
|
ct := r.Header.Get("Content-Type")
|
||||||
switch {
|
switch {
|
||||||
case strings.Contains(ct, "application/json-rpc"):
|
case strings.Contains(ct, "application/json-rpc"):
|
||||||
@ -229,11 +241,11 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
Header: make(map[string]string),
|
Header: make(map[string]string),
|
||||||
}
|
}
|
||||||
c := jsonrpc.NewCodec(&buffer{r.Body})
|
c := jsonrpc.NewCodec(&buffer{r.Body})
|
||||||
if err := c.ReadHeader(&msg, codec.Request); err != nil {
|
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var raw json.RawMessage
|
var raw json.RawMessage
|
||||||
if err := c.ReadBody(&raw); err != nil {
|
if err = c.ReadBody(&raw); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return ([]byte)(raw), nil
|
return ([]byte)(raw), nil
|
||||||
@ -243,15 +255,14 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
Header: make(map[string]string),
|
Header: make(map[string]string),
|
||||||
}
|
}
|
||||||
c := protorpc.NewCodec(&buffer{r.Body})
|
c := protorpc.NewCodec(&buffer{r.Body})
|
||||||
if err := c.ReadHeader(&msg, codec.Request); err != nil {
|
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var raw proto.Message
|
var raw proto.Message
|
||||||
if err := c.ReadBody(&raw); err != nil {
|
if err = c.ReadBody(&raw); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
b, err := raw.Marshal()
|
return raw.Marshal()
|
||||||
return b, err
|
|
||||||
case strings.Contains(ct, "application/www-x-form-urlencoded"):
|
case strings.Contains(ct, "application/www-x-form-urlencoded"):
|
||||||
r.ParseForm()
|
r.ParseForm()
|
||||||
|
|
||||||
@ -262,43 +273,94 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// marshal
|
// marshal
|
||||||
b, err := json.Marshal(vals)
|
return json.Marshal(vals)
|
||||||
return b, err
|
|
||||||
// TODO: application/grpc
|
// TODO: application/grpc
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise as per usual
|
// otherwise as per usual
|
||||||
|
ctx := r.Context()
|
||||||
switch r.Method {
|
// dont user meadata.FromContext as it mangles names
|
||||||
case "GET":
|
md, ok := ctx.Value(metadata.MetadataKey{}).(metadata.Metadata)
|
||||||
if len(r.URL.RawQuery) > 0 {
|
if !ok {
|
||||||
return qson.ToJSON(r.URL.RawQuery)
|
md = make(map[string]string)
|
||||||
}
|
}
|
||||||
case "PATCH", "POST", "PUT", "DELETE":
|
// allocate maximum
|
||||||
urlParams := []byte("{}")
|
matches := make(map[string]string, len(md))
|
||||||
bodyParams := []byte("{}")
|
for k, v := range md {
|
||||||
var err error
|
if strings.HasPrefix(k, "x-api-field-") {
|
||||||
|
matches[strings.TrimPrefix(k, "x-api-field-")] = v
|
||||||
|
}
|
||||||
|
delete(md, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore context without fields
|
||||||
|
ctx = metadata.NewContext(ctx, md)
|
||||||
|
*r = *r.WithContext(ctx)
|
||||||
|
req := make(map[string]interface{}, len(md))
|
||||||
|
for k, v := range matches {
|
||||||
|
ps := strings.Split(k, ".")
|
||||||
|
if len(ps) == 1 {
|
||||||
|
req[k] = v
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
em := make(map[string]interface{})
|
||||||
|
em[ps[len(ps)-1]] = v
|
||||||
|
for i := len(ps) - 2; i > 0; i-- {
|
||||||
|
nm := make(map[string]interface{})
|
||||||
|
nm[ps[i]] = em
|
||||||
|
em = nm
|
||||||
|
}
|
||||||
|
req[ps[0]] = em
|
||||||
|
}
|
||||||
|
pathbuf := []byte("{}")
|
||||||
|
if len(req) > 0 {
|
||||||
|
pathbuf, err = json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
urlbuf := []byte("{}")
|
||||||
if len(r.URL.RawQuery) > 0 {
|
if len(r.URL.RawQuery) > 0 {
|
||||||
if urlParams, err = qson.ToJSON(r.URL.RawQuery); err != nil {
|
urlbuf, err = qson.ToJSON(r.URL.RawQuery)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch r.Method {
|
||||||
|
case "GET":
|
||||||
|
// empty response
|
||||||
|
if strings.Contains(ct, "application/json") && string(out) == "{}" {
|
||||||
|
return out, nil
|
||||||
|
} else if string(out) == "{}" && !strings.Contains(ct, "application/json") {
|
||||||
|
return []byte{}, nil
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
case "PATCH", "POST", "PUT", "DELETE":
|
||||||
|
bodybuf := []byte("{}")
|
||||||
buf := bufferPool.Get()
|
buf := bufferPool.Get()
|
||||||
defer bufferPool.Put(buf)
|
defer bufferPool.Put(buf)
|
||||||
if _, err := buf.ReadFrom(r.Body); err != nil {
|
if _, err := buf.ReadFrom(r.Body); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if b := buf.Bytes(); len(b) > 0 {
|
if b := buf.Bytes(); len(b) > 0 {
|
||||||
bodyParams = b
|
bodybuf = b
|
||||||
|
} else {
|
||||||
|
return []byte{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if out, err := jsonpatch.MergeMergePatches(urlParams, bodyParams); err == nil {
|
if out, err = jsonpatch.MergeMergePatches(out, bodybuf); err == nil {
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//fallback to previous unknown behaviour
|
//fallback to previous unknown behaviour
|
||||||
return buf.Bytes(), nil
|
return bodybuf, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -332,7 +394,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_, werr := w.Write([]byte(ce.Error()))
|
_, werr := w.Write([]byte(ce.Error()))
|
||||||
if err != nil {
|
if werr != nil {
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error(werr)
|
logger.Error(werr)
|
||||||
}
|
}
|
||||||
@ -351,6 +413,11 @@ func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) {
|
|||||||
w.Header().Set("grpc-message", "")
|
w.Header().Set("grpc-message", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write 204 status if rsp is nil
|
||||||
|
if len(rsp) == 0 {
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
// write response
|
// write response
|
||||||
_, err := w.Write(rsp)
|
_, err := w.Write(rsp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -250,6 +250,14 @@ func (r *registryRouter) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *registryRouter) Register(ep *api.Endpoint) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registryRouter) Deregister(ep *api.Endpoint) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
|
func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
|
||||||
if r.isClosed() {
|
if r.isClosed() {
|
||||||
return nil, errors.New("router closed")
|
return nil, errors.New("router closed")
|
||||||
|
@ -15,6 +15,10 @@ type Router interface {
|
|||||||
Close() error
|
Close() error
|
||||||
// Endpoint returns an api.Service endpoint or an error if it does not exist
|
// Endpoint returns an api.Service endpoint or an error if it does not exist
|
||||||
Endpoint(r *http.Request) (*api.Service, error)
|
Endpoint(r *http.Request) (*api.Service, error)
|
||||||
|
// Register endpoint in router
|
||||||
|
Register(ep *api.Endpoint) error
|
||||||
|
// Deregister endpoint from router
|
||||||
|
Deregister(ep *api.Endpoint) error
|
||||||
// Route returns an api.Service route
|
// Route returns an api.Service route
|
||||||
Route(r *http.Request) (*api.Service, error)
|
Route(r *http.Request) (*api.Service, error)
|
||||||
}
|
}
|
||||||
|
304
api/router/static/static.go
Normal file
304
api/router/static/static.go
Normal file
@ -0,0 +1,304 @@
|
|||||||
|
package static
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway/httprule"
|
||||||
|
"github.com/grpc-ecosystem/grpc-gateway/runtime"
|
||||||
|
"github.com/micro/go-micro/v2/api"
|
||||||
|
"github.com/micro/go-micro/v2/api/router"
|
||||||
|
"github.com/micro/go-micro/v2/logger"
|
||||||
|
"github.com/micro/go-micro/v2/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
type endpoint struct {
|
||||||
|
apiep *api.Endpoint
|
||||||
|
hostregs []*regexp.Regexp
|
||||||
|
pathregs []runtime.Pattern
|
||||||
|
}
|
||||||
|
|
||||||
|
// router is the default router
|
||||||
|
type staticRouter struct {
|
||||||
|
exit chan bool
|
||||||
|
opts router.Options
|
||||||
|
sync.RWMutex
|
||||||
|
eps map[string]*endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *staticRouter) isClosed() bool {
|
||||||
|
select {
|
||||||
|
case <-r.exit:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// watch for endpoint changes
|
||||||
|
func (r *staticRouter) watch() {
|
||||||
|
var attempts int
|
||||||
|
|
||||||
|
for {
|
||||||
|
if r.isClosed() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// watch for changes
|
||||||
|
w, err := r.opts.Registry.Watch()
|
||||||
|
if err != nil {
|
||||||
|
attempts++
|
||||||
|
log.Println("Error watching endpoints", err)
|
||||||
|
time.Sleep(time.Duration(attempts) * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan bool)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
w.Stop()
|
||||||
|
case <-r.exit:
|
||||||
|
w.Stop()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// reset if we get here
|
||||||
|
attempts = 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
// process next event
|
||||||
|
res, err := w.Next()
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error getting next endpoint", err)
|
||||||
|
close(ch)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.process(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
func (r *staticRouter) Register(ep *api.Endpoint) error {
|
||||||
|
if err := api.Validate(ep); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pathregs []runtime.Pattern
|
||||||
|
var hostregs []*regexp.Regexp
|
||||||
|
|
||||||
|
for _, h := range ep.Host {
|
||||||
|
if h == "" || h == "*" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
hostreg, err := regexp.CompilePOSIX(h)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
hostregs = append(hostregs, hostreg)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range ep.Path {
|
||||||
|
rule, err := httprule.Parse(p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tpl := rule.Compile()
|
||||||
|
pathreg, err := runtime.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pathregs = append(pathregs, pathreg)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Lock()
|
||||||
|
r.eps[ep.Name] = &endpoint{apiep: ep, pathregs: pathregs, hostregs: hostregs}
|
||||||
|
r.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *staticRouter) Deregister(ep *api.Endpoint) error {
|
||||||
|
if err := api.Validate(ep); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Lock()
|
||||||
|
delete(r.eps, ep.Name)
|
||||||
|
r.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *staticRouter) Options() router.Options {
|
||||||
|
return r.opts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *staticRouter) Close() error {
|
||||||
|
select {
|
||||||
|
case <-r.exit:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
close(r.exit)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) {
|
||||||
|
ep, err := r.endpoint(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
epf := strings.Split(ep.apiep.Name, ".")
|
||||||
|
services, err := r.opts.Registry.GetService(epf[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// hack for stream endpoint
|
||||||
|
if ep.apiep.Stream {
|
||||||
|
for _, svc := range services {
|
||||||
|
for _, e := range svc.Endpoints {
|
||||||
|
e.Name = strings.Join(epf[1:], ".")
|
||||||
|
e.Metadata = make(map[string]string)
|
||||||
|
e.Metadata["stream"] = "true"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
svc := &api.Service{
|
||||||
|
Name: epf[0],
|
||||||
|
Endpoint: &api.Endpoint{
|
||||||
|
Name: strings.Join(epf[1:], "."),
|
||||||
|
Handler: "rpc",
|
||||||
|
Host: ep.apiep.Host,
|
||||||
|
Method: ep.apiep.Method,
|
||||||
|
Path: ep.apiep.Path,
|
||||||
|
},
|
||||||
|
Services: services,
|
||||||
|
}
|
||||||
|
|
||||||
|
return svc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
|
||||||
|
if r.isClosed() {
|
||||||
|
return nil, errors.New("router closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
var idx int
|
||||||
|
if len(req.URL.Path) > 0 && req.URL.Path != "/" {
|
||||||
|
idx = 1
|
||||||
|
}
|
||||||
|
path := strings.Split(req.URL.Path[idx:], "/")
|
||||||
|
// use the first match
|
||||||
|
// TODO: weighted matching
|
||||||
|
|
||||||
|
for _, ep := range r.eps {
|
||||||
|
var mMatch, hMatch, pMatch bool
|
||||||
|
|
||||||
|
// 1. try method
|
||||||
|
methodLoop:
|
||||||
|
for _, m := range ep.apiep.Method {
|
||||||
|
if m == req.Method {
|
||||||
|
mMatch = true
|
||||||
|
break methodLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !mMatch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||||
|
logger.Debugf("api method match %s", req.Method)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. try host
|
||||||
|
if len(ep.apiep.Host) == 0 {
|
||||||
|
hMatch = true
|
||||||
|
} else {
|
||||||
|
hostLoop:
|
||||||
|
for idx, h := range ep.apiep.Host {
|
||||||
|
if h == "" || h == "*" {
|
||||||
|
hMatch = true
|
||||||
|
break hostLoop
|
||||||
|
} else {
|
||||||
|
if ep.hostregs[idx].MatchString(req.URL.Host) {
|
||||||
|
hMatch = true
|
||||||
|
break hostLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hMatch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||||
|
logger.Debugf("api host match %s", req.URL.Host)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. try path
|
||||||
|
pathLoop:
|
||||||
|
for _, pathreg := range ep.pathregs {
|
||||||
|
matches, err := pathreg.Match(path, "")
|
||||||
|
if err != nil {
|
||||||
|
// TODO: log error
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pMatch = true
|
||||||
|
ctx := req.Context()
|
||||||
|
md, ok := metadata.FromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
md = make(metadata.Metadata)
|
||||||
|
}
|
||||||
|
for k, v := range matches {
|
||||||
|
md[fmt.Sprintf("x-api-field-%s", k)] = v
|
||||||
|
}
|
||||||
|
*req = *req.WithContext(context.WithValue(ctx, metadata.MetadataKey{}, md))
|
||||||
|
break pathLoop
|
||||||
|
}
|
||||||
|
if !pMatch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// TODO: Percentage traffic
|
||||||
|
|
||||||
|
// we got here, so its a match
|
||||||
|
return ep, nil
|
||||||
|
}
|
||||||
|
// no match
|
||||||
|
return nil, fmt.Errorf("endpoint not found for %v", req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *staticRouter) Route(req *http.Request) (*api.Service, error) {
|
||||||
|
if r.isClosed() {
|
||||||
|
return nil, errors.New("router closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// try get an endpoint
|
||||||
|
ep, err := r.Endpoint(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ep, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRouter(opts ...router.Option) *staticRouter {
|
||||||
|
options := router.NewOptions(opts...)
|
||||||
|
r := &staticRouter{
|
||||||
|
exit: make(chan bool),
|
||||||
|
opts: options,
|
||||||
|
eps: make(map[string]*endpoint),
|
||||||
|
}
|
||||||
|
//go r.watch()
|
||||||
|
//go r.refresh()
|
||||||
|
return r
|
||||||
|
}
|
149
api/service/proto/api.pb.go
Normal file
149
api/service/proto/api.pb.go
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||||
|
// source: api.proto
|
||||||
|
|
||||||
|
package go_micro_api
|
||||||
|
|
||||||
|
import (
|
||||||
|
fmt "fmt"
|
||||||
|
proto "github.com/golang/protobuf/proto"
|
||||||
|
math "math"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Reference imports to suppress errors if they are not otherwise used.
|
||||||
|
var _ = proto.Marshal
|
||||||
|
var _ = fmt.Errorf
|
||||||
|
var _ = math.Inf
|
||||||
|
|
||||||
|
// This is a compile-time assertion to ensure that this generated file
|
||||||
|
// is compatible with the proto package it is being compiled against.
|
||||||
|
// A compilation error at this line likely means your copy of the
|
||||||
|
// proto package needs to be updated.
|
||||||
|
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||||
|
|
||||||
|
type Endpoint struct {
|
||||||
|
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
|
||||||
|
Host []string `protobuf:"bytes,2,rep,name=host,proto3" json:"host,omitempty"`
|
||||||
|
Path []string `protobuf:"bytes,3,rep,name=path,proto3" json:"path,omitempty"`
|
||||||
|
Method []string `protobuf:"bytes,4,rep,name=method,proto3" json:"method,omitempty"`
|
||||||
|
Stream bool `protobuf:"varint,5,opt,name=stream,proto3" json:"stream,omitempty"`
|
||||||
|
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||||
|
XXX_unrecognized []byte `json:"-"`
|
||||||
|
XXX_sizecache int32 `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Endpoint) Reset() { *m = Endpoint{} }
|
||||||
|
func (m *Endpoint) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*Endpoint) ProtoMessage() {}
|
||||||
|
func (*Endpoint) Descriptor() ([]byte, []int) {
|
||||||
|
return fileDescriptor_00212fb1f9d3bf1c, []int{0}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Endpoint) XXX_Unmarshal(b []byte) error {
|
||||||
|
return xxx_messageInfo_Endpoint.Unmarshal(m, b)
|
||||||
|
}
|
||||||
|
func (m *Endpoint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||||
|
return xxx_messageInfo_Endpoint.Marshal(b, m, deterministic)
|
||||||
|
}
|
||||||
|
func (m *Endpoint) XXX_Merge(src proto.Message) {
|
||||||
|
xxx_messageInfo_Endpoint.Merge(m, src)
|
||||||
|
}
|
||||||
|
func (m *Endpoint) XXX_Size() int {
|
||||||
|
return xxx_messageInfo_Endpoint.Size(m)
|
||||||
|
}
|
||||||
|
func (m *Endpoint) XXX_DiscardUnknown() {
|
||||||
|
xxx_messageInfo_Endpoint.DiscardUnknown(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
var xxx_messageInfo_Endpoint proto.InternalMessageInfo
|
||||||
|
|
||||||
|
func (m *Endpoint) GetName() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Name
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Endpoint) GetHost() []string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Host
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Endpoint) GetPath() []string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Path
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Endpoint) GetMethod() []string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Method
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Endpoint) GetStream() bool {
|
||||||
|
if m != nil {
|
||||||
|
return m.Stream
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type EmptyResponse struct {
|
||||||
|
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||||
|
XXX_unrecognized []byte `json:"-"`
|
||||||
|
XXX_sizecache int32 `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *EmptyResponse) Reset() { *m = EmptyResponse{} }
|
||||||
|
func (m *EmptyResponse) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*EmptyResponse) ProtoMessage() {}
|
||||||
|
func (*EmptyResponse) Descriptor() ([]byte, []int) {
|
||||||
|
return fileDescriptor_00212fb1f9d3bf1c, []int{1}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *EmptyResponse) XXX_Unmarshal(b []byte) error {
|
||||||
|
return xxx_messageInfo_EmptyResponse.Unmarshal(m, b)
|
||||||
|
}
|
||||||
|
func (m *EmptyResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||||
|
return xxx_messageInfo_EmptyResponse.Marshal(b, m, deterministic)
|
||||||
|
}
|
||||||
|
func (m *EmptyResponse) XXX_Merge(src proto.Message) {
|
||||||
|
xxx_messageInfo_EmptyResponse.Merge(m, src)
|
||||||
|
}
|
||||||
|
func (m *EmptyResponse) XXX_Size() int {
|
||||||
|
return xxx_messageInfo_EmptyResponse.Size(m)
|
||||||
|
}
|
||||||
|
func (m *EmptyResponse) XXX_DiscardUnknown() {
|
||||||
|
xxx_messageInfo_EmptyResponse.DiscardUnknown(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
var xxx_messageInfo_EmptyResponse proto.InternalMessageInfo
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
proto.RegisterType((*Endpoint)(nil), "go.micro.api.Endpoint")
|
||||||
|
proto.RegisterType((*EmptyResponse)(nil), "go.micro.api.EmptyResponse")
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c)
|
||||||
|
}
|
||||||
|
|
||||||
|
var fileDescriptor_00212fb1f9d3bf1c = []byte{
|
||||||
|
// 201 bytes of a gzipped FileDescriptorProto
|
||||||
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0xd0, 0x41, 0x4a, 0xc5, 0x30,
|
||||||
|
0x10, 0x06, 0x60, 0x63, 0x9f, 0x8f, 0xbe, 0x41, 0x11, 0xb2, 0x78, 0x04, 0xdd, 0x94, 0xae, 0xde,
|
||||||
|
0x2a, 0x0b, 0x3d, 0x41, 0xd1, 0x5e, 0x20, 0x37, 0x88, 0x76, 0x68, 0xb3, 0x48, 0x66, 0x48, 0x06,
|
||||||
|
0xc1, 0x43, 0x78, 0x67, 0x49, 0xad, 0x50, 0xdc, 0xba, 0xfb, 0xe7, 0x5b, 0xfc, 0xfc, 0x0c, 0x9c,
|
||||||
|
0x3c, 0x07, 0xcb, 0x99, 0x84, 0xf4, 0xed, 0x4c, 0x36, 0x86, 0xf7, 0x4c, 0xd6, 0x73, 0xe8, 0x3f,
|
||||||
|
0xa0, 0x1d, 0xd3, 0xc4, 0x14, 0x92, 0x68, 0x0d, 0x87, 0xe4, 0x23, 0x1a, 0xd5, 0xa9, 0xcb, 0xc9,
|
||||||
|
0xad, 0xb9, 0xda, 0x42, 0x45, 0xcc, 0x75, 0xd7, 0x54, 0xab, 0xb9, 0x1a, 0x7b, 0x59, 0x4c, 0xf3,
|
||||||
|
0x63, 0x35, 0xeb, 0x33, 0x1c, 0x23, 0xca, 0x42, 0x93, 0x39, 0xac, 0xba, 0x5d, 0xd5, 0x8b, 0x64,
|
||||||
|
0xf4, 0xd1, 0xdc, 0x74, 0xea, 0xd2, 0xba, 0xed, 0xea, 0xef, 0xe1, 0x6e, 0x8c, 0x2c, 0x9f, 0x0e,
|
||||||
|
0x0b, 0x53, 0x2a, 0xf8, 0xf4, 0xa5, 0xa0, 0x19, 0x38, 0xe8, 0x01, 0x5a, 0x87, 0x73, 0x28, 0x82,
|
||||||
|
0x59, 0x9f, 0xed, 0x7e, 0xab, 0xfd, 0x1d, 0xfa, 0xf0, 0xf8, 0xc7, 0xf7, 0x45, 0xfd, 0x95, 0x7e,
|
||||||
|
0x01, 0x78, 0xc5, 0xfc, 0xbf, 0x92, 0xb7, 0xe3, 0xfa, 0xad, 0xe7, 0xef, 0x00, 0x00, 0x00, 0xff,
|
||||||
|
0xff, 0x1f, 0xf0, 0xd9, 0x19, 0x3a, 0x01, 0x00, 0x00,
|
||||||
|
}
|
102
api/service/proto/api.pb.micro.go
Normal file
102
api/service/proto/api.pb.micro.go
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
// Code generated by protoc-gen-micro. DO NOT EDIT.
|
||||||
|
// source: api.proto
|
||||||
|
|
||||||
|
package go_micro_api
|
||||||
|
|
||||||
|
import (
|
||||||
|
fmt "fmt"
|
||||||
|
proto "github.com/golang/protobuf/proto"
|
||||||
|
math "math"
|
||||||
|
)
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
client "github.com/micro/go-micro/v2/client"
|
||||||
|
server "github.com/micro/go-micro/v2/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Reference imports to suppress errors if they are not otherwise used.
|
||||||
|
var _ = proto.Marshal
|
||||||
|
var _ = fmt.Errorf
|
||||||
|
var _ = math.Inf
|
||||||
|
|
||||||
|
// This is a compile-time assertion to ensure that this generated file
|
||||||
|
// is compatible with the proto package it is being compiled against.
|
||||||
|
// A compilation error at this line likely means your copy of the
|
||||||
|
// proto package needs to be updated.
|
||||||
|
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||||
|
|
||||||
|
// Reference imports to suppress errors if they are not otherwise used.
|
||||||
|
var _ context.Context
|
||||||
|
var _ client.Option
|
||||||
|
var _ server.Option
|
||||||
|
|
||||||
|
// Client API for Api service
|
||||||
|
|
||||||
|
type ApiService interface {
|
||||||
|
Register(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error)
|
||||||
|
Deregister(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type apiService struct {
|
||||||
|
c client.Client
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewApiService(name string, c client.Client) ApiService {
|
||||||
|
return &apiService{
|
||||||
|
c: c,
|
||||||
|
name: name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *apiService) Register(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error) {
|
||||||
|
req := c.c.NewRequest(c.name, "Api.Register", in)
|
||||||
|
out := new(EmptyResponse)
|
||||||
|
err := c.c.Call(ctx, req, out, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *apiService) Deregister(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error) {
|
||||||
|
req := c.c.NewRequest(c.name, "Api.Deregister", in)
|
||||||
|
out := new(EmptyResponse)
|
||||||
|
err := c.c.Call(ctx, req, out, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server API for Api service
|
||||||
|
|
||||||
|
type ApiHandler interface {
|
||||||
|
Register(context.Context, *Endpoint, *EmptyResponse) error
|
||||||
|
Deregister(context.Context, *Endpoint, *EmptyResponse) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterApiHandler(s server.Server, hdlr ApiHandler, opts ...server.HandlerOption) error {
|
||||||
|
type api interface {
|
||||||
|
Register(ctx context.Context, in *Endpoint, out *EmptyResponse) error
|
||||||
|
Deregister(ctx context.Context, in *Endpoint, out *EmptyResponse) error
|
||||||
|
}
|
||||||
|
type Api struct {
|
||||||
|
api
|
||||||
|
}
|
||||||
|
h := &apiHandler{hdlr}
|
||||||
|
return s.Handle(s.NewHandler(&Api{h}, opts...))
|
||||||
|
}
|
||||||
|
|
||||||
|
type apiHandler struct {
|
||||||
|
ApiHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *apiHandler) Register(ctx context.Context, in *Endpoint, out *EmptyResponse) error {
|
||||||
|
return h.ApiHandler.Register(ctx, in, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *apiHandler) Deregister(ctx context.Context, in *Endpoint, out *EmptyResponse) error {
|
||||||
|
return h.ApiHandler.Deregister(ctx, in, out)
|
||||||
|
}
|
18
api/service/proto/api.proto
Normal file
18
api/service/proto/api.proto
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package go.micro.api;
|
||||||
|
|
||||||
|
service Api {
|
||||||
|
rpc Register(Endpoint) returns (EmptyResponse) {};
|
||||||
|
rpc Deregister(Endpoint) returns (EmptyResponse) {};
|
||||||
|
}
|
||||||
|
|
||||||
|
message Endpoint {
|
||||||
|
string name = 1;
|
||||||
|
repeated string host = 2;
|
||||||
|
repeated string path = 3;
|
||||||
|
repeated string method = 4;
|
||||||
|
bool stream = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message EmptyResponse {}
|
2
go.mod
2
go.mod
@ -31,7 +31,7 @@ require (
|
|||||||
github.com/gorilla/websocket v1.4.1
|
github.com/gorilla/websocket v1.4.1
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
|
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
|
github.com/grpc-ecosystem/grpc-gateway v1.9.5
|
||||||
github.com/hashicorp/hcl v1.0.0
|
github.com/hashicorp/hcl v1.0.0
|
||||||
github.com/imdario/mergo v0.3.8
|
github.com/imdario/mergo v0.3.8
|
||||||
github.com/jonboulle/clockwork v0.1.0 // indirect
|
github.com/jonboulle/clockwork v0.1.0 // indirect
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type metaKey struct{}
|
type MetadataKey struct{}
|
||||||
|
|
||||||
// Metadata is our way of representing request headers internally.
|
// Metadata is our way of representing request headers internally.
|
||||||
// They're used at the RPC level and translate back and forth
|
// They're used at the RPC level and translate back and forth
|
||||||
@ -41,7 +41,7 @@ func Set(ctx context.Context, k, v string) context.Context {
|
|||||||
md = make(Metadata)
|
md = make(Metadata)
|
||||||
}
|
}
|
||||||
md[k] = v
|
md[k] = v
|
||||||
return context.WithValue(ctx, metaKey{}, md)
|
return context.WithValue(ctx, MetadataKey{}, md)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a single value from metadata in the context
|
// Get returns a single value from metadata in the context
|
||||||
@ -64,7 +64,7 @@ func Get(ctx context.Context, key string) (string, bool) {
|
|||||||
|
|
||||||
// FromContext returns metadata from the given context
|
// FromContext returns metadata from the given context
|
||||||
func FromContext(ctx context.Context) (Metadata, bool) {
|
func FromContext(ctx context.Context) (Metadata, bool) {
|
||||||
md, ok := ctx.Value(metaKey{}).(Metadata)
|
md, ok := ctx.Value(MetadataKey{}).(Metadata)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, ok
|
return nil, ok
|
||||||
}
|
}
|
||||||
@ -80,7 +80,7 @@ func FromContext(ctx context.Context) (Metadata, bool) {
|
|||||||
|
|
||||||
// NewContext creates a new context with the given metadata
|
// NewContext creates a new context with the given metadata
|
||||||
func NewContext(ctx context.Context, md Metadata) context.Context {
|
func NewContext(ctx context.Context, md Metadata) context.Context {
|
||||||
return context.WithValue(ctx, metaKey{}, md)
|
return context.WithValue(ctx, MetadataKey{}, md)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MergeContext merges metadata to existing metadata, overwriting if specified
|
// MergeContext merges metadata to existing metadata, overwriting if specified
|
||||||
@ -88,7 +88,7 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
md, _ := ctx.Value(metaKey{}).(Metadata)
|
md, _ := ctx.Value(MetadataKey{}).(Metadata)
|
||||||
cmd := make(Metadata)
|
cmd := make(Metadata)
|
||||||
for k, v := range md {
|
for k, v := range md {
|
||||||
cmd[k] = v
|
cmd[k] = v
|
||||||
@ -100,5 +100,5 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context
|
|||||||
cmd[k] = v
|
cmd[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return context.WithValue(ctx, metaKey{}, cmd)
|
return context.WithValue(ctx, MetadataKey{}, cmd)
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,11 @@ type bytesCodec struct{}
|
|||||||
type protoCodec struct{}
|
type protoCodec struct{}
|
||||||
type wrapCodec struct{ encoding.Codec }
|
type wrapCodec struct{ encoding.Codec }
|
||||||
|
|
||||||
var jsonpbMarshaler = &jsonpb.Marshaler{}
|
var jsonpbMarshaler = &jsonpb.Marshaler{
|
||||||
|
EnumsAsInts: false,
|
||||||
|
EmitDefaults: false,
|
||||||
|
OrigName: true,
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
defaultGRPCCodecs = map[string]encoding.Codec{
|
defaultGRPCCodecs = map[string]encoding.Codec{
|
||||||
|
Loading…
Reference in New Issue
Block a user