diff --git a/examples/addsvc/cmd/addsvc/main.go b/examples/addsvc/cmd/addsvc/main.go index 9da3118..2273a4c 100644 --- a/examples/addsvc/cmd/addsvc/main.go +++ b/examples/addsvc/cmd/addsvc/main.go @@ -47,7 +47,7 @@ func main() { ) flag.Parse() - // Logging domain + // Logging domain. var logger log.Logger { logger = log.NewLogfmtLogger(os.Stdout) @@ -57,10 +57,10 @@ func main() { logger.Log("msg", "hello") defer logger.Log("msg", "goodbye") - // Metrics domain + // Metrics domain. var ints, chars metrics.Counter { - // Business level metrics + // Business level metrics. ints = prometheus.NewCounter(stdprometheus.CounterOpts{ Namespace: "addsvc", Name: "integers_summed", @@ -74,7 +74,7 @@ func main() { } var duration metrics.TimeHistogram { - // Transport level metrics + // Transport level metrics. duration = metrics.NewTimeHistogram(time.Nanosecond, prometheus.NewSummary(stdprometheus.SummaryOpts{ Namespace: "addsvc", Name: "request_duration_ns", @@ -82,7 +82,7 @@ func main() { }, []string{"method", "success"})) } - // Tracing domain + // Tracing domain. var tracer stdopentracing.Tracer { if *zipkinAddr != "" { @@ -121,7 +121,7 @@ func main() { } } - // Business domain + // Business domain. var service addsvc.Service { service = addsvc.NewBasicService() @@ -129,7 +129,7 @@ func main() { service = addsvc.ServiceInstrumentingMiddleware(ints, chars)(service) } - // Endpoint domain + // Endpoint domain. var sumEndpoint endpoint.Endpoint { sumDuration := duration.With(metrics.Field{Key: "method", Value: "Sum"}) @@ -155,18 +155,18 @@ func main() { ConcatEndpoint: concatEndpoint, } - // Mechanical domain + // Mechanical domain. errc := make(chan error) ctx := context.Background() - // Interrupt handler + // Interrupt handler. go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) errc <- fmt.Errorf("%s", <-c) }() - // Debug listener + // Debug listener. go func() { logger := log.NewContext(logger).With("transport", "debug") @@ -182,7 +182,7 @@ func main() { errc <- http.ListenAndServe(*debugAddr, m) }() - // HTTP transport + // HTTP transport. go func() { logger := log.NewContext(logger).With("transport", "HTTP") h := addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger) @@ -190,7 +190,7 @@ func main() { errc <- http.ListenAndServe(*httpAddr, h) }() - // gRPC transport + // gRPC transport. go func() { logger := log.NewContext(logger).With("transport", "gRPC") @@ -208,7 +208,7 @@ func main() { errc <- s.Serve(ln) }() - // Thrift transport + // Thrift transport. go func() { logger := log.NewContext(logger).With("transport", "Thrift") @@ -252,6 +252,6 @@ func main() { ).Serve() }() - // Run + // Run! logger.Log("exit", <-errc) } diff --git a/examples/apigateway/main.go b/examples/apigateway/main.go index b01b71f..01367ec 100644 --- a/examples/apigateway/main.go +++ b/examples/apigateway/main.go @@ -1,12 +1,12 @@ package main import ( + "bytes" "encoding/json" "flag" "fmt" "io" "io/ioutil" - stdlog "log" "net/http" "net/url" "os" @@ -17,16 +17,18 @@ import ( "github.com/gorilla/mux" "github.com/hashicorp/consul/api" - "github.com/opentracing/opentracing-go" + stdopentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/context" "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/examples/addsvc/client/grpc" - "github.com/go-kit/kit/examples/addsvc/server" - "github.com/go-kit/kit/loadbalancer" - "github.com/go-kit/kit/loadbalancer/consul" + "github.com/go-kit/kit/examples/addsvc" + addsvcgrpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + consulsd "github.com/go-kit/kit/sd/consul" + "github.com/go-kit/kit/sd/lb" httptransport "github.com/go-kit/kit/transport/http" + "google.golang.org/grpc" ) func main() { @@ -38,153 +40,243 @@ func main() { ) flag.Parse() - // Log domain - logger := log.NewLogfmtLogger(os.Stderr) - logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC).With("caller", log.DefaultCaller) - stdlog.SetFlags(0) // flags are handled by Go kit's logger - stdlog.SetOutput(log.NewStdlibAdapter(logger)) // redirect anything using stdlib log to us + // Logging domain. + var logger log.Logger + { + logger = log.NewLogfmtLogger(os.Stderr) + logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC) + logger = log.NewContext(logger).With("caller", log.DefaultCaller) + } // Service discovery domain. In this example we use Consul. - consulConfig := api.DefaultConfig() - if len(*consulAddr) > 0 { - consulConfig.Address = *consulAddr - } - consulClient, err := api.NewClient(consulConfig) - if err != nil { - logger.Log("err", err) - os.Exit(1) - } - discoveryClient := consul.NewClient(consulClient) - - // Context domain. - ctx := context.Background() - - // Set up our routes. - // - // Each Consul service name maps to multiple instances of that service. We - // connect to each instance according to its pre-determined transport: in this - // case, we choose to access addsvc via its gRPC client, and stringsvc over - // plain transport/http (it has no client package). - // - // Each service instance implements multiple methods, and we want to map each - // method to a unique path on the API gateway. So, we define that path and its - // corresponding factory function, which takes an instance string and returns an - // endpoint.Endpoint for the specific method. - // - // Finally, we mount that path + endpoint handler into the router. - r := mux.NewRouter() - for consulName, methods := range map[string][]struct { - path string - factory loadbalancer.Factory - }{ - "addsvc": { - {path: "/api/addsvc/concat", factory: grpc.MakeConcatEndpointFactory(opentracing.GlobalTracer(), nil)}, - {path: "/api/addsvc/sum", factory: grpc.MakeSumEndpointFactory(opentracing.GlobalTracer(), nil)}, - }, - "stringsvc": { - {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")}, - {path: "/api/stringsvc/concat", factory: httpFactory(ctx, "GET", "concat/")}, - }, - } { - for _, method := range methods { - publisher, err := consul.NewPublisher(discoveryClient, method.factory, logger, consulName) - if err != nil { - logger.Log("service", consulName, "path", method.path, "err", err) - continue - } - lb := loadbalancer.NewRoundRobin(publisher) - e := loadbalancer.Retry(*retryMax, *retryTimeout, lb) - h := makeHandler(ctx, e, logger) - r.HandleFunc(method.path, h) + var client consulsd.Client + { + consulConfig := api.DefaultConfig() + if len(*consulAddr) > 0 { + consulConfig.Address = *consulAddr } + consulClient, err := api.NewClient(consulConfig) + if err != nil { + logger.Log("err", err) + os.Exit(1) + } + client = consulsd.NewClient(consulClient) } - // Mechanical stuff. + // Transport domain. + tracer := stdopentracing.GlobalTracer() // no-op + ctx := context.Background() + r := mux.NewRouter() + + // Now we begin installing the routes. Each route corresponds to a single + // method: sum, concat, uppercase, and count. + + // addsvc routes. + { + // Each method gets constructed with a factory. Factories take an + // instance string, and return a specific endpoint. In the factory we + // dial the instance string we get from Consul, and then leverage an + // addsvc client package to construct a complete service. We can then + // leverage the addsvc.Make{Sum,Concat}Endpoint constructors to convert + // the complete service to specific endpoint. + + var ( + tags = []string{} + passingOnly = true + endpoints = addsvc.Endpoints{} + ) + { + factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger) + subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly) + balancer := lb.NewRoundRobin(subscriber) + retry := lb.Retry(*retryMax, *retryTimeout, balancer) + endpoints.SumEndpoint = retry + } + { + factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger) + subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly) + balancer := lb.NewRoundRobin(subscriber) + retry := lb.Retry(*retryMax, *retryTimeout, balancer) + endpoints.ConcatEndpoint = retry + } + + // Here we leverage the fact that addsvc comes with a constructor for an + // HTTP handler, and just install it under a particular path prefix in + // our router. + + r.PathPrefix("addsvc/").Handler(addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger)) + } + + // stringsvc routes. + { + // addsvc had lots of nice importable Go packages we could leverage. + // With stringsvc we are not so fortunate, it just has some endpoints + // that we assume will exist. So we have to write that logic here. This + // is by design, so you can see two totally different methods of + // proxying to a remote service. + + var ( + tags = []string{} + passingOnly = true + uppercase endpoint.Endpoint + count endpoint.Endpoint + ) + { + factory := stringsvcFactory(ctx, "GET", "/uppercase") + subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly) + balancer := lb.NewRoundRobin(subscriber) + retry := lb.Retry(*retryMax, *retryTimeout, balancer) + uppercase = retry + } + { + factory := stringsvcFactory(ctx, "GET", "/count") + subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly) + balancer := lb.NewRoundRobin(subscriber) + retry := lb.Retry(*retryMax, *retryTimeout, balancer) + count = retry + } + + // We can use the transport/http.Server to act as our handler, all we + // have to do provide it with the encode and decode functions for our + // stringsvc methods. + + r.Handle("/stringsvc/uppercase", httptransport.NewServer(ctx, uppercase, decodeUppercaseRequest, encodeJSONResponse)) + r.Handle("/stringsvc/count", httptransport.NewServer(ctx, count, decodeCountRequest, encodeJSONResponse)) + } + + // Interrupt handler. errc := make(chan error) go func() { - errc <- interrupt() + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + errc <- fmt.Errorf("%s", <-c) }() + + // HTTP transport. go func() { - logger.Log("transport", "http", "addr", *httpAddr) + logger.Log("transport", "HTTP", "addr", *httpAddr) errc <- http.ListenAndServe(*httpAddr, r) }() - logger.Log("err", <-errc) + + // Run! + logger.Log("exit", <-errc) } -func makeHandler(ctx context.Context, e endpoint.Endpoint, logger log.Logger) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - resp, err := e(ctx, r.Body) - if err != nil { - logger.Log("err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - b, ok := resp.([]byte) - if !ok { - logger.Log("err", "endpoint response is not of type []byte") - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - _, err = w.Write(b) - if err != nil { - logger.Log("err", err) - return - } - } -} - -func makeSumEndpoint(svc server.AddService) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - r := request.(io.Reader) - var req server.SumRequest - if err := json.NewDecoder(r).Decode(&req); err != nil { - return nil, err - } - v := svc.Sum(req.A, req.B) - return json.Marshal(v) - } -} - -func makeConcatEndpoint(svc server.AddService) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - r := request.(io.Reader) - var req server.ConcatRequest - if err := json.NewDecoder(r).Decode(&req); err != nil { - return nil, err - } - v := svc.Concat(req.A, req.B) - return json.Marshal(v) - } -} - -func httpFactory(ctx context.Context, method, path string) loadbalancer.Factory { +func addsvcFactory(makeEndpoint func(addsvc.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory { return func(instance string) (endpoint.Endpoint, io.Closer, error) { - var e endpoint.Endpoint - if !strings.HasPrefix(instance, "http") { - instance = "http://" + instance - } - u, err := url.Parse(instance) + // We could just as easily use the HTTP or Thrift client package to make + // the connection to addsvc. We've chosen gRPC arbitrarily. Note that + // the transport is an implementation detail: it doesn't leak out of + // this function. Nice! + + conn, err := grpc.Dial(instance, grpc.WithInsecure()) if err != nil { return nil, nil, err } - u.Path = path + service := addsvcgrpcclient.New(conn, tracer, logger) + endpoint := makeEndpoint(service) - e = httptransport.NewClient(method, u, passEncode, passDecode).Endpoint() - return e, nil, nil + // Notice that the addsvc gRPC client converts the connection to a + // complete addsvc, and we just throw away everything except the method + // we're interested in. A smarter factory would mux multiple methods + // over the same connection. But that would require more work to manage + // the returned io.Closer, e.g. reference counting. Since this is for + // the purposes of demonstration, we'll just keep it simple. + + return endpoint, conn, nil } } -func passEncode(_ context.Context, r *http.Request, request interface{}) error { - r.Body = request.(io.ReadCloser) +func stringsvcFactory(ctx context.Context, method, path string) sd.Factory { + return func(instance string) (endpoint.Endpoint, io.Closer, error) { + if !strings.HasPrefix(instance, "http") { + instance = "http://" + instance + } + tgt, err := url.Parse(instance) + if err != nil { + return nil, nil, err + } + tgt.Path = path + + // Since stringsvc doesn't have any kind of package we can import, or + // any formal spec, we are forced to just assert where the endpoints + // live, and write our own code to encode and decode requests and + // responses. Ideally, if you write the service, you will want to + // provide stronger guarantees to your clients. + + var ( + enc httptransport.EncodeRequestFunc + dec httptransport.DecodeResponseFunc + ) + switch path { + case "/uppercase": + enc, dec = encodeJSONRequest, decodeUppercaseResponse + case "/count": + enc, dec = encodeJSONRequest, decodeCountResponse + default: + return nil, nil, fmt.Errorf("unknown stringsvc path %q", path) + } + + return httptransport.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil + } +} + +func encodeJSONRequest(_ context.Context, req *http.Request, request interface{}) error { + // Both uppercase and count requests are encoded in the same way: + // simple JSON serialization to the request body. + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(request); err != nil { + return err + } + req.Body = ioutil.NopCloser(&buf) return nil } -func passDecode(_ context.Context, r *http.Response) (interface{}, error) { - return ioutil.ReadAll(r.Body) +func encodeJSONResponse(_ context.Context, w http.ResponseWriter, response interface{}) error { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + return json.NewEncoder(w).Encode(response) } -func interrupt() error { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - return fmt.Errorf("%s", <-c) +// I've just copied these functions from stringsvc3/transport.go, inlining the +// struct definitions. + +func decodeUppercaseResponse(ctx context.Context, resp *http.Response) (interface{}, error) { + var response struct { + V string `json:"v"` + Err string `json:"err,omitempty"` + } + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, err + } + return response, nil +} + +func decodeCountResponse(ctx context.Context, resp *http.Response) (interface{}, error) { + var response struct { + V int `json:"v"` + } + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, err + } + return response, nil +} + +func decodeUppercaseRequest(ctx context.Context, req *http.Request) (interface{}, error) { + var request struct { + S string `json:"s"` + } + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return nil, err + } + return request, nil +} + +func decodeCountRequest(ctx context.Context, req *http.Request) (interface{}, error) { + var request struct { + S string `json:"s"` + } + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return nil, err + } + return request, nil } diff --git a/sd/consul/subscriber.go b/sd/consul/subscriber.go index a2840dd..ee3ae34 100644 --- a/sd/consul/subscriber.go +++ b/sd/consul/subscriber.go @@ -32,7 +32,7 @@ var _ sd.Subscriber = &Subscriber{} // NewSubscriber returns a Consul subscriber which returns endpoints for the // requested service. It only returns instances for which all of the passed tags // are present. -func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service string, tags []string, passingOnly bool) (*Subscriber, error) { +func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service string, tags []string, passingOnly bool) *Subscriber { s := &Subscriber{ cache: cache.New(factory, logger), client: client, @@ -52,7 +52,7 @@ func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service s.cache.Update(instances) go s.loop(index) - return s, nil + return s } // Endpoints implements the Subscriber interface. diff --git a/sd/consul/subscriber_test.go b/sd/consul/subscriber_test.go index 9be92bb..f581216 100644 --- a/sd/consul/subscriber_test.go +++ b/sd/consul/subscriber_test.go @@ -63,10 +63,7 @@ func TestSubscriber(t *testing.T) { client = newTestClient(consulState) ) - s, err := NewSubscriber(client, testFactory, logger, "search", []string{"api"}, true) - if err != nil { - t.Fatal(err) - } + s := NewSubscriber(client, testFactory, logger, "search", []string{"api"}, true) defer s.Stop() endpoints, err := s.Endpoints() @@ -85,10 +82,7 @@ func TestSubscriberNoService(t *testing.T) { client = newTestClient(consulState) ) - s, err := NewSubscriber(client, testFactory, logger, "feed", []string{}, true) - if err != nil { - t.Fatal(err) - } + s := NewSubscriber(client, testFactory, logger, "feed", []string{}, true) defer s.Stop() endpoints, err := s.Endpoints() @@ -107,10 +101,7 @@ func TestSubscriberWithTags(t *testing.T) { client = newTestClient(consulState) ) - s, err := NewSubscriber(client, testFactory, logger, "search", []string{"api", "v2"}, true) - if err != nil { - t.Fatal(err) - } + s := NewSubscriber(client, testFactory, logger, "search", []string{"api", "v2"}, true) defer s.Stop() endpoints, err := s.Endpoints() @@ -124,10 +115,7 @@ func TestSubscriberWithTags(t *testing.T) { } func TestSubscriberAddressOverride(t *testing.T) { - s, err := NewSubscriber(newTestClient(consulState), testFactory, log.NewNopLogger(), "search", []string{"db"}, true) - if err != nil { - t.Fatal(err) - } + s := NewSubscriber(newTestClient(consulState), testFactory, log.NewNopLogger(), "search", []string{"db"}, true) defer s.Stop() endpoints, err := s.Endpoints() diff --git a/sd/factory.go b/sd/factory.go index 0e3e196..af99817 100644 --- a/sd/factory.go +++ b/sd/factory.go @@ -9,7 +9,8 @@ import ( // Factory is a function that converts an instance string (e.g. host:port) to a // specific endpoint. Instances that provide multiple endpoints require multiple // factories. A factory also returns an io.Closer that's invoked when the -// instance goes away and needs to be cleaned up. +// instance goes away and needs to be cleaned up. Factories may return nil +// closers. // // Users are expected to provide their own factory functions that assume // specific transports, or can deduce transports by parsing the instance string.