You've already forked golang-saas-starter-kit
mirror of
https://github.com/raseels-repos/golang-saas-starter-kit.git
synced 2025-07-15 01:34:32 +02:00
Imported github.com/ardanlabs/service as base example project
This commit is contained in:
23
example-project/cmd/sidecar/tracer/handlers/health.go
Normal file
23
example-project/cmd/sidecar/tracer/handlers/health.go
Normal file
@ -0,0 +1,23 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
)
|
||||
|
||||
// Health provides support for orchestration health checks.
|
||||
type Health struct{}
|
||||
|
||||
// Check validates the service is ready and healthy to accept requests.
|
||||
func (h *Health) Check(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
status := struct {
|
||||
Status string `json:"status"`
|
||||
}{
|
||||
Status: "ok",
|
||||
}
|
||||
|
||||
web.Respond(ctx, w, status, http.StatusOK)
|
||||
return nil
|
||||
}
|
25
example-project/cmd/sidecar/tracer/handlers/routes.go
Normal file
25
example-project/cmd/sidecar/tracer/handlers/routes.go
Normal file
@ -0,0 +1,25 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/mid"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
)
|
||||
|
||||
// API returns a handler for a set of routes.
|
||||
func API(shutdown chan os.Signal, log *log.Logger, zipkinHost string, apiHost string) http.Handler {
|
||||
|
||||
app := web.NewApp(shutdown, log, mid.Logger(log), mid.Errors(log))
|
||||
|
||||
z := NewZipkin(zipkinHost, apiHost, time.Second)
|
||||
app.Handle("POST", "/v1/publish", z.Publish)
|
||||
|
||||
h := Health{}
|
||||
app.Handle("GET", "/v1/health", h.Check)
|
||||
|
||||
return app
|
||||
}
|
326
example-project/cmd/sidecar/tracer/handlers/zipkin.go
Normal file
326
example-project/cmd/sidecar/tracer/handlers/zipkin.go
Normal file
@ -0,0 +1,326 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"github.com/openzipkin/zipkin-go/model"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// Zipkin represents the API to collect span data and send to zipkin.
|
||||
type Zipkin struct {
|
||||
zipkinHost string // IP:port of the zipkin service.
|
||||
localHost string // IP:port of the sidecare consuming the trace data.
|
||||
sendTimeout time.Duration // Time to wait for the sidecar to respond on send.
|
||||
client http.Client // Provides APIs for performing the http send.
|
||||
}
|
||||
|
||||
// NewZipkin provides support for publishing traces to zipkin.
|
||||
func NewZipkin(zipkinHost string, localHost string, sendTimeout time.Duration) *Zipkin {
|
||||
tr := http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 2,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
}
|
||||
|
||||
z := Zipkin{
|
||||
zipkinHost: zipkinHost,
|
||||
localHost: localHost,
|
||||
sendTimeout: sendTimeout,
|
||||
client: http.Client{
|
||||
Transport: &tr,
|
||||
},
|
||||
}
|
||||
|
||||
return &z
|
||||
}
|
||||
|
||||
// Publish takes a batch and publishes that to a host system.
|
||||
func (z *Zipkin) Publish(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
var sd []trace.SpanData
|
||||
if err := json.NewDecoder(r.Body).Decode(&sd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := z.send(sd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
web.Respond(ctx, w, nil, http.StatusNoContent)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// send uses HTTP to send the data to the tracing sidecar for processing.
|
||||
func (z *Zipkin) send(sendBatch []trace.SpanData) error {
|
||||
le, err := newEndpoint("sales-api", z.localHost)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sm := convertForZipkin(sendBatch, le)
|
||||
data, err := json.Marshal(sm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", z.zipkinHost, bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(req.Context(), z.sendTimeout)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
ch := make(chan error)
|
||||
go func() {
|
||||
resp, err := z.client.Do(req)
|
||||
if err != nil {
|
||||
ch <- err
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on call : status[%s]", resp.Status)
|
||||
return
|
||||
}
|
||||
ch <- fmt.Errorf("error on call : status[%s] : %s", resp.Status, string(data))
|
||||
return
|
||||
}
|
||||
|
||||
ch <- nil
|
||||
}()
|
||||
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
||||
const (
|
||||
statusCodeTagKey = "error"
|
||||
statusDescriptionTagKey = "opencensus.status_description"
|
||||
)
|
||||
|
||||
var (
|
||||
sampledTrue = true
|
||||
canonicalCodes = [...]string{
|
||||
"OK",
|
||||
"CANCELLED",
|
||||
"UNKNOWN",
|
||||
"INVALID_ARGUMENT",
|
||||
"DEADLINE_EXCEEDED",
|
||||
"NOT_FOUND",
|
||||
"ALREADY_EXISTS",
|
||||
"PERMISSION_DENIED",
|
||||
"RESOURCE_EXHAUSTED",
|
||||
"FAILED_PRECONDITION",
|
||||
"ABORTED",
|
||||
"OUT_OF_RANGE",
|
||||
"UNIMPLEMENTED",
|
||||
"INTERNAL",
|
||||
"UNAVAILABLE",
|
||||
"DATA_LOSS",
|
||||
"UNAUTHENTICATED",
|
||||
}
|
||||
)
|
||||
|
||||
func convertForZipkin(spanData []trace.SpanData, localEndpoint *model.Endpoint) []model.SpanModel {
|
||||
sm := make([]model.SpanModel, len(spanData))
|
||||
for i := range spanData {
|
||||
sm[i] = zipkinSpan(&spanData[i], localEndpoint)
|
||||
}
|
||||
return sm
|
||||
}
|
||||
|
||||
func newEndpoint(serviceName string, hostPort string) (*model.Endpoint, error) {
|
||||
e := &model.Endpoint{
|
||||
ServiceName: serviceName,
|
||||
}
|
||||
|
||||
if hostPort == "" || hostPort == ":0" {
|
||||
if serviceName == "" {
|
||||
// if all properties are empty we should not have an Endpoint object.
|
||||
return nil, nil
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
if strings.IndexByte(hostPort, ':') < 0 {
|
||||
hostPort += ":0"
|
||||
}
|
||||
|
||||
host, port, err := net.SplitHostPort(hostPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p, err := strconv.ParseUint(port, 10, 16)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.Port = uint16(p)
|
||||
|
||||
addrs, err := net.LookupIP(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range addrs {
|
||||
addr := addrs[i].To4()
|
||||
if addr == nil {
|
||||
// IPv6 - 16 bytes
|
||||
if e.IPv6 == nil {
|
||||
e.IPv6 = addrs[i].To16()
|
||||
}
|
||||
} else {
|
||||
// IPv4 - 4 bytes
|
||||
if e.IPv4 == nil {
|
||||
e.IPv4 = addr
|
||||
}
|
||||
}
|
||||
if e.IPv4 != nil && e.IPv6 != nil {
|
||||
// Both IPv4 & IPv6 have been set, done...
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// default to 0 filled 4 byte array for IPv4 if IPv6 only host was found
|
||||
if e.IPv4 == nil {
|
||||
e.IPv4 = make([]byte, 4)
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func canonicalCodeString(code int32) string {
|
||||
if code < 0 || int(code) >= len(canonicalCodes) {
|
||||
return "error code " + strconv.FormatInt(int64(code), 10)
|
||||
}
|
||||
return canonicalCodes[code]
|
||||
}
|
||||
|
||||
func convertTraceID(t trace.TraceID) model.TraceID {
|
||||
return model.TraceID{
|
||||
High: binary.BigEndian.Uint64(t[:8]),
|
||||
Low: binary.BigEndian.Uint64(t[8:]),
|
||||
}
|
||||
}
|
||||
|
||||
func convertSpanID(s trace.SpanID) model.ID {
|
||||
return model.ID(binary.BigEndian.Uint64(s[:]))
|
||||
}
|
||||
|
||||
func spanKind(s *trace.SpanData) model.Kind {
|
||||
switch s.SpanKind {
|
||||
case trace.SpanKindClient:
|
||||
return model.Client
|
||||
case trace.SpanKindServer:
|
||||
return model.Server
|
||||
}
|
||||
return model.Undetermined
|
||||
}
|
||||
|
||||
func zipkinSpan(s *trace.SpanData, localEndpoint *model.Endpoint) model.SpanModel {
|
||||
sc := s.SpanContext
|
||||
z := model.SpanModel{
|
||||
SpanContext: model.SpanContext{
|
||||
TraceID: convertTraceID(sc.TraceID),
|
||||
ID: convertSpanID(sc.SpanID),
|
||||
Sampled: &sampledTrue,
|
||||
},
|
||||
Kind: spanKind(s),
|
||||
Name: s.Name,
|
||||
Timestamp: s.StartTime,
|
||||
Shared: false,
|
||||
LocalEndpoint: localEndpoint,
|
||||
}
|
||||
|
||||
if s.ParentSpanID != (trace.SpanID{}) {
|
||||
id := convertSpanID(s.ParentSpanID)
|
||||
z.ParentID = &id
|
||||
}
|
||||
|
||||
if s, e := s.StartTime, s.EndTime; !s.IsZero() && !e.IsZero() {
|
||||
z.Duration = e.Sub(s)
|
||||
}
|
||||
|
||||
// construct Tags from s.Attributes and s.Status.
|
||||
if len(s.Attributes) != 0 {
|
||||
m := make(map[string]string, len(s.Attributes)+2)
|
||||
for key, value := range s.Attributes {
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
m[key] = v
|
||||
case bool:
|
||||
if v {
|
||||
m[key] = "true"
|
||||
} else {
|
||||
m[key] = "false"
|
||||
}
|
||||
case int64:
|
||||
m[key] = strconv.FormatInt(v, 10)
|
||||
}
|
||||
}
|
||||
z.Tags = m
|
||||
}
|
||||
if s.Status.Code != 0 || s.Status.Message != "" {
|
||||
if z.Tags == nil {
|
||||
z.Tags = make(map[string]string, 2)
|
||||
}
|
||||
if s.Status.Code != 0 {
|
||||
z.Tags[statusCodeTagKey] = canonicalCodeString(s.Status.Code)
|
||||
}
|
||||
if s.Status.Message != "" {
|
||||
z.Tags[statusDescriptionTagKey] = s.Status.Message
|
||||
}
|
||||
}
|
||||
|
||||
// construct Annotations from s.Annotations and s.MessageEvents.
|
||||
if len(s.Annotations) != 0 || len(s.MessageEvents) != 0 {
|
||||
z.Annotations = make([]model.Annotation, 0, len(s.Annotations)+len(s.MessageEvents))
|
||||
for _, a := range s.Annotations {
|
||||
z.Annotations = append(z.Annotations, model.Annotation{
|
||||
Timestamp: a.Time,
|
||||
Value: a.Message,
|
||||
})
|
||||
}
|
||||
for _, m := range s.MessageEvents {
|
||||
a := model.Annotation{
|
||||
Timestamp: m.Time,
|
||||
}
|
||||
switch m.EventType {
|
||||
case trace.MessageEventTypeSent:
|
||||
a.Value = "SENT"
|
||||
case trace.MessageEventTypeRecv:
|
||||
a.Value = "RECV"
|
||||
default:
|
||||
a.Value = "<?>"
|
||||
}
|
||||
z.Annotations = append(z.Annotations, a)
|
||||
}
|
||||
}
|
||||
|
||||
return z
|
||||
}
|
118
example-project/cmd/sidecar/tracer/main.go
Normal file
118
example-project/cmd/sidecar/tracer/main.go
Normal file
@ -0,0 +1,118 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/cmd/sidecar/tracer/handlers"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
// =========================================================================
|
||||
// Logging
|
||||
|
||||
log := log.New(os.Stdout, "TRACER : ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
|
||||
defer log.Println("main : Completed")
|
||||
|
||||
// =========================================================================
|
||||
// Configuration
|
||||
|
||||
var cfg struct {
|
||||
Web struct {
|
||||
APIHost string `default:"0.0.0.0:3002" envconfig:"API_HOST"`
|
||||
DebugHost string `default:"0.0.0.0:4002" envconfig:"DEBUG_HOST"`
|
||||
ReadTimeout time.Duration `default:"5s" envconfig:"READ_TIMEOUT"`
|
||||
WriteTimeout time.Duration `default:"5s" envconfig:"WRITE_TIMEOUT"`
|
||||
ShutdownTimeout time.Duration `default:"5s" envconfig:"SHUTDOWN_TIMEOUT"`
|
||||
}
|
||||
Zipkin struct {
|
||||
Host string `default:"http://zipkin:9411/api/v2/spans" envconfig:"HOST"`
|
||||
}
|
||||
}
|
||||
|
||||
if err := envconfig.Process("TRACER", &cfg); err != nil {
|
||||
log.Fatalf("main : Parsing Config : %v", err)
|
||||
}
|
||||
|
||||
cfgJSON, err := json.MarshalIndent(cfg, "", " ")
|
||||
if err != nil {
|
||||
log.Fatalf("main : Marshalling Config to JSON : %v", err)
|
||||
}
|
||||
log.Printf("config : %v\n", string(cfgJSON))
|
||||
|
||||
// =========================================================================
|
||||
// Start Debug Service. Not concerned with shutting this down when the
|
||||
// application is being shutdown.
|
||||
//
|
||||
// /debug/pprof - Added to the default mux by the net/http/pprof package.
|
||||
go func() {
|
||||
log.Printf("main : Debug Listening %s", cfg.Web.DebugHost)
|
||||
log.Printf("main : Debug Listener closed : %v", http.ListenAndServe(cfg.Web.DebugHost, http.DefaultServeMux))
|
||||
}()
|
||||
|
||||
// =========================================================================
|
||||
// Start API Service
|
||||
|
||||
// Make a channel to listen for an interrupt or terminate signal from the OS.
|
||||
// Use a buffered channel because the signal package requires it.
|
||||
shutdown := make(chan os.Signal, 1)
|
||||
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
api := http.Server{
|
||||
Addr: cfg.Web.APIHost,
|
||||
Handler: handlers.API(shutdown, log, cfg.Zipkin.Host, cfg.Web.APIHost),
|
||||
ReadTimeout: cfg.Web.ReadTimeout,
|
||||
WriteTimeout: cfg.Web.WriteTimeout,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
|
||||
// Make a channel to listen for errors coming from the listener. Use a
|
||||
// buffered channel so the goroutine can exit if we don't collect this error.
|
||||
serverErrors := make(chan error, 1)
|
||||
|
||||
// Start the service listening for requests.
|
||||
go func() {
|
||||
log.Printf("main : API Listening %s", cfg.Web.APIHost)
|
||||
serverErrors <- api.ListenAndServe()
|
||||
}()
|
||||
|
||||
// =========================================================================
|
||||
// Shutdown
|
||||
|
||||
// Blocking main and waiting for shutdown.
|
||||
select {
|
||||
case err := <-serverErrors:
|
||||
log.Fatalf("main : Error starting server: %v", err)
|
||||
|
||||
case sig := <-shutdown:
|
||||
log.Printf("main : %v : Start shutdown..", sig)
|
||||
|
||||
// Create context for Shutdown call.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.Web.ShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Asking listener to shutdown and load shed.
|
||||
err := api.Shutdown(ctx)
|
||||
if err != nil {
|
||||
log.Printf("main : Graceful shutdown did not complete in %v : %v", cfg.Web.ShutdownTimeout, err)
|
||||
err = api.Close()
|
||||
}
|
||||
|
||||
// Log the status of this shutdown.
|
||||
switch {
|
||||
case sig == syscall.SIGSTOP:
|
||||
log.Fatal("main : Integrity issue caused shutdown")
|
||||
case err != nil:
|
||||
log.Fatalf("main : Could not stop server gracefully : %v", err)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user