1
0
mirror of https://github.com/imgproxy/imgproxy.git synced 2025-01-18 11:12:10 +02:00

412 lines
10 KiB
Go

package otel
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
"time"
"github.com/felixge/httpsnoop"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/detectors/aws/ec2"
"go.opentelemetry.io/contrib/detectors/aws/ecs"
"go.opentelemetry.io/contrib/detectors/aws/eks"
"go.opentelemetry.io/contrib/propagators/autoprop"
"go.opentelemetry.io/contrib/propagators/aws/xray"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/ierrors"
"github.com/imgproxy/imgproxy/v3/metrics/errformat"
"github.com/imgproxy/imgproxy/v3/metrics/stats"
)
type hasSpanCtxKey struct{}
type GaugeFunc func() float64
var (
enabled bool
enabledMetrics bool
tracerProvider *sdktrace.TracerProvider
tracer trace.Tracer
meterProvider *sdkmetric.MeterProvider
meter metric.Meter
propagator propagation.TextMapPropagator
)
func Init() error {
if len(config.OpenTelemetryEndpoint) == 0 {
return nil
}
otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
var (
traceExporter *otlptrace.Exporter
metricExporter sdkmetric.Exporter
err error
)
switch config.OpenTelemetryProtocol {
case "grpc":
traceExporter, metricExporter, err = buildGRPCExporters()
case "https":
traceExporter, metricExporter, err = buildHTTPExporters(false)
case "http":
traceExporter, metricExporter, err = buildHTTPExporters(true)
default:
return fmt.Errorf("Unknown OpenTelemetry protocol: %s", config.OpenTelemetryProtocol)
}
if err != nil {
return err
}
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(config.OpenTelemetryServiceName),
)
awsRes, _ := resource.Detect(
context.Background(),
ec2.NewResourceDetector(),
ecs.NewResourceDetector(),
eks.NewResourceDetector(),
)
if awsRes != nil {
res, _ = resource.Merge(res, awsRes)
}
idg := xray.NewIDGenerator()
tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(traceExporter),
sdktrace.WithIDGenerator(idg),
)
tracer = tracerProvider.Tracer("imgproxy")
if len(config.OpenTelemetryPropagators) > 0 {
propagator, err = autoprop.TextMapPropagator(config.OpenTelemetryPropagators...)
if err != nil {
return err
}
}
enabled = true
if metricExporter == nil {
return nil
}
metricReader := sdkmetric.NewPeriodicReader(
metricExporter,
sdkmetric.WithInterval(5*time.Second),
)
meterProvider = sdkmetric.NewMeterProvider(
sdkmetric.WithResource(res),
sdkmetric.WithReader(metricReader),
)
meter = meterProvider.Meter("imgproxy")
enabledMetrics = true
AddGaugeFunc(
"requests_in_progress",
"A gauge of the number of requests currently being in progress.",
"1",
stats.RequestsInProgress,
)
AddGaugeFunc(
"images_in_progress",
"A gauge of the number of images currently being in progress.",
"1",
stats.ImagesInProgress,
)
return nil
}
func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
tracerOpts := []otlptracegrpc.Option{
otlptracegrpc.WithEndpoint(config.OpenTelemetryEndpoint),
otlptracegrpc.WithDialOption(grpc.WithBlock()),
}
meterOpts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithEndpoint(config.OpenTelemetryEndpoint),
otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
}
tlsConf, err := buildTLSConfig()
if err != nil {
return nil, nil, err
}
if tlsConf != nil {
creds := credentials.NewTLS(tlsConf)
tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
} else if config.OpenTelemetryGRPCInsecure {
tracerOpts = append(tracerOpts, otlptracegrpc.WithInsecure())
meterOpts = append(meterOpts, otlpmetricgrpc.WithInsecure())
}
trctx, trcancel := context.WithTimeout(
context.Background(),
time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
)
defer trcancel()
traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
if err != nil {
err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
}
if !config.OpenTelemetryEnableMetrics {
return traceExporter, nil, err
}
mtctx, mtcancel := context.WithTimeout(
context.Background(),
time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
)
defer mtcancel()
metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
if err != nil {
err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
}
return traceExporter, metricExporter, err
}
func buildHTTPExporters(insecure bool) (*otlptrace.Exporter, sdkmetric.Exporter, error) {
tracerOpts := []otlptracehttp.Option{
otlptracehttp.WithEndpoint(config.OpenTelemetryEndpoint),
}
meterOpts := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(config.OpenTelemetryEndpoint),
}
if insecure {
tracerOpts = append(tracerOpts, otlptracehttp.WithInsecure())
meterOpts = append(meterOpts, otlpmetrichttp.WithInsecure())
} else {
tlsConf, err := buildTLSConfig()
if err != nil {
return nil, nil, err
}
if tlsConf != nil {
tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
}
}
trctx, trcancel := context.WithTimeout(
context.Background(),
time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
)
defer trcancel()
traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
if err != nil {
err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
}
if !config.OpenTelemetryEnableMetrics {
return traceExporter, nil, err
}
mtctx, mtcancel := context.WithTimeout(
context.Background(),
time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
)
defer mtcancel()
metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
if err != nil {
err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
}
return traceExporter, metricExporter, err
}
func buildTLSConfig() (*tls.Config, error) {
if len(config.OpenTelemetryServerCert) == 0 {
return nil, nil
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM([]byte(config.OpenTelemetryServerCert)) {
return nil, fmt.Errorf("Can't load OpenTelemetry server cert")
}
tlsConf := tls.Config{RootCAs: certPool}
if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
cert, err := tls.X509KeyPair(
[]byte(config.OpenTelemetryClientCert),
[]byte(config.OpenTelemetryClientKey),
)
if err != nil {
return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
}
tlsConf.Certificates = []tls.Certificate{cert}
}
return &tlsConf, nil
}
func Stop() {
if enabled {
trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer trcancel()
tracerProvider.Shutdown(trctx)
if meterProvider != nil {
mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer mtcancel()
meterProvider.Shutdown(mtctx)
}
}
}
func Enabled() bool {
return enabled
}
func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
if !enabled {
return ctx, func() {}, rw
}
if propagator != nil {
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
}
ctx, span := tracer.Start(
ctx, "/request",
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(semconv.NetAttributesFromHTTPRequest("tcp", r)...),
trace.WithAttributes(semconv.EndUserAttributesFromHTTPRequest(r)...),
trace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest("imgproxy", "/", r)...),
)
ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
return func(statusCode int) {
attrs := semconv.HTTPAttributesFromHTTPStatusCode(statusCode)
spanStatus, spanMessage := semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(statusCode, trace.SpanKindServer)
span.SetAttributes(attrs...)
span.SetStatus(spanStatus, spanMessage)
next(statusCode)
}
},
})
cancel := func() { span.End() }
return ctx, cancel, newRw
}
func StartSpan(ctx context.Context, name string) context.CancelFunc {
if !enabled {
return func() {}
}
if ctx.Value(hasSpanCtxKey{}) != nil {
_, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
return func() { span.End() }
}
return func() {}
}
func SendError(ctx context.Context, errType string, err error) {
if !enabled {
return
}
span := trace.SpanFromContext(ctx)
attributes := []attribute.KeyValue{
semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
semconv.ExceptionMessageKey.String(err.Error()),
}
if ierr, ok := err.(*ierrors.Error); ok {
if stack := ierr.FormatStack(); len(stack) != 0 {
attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
}
}
span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
}
func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
if !enabledMetrics {
return
}
gauge, _ := meter.AsyncFloat64().Gauge(
name,
instrument.WithUnit(unit.Unit(u)),
instrument.WithDescription(desc),
)
if err := meter.RegisterCallback(
[]instrument.Asynchronous{
gauge,
},
func(ctx context.Context) {
gauge.Observe(ctx, f())
},
); err != nil {
logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
}
}
type errorHandler struct {
entry *logrus.Entry
}
func (h *errorHandler) Handle(err error) {
h.entry.Warn(err.Error())
}