You've already forked golang-saas-starter-kit
mirror of
https://github.com/raseels-repos/golang-saas-starter-kit.git
synced 2025-06-15 00:15:15 +02:00
go fmt
This commit is contained in:
@ -198,6 +198,8 @@ additianal info required here in readme
|
||||
|
||||
need to copy sample.env_docker_compose to .env_docker_compose and defined your aws configs for docker-compose
|
||||
|
||||
need to add mid tracer for all requests
|
||||
|
||||
|
||||
/*
|
||||
ZipKin: http://localhost:9411
|
||||
|
@ -1,73 +0,0 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Expvar provides the ability to receive metrics
|
||||
// from internal services using expvar.
|
||||
type Expvar struct {
|
||||
host string
|
||||
tr *http.Transport
|
||||
client http.Client
|
||||
}
|
||||
|
||||
// New creates a Expvar for collection metrics.
|
||||
func New(host string) (*Expvar, error) {
|
||||
tr := http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 2,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
}
|
||||
|
||||
exp := Expvar{
|
||||
host: host,
|
||||
tr: &tr,
|
||||
client: http.Client{
|
||||
Transport: &tr,
|
||||
Timeout: 1 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
return &exp, nil
|
||||
}
|
||||
|
||||
func (exp *Expvar) Collect() (map[string]interface{}, error) {
|
||||
req, err := http.NewRequest("GET", exp.host, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := exp.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
msg, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, errors.New(string(msg))
|
||||
}
|
||||
|
||||
data := make(map[string]interface{})
|
||||
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
@ -1,109 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/cmd/sidecar/metrics/collector"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/cmd/sidecar/metrics/publisher"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/cmd/sidecar/metrics/publisher/expvar"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
// =========================================================================
|
||||
// Logging
|
||||
|
||||
log := log.New(os.Stdout, "TRACER : ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
|
||||
defer log.Println("main : Completed")
|
||||
|
||||
// =========================================================================
|
||||
// Configuration
|
||||
|
||||
var cfg struct {
|
||||
Web struct {
|
||||
DebugHost string `default:"0.0.0.0:4001" envconfig:"DEBUG_HOST"`
|
||||
ReadTimeout time.Duration `default:"5s" envconfig:"READ_TIMEOUT"`
|
||||
WriteTimeout time.Duration `default:"5s" envconfig:"WRITE_TIMEOUT"`
|
||||
ShutdownTimeout time.Duration `default:"5s" envconfig:"SHUTDOWN_TIMEOUT"`
|
||||
}
|
||||
Expvar struct {
|
||||
Host string `default:"0.0.0.0:3001" envconfig:"HOST"`
|
||||
Route string `default:"/metrics" envconfig:"ROUTE"`
|
||||
ReadTimeout time.Duration `default:"5s" envconfig:"READ_TIMEOUT"`
|
||||
WriteTimeout time.Duration `default:"5s" envconfig:"WRITE_TIMEOUT"`
|
||||
ShutdownTimeout time.Duration `default:"5s" envconfig:"SHUTDOWN_TIMEOUT"`
|
||||
}
|
||||
Collect struct {
|
||||
From string `default:"http://web-api:4000/debug/vars" envconfig:"FROM"`
|
||||
}
|
||||
Publish struct {
|
||||
To string `default:"console" envconfig:"TO"`
|
||||
Interval time.Duration `default:"5s" envconfig:"INTERVAL"`
|
||||
}
|
||||
}
|
||||
|
||||
if err := envconfig.Process("METRICS", &cfg); err != nil {
|
||||
log.Fatalf("main : Parsing Config : %v", err)
|
||||
}
|
||||
|
||||
cfgJSON, err := json.MarshalIndent(cfg, "", " ")
|
||||
if err != nil {
|
||||
log.Fatalf("main : Marshalling Config to JSON : %v", err)
|
||||
}
|
||||
log.Printf("config : %v\n", string(cfgJSON))
|
||||
|
||||
// =========================================================================
|
||||
// Start Debug Service. Not concerned with shutting this down when the
|
||||
// application is being shutdown.
|
||||
//
|
||||
// /debug/pprof - Added to the default mux by the net/http/pprof package.
|
||||
go func() {
|
||||
log.Printf("main : Debug Listening %s", cfg.Web.DebugHost)
|
||||
log.Printf("main : Debug Listener closed : %v", http.ListenAndServe(cfg.Web.DebugHost, http.DefaultServeMux))
|
||||
}()
|
||||
|
||||
// =========================================================================
|
||||
// Start expvar Service
|
||||
|
||||
exp := expvar.New(log, cfg.Expvar.Host, cfg.Expvar.Route, cfg.Expvar.ReadTimeout, cfg.Expvar.WriteTimeout)
|
||||
defer exp.Stop(cfg.Expvar.ShutdownTimeout)
|
||||
|
||||
// =========================================================================
|
||||
// Start collectors and publishers
|
||||
|
||||
// Initialize to allow for the collection of metrics.
|
||||
collector, err := collector.New(cfg.Collect.From)
|
||||
if err != nil {
|
||||
log.Fatalf("main : Starting collector : %v", err)
|
||||
}
|
||||
|
||||
// Create a stdout publisher.
|
||||
// TODO: Respect the cfg.publish.to config option.
|
||||
stdout := publisher.NewStdout(log)
|
||||
|
||||
// Start the publisher to collect/publish metrics.
|
||||
publish, err := publisher.New(log, collector, cfg.Publish.Interval, exp.Publish, stdout.Publish)
|
||||
if err != nil {
|
||||
log.Fatalf("main : Starting publisher : %v", err)
|
||||
}
|
||||
defer publish.Stop()
|
||||
|
||||
// =========================================================================
|
||||
// Shutdown
|
||||
|
||||
// Make a channel to listen for an interrupt or terminate signal from the OS.
|
||||
// Use a buffered channel because the signal package requires it.
|
||||
shutdown := make(chan os.Signal, 1)
|
||||
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
|
||||
<-shutdown
|
||||
|
||||
log.Println("main : Start shutdown...")
|
||||
}
|
@ -1,164 +0,0 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Datadog provides the ability to publish metrics to Datadog.
|
||||
type Datadog struct {
|
||||
log *log.Logger
|
||||
apiKey string
|
||||
host string
|
||||
tr *http.Transport
|
||||
client http.Client
|
||||
}
|
||||
|
||||
// New initializes Datadog access for publishing metrics.
|
||||
func New(log *log.Logger, apiKey string, host string) *Datadog {
|
||||
tr := http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 2,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
}
|
||||
|
||||
d := Datadog{
|
||||
log: log,
|
||||
apiKey: apiKey,
|
||||
host: host,
|
||||
tr: &tr,
|
||||
client: http.Client{
|
||||
Transport: &tr,
|
||||
Timeout: 1 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
return &d
|
||||
}
|
||||
|
||||
// Publish handles the processing of metrics for deliver
|
||||
// to the DataDog.
|
||||
func (d *Datadog) Publish(data map[string]interface{}) {
|
||||
doc, err := marshalDatadog(d.log, data)
|
||||
if err != nil {
|
||||
d.log.Println("datadog.publish :", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := sendDatadog(d, doc); err != nil {
|
||||
d.log.Println("datadog.publish :", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("datadog.publish : published :", string(doc))
|
||||
}
|
||||
|
||||
// marshalDatadog converts the data map to datadog JSON document.
|
||||
func marshalDatadog(log *log.Logger, data map[string]interface{}) ([]byte, error) {
|
||||
/*
|
||||
{ "series" : [
|
||||
{
|
||||
"metric":"test.metric",
|
||||
"points": [
|
||||
[
|
||||
$currenttime,
|
||||
20
|
||||
]
|
||||
],
|
||||
"type":"gauge",
|
||||
"host":"test.example.com",
|
||||
"tags": [
|
||||
"environment:test"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
*/
|
||||
|
||||
// Extract the base keys/values.
|
||||
mType := "gauge"
|
||||
host, ok := data["host"].(string)
|
||||
if !ok {
|
||||
host = "unknown"
|
||||
}
|
||||
env := "dev"
|
||||
if host != "localhost" {
|
||||
env = "prod"
|
||||
}
|
||||
envTag := "environment:" + env
|
||||
|
||||
// Define the Datadog data format.
|
||||
type series struct {
|
||||
Metric string `json:"metric"`
|
||||
Points [][]interface{} `json:"points"`
|
||||
Type string `json:"type"`
|
||||
Host string `json:"host"`
|
||||
Tags []string `json:"tags"`
|
||||
}
|
||||
|
||||
// Populate the data into the data structure.
|
||||
var doc struct {
|
||||
Series []series `json:"series"`
|
||||
}
|
||||
for key, value := range data {
|
||||
switch value.(type) {
|
||||
case int, float64:
|
||||
doc.Series = append(doc.Series, series{
|
||||
Metric: env + "." + key,
|
||||
Points: [][]interface{}{{"$currenttime", value}},
|
||||
Type: mType,
|
||||
Host: host,
|
||||
Tags: []string{envTag},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Convert the data into JSON.
|
||||
out, err := json.MarshalIndent(doc, "", " ")
|
||||
if err != nil {
|
||||
log.Println("datadog.publish : marshaling :", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// sendDatadog sends data to the datadog servers.
|
||||
func sendDatadog(d *Datadog, data []byte) error {
|
||||
url := fmt.Sprintf("%s?api_key=%s", d.host, d.apiKey)
|
||||
b := bytes.NewBuffer(data)
|
||||
|
||||
r, err := http.NewRequest("POST", url, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := d.client.Do(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
out, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("status[%d] : %s", resp.StatusCode, out)
|
||||
}
|
||||
return fmt.Errorf("status[%d]", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,96 +0,0 @@
|
||||
package expvar
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dimfeld/httptreemux"
|
||||
)
|
||||
|
||||
// Expvar provide our basic publishing.
|
||||
type Expvar struct {
|
||||
log *log.Logger
|
||||
server http.Server
|
||||
data map[string]interface{}
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// New starts a service for consuming the raw expvar stats.
|
||||
func New(log *log.Logger, host string, route string, readTimeout, writeTimeout time.Duration) *Expvar {
|
||||
mux := httptreemux.New()
|
||||
exp := Expvar{
|
||||
log: log,
|
||||
server: http.Server{
|
||||
Addr: host,
|
||||
Handler: mux,
|
||||
ReadTimeout: readTimeout,
|
||||
WriteTimeout: writeTimeout,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
},
|
||||
}
|
||||
|
||||
mux.Handle("GET", route, exp.handler)
|
||||
|
||||
go func() {
|
||||
log.Println("expvar : API Listening", host)
|
||||
if err := exp.server.ListenAndServe(); err != nil {
|
||||
log.Println("expvar : ERROR :", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return &exp
|
||||
}
|
||||
|
||||
// Stop shuts down the service.
|
||||
func (exp *Expvar) Stop(shutdownTimeout time.Duration) {
|
||||
exp.log.Println("expvar : Start shutdown...")
|
||||
defer exp.log.Println("expvar : Completed")
|
||||
|
||||
// Create context for Shutdown call.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Asking listener to shutdown and load shed.
|
||||
if err := exp.server.Shutdown(ctx); err != nil {
|
||||
exp.log.Printf("expvar : Graceful shutdown did not complete in %v : %v", shutdownTimeout, err)
|
||||
if err := exp.server.Close(); err != nil {
|
||||
exp.log.Fatalf("expvar : Could not stop http server: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Publish is called by the publisher goroutine and saves the raw stats.
|
||||
func (exp *Expvar) Publish(data map[string]interface{}) {
|
||||
exp.mu.Lock()
|
||||
{
|
||||
exp.data = data
|
||||
}
|
||||
exp.mu.Unlock()
|
||||
}
|
||||
|
||||
// handler is what consumers call to get the raw stats.
|
||||
func (exp *Expvar) handler(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
var data map[string]interface{}
|
||||
exp.mu.Lock()
|
||||
{
|
||||
data = exp.data
|
||||
}
|
||||
exp.mu.Unlock()
|
||||
|
||||
if err := json.NewEncoder(w).Encode(data); err != nil {
|
||||
exp.log.Println("expvar : ERROR :", err)
|
||||
}
|
||||
|
||||
log.Printf("expvar : (%d) : %s %s -> %s",
|
||||
http.StatusOK,
|
||||
r.Method, r.URL.Path,
|
||||
r.RemoteAddr,
|
||||
)
|
||||
}
|
@ -1,128 +0,0 @@
|
||||
package publisher
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Set of possible publisher types.
|
||||
const (
|
||||
TypeStdout = "stdout"
|
||||
TypeDatadog = "datadog"
|
||||
)
|
||||
|
||||
// =============================================================================
|
||||
|
||||
// Collector defines a contract a collector must support
|
||||
// so a consumer can retrieve metrics.
|
||||
type Collector interface {
|
||||
Collect() (map[string]interface{}, error)
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
||||
// Publisher defines a handler function that will be called
|
||||
// on each interval.
|
||||
type Publisher func(map[string]interface{})
|
||||
|
||||
// Publish provides the ability to receive metrics
|
||||
// on an interval.
|
||||
type Publish struct {
|
||||
log *log.Logger
|
||||
collector Collector
|
||||
publisher []Publisher
|
||||
wg sync.WaitGroup
|
||||
timer *time.Timer
|
||||
shutdown chan struct{}
|
||||
}
|
||||
|
||||
// New creates a Publish for consuming and publishing metrics.
|
||||
func New(log *log.Logger, collector Collector, interval time.Duration, publisher ...Publisher) (*Publish, error) {
|
||||
p := Publish{
|
||||
log: log,
|
||||
collector: collector,
|
||||
publisher: publisher,
|
||||
timer: time.NewTimer(interval),
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
for {
|
||||
p.timer.Reset(interval)
|
||||
select {
|
||||
case <-p.timer.C:
|
||||
p.update()
|
||||
case <-p.shutdown:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// Stop is used to shutdown the goroutine collecting metrics.
|
||||
func (p *Publish) Stop() {
|
||||
close(p.shutdown)
|
||||
p.wg.Wait()
|
||||
}
|
||||
|
||||
// update pulls the metrics and publishes them to the specified system.
|
||||
func (p *Publish) update() {
|
||||
data, err := p.collector.Collect()
|
||||
if err != nil {
|
||||
p.log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, pub := range p.publisher {
|
||||
pub(data)
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
||||
// Stdout provide our basic publishing.
|
||||
type Stdout struct {
|
||||
log *log.Logger
|
||||
}
|
||||
|
||||
// NewStdout initializes stdout for publishing metrics.
|
||||
func NewStdout(log *log.Logger) *Stdout {
|
||||
return &Stdout{log}
|
||||
}
|
||||
|
||||
// Publish publishers for writing to stdout.
|
||||
func (s *Stdout) Publish(data map[string]interface{}) {
|
||||
rawJSON, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
s.log.Println("Stdout : Marshal ERROR :", err)
|
||||
return
|
||||
}
|
||||
|
||||
var d map[string]interface{}
|
||||
if err := json.Unmarshal(rawJSON, &d); err != nil {
|
||||
s.log.Println("Stdout : Unmarshal ERROR :", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Add heap value into the data set.
|
||||
memStats, ok := (d["memstats"]).(map[string]interface{})
|
||||
if ok {
|
||||
d["heap"] = memStats["Alloc"]
|
||||
}
|
||||
|
||||
// Remove unnecessary keys.
|
||||
delete(d, "memstats")
|
||||
delete(d, "cmdline")
|
||||
|
||||
out, err := json.MarshalIndent(d, "", " ")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.log.Println("Stdout :\n", string(out))
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
)
|
||||
|
||||
// Health provides support for orchestration health checks.
|
||||
type Health struct{}
|
||||
|
||||
// Check validates the service is ready and healthy to accept requests.
|
||||
func (h *Health) Check(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
status := struct {
|
||||
Status string `json:"status"`
|
||||
}{
|
||||
Status: "ok",
|
||||
}
|
||||
|
||||
web.Respond(ctx, w, status, http.StatusOK)
|
||||
return nil
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/mid"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
)
|
||||
|
||||
// API returns a handler for a set of routes.
|
||||
func API(shutdown chan os.Signal, log *log.Logger, zipkinHost string, apiHost string) http.Handler {
|
||||
|
||||
app := web.NewApp(shutdown, log, mid.Logger(log), mid.Errors(log))
|
||||
|
||||
z := NewZipkin(zipkinHost, apiHost, time.Second)
|
||||
app.Handle("POST", "/v1/publish", z.Publish)
|
||||
|
||||
h := Health{}
|
||||
app.Handle("GET", "/v1/health", h.Check)
|
||||
|
||||
return app
|
||||
}
|
@ -1,326 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"github.com/openzipkin/zipkin-go/model"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// Zipkin represents the API to collect span data and send to zipkin.
|
||||
type Zipkin struct {
|
||||
zipkinHost string // IP:port of the zipkin service.
|
||||
localHost string // IP:port of the sidecare consuming the trace data.
|
||||
sendTimeout time.Duration // Time to wait for the sidecar to respond on send.
|
||||
client http.Client // Provides APIs for performing the http send.
|
||||
}
|
||||
|
||||
// NewZipkin provides support for publishing traces to zipkin.
|
||||
func NewZipkin(zipkinHost string, localHost string, sendTimeout time.Duration) *Zipkin {
|
||||
tr := http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 2,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
}
|
||||
|
||||
z := Zipkin{
|
||||
zipkinHost: zipkinHost,
|
||||
localHost: localHost,
|
||||
sendTimeout: sendTimeout,
|
||||
client: http.Client{
|
||||
Transport: &tr,
|
||||
},
|
||||
}
|
||||
|
||||
return &z
|
||||
}
|
||||
|
||||
// Publish takes a batch and publishes that to a host system.
|
||||
func (z *Zipkin) Publish(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
var sd []trace.SpanData
|
||||
if err := json.NewDecoder(r.Body).Decode(&sd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := z.send(sd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
web.Respond(ctx, w, nil, http.StatusNoContent)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// send uses HTTP to send the data to the tracing sidecar for processing.
|
||||
func (z *Zipkin) send(sendBatch []trace.SpanData) error {
|
||||
le, err := newEndpoint("web-api", z.localHost)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sm := convertForZipkin(sendBatch, le)
|
||||
data, err := json.Marshal(sm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", z.zipkinHost, bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(req.Context(), z.sendTimeout)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
ch := make(chan error)
|
||||
go func() {
|
||||
resp, err := z.client.Do(req)
|
||||
if err != nil {
|
||||
ch <- err
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on call : status[%s]", resp.Status)
|
||||
return
|
||||
}
|
||||
ch <- fmt.Errorf("error on call : status[%s] : %s", resp.Status, string(data))
|
||||
return
|
||||
}
|
||||
|
||||
ch <- nil
|
||||
}()
|
||||
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
||||
const (
|
||||
statusCodeTagKey = "error"
|
||||
statusDescriptionTagKey = "opencensus.status_description"
|
||||
)
|
||||
|
||||
var (
|
||||
sampledTrue = true
|
||||
canonicalCodes = [...]string{
|
||||
"OK",
|
||||
"CANCELLED",
|
||||
"UNKNOWN",
|
||||
"INVALID_ARGUMENT",
|
||||
"DEADLINE_EXCEEDED",
|
||||
"NOT_FOUND",
|
||||
"ALREADY_EXISTS",
|
||||
"PERMISSION_DENIED",
|
||||
"RESOURCE_EXHAUSTED",
|
||||
"FAILED_PRECONDITION",
|
||||
"ABORTED",
|
||||
"OUT_OF_RANGE",
|
||||
"UNIMPLEMENTED",
|
||||
"INTERNAL",
|
||||
"UNAVAILABLE",
|
||||
"DATA_LOSS",
|
||||
"UNAUTHENTICATED",
|
||||
}
|
||||
)
|
||||
|
||||
func convertForZipkin(spanData []trace.SpanData, localEndpoint *model.Endpoint) []model.SpanModel {
|
||||
sm := make([]model.SpanModel, len(spanData))
|
||||
for i := range spanData {
|
||||
sm[i] = zipkinSpan(&spanData[i], localEndpoint)
|
||||
}
|
||||
return sm
|
||||
}
|
||||
|
||||
func newEndpoint(serviceName string, hostPort string) (*model.Endpoint, error) {
|
||||
e := &model.Endpoint{
|
||||
ServiceName: serviceName,
|
||||
}
|
||||
|
||||
if hostPort == "" || hostPort == ":0" {
|
||||
if serviceName == "" {
|
||||
// if all properties are empty we should not have an Endpoint object.
|
||||
return nil, nil
|
||||
}
|
||||
return e, nil
|
||||
}
|
||||
|
||||
if strings.IndexByte(hostPort, ':') < 0 {
|
||||
hostPort += ":0"
|
||||
}
|
||||
|
||||
host, port, err := net.SplitHostPort(hostPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p, err := strconv.ParseUint(port, 10, 16)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.Port = uint16(p)
|
||||
|
||||
addrs, err := net.LookupIP(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range addrs {
|
||||
addr := addrs[i].To4()
|
||||
if addr == nil {
|
||||
// IPv6 - 16 bytes
|
||||
if e.IPv6 == nil {
|
||||
e.IPv6 = addrs[i].To16()
|
||||
}
|
||||
} else {
|
||||
// IPv4 - 4 bytes
|
||||
if e.IPv4 == nil {
|
||||
e.IPv4 = addr
|
||||
}
|
||||
}
|
||||
if e.IPv4 != nil && e.IPv6 != nil {
|
||||
// Both IPv4 & IPv6 have been set, done...
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// default to 0 filled 4 byte array for IPv4 if IPv6 only host was found
|
||||
if e.IPv4 == nil {
|
||||
e.IPv4 = make([]byte, 4)
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func canonicalCodeString(code int32) string {
|
||||
if code < 0 || int(code) >= len(canonicalCodes) {
|
||||
return "error code " + strconv.FormatInt(int64(code), 10)
|
||||
}
|
||||
return canonicalCodes[code]
|
||||
}
|
||||
|
||||
func convertTraceID(t trace.TraceID) model.TraceID {
|
||||
return model.TraceID{
|
||||
High: binary.BigEndian.Uint64(t[:8]),
|
||||
Low: binary.BigEndian.Uint64(t[8:]),
|
||||
}
|
||||
}
|
||||
|
||||
func convertSpanID(s trace.SpanID) model.ID {
|
||||
return model.ID(binary.BigEndian.Uint64(s[:]))
|
||||
}
|
||||
|
||||
func spanKind(s *trace.SpanData) model.Kind {
|
||||
switch s.SpanKind {
|
||||
case trace.SpanKindClient:
|
||||
return model.Client
|
||||
case trace.SpanKindServer:
|
||||
return model.Server
|
||||
}
|
||||
return model.Undetermined
|
||||
}
|
||||
|
||||
func zipkinSpan(s *trace.SpanData, localEndpoint *model.Endpoint) model.SpanModel {
|
||||
sc := s.SpanContext
|
||||
z := model.SpanModel{
|
||||
SpanContext: model.SpanContext{
|
||||
TraceID: convertTraceID(sc.TraceID),
|
||||
ID: convertSpanID(sc.SpanID),
|
||||
Sampled: &sampledTrue,
|
||||
},
|
||||
Kind: spanKind(s),
|
||||
Name: s.Name,
|
||||
Timestamp: s.StartTime,
|
||||
Shared: false,
|
||||
LocalEndpoint: localEndpoint,
|
||||
}
|
||||
|
||||
if s.ParentSpanID != (trace.SpanID{}) {
|
||||
id := convertSpanID(s.ParentSpanID)
|
||||
z.ParentID = &id
|
||||
}
|
||||
|
||||
if s, e := s.StartTime, s.EndTime; !s.IsZero() && !e.IsZero() {
|
||||
z.Duration = e.Sub(s)
|
||||
}
|
||||
|
||||
// construct Tags from s.Attributes and s.Status.
|
||||
if len(s.Attributes) != 0 {
|
||||
m := make(map[string]string, len(s.Attributes)+2)
|
||||
for key, value := range s.Attributes {
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
m[key] = v
|
||||
case bool:
|
||||
if v {
|
||||
m[key] = "true"
|
||||
} else {
|
||||
m[key] = "false"
|
||||
}
|
||||
case int64:
|
||||
m[key] = strconv.FormatInt(v, 10)
|
||||
}
|
||||
}
|
||||
z.Tags = m
|
||||
}
|
||||
if s.Status.Code != 0 || s.Status.Message != "" {
|
||||
if z.Tags == nil {
|
||||
z.Tags = make(map[string]string, 2)
|
||||
}
|
||||
if s.Status.Code != 0 {
|
||||
z.Tags[statusCodeTagKey] = canonicalCodeString(s.Status.Code)
|
||||
}
|
||||
if s.Status.Message != "" {
|
||||
z.Tags[statusDescriptionTagKey] = s.Status.Message
|
||||
}
|
||||
}
|
||||
|
||||
// construct Annotations from s.Annotations and s.MessageEvents.
|
||||
if len(s.Annotations) != 0 || len(s.MessageEvents) != 0 {
|
||||
z.Annotations = make([]model.Annotation, 0, len(s.Annotations)+len(s.MessageEvents))
|
||||
for _, a := range s.Annotations {
|
||||
z.Annotations = append(z.Annotations, model.Annotation{
|
||||
Timestamp: a.Time,
|
||||
Value: a.Message,
|
||||
})
|
||||
}
|
||||
for _, m := range s.MessageEvents {
|
||||
a := model.Annotation{
|
||||
Timestamp: m.Time,
|
||||
}
|
||||
switch m.EventType {
|
||||
case trace.MessageEventTypeSent:
|
||||
a.Value = "SENT"
|
||||
case trace.MessageEventTypeRecv:
|
||||
a.Value = "RECV"
|
||||
default:
|
||||
a.Value = "<?>"
|
||||
}
|
||||
z.Annotations = append(z.Annotations, a)
|
||||
}
|
||||
}
|
||||
|
||||
return z
|
||||
}
|
@ -1,118 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/cmd/sidecar/tracer/handlers"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
// =========================================================================
|
||||
// Logging
|
||||
|
||||
log := log.New(os.Stdout, "TRACER : ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
|
||||
defer log.Println("main : Completed")
|
||||
|
||||
// =========================================================================
|
||||
// Configuration
|
||||
|
||||
var cfg struct {
|
||||
Web struct {
|
||||
APIHost string `default:"0.0.0.0:3002" envconfig:"API_HOST"`
|
||||
DebugHost string `default:"0.0.0.0:4002" envconfig:"DEBUG_HOST"`
|
||||
ReadTimeout time.Duration `default:"5s" envconfig:"READ_TIMEOUT"`
|
||||
WriteTimeout time.Duration `default:"5s" envconfig:"WRITE_TIMEOUT"`
|
||||
ShutdownTimeout time.Duration `default:"5s" envconfig:"SHUTDOWN_TIMEOUT"`
|
||||
}
|
||||
Zipkin struct {
|
||||
Host string `default:"http://zipkin:9411/api/v2/spans" envconfig:"HOST"`
|
||||
}
|
||||
}
|
||||
|
||||
if err := envconfig.Process("TRACER", &cfg); err != nil {
|
||||
log.Fatalf("main : Parsing Config : %v", err)
|
||||
}
|
||||
|
||||
cfgJSON, err := json.MarshalIndent(cfg, "", " ")
|
||||
if err != nil {
|
||||
log.Fatalf("main : Marshalling Config to JSON : %v", err)
|
||||
}
|
||||
log.Printf("config : %v\n", string(cfgJSON))
|
||||
|
||||
// =========================================================================
|
||||
// Start Debug Service. Not concerned with shutting this down when the
|
||||
// application is being shutdown.
|
||||
//
|
||||
// /debug/pprof - Added to the default mux by the net/http/pprof package.
|
||||
go func() {
|
||||
log.Printf("main : Debug Listening %s", cfg.Web.DebugHost)
|
||||
log.Printf("main : Debug Listener closed : %v", http.ListenAndServe(cfg.Web.DebugHost, http.DefaultServeMux))
|
||||
}()
|
||||
|
||||
// =========================================================================
|
||||
// Start API Service
|
||||
|
||||
// Make a channel to listen for an interrupt or terminate signal from the OS.
|
||||
// Use a buffered channel because the signal package requires it.
|
||||
shutdown := make(chan os.Signal, 1)
|
||||
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
api := http.Server{
|
||||
Addr: cfg.Web.APIHost,
|
||||
Handler: handlers.API(shutdown, log, cfg.Zipkin.Host, cfg.Web.APIHost),
|
||||
ReadTimeout: cfg.Web.ReadTimeout,
|
||||
WriteTimeout: cfg.Web.WriteTimeout,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
|
||||
// Make a channel to listen for errors coming from the listener. Use a
|
||||
// buffered channel so the goroutine can exit if we don't collect this error.
|
||||
serverErrors := make(chan error, 1)
|
||||
|
||||
// Start the service listening for requests.
|
||||
go func() {
|
||||
log.Printf("main : API Listening %s", cfg.Web.APIHost)
|
||||
serverErrors <- api.ListenAndServe()
|
||||
}()
|
||||
|
||||
// =========================================================================
|
||||
// Shutdown
|
||||
|
||||
// Blocking main and waiting for shutdown.
|
||||
select {
|
||||
case err := <-serverErrors:
|
||||
log.Fatalf("main : Error starting server: %v", err)
|
||||
|
||||
case sig := <-shutdown:
|
||||
log.Printf("main : %v : Start shutdown..", sig)
|
||||
|
||||
// Create context for Shutdown call.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.Web.ShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Asking listener to shutdown and load shed.
|
||||
err := api.Shutdown(ctx)
|
||||
if err != nil {
|
||||
log.Printf("main : Graceful shutdown did not complete in %v : %v", cfg.Web.ShutdownTimeout, err)
|
||||
err = api.Close()
|
||||
}
|
||||
|
||||
// Log the status of this shutdown.
|
||||
switch {
|
||||
case sig == syscall.SIGSTOP:
|
||||
log.Fatal("main : Integrity issue caused shutdown")
|
||||
case err != nil:
|
||||
log.Fatalf("main : Could not stop server gracefully : %v", err)
|
||||
}
|
||||
}
|
||||
}
|
@ -4,27 +4,21 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"go.opencensus.io/trace"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
// Check provides support for orchestration health checks.
|
||||
type Check struct {
|
||||
MasterDB *db.DB
|
||||
MasterDB *sqlx.DB
|
||||
|
||||
// ADD OTHER STATE LIKE THE LOGGER IF NEEDED.
|
||||
}
|
||||
|
||||
// Health validates the service is healthy and ready to accept requests.
|
||||
func (c *Check) Health(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Check.Health")
|
||||
defer span.End()
|
||||
|
||||
dbConn := c.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
if err := dbConn.StatusCheck(ctx); err != nil {
|
||||
_, err := c.MasterDB.Exec("SELECT 1")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -34,5 +28,5 @@ func (c *Check) Health(ctx context.Context, w http.ResponseWriter, r *http.Reque
|
||||
Status: "ok",
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, status, http.StatusOK)
|
||||
return web.RespondJson(ctx, w, status, http.StatusOK)
|
||||
}
|
||||
|
@ -4,45 +4,32 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/project"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// Project represents the Project API method handler set.
|
||||
type Project struct {
|
||||
MasterDB *db.DB
|
||||
MasterDB *sqlx.DB
|
||||
|
||||
// ADD OTHER STATE LIKE THE LOGGER IF NEEDED.
|
||||
}
|
||||
|
||||
// List returns all the existing projects in the system.
|
||||
func (p *Project) List(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Project.List")
|
||||
defer span.End()
|
||||
|
||||
dbConn := p.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
projects, err := project.List(ctx, dbConn)
|
||||
projects, err := project.List(ctx, p.MasterDB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, projects, http.StatusOK)
|
||||
return web.RespondJson(ctx, w, projects, http.StatusOK)
|
||||
}
|
||||
|
||||
// Retrieve returns the specified project from the system.
|
||||
func (p *Project) Retrieve(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Project.Retrieve")
|
||||
defer span.End()
|
||||
|
||||
dbConn := p.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
prod, err := project.Retrieve(ctx, dbConn, params["id"])
|
||||
prod, err := project.Retrieve(ctx, p.MasterDB, params["id"])
|
||||
if err != nil {
|
||||
switch err {
|
||||
case project.ErrInvalidID:
|
||||
@ -54,17 +41,11 @@ func (p *Project) Retrieve(ctx context.Context, w http.ResponseWriter, r *http.R
|
||||
}
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, prod, http.StatusOK)
|
||||
return web.RespondJson(ctx, w, prod, http.StatusOK)
|
||||
}
|
||||
|
||||
// Create inserts a new project into the system.
|
||||
func (p *Project) Create(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Project.Create")
|
||||
defer span.End()
|
||||
|
||||
dbConn := p.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
v, ok := ctx.Value(web.KeyValues).(*web.Values)
|
||||
if !ok {
|
||||
return web.NewShutdownError("web value missing from context")
|
||||
@ -75,22 +56,16 @@ func (p *Project) Create(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||
return errors.Wrap(err, "")
|
||||
}
|
||||
|
||||
nUsr, err := project.Create(ctx, dbConn, &np, v.Now)
|
||||
nUsr, err := project.Create(ctx, p.MasterDB, &np, v.Now)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Project: %+v", &np)
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, nUsr, http.StatusCreated)
|
||||
return web.RespondJson(ctx, w, nUsr, http.StatusCreated)
|
||||
}
|
||||
|
||||
// Update updates the specified project in the system.
|
||||
func (p *Project) Update(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Project.Update")
|
||||
defer span.End()
|
||||
|
||||
dbConn := p.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
v, ok := ctx.Value(web.KeyValues).(*web.Values)
|
||||
if !ok {
|
||||
return web.NewShutdownError("web value missing from context")
|
||||
@ -101,7 +76,7 @@ func (p *Project) Update(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||
return errors.Wrap(err, "")
|
||||
}
|
||||
|
||||
err := project.Update(ctx, dbConn, params["id"], up, v.Now)
|
||||
err := project.Update(ctx, p.MasterDB, params["id"], up, v.Now)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case project.ErrInvalidID:
|
||||
@ -113,18 +88,12 @@ func (p *Project) Update(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, nil, http.StatusNoContent)
|
||||
return web.RespondJson(ctx, w, nil, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Delete removes the specified project from the system.
|
||||
func (p *Project) Delete(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Project.Delete")
|
||||
defer span.End()
|
||||
|
||||
dbConn := p.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
err := project.Delete(ctx, dbConn, params["id"])
|
||||
err := project.Delete(ctx, p.MasterDB, params["id"])
|
||||
if err != nil {
|
||||
switch err {
|
||||
case project.ErrInvalidID:
|
||||
@ -136,5 +105,5 @@ func (p *Project) Delete(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, nil, http.StatusNoContent)
|
||||
return web.RespondJson(ctx, w, nil, http.StatusNoContent)
|
||||
}
|
||||
|
@ -7,12 +7,12 @@ import (
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/mid"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/auth"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
// API returns a handler for a set of routes.
|
||||
func API(shutdown chan os.Signal, log *log.Logger, masterDB *db.DB, authenticator *auth.Authenticator) http.Handler {
|
||||
func API(shutdown chan os.Signal, log *log.Logger, masterDB *sqlx.DB, authenticator *auth.Authenticator) http.Handler {
|
||||
|
||||
// Construct the web.App which holds all routes as well as common Middleware.
|
||||
app := web.NewApp(shutdown, log, mid.Logger(log), mid.Errors(log), mid.Metrics(), mid.Panics())
|
||||
|
@ -5,16 +5,15 @@ import (
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/auth"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/user"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// User represents the User API method handler set.
|
||||
type User struct {
|
||||
MasterDB *db.DB
|
||||
MasterDB *sqlx.DB
|
||||
TokenGenerator user.TokenGenerator
|
||||
|
||||
// ADD OTHER STATE LIKE THE LOGGER AND CONFIG HERE.
|
||||
@ -22,34 +21,22 @@ type User struct {
|
||||
|
||||
// List returns all the existing users in the system.
|
||||
func (u *User) List(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.User.List")
|
||||
defer span.End()
|
||||
|
||||
dbConn := u.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
usrs, err := user.List(ctx, dbConn)
|
||||
usrs, err := user.List(ctx, u.MasterDB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, usrs, http.StatusOK)
|
||||
return web.RespondJson(ctx, w, usrs, http.StatusOK)
|
||||
}
|
||||
|
||||
// Retrieve returns the specified user from the system.
|
||||
func (u *User) Retrieve(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.User.Retrieve")
|
||||
defer span.End()
|
||||
|
||||
dbConn := u.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
claims, ok := ctx.Value(auth.Key).(auth.Claims)
|
||||
if !ok {
|
||||
return errors.New("claims missing from context")
|
||||
}
|
||||
|
||||
usr, err := user.Retrieve(ctx, claims, dbConn, params["id"])
|
||||
usr, err := user.Retrieve(ctx, claims, u.MasterDB, params["id"])
|
||||
if err != nil {
|
||||
switch err {
|
||||
case user.ErrInvalidID:
|
||||
@ -63,17 +50,11 @@ func (u *User) Retrieve(ctx context.Context, w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, usr, http.StatusOK)
|
||||
return web.RespondJson(ctx, w, usr, http.StatusOK)
|
||||
}
|
||||
|
||||
// Create inserts a new user into the system.
|
||||
func (u *User) Create(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.User.Create")
|
||||
defer span.End()
|
||||
|
||||
dbConn := u.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
v, ok := ctx.Value(web.KeyValues).(*web.Values)
|
||||
if !ok {
|
||||
return web.NewShutdownError("web value missing from context")
|
||||
@ -84,22 +65,16 @@ func (u *User) Create(ctx context.Context, w http.ResponseWriter, r *http.Reques
|
||||
return errors.Wrap(err, "")
|
||||
}
|
||||
|
||||
usr, err := user.Create(ctx, dbConn, &newU, v.Now)
|
||||
usr, err := user.Create(ctx, u.MasterDB, &newU, v.Now)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "User: %+v", &usr)
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, usr, http.StatusCreated)
|
||||
return web.RespondJson(ctx, w, usr, http.StatusCreated)
|
||||
}
|
||||
|
||||
// Update updates the specified user in the system.
|
||||
func (u *User) Update(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.User.Update")
|
||||
defer span.End()
|
||||
|
||||
dbConn := u.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
v, ok := ctx.Value(web.KeyValues).(*web.Values)
|
||||
if !ok {
|
||||
return web.NewShutdownError("web value missing from context")
|
||||
@ -110,7 +85,7 @@ func (u *User) Update(ctx context.Context, w http.ResponseWriter, r *http.Reques
|
||||
return errors.Wrap(err, "")
|
||||
}
|
||||
|
||||
err := user.Update(ctx, dbConn, params["id"], &upd, v.Now)
|
||||
err := user.Update(ctx, u.MasterDB, params["id"], &upd, v.Now)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case user.ErrInvalidID:
|
||||
@ -124,18 +99,12 @@ func (u *User) Update(ctx context.Context, w http.ResponseWriter, r *http.Reques
|
||||
}
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, nil, http.StatusNoContent)
|
||||
return web.RespondJson(ctx, w, nil, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Delete removes the specified user from the system.
|
||||
func (u *User) Delete(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.User.Delete")
|
||||
defer span.End()
|
||||
|
||||
dbConn := u.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
err := user.Delete(ctx, dbConn, params["id"])
|
||||
err := user.Delete(ctx, u.MasterDB, params["id"])
|
||||
if err != nil {
|
||||
switch err {
|
||||
case user.ErrInvalidID:
|
||||
@ -149,18 +118,12 @@ func (u *User) Delete(ctx context.Context, w http.ResponseWriter, r *http.Reques
|
||||
}
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, nil, http.StatusNoContent)
|
||||
return web.RespondJson(ctx, w, nil, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Token handles a request to authenticate a user. It expects a request using
|
||||
// Basic Auth with a user's email and password. It responds with a JWT.
|
||||
func (u *User) Token(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.User.Token")
|
||||
defer span.End()
|
||||
|
||||
dbConn := u.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
v, ok := ctx.Value(web.KeyValues).(*web.Values)
|
||||
if !ok {
|
||||
return web.NewShutdownError("web value missing from context")
|
||||
@ -172,7 +135,7 @@ func (u *User) Token(ctx context.Context, w http.ResponseWriter, r *http.Request
|
||||
return web.NewRequestError(err, http.StatusUnauthorized)
|
||||
}
|
||||
|
||||
tkn, err := user.Authenticate(ctx, dbConn, u.TokenGenerator, v.Now, email, pass)
|
||||
tkn, err := user.Authenticate(ctx, u.MasterDB, u.TokenGenerator, v.Now, email, pass)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case user.ErrAuthenticationFailure:
|
||||
@ -182,5 +145,5 @@ func (u *User) Token(ctx context.Context, w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
}
|
||||
|
||||
return web.Respond(ctx, w, tkn, http.StatusOK)
|
||||
return web.RespondJson(ctx, w, tkn, http.StatusOK)
|
||||
}
|
||||
|
@ -14,8 +14,6 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/lib/pq"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/cmd/web-api/handlers"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/auth"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/flag"
|
||||
@ -23,7 +21,9 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
"github.com/lib/pq"
|
||||
"go.opencensus.io/trace"
|
||||
awstrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/aws-sdk-go/aws"
|
||||
sqltrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql"
|
||||
@ -305,7 +305,7 @@ func main() {
|
||||
|
||||
api := http.Server{
|
||||
Addr: cfg.HTTP.Host,
|
||||
Handler: handlers.API(shutdown, log, masterDB, authenticator),
|
||||
Handler: handlers.API(shutdown, log, masterDb, authenticator),
|
||||
ReadTimeout: cfg.HTTP.ReadTimeout,
|
||||
WriteTimeout: cfg.HTTP.WriteTimeout,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
@ -333,13 +333,13 @@ func main() {
|
||||
log.Printf("main : %v : Start shutdown..", sig)
|
||||
|
||||
// Create context for Shutdown call.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.HTTP.ShutdownTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.App.ShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Asking listener to shutdown and load shed.
|
||||
err := api.Shutdown(ctx)
|
||||
if err != nil {
|
||||
log.Printf("main : Graceful shutdown did not complete in %v : %v", cfg.HTTP.ShutdownTimeout, err)
|
||||
log.Printf("main : Graceful shutdown did not complete in %v : %v", cfg.App.ShutdownTimeout, err)
|
||||
err = api.Close()
|
||||
}
|
||||
|
||||
|
@ -2,16 +2,15 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// Check provides support for orchestration health checks.
|
||||
type Check struct {
|
||||
MasterDB *db.DB
|
||||
MasterDB *sqlx.DB
|
||||
Renderer web.Renderer
|
||||
|
||||
// ADD OTHER STATE LIKE THE LOGGER IF NEEDED.
|
||||
@ -19,13 +18,10 @@ type Check struct {
|
||||
|
||||
// Health validates the service is healthy and ready to accept requests.
|
||||
func (c *Check) Health(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Check.Health")
|
||||
defer span.End()
|
||||
|
||||
dbConn := c.MasterDB.Copy()
|
||||
defer dbConn.Close()
|
||||
|
||||
if err := dbConn.StatusCheck(ctx); err != nil {
|
||||
// check postgres
|
||||
_, err := c.MasterDB.Exec("SELECT 1")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -4,23 +4,19 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"go.opencensus.io/trace"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
// User represents the User API method handler set.
|
||||
type Root struct {
|
||||
MasterDB *db.DB
|
||||
MasterDB *sqlx.DB
|
||||
Renderer web.Renderer
|
||||
// ADD OTHER STATE LIKE THE LOGGER AND CONFIG HERE.
|
||||
}
|
||||
|
||||
// List returns all the existing users in the system.
|
||||
func (u *Root) Index(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.Root.Index")
|
||||
defer span.End()
|
||||
|
||||
data := map[string]interface{}{
|
||||
"imgSizes": []int{100, 200, 300, 400, 500},
|
||||
}
|
||||
|
@ -7,14 +7,14 @@ import (
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/mid"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/auth"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
const baseLayoutTmpl = "base.tmpl"
|
||||
|
||||
// API returns a handler for a set of routes.
|
||||
func APP(shutdown chan os.Signal, log *log.Logger, staticDir, templateDir string, masterDB *db.DB, authenticator *auth.Authenticator, renderer web.Renderer) http.Handler {
|
||||
func APP(shutdown chan os.Signal, log *log.Logger, staticDir, templateDir string, masterDB *sqlx.DB, authenticator *auth.Authenticator, renderer web.Renderer) http.Handler {
|
||||
|
||||
// Construct the web.App which holds all routes as well as common Middleware.
|
||||
app := web.NewApp(shutdown, log, mid.Logger(log), mid.Errors(log), mid.Metrics(), mid.Panics())
|
||||
|
@ -4,25 +4,19 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"go.opencensus.io/trace"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
// User represents the User API method handler set.
|
||||
type User struct {
|
||||
MasterDB *db.DB
|
||||
MasterDB *sqlx.DB
|
||||
Renderer web.Renderer
|
||||
// ADD OTHER STATE LIKE THE LOGGER AND CONFIG HERE.
|
||||
}
|
||||
|
||||
// List returns all the existing users in the system.
|
||||
func (u *User) Login(ctx context.Context, w http.ResponseWriter, r *http.Request, params map[string]string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "handlers.User.Login")
|
||||
defer span.End()
|
||||
|
||||
//dbConn := u.MasterDB.Copy()
|
||||
//defer dbConn.Close()
|
||||
|
||||
return u.Renderer.Render(ctx, w, r, baseLayoutTmpl, "user-login.tmpl", web.MIMETextHTMLCharsetUTF8, http.StatusOK, nil)
|
||||
}
|
||||
|
@ -181,10 +181,6 @@ func main() {
|
||||
log.Printf("main : Config : %v\n", string(cfgJSON))
|
||||
}
|
||||
|
||||
// TODO: Validate what is being written to the logs. We don't
|
||||
// want to leak credentials or anything that can be a security risk.
|
||||
log.Printf("main : Config : %v\n", string(cfgJSON))
|
||||
|
||||
// =========================================================================
|
||||
// Init AWS Session
|
||||
var awsSession *session.Session
|
||||
|
@ -3,16 +3,16 @@ package tests
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"log"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/docker"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/web"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pborman/uuid"
|
||||
)
|
||||
|
||||
@ -25,7 +25,7 @@ const (
|
||||
// Test owns state for running/shutting down tests.
|
||||
type Test struct {
|
||||
Log *log.Logger
|
||||
MasterDB *db.DB
|
||||
MasterDB *sqlx.DB
|
||||
container *docker.Container
|
||||
AwsSession *session.Session
|
||||
}
|
||||
|
@ -5,10 +5,10 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"go.opencensus.io/trace"
|
||||
mgo "gopkg.in/mgo.v2"
|
||||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
|
||||
"gopkg.in/mgo.v2"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
@ -23,16 +23,17 @@ var (
|
||||
)
|
||||
|
||||
// List retrieves a list of existing projects from the database.
|
||||
func List(ctx context.Context, dbConn *db.DB) ([]Project, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.project.List")
|
||||
defer span.End()
|
||||
func List(ctx context.Context, dbConn *sqlx.DB) ([]Project, error) {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.project.List")
|
||||
defer span.Finish()
|
||||
|
||||
p := []Project{}
|
||||
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Find(nil).All(&p)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, projectsCollection, f); err != nil {
|
||||
|
||||
if _, err := dbConn.ExecContext(ctx, projectsCollection, f); err != nil {
|
||||
return nil, errors.Wrap(err, "db.projects.find()")
|
||||
}
|
||||
|
||||
@ -40,9 +41,9 @@ func List(ctx context.Context, dbConn *db.DB) ([]Project, error) {
|
||||
}
|
||||
|
||||
// Retrieve gets the specified project from the database.
|
||||
func Retrieve(ctx context.Context, dbConn *db.DB, id string) (*Project, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.project.Retrieve")
|
||||
defer span.End()
|
||||
func Retrieve(ctx context.Context, dbConn *sqlx.DB, id string) (*Project, error) {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.project.Retrieve")
|
||||
defer span.Finish()
|
||||
|
||||
if !bson.IsObjectIdHex(id) {
|
||||
return nil, ErrInvalidID
|
||||
@ -54,20 +55,20 @@ func Retrieve(ctx context.Context, dbConn *db.DB, id string) (*Project, error) {
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Find(q).One(&p)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, projectsCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, projectsCollection, f); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.projects.find(%s)", db.Query(q)))
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.projects.find(%s)", q))
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Create inserts a new project into the database.
|
||||
func Create(ctx context.Context, dbConn *db.DB, cp *NewProject, now time.Time) (*Project, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.project.Create")
|
||||
defer span.End()
|
||||
func Create(ctx context.Context, dbConn *sqlx.DB, cp *NewProject, now time.Time) (*Project, error) {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.project.Create")
|
||||
defer span.Finish()
|
||||
|
||||
// Mongo truncates times to milliseconds when storing. We and do the same
|
||||
// here so the value we return is consistent with what we store.
|
||||
@ -85,17 +86,17 @@ func Create(ctx context.Context, dbConn *db.DB, cp *NewProject, now time.Time) (
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Insert(&p)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, projectsCollection, f); err != nil {
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.projects.insert(%s)", db.Query(&p)))
|
||||
if _, err := dbConn.ExecContext(ctx, projectsCollection, f); err != nil {
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.projects.insert(%v)", &p))
|
||||
}
|
||||
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// Update replaces a project document in the database.
|
||||
func Update(ctx context.Context, dbConn *db.DB, id string, upd UpdateProject, now time.Time) error {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.project.Update")
|
||||
defer span.End()
|
||||
func Update(ctx context.Context, dbConn *sqlx.DB, id string, upd UpdateProject, now time.Time) error {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.project.Update")
|
||||
defer span.Finish()
|
||||
|
||||
if !bson.IsObjectIdHex(id) {
|
||||
return ErrInvalidID
|
||||
@ -126,20 +127,20 @@ func Update(ctx context.Context, dbConn *db.DB, id string, upd UpdateProject, no
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Update(q, m)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, projectsCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, projectsCollection, f); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
return ErrNotFound
|
||||
}
|
||||
return errors.Wrap(err, fmt.Sprintf("db.customers.update(%s, %s)", db.Query(q), db.Query(m)))
|
||||
return errors.Wrap(err, fmt.Sprintf("db.customers.update(%s, %s)", q, m))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes a project from the database.
|
||||
func Delete(ctx context.Context, dbConn *db.DB, id string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.project.Delete")
|
||||
defer span.End()
|
||||
func Delete(ctx context.Context, dbConn *sqlx.DB, id string) error {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.project.Delete")
|
||||
defer span.Finish()
|
||||
|
||||
if !bson.IsObjectIdHex(id) {
|
||||
return ErrInvalidID
|
||||
@ -150,7 +151,7 @@ func Delete(ctx context.Context, dbConn *db.DB, id string) error {
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Remove(q)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, projectsCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, projectsCollection, f); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
@ -6,10 +6,10 @@ import (
|
||||
"time"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/auth"
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/db"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
|
||||
mgo "gopkg.in/mgo.v2"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
@ -32,16 +32,16 @@ var (
|
||||
)
|
||||
|
||||
// List retrieves a list of existing users from the database.
|
||||
func List(ctx context.Context, dbConn *db.DB) ([]User, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.user.List")
|
||||
defer span.End()
|
||||
func List(ctx context.Context, dbConn *sqlx.DB) ([]User, error) {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.user.List")
|
||||
defer span.Finish()
|
||||
|
||||
u := []User{}
|
||||
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Find(nil).All(&u)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, usersCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, usersCollection, f); err != nil {
|
||||
return nil, errors.Wrap(err, "db.users.find()")
|
||||
}
|
||||
|
||||
@ -49,9 +49,9 @@ func List(ctx context.Context, dbConn *db.DB) ([]User, error) {
|
||||
}
|
||||
|
||||
// Retrieve gets the specified user from the database.
|
||||
func Retrieve(ctx context.Context, claims auth.Claims, dbConn *db.DB, id string) (*User, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.user.Retrieve")
|
||||
defer span.End()
|
||||
func Retrieve(ctx context.Context, claims auth.Claims, dbConn *sqlx.DB, id string) (*User, error) {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.user.Retrieve")
|
||||
defer span.Finish()
|
||||
|
||||
if !bson.IsObjectIdHex(id) {
|
||||
return nil, ErrInvalidID
|
||||
@ -68,20 +68,20 @@ func Retrieve(ctx context.Context, claims auth.Claims, dbConn *db.DB, id string)
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Find(q).One(&u)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, usersCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, usersCollection, f); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.users.find(%s)", db.Query(q)))
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.users.find(%s)", q))
|
||||
}
|
||||
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// Create inserts a new user into the database.
|
||||
func Create(ctx context.Context, dbConn *db.DB, nu *NewUser, now time.Time) (*User, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.user.Create")
|
||||
defer span.End()
|
||||
func Create(ctx context.Context, dbConn *sqlx.DB, nu *NewUser, now time.Time) (*User, error) {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.user.Create")
|
||||
defer span.Finish()
|
||||
|
||||
// Mongo truncates times to milliseconds when storing. We and do the same
|
||||
// here so the value we return is consistent with what we store.
|
||||
@ -105,17 +105,17 @@ func Create(ctx context.Context, dbConn *db.DB, nu *NewUser, now time.Time) (*Us
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Insert(&u)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, usersCollection, f); err != nil {
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.users.insert(%s)", db.Query(&u)))
|
||||
if _, err := dbConn.ExecContext(ctx, usersCollection, f); err != nil {
|
||||
return nil, errors.Wrap(err, fmt.Sprintf("db.users.insert(%s)", &u))
|
||||
}
|
||||
|
||||
return &u, nil
|
||||
}
|
||||
|
||||
// Update replaces a user document in the database.
|
||||
func Update(ctx context.Context, dbConn *db.DB, id string, upd *UpdateUser, now time.Time) error {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.user.Update")
|
||||
defer span.End()
|
||||
func Update(ctx context.Context, dbConn *sqlx.DB, id string, upd *UpdateUser, now time.Time) error {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.user.Update")
|
||||
defer span.Finish()
|
||||
|
||||
if !bson.IsObjectIdHex(id) {
|
||||
return ErrInvalidID
|
||||
@ -153,20 +153,20 @@ func Update(ctx context.Context, dbConn *db.DB, id string, upd *UpdateUser, now
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Update(q, m)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, usersCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, usersCollection, f); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
return ErrNotFound
|
||||
}
|
||||
return errors.Wrap(err, fmt.Sprintf("db.customers.update(%s, %s)", db.Query(q), db.Query(m)))
|
||||
return errors.Wrap(err, fmt.Sprintf("db.customers.update(%s, %s)", q, m))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes a user from the database.
|
||||
func Delete(ctx context.Context, dbConn *db.DB, id string) error {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.user.Delete")
|
||||
defer span.End()
|
||||
func Delete(ctx context.Context, dbConn *sqlx.DB, id string) error {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.user.Delete")
|
||||
defer span.Finish()
|
||||
|
||||
if !bson.IsObjectIdHex(id) {
|
||||
return ErrInvalidID
|
||||
@ -177,11 +177,11 @@ func Delete(ctx context.Context, dbConn *db.DB, id string) error {
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Remove(q)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, usersCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, usersCollection, f); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
return ErrNotFound
|
||||
}
|
||||
return errors.Wrap(err, fmt.Sprintf("db.users.remove(%s)", db.Query(q)))
|
||||
return errors.Wrap(err, fmt.Sprintf("db.users.remove(%s)", q))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -195,9 +195,9 @@ type TokenGenerator interface {
|
||||
|
||||
// Authenticate finds a user by their email and verifies their password. On
|
||||
// success it returns a Token that can be used to authenticate in the future.
|
||||
func Authenticate(ctx context.Context, dbConn *db.DB, tknGen TokenGenerator, now time.Time, email, password string) (Token, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "internal.user.Authenticate")
|
||||
defer span.End()
|
||||
func Authenticate(ctx context.Context, dbConn *sqlx.DB, tknGen TokenGenerator, now time.Time, email, password string) (Token, error) {
|
||||
span, ctx := tracer.StartSpanFromContext(ctx, "internal.user.Authenticate")
|
||||
defer span.Finish()
|
||||
|
||||
q := bson.M{"email": email}
|
||||
|
||||
@ -205,14 +205,14 @@ func Authenticate(ctx context.Context, dbConn *db.DB, tknGen TokenGenerator, now
|
||||
f := func(collection *mgo.Collection) error {
|
||||
return collection.Find(q).One(&u)
|
||||
}
|
||||
if err := dbConn.Execute(ctx, usersCollection, f); err != nil {
|
||||
if _, err := dbConn.ExecContext(ctx, usersCollection, f); err != nil {
|
||||
|
||||
// Normally we would return ErrNotFound in this scenario but we do not want
|
||||
// to leak to an unauthenticated user which emails are in the system.
|
||||
if err == mgo.ErrNotFound {
|
||||
return Token{}, ErrAuthenticationFailure
|
||||
}
|
||||
return Token{}, errors.Wrap(err, fmt.Sprintf("db.users.find(%s)", db.Query(q)))
|
||||
return Token{}, errors.Wrap(err, fmt.Sprintf("db.users.find(%s)", q))
|
||||
}
|
||||
|
||||
// Compare the provided password with the saved hash. Use the bcrypt
|
||||
|
@ -9,11 +9,11 @@ import (
|
||||
"os"
|
||||
|
||||
"geeks-accelerator/oss/saas-starter-kit/example-project/internal/platform/flag"
|
||||
"github.com/gitwak/sqlxmigrate"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
_ "github.com/lib/pq"
|
||||
sqltrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql"
|
||||
sqlxtrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/jmoiron/sqlx"
|
||||
"github.com/gitwak/sqlxmigrate"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
// build is the git version of this program. It is set using build flags in the makefile.
|
||||
|
@ -4,10 +4,10 @@ import (
|
||||
"database/sql"
|
||||
"log"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/gitwak/sqlxmigrate"
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// migrationList returns a list of migrations to be executed. If the id of the
|
||||
|
Reference in New Issue
Block a user