1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

api package (#2522)

* api gateway

* add comment
This commit is contained in:
Asim Aslam 2022-07-02 12:11:59 +01:00 committed by GitHub
parent 28298a30e4
commit b6b866c0b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 320 additions and 138 deletions

View File

@ -1,6 +1,8 @@
// Package api is for building api gateways
package api
import (
"context"
"errors"
"regexp"
"strings"
@ -9,6 +11,8 @@ import (
"go-micro.dev/v4/server"
)
// The Api interface provides a way to
// create composable API gateways
type Api interface {
// Initialise options
Init(...Option) error
@ -18,11 +22,16 @@ type Api interface {
Register(*Endpoint) error
// Register a route
Deregister(*Endpoint) error
// Run the api
Run(context.Context) error
// Implemenation of api
String() string
}
type Options struct{}
type Options struct {
// Address of the server
Address string
}
type Option func(*Options) error
@ -185,3 +194,8 @@ func NewGateway() Gateway {
func WithEndpoint(e *Endpoint) server.HandlerOption {
return server.EndpointMetadata(e.Name, Encode(e))
}
// NewApi returns a new api gateway
func NewApi(opts ...Option) Api {
return newApi(opts...)
}

84
api/default.go Normal file
View File

@ -0,0 +1,84 @@
package api
import (
"context"
"go-micro.dev/v4/api/handler"
"go-micro.dev/v4/api/handler/rpc"
"go-micro.dev/v4/api/router/registry"
"go-micro.dev/v4/api/server"
"go-micro.dev/v4/api/server/http"
)
type api struct {
options Options
server server.Server
}
func newApi(opts ...Option) Api {
options := NewOptions(opts...)
// TODO: make configurable
rtr := registry.NewRouter()
// TODO: make configurable
hdlr := rpc.NewHandler(
handler.WithRouter(rtr),
)
// TODO: make configurable
// create a new server
srv := http.NewServer(options.Address)
// TODO: allow multiple handlers
// define the handler
srv.Handle("/", hdlr)
return &api{
options: options,
server: srv,
}
}
// Initialise options
func (a *api) Init(opts ...Option) error {
for _, o := range opts {
o(&a.options)
}
return nil
}
// Get the options
func (a *api) Options() Options {
return a.options
}
// Register a http handler
func (a *api) Register(*Endpoint) error {
return nil
}
// Register a route
func (a *api) Deregister(*Endpoint) error {
return nil
}
func (a *api) Run(ctx context.Context) error {
if err := a.server.Start(); err != nil {
return err
}
// wait to finish
<-ctx.Done()
if err := a.server.Stop(); err != nil {
return err
}
return nil
}
func (a *api) String() string {
return "http"
}

View File

@ -4,9 +4,9 @@ package api
import (
"net/http"
goapi "go-micro.dev/v4/api"
"go-micro.dev/v4/api/handler"
api "go-micro.dev/v4/api/proto"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/client"
"go-micro.dev/v4/errors"
"go-micro.dev/v4/selector"
@ -15,7 +15,6 @@ import (
type apiHandler struct {
opts handler.Options
s *goapi.Service
}
const (
@ -39,12 +38,9 @@ func (a *apiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
var service *goapi.Service
var service *router.Route
if a.s != nil {
// we were given the service
service = a.s
} else if a.opts.Router != nil {
if a.opts.Router != nil {
// try get service from router
s, err := a.opts.Router.Route(r)
if err != nil {
@ -66,13 +62,13 @@ func (a *apiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// create request and response
c := a.opts.Client
req := c.NewRequest(service.Name, service.Endpoint.Name, request)
req := c.NewRequest(service.Service, service.Endpoint.Name, request)
rsp := &api.Response{}
// create the context from headers
cx := ctx.FromRequest(r)
// create strategy
so := selector.WithStrategy(strategy(service.Services))
// create strategy:
so := selector.WithStrategy(strategy(service.Versions))
if err := c.Call(cx, req, rsp, client.WithSelectOption(so)); err != nil {
w.Header().Set("Content-Type", "application/json")
@ -113,11 +109,3 @@ func NewHandler(opts ...handler.Option) handler.Handler {
opts: options,
}
}
func WithService(s *goapi.Service, opts ...handler.Option) handler.Handler {
options := handler.NewOptions(opts...)
return &apiHandler{
opts: options,
s: s,
}
}

View File

@ -8,8 +8,8 @@ import (
"net/http/httputil"
"net/url"
"go-micro.dev/v4/api"
"go-micro.dev/v4/api/handler"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/selector"
)
@ -19,9 +19,6 @@ const (
type httpHandler struct {
options handler.Options
// set with different initialiser
s *api.Service
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -47,12 +44,9 @@ func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// getService returns the service for this request from the selector
func (h *httpHandler) getService(r *http.Request) (string, error) {
var service *api.Service
var service *router.Route
if h.s != nil {
// we were given the service
service = h.s
} else if h.options.Router != nil {
if h.options.Router != nil {
// try get service from router
s, err := h.options.Router.Route(r)
if err != nil {
@ -65,7 +59,7 @@ func (h *httpHandler) getService(r *http.Request) (string, error) {
}
// create a random selector
next := selector.Random(service.Services)
next := selector.Random(service.Versions)
// get the next node
s, err := next()
@ -88,13 +82,3 @@ func NewHandler(opts ...handler.Option) handler.Handler {
options: options,
}
}
// WithService creates a handler with a service
func WithService(s *api.Service, opts ...handler.Option) handler.Handler {
options := handler.NewOptions(opts...)
return &httpHandler{
options: options,
s: s,
}
}

View File

@ -26,12 +26,7 @@ func NewOptions(opts ...Option) Options {
}
if options.Client == nil {
WithClient(client.NewClient())(&options)
}
// set namespace if blank
if len(options.Namespace) == 0 {
WithNamespace("go.micro.api")(&options)
WithClient(client.DefaultClient)(&options)
}
if options.MaxRecvSize == 0 {

View File

@ -11,9 +11,9 @@ import (
jsonpatch "github.com/evanphx/json-patch/v5"
"github.com/oxtoacart/bpool"
"go-micro.dev/v4/api"
"go-micro.dev/v4/api/handler"
"go-micro.dev/v4/api/internal/proto"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/client"
"go-micro.dev/v4/codec"
"go-micro.dev/v4/codec/jsonrpc"
@ -54,7 +54,6 @@ var (
type rpcHandler struct {
opts handler.Options
s *api.Service
}
type buffer struct {
@ -82,12 +81,9 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, bsize)
defer r.Body.Close()
var service *api.Service
var service *router.Route
if h.s != nil {
// we were given the service
service = h.s
} else if h.opts.Router != nil {
if h.opts.Router != nil {
// try get service from router
s, err := h.opts.Router.Route(r)
if err != nil {
@ -142,7 +138,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// create strategy
so := selector.WithStrategy(strategy(service.Services))
so := selector.WithStrategy(strategy(service.Versions))
// walk the standard call path
// get payload
@ -167,7 +163,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
response := &proto.Message{}
req := c.NewRequest(
service.Name,
service.Service,
service.Endpoint.Name,
request,
client.WithContentType(ct),
@ -203,7 +199,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var response json.RawMessage
req := c.NewRequest(
service.Name,
service.Service,
service.Endpoint.Name,
&request,
client.WithContentType(ct),
@ -512,11 +508,3 @@ func NewHandler(opts ...handler.Option) handler.Handler {
opts: options,
}
}
func WithService(s *api.Service, opts ...handler.Option) handler.Handler {
options := handler.NewOptions(opts...)
return &rpcHandler{
opts: options,
s: s,
}
}

View File

@ -12,7 +12,7 @@ import (
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"go-micro.dev/v4/api"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/client"
raw "go-micro.dev/v4/codec/bytes"
"go-micro.dev/v4/logger"
@ -20,7 +20,7 @@ import (
)
// serveWebsocket will stream rpc back over websockets assuming json
func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *api.Service, c client.Client) {
func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *router.Route, c client.Client) {
var op ws.OpCode
ct := r.Header.Get("Content-Type")
@ -103,14 +103,14 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
ct = "application/json"
}
req := c.NewRequest(
service.Name,
service.Service,
service.Endpoint.Name,
request,
client.WithContentType(ct),
client.StreamingRequest(),
)
so := selector.WithStrategy(strategy(service.Services))
so := selector.WithStrategy(strategy(service.Versions))
// create a new stream
stream, err := c.Stream(ctx, req, client.WithSelectOption(so))
if err != nil {
@ -219,13 +219,13 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) {
}
}
func isStream(r *http.Request, srv *api.Service) bool {
func isStream(r *http.Request, srv *router.Route) bool {
// check if it's a web socket
if !isWebSocket(r) {
return false
}
// check if the endpoint supports streaming
for _, service := range srv.Services {
for _, service := range srv.Versions {
for _, ep := range service.Endpoints {
// skip if it doesn't match the name
if ep.Name != srv.Endpoint.Name {

View File

@ -11,8 +11,8 @@ import (
"net/url"
"strings"
"go-micro.dev/v4/api"
"go-micro.dev/v4/api/handler"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/selector"
)
@ -22,7 +22,6 @@ const (
type webHandler struct {
opts handler.Options
s *api.Service
}
func (wh *webHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -53,12 +52,9 @@ func (wh *webHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// getService returns the service for this request from the selector
func (wh *webHandler) getService(r *http.Request) (string, error) {
var service *api.Service
var service *router.Route
if wh.s != nil {
// we were given the service
service = wh.s
} else if wh.opts.Router != nil {
if wh.opts.Router != nil {
// try get service from router
s, err := wh.opts.Router.Route(r)
if err != nil {
@ -71,7 +67,7 @@ func (wh *webHandler) getService(r *http.Request) (string, error) {
}
// create a random selector
next := selector.Random(service.Services)
next := selector.Random(service.Versions)
// get the next node
s, err := next()
@ -166,12 +162,3 @@ func NewHandler(opts ...handler.Option) handler.Handler {
opts: handler.NewOptions(opts...),
}
}
func WithService(s *api.Service, opts ...handler.Option) handler.Handler {
options := handler.NewOptions(opts...)
return &webHandler{
opts: options,
s: s,
}
}

13
api/options.go Normal file
View File

@ -0,0 +1,13 @@
package api
func NewOptions(opts ...Option) Options {
options := Options{
Address: ":8080",
}
for _, o := range opts {
o(&options)
}
return options
}

99
api/router/endpoint.go Normal file
View File

@ -0,0 +1,99 @@
package router
import (
"errors"
"regexp"
"strings"
)
func strip(s string) string {
return strings.TrimSpace(s)
}
func slice(s string) []string {
var sl []string
for _, p := range strings.Split(s, ",") {
if str := strip(p); len(str) > 0 {
sl = append(sl, strip(p))
}
}
return sl
}
// Encode encodes an endpoint to endpoint metadata
func Encode(e *Endpoint) map[string]string {
if e == nil {
return nil
}
// endpoint map
ep := make(map[string]string)
// set vals only if they exist
set := func(k, v string) {
if len(v) == 0 {
return
}
ep[k] = v
}
set("endpoint", e.Name)
set("description", e.Description)
set("handler", e.Handler)
set("method", strings.Join(e.Method, ","))
set("path", strings.Join(e.Path, ","))
set("host", strings.Join(e.Host, ","))
return ep
}
// Decode decodes endpoint metadata into an endpoint
func Decode(e map[string]string) *Endpoint {
if e == nil {
return nil
}
return &Endpoint{
Name: e["endpoint"],
Description: e["description"],
Method: slice(e["method"]),
Path: slice(e["path"]),
Host: slice(e["host"]),
Handler: e["handler"],
}
}
// Validate validates an endpoint to guarantee it won't blow up when being served
func Validate(e *Endpoint) error {
if e == nil {
return errors.New("endpoint is nil")
}
if len(e.Name) == 0 {
return errors.New("name required")
}
for _, p := range e.Path {
ps := p[0]
pe := p[len(p)-1]
if ps == '^' && pe == '$' {
_, err := regexp.CompilePOSIX(p)
if err != nil {
return err
}
} else if ps == '^' && pe != '$' {
return errors.New("invalid path")
} else if ps != '^' && pe == '$' {
return errors.New("invalid path")
}
}
if len(e.Handler) == 0 {
return errors.New("invalid handler")
}
return nil
}

View File

@ -10,7 +10,6 @@ import (
"sync"
"time"
"go-micro.dev/v4/api"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/api/router/util"
"go-micro.dev/v4/logger"
@ -35,12 +34,12 @@ type registryRouter struct {
rc cache.Cache
sync.RWMutex
eps map[string]*api.Service
eps map[string]*router.Route
// compiled regexp for host and path
ceps map[string]*endpoint
}
func (r *registryRouter) isClosed() bool {
func (r *registryRouter) isStopd() bool {
select {
case <-r.exit:
return true
@ -111,7 +110,7 @@ func (r *registryRouter) process(res *registry.Result) {
// store local endpoint cache
func (r *registryRouter) store(services []*registry.Service) {
// endpoints
eps := map[string]*api.Service{}
eps := map[string]*router.Route{}
// services
names := map[string]bool{}
@ -126,10 +125,10 @@ func (r *registryRouter) store(services []*registry.Service) {
// create a key service:endpoint_name
key := fmt.Sprintf("%s.%s", service.Name, sep.Name)
// decode endpoint
end := api.Decode(sep.Metadata)
end := router.Decode(sep.Metadata)
// if we got nothing skip
if err := api.Validate(end); err != nil {
if err := router.Validate(end); err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("endpoint validation failed: %v", err)
}
@ -139,13 +138,13 @@ func (r *registryRouter) store(services []*registry.Service) {
// try get endpoint
ep, ok := eps[key]
if !ok {
ep = &api.Service{Name: service.Name}
ep = &router.Route{Service: service.Name}
}
// overwrite the endpoint
ep.Endpoint = end
// append services
ep.Services = append(ep.Services, service)
ep.Versions = append(ep.Versions, service)
// store it
eps[key] = ep
}
@ -155,9 +154,9 @@ func (r *registryRouter) store(services []*registry.Service) {
defer r.Unlock()
// delete any existing eps for services we know
for key, service := range r.eps {
for key, route := range r.eps {
// skip what we don't care about
if !names[service.Name] {
if !names[route.Service] {
continue
}
@ -226,7 +225,7 @@ func (r *registryRouter) watch() {
var attempts int
for {
if r.isClosed() {
if r.isStopd() {
return
}
@ -274,7 +273,7 @@ func (r *registryRouter) Options() router.Options {
return r.opts
}
func (r *registryRouter) Close() error {
func (r *registryRouter) Stop() error {
select {
case <-r.exit:
return nil
@ -285,16 +284,16 @@ func (r *registryRouter) Close() error {
return nil
}
func (r *registryRouter) Register(ep *api.Endpoint) error {
func (r *registryRouter) Register(ep *router.Route) error {
return nil
}
func (r *registryRouter) Deregister(ep *api.Endpoint) error {
func (r *registryRouter) Deregister(ep *router.Route) error {
return nil
}
func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
if r.isClosed() {
func (r *registryRouter) Endpoint(req *http.Request) (*router.Route, error) {
if r.isStopd() {
return nil, errors.New("router closed")
}
@ -409,8 +408,8 @@ func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
return nil, errors.New("not found")
}
func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
if r.isClosed() {
func (r *registryRouter) Route(req *http.Request) (*router.Route, error) {
if r.isStopd() {
return nil, errors.New("router closed")
}
@ -451,27 +450,27 @@ func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
}
// construct api service
return &api.Service{
Name: name,
Endpoint: &api.Endpoint{
return &router.Route{
Service: name,
Endpoint: &router.Endpoint{
Name: rp.Method,
Handler: handler,
},
Services: services,
Versions: services,
}, nil
// http handler
case "http", "proxy", "web":
// construct api service
return &api.Service{
Name: name,
Endpoint: &api.Endpoint{
return &router.Route{
Service: name,
Endpoint: &router.Endpoint{
Name: req.URL.String(),
Handler: r.opts.Handler,
Host: []string{req.Host},
Method: []string{req.Method},
Path: []string{req.URL.Path},
},
Services: services,
Versions: services,
}, nil
}
@ -484,7 +483,7 @@ func newRouter(opts ...router.Option) *registryRouter {
exit: make(chan bool),
opts: options,
rc: cache.New(options.Registry),
eps: make(map[string]*api.Service),
eps: make(map[string]*router.Route),
ceps: make(map[string]*endpoint),
}
go r.watch()

View File

@ -4,21 +4,50 @@ package router
import (
"net/http"
"go-micro.dev/v4/api"
"go-micro.dev/v4/registry"
)
// Router is used to determine an endpoint for a request
type Router interface {
// Returns options
Options() Options
// Stop the router
Close() error
// Endpoint returns an api.Service endpoint or an error if it does not exist
Endpoint(r *http.Request) (*api.Service, error)
// Register endpoint in router
Register(ep *api.Endpoint) error
Register(r *Route) error
// Deregister endpoint from router
Deregister(ep *api.Endpoint) error
Deregister(r *Route) error
// Route returns an api.Service route
Route(r *http.Request) (*api.Service, error)
Route(r *http.Request) (*Route, error)
// Stop the router
Stop() error
}
type Route struct {
// Name of service
Service string
// The endpoint for this service
Endpoint *Endpoint
// Versions of this service
Versions []*registry.Service
}
// Endpoint is a mapping between an RPC method and HTTP endpoint
type Endpoint struct {
// RPC Method e.g. Greeter.Hello
Name string
// Description e.g what's this endpoint for
Description string
// API Handler e.g rpc, proxy
Handler string
// HTTP Host e.g example.com
Host []string
// HTTP Methods e.g GET, POST
Method []string
// HTTP Path e.g /greeter. Expect POSIX regex
Path []string
// Body destination
// "*" or "" - top level message value
// "string" - inner message value
Body string
// Stream flag
Stream bool
}

View File

@ -8,7 +8,6 @@ import (
"strings"
"sync"
"go-micro.dev/v4/api"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/api/router/util"
"go-micro.dev/v4/logger"
@ -18,7 +17,7 @@ import (
)
type endpoint struct {
apiep *api.Endpoint
apiep *router.Endpoint
hostregs []*regexp.Regexp
pathregs []util.Pattern
pcreregs []*regexp.Regexp
@ -32,7 +31,7 @@ type staticRouter struct {
eps map[string]*endpoint
}
func (r *staticRouter) isClosed() bool {
func (r *staticRouter) isStopd() bool {
select {
case <-r.exit:
return true
@ -47,7 +46,7 @@ func (r *staticRouter) watch() {
var attempts int
for {
if r.isClosed() {
if r.isStopd() {
return
}
@ -88,8 +87,10 @@ func (r *staticRouter) watch() {
}
*/
func (r *staticRouter) Register(ep *api.Endpoint) error {
if err := api.Validate(ep); err != nil {
func (r *staticRouter) Register(route *router.Route) error {
ep := route.Endpoint
if err := router.Validate(ep); err != nil {
return err
}
@ -146,8 +147,9 @@ func (r *staticRouter) Register(ep *api.Endpoint) error {
return nil
}
func (r *staticRouter) Deregister(ep *api.Endpoint) error {
if err := api.Validate(ep); err != nil {
func (r *staticRouter) Deregister(route *router.Route) error {
ep := route.Endpoint
if err := router.Validate(ep); err != nil {
return err
}
r.Lock()
@ -160,7 +162,7 @@ func (r *staticRouter) Options() router.Options {
return r.opts
}
func (r *staticRouter) Close() error {
func (r *staticRouter) Stop() error {
select {
case <-r.exit:
return nil
@ -170,7 +172,7 @@ func (r *staticRouter) Close() error {
return nil
}
func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) {
func (r *staticRouter) Endpoint(req *http.Request) (*router.Route, error) {
ep, err := r.endpoint(req)
if err != nil {
return nil, err
@ -203,9 +205,9 @@ func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) {
services = svcs
}
svc := &api.Service{
Name: epf[0],
Endpoint: &api.Endpoint{
svc := &router.Route{
Service: epf[0],
Endpoint: &router.Endpoint{
Name: strings.Join(epf[1:], "."),
Handler: "rpc",
Host: ep.apiep.Host,
@ -214,14 +216,14 @@ func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) {
Body: ep.apiep.Body,
Stream: ep.apiep.Stream,
},
Services: services,
Versions: services,
}
return svc, nil
}
func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
if r.isClosed() {
if r.isStopd() {
return nil, errors.New("router closed")
}
@ -329,8 +331,8 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
return nil, fmt.Errorf("endpoint not found for %v", req.URL)
}
func (r *staticRouter) Route(req *http.Request) (*api.Service, error) {
if r.isClosed() {
func (r *staticRouter) Route(req *http.Request) (*router.Route, error) {
if r.isStopd() {
return nil, errors.New("router closed")
}