diff --git a/config/config.go b/config/config.go index 38946318..ebb40631 100644 --- a/config/config.go +++ b/config/config.go @@ -156,6 +156,10 @@ var ( OpenTelemetryTraceIDGenerator string OpenTelemetryConnectionTimeout int + CloudWatchServiceName string + CloudWatchNamespace string + CloudWatchRegion string + BugsnagKey string BugsnagStage string @@ -334,6 +338,10 @@ func Reset() { OpenTelemetryTraceIDGenerator = "xray" OpenTelemetryConnectionTimeout = 5 + CloudWatchServiceName = "" + CloudWatchNamespace = "imgproxy" + CloudWatchRegion = "" + BugsnagKey = "" BugsnagStage = "production" @@ -527,6 +535,10 @@ func Configure() error { configurators.String(&OpenTelemetryTraceIDGenerator, "IMGPROXY_OPEN_TELEMETRY_TRACE_ID_GENERATOR") configurators.Int(&OpenTelemetryConnectionTimeout, "IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT") + configurators.String(&CloudWatchServiceName, "IMGPROXY_CLOUD_WATCH_SERVICE_NAME") + configurators.String(&CloudWatchNamespace, "IMGPROXY_CLOUD_WATCH_NAMESPACE") + configurators.String(&CloudWatchRegion, "IMGPROXY_CLOUD_WATCH_REGION") + configurators.String(&BugsnagKey, "IMGPROXY_BUGSNAG_KEY") configurators.String(&BugsnagStage, "IMGPROXY_BUGSNAG_STAGE") configurators.String(&HoneybadgerKey, "IMGPROXY_HONEYBADGER_KEY") diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go new file mode 100644 index 00000000..b0b1fac0 --- /dev/null +++ b/metrics/cloudwatch/cloudwatch.go @@ -0,0 +1,250 @@ +package cloudwatch + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/imgproxy/imgproxy/v3/config" + "github.com/imgproxy/imgproxy/v3/imath" + "github.com/imgproxy/imgproxy/v3/metrics/stats" + "github.com/sirupsen/logrus" +) + +type GaugeFunc func() float64 + +type gauge struct { + unit string + f GaugeFunc +} + +type bufferStats struct { + count int + sum, min, max int +} + +var ( + enabled bool + + client *cloudwatch.CloudWatch + + gauges = make(map[string]gauge) + gaugesMutex sync.RWMutex + + collectorCtx context.Context + collectorCtxCancel context.CancelFunc + + bufferDefaultSizes = make(map[string]int) + bufferMaxSizes = make(map[string]int) + bufferSizeStats = make(map[string]*bufferStats) + bufferStatsMutex sync.Mutex +) + +func Init() error { + if len(config.CloudWatchServiceName) == 0 { + return nil + } + + conf := aws.NewConfig() + + if len(config.CloudWatchRegion) > 0 { + conf = conf.WithRegion(config.CloudWatchRegion) + } + + sess, err := session.NewSession() + if err != nil { + return fmt.Errorf("Can't create CloudWatch session: %s", err) + } + + if sess.Config.Region == nil || len(*sess.Config.Region) == 0 { + sess.Config.Region = aws.String("us-west-1") + } + + client = cloudwatch.New(sess, conf) + + collectorCtx, collectorCtxCancel = context.WithCancel(context.Background()) + + go runMetricsCollector() + + enabled = true + + return nil +} + +func Stop() { + if enabled { + collectorCtxCancel() + } +} + +func Enabled() bool { + return enabled +} + +func AddGaugeFunc(name, unit string, f GaugeFunc) { + gaugesMutex.Lock() + defer gaugesMutex.Unlock() + + gauges[name] = gauge{unit: unit, f: f} +} + +func ObserveBufferSize(t string, size int) { + if enabled { + bufferStatsMutex.Lock() + defer bufferStatsMutex.Unlock() + + sizef := size + + stats, ok := bufferSizeStats[t] + if !ok { + stats = &bufferStats{count: 1, sum: sizef, min: sizef, max: sizef} + bufferSizeStats[t] = stats + return + } + + stats.count += 1 + stats.sum += sizef + stats.min = imath.Min(stats.min, sizef) + stats.max = imath.Max(stats.max, sizef) + } +} + +func SetBufferDefaultSize(t string, size int) { + if enabled { + bufferStatsMutex.Lock() + defer bufferStatsMutex.Unlock() + + bufferDefaultSizes[t] = size + } +} + +func SetBufferMaxSize(t string, size int) { + if enabled { + bufferStatsMutex.Lock() + defer bufferStatsMutex.Unlock() + + bufferMaxSizes[t] = size + } +} + +func runMetricsCollector() { + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + + dimension := &cloudwatch.Dimension{ + Name: aws.String("ServiceName"), + Value: aws.String(config.CloudWatchServiceName), + } + + bufferDimensions := make(map[string]*cloudwatch.Dimension) + bufferDimension := func(t string) *cloudwatch.Dimension { + if d, ok := bufferDimensions[t]; ok { + return d + } + + d := &cloudwatch.Dimension{ + Name: aws.String("BufferType"), + Value: aws.String(t), + } + + bufferDimensions[t] = d + + return d + } + + for { + select { + case <-tick.C: + metricsCount := len(gauges) + len(bufferDefaultSizes) + len(bufferMaxSizes) + len(bufferSizeStats) + 3 + metrics := make([]*cloudwatch.MetricDatum, 0, metricsCount) + + func() { + gaugesMutex.RLock() + defer gaugesMutex.RUnlock() + + for name, g := range gauges { + metrics = append(metrics, &cloudwatch.MetricDatum{ + Dimensions: []*cloudwatch.Dimension{dimension}, + MetricName: aws.String(name), + Unit: aws.String(g.unit), + Value: aws.Float64(g.f()), + }) + } + }() + + func() { + bufferStatsMutex.Lock() + defer bufferStatsMutex.Unlock() + + for t, size := range bufferDefaultSizes { + metrics = append(metrics, &cloudwatch.MetricDatum{ + Dimensions: []*cloudwatch.Dimension{dimension, bufferDimension(t)}, + MetricName: aws.String("BufferDefaultSize"), + Unit: aws.String("Bytes"), + Value: aws.Float64(float64(size)), + }) + } + + for t, size := range bufferMaxSizes { + metrics = append(metrics, &cloudwatch.MetricDatum{ + Dimensions: []*cloudwatch.Dimension{dimension, bufferDimension(t)}, + MetricName: aws.String("BufferMaximumSize"), + Unit: aws.String("Bytes"), + Value: aws.Float64(float64(size)), + }) + } + + for t, stats := range bufferSizeStats { + metrics = append(metrics, &cloudwatch.MetricDatum{ + Dimensions: []*cloudwatch.Dimension{dimension, bufferDimension(t)}, + MetricName: aws.String("BufferSize"), + Unit: aws.String("Bytes"), + StatisticValues: &cloudwatch.StatisticSet{ + SampleCount: aws.Float64(float64(stats.count)), + Sum: aws.Float64(float64(stats.sum)), + Minimum: aws.Float64(float64(stats.min)), + Maximum: aws.Float64(float64(stats.max)), + }, + }) + } + }() + + metrics = append(metrics, &cloudwatch.MetricDatum{ + Dimensions: []*cloudwatch.Dimension{dimension}, + MetricName: aws.String("RequestsInProgress"), + Unit: aws.String("Count"), + Value: aws.Float64(stats.RequestsInProgress()), + }) + + metrics = append(metrics, &cloudwatch.MetricDatum{ + Dimensions: []*cloudwatch.Dimension{dimension}, + MetricName: aws.String("ImagesInProgress"), + Unit: aws.String("Count"), + Value: aws.Float64(stats.ImagesInProgress()), + }) + + metrics = append(metrics, &cloudwatch.MetricDatum{ + Dimensions: []*cloudwatch.Dimension{dimension}, + MetricName: aws.String("ConcurrencyUtilization"), + Unit: aws.String("Percent"), + Value: aws.Float64( + stats.RequestsInProgress() / float64(config.Concurrency) * 100.0, + ), + }) + + _, err := client.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(config.CloudWatchNamespace), + MetricData: metrics, + }) + if err != nil { + logrus.Warnf("Can't send CloudWatch metrics: %s", err) + } + case <-collectorCtx.Done(): + return + } + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index c7560f09..1c5aa579 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -4,6 +4,7 @@ import ( "context" "net/http" + "github.com/imgproxy/imgproxy/v3/metrics/cloudwatch" "github.com/imgproxy/imgproxy/v3/metrics/datadog" "github.com/imgproxy/imgproxy/v3/metrics/newrelic" "github.com/imgproxy/imgproxy/v3/metrics/otel" @@ -23,6 +24,10 @@ func Init() error { return err } + if err := cloudwatch.Init(); err != nil { + return err + } + return nil } @@ -30,13 +35,15 @@ func Stop() { newrelic.Stop() datadog.Stop() otel.Stop() + cloudwatch.Stop() } func Enabled() bool { return prometheus.Enabled() || newrelic.Enabled() || datadog.Enabled() || - otel.Enabled() + otel.Enabled() || + cloudwatch.Enabled() } func StartRequest(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) { @@ -130,16 +137,19 @@ func ObserveBufferSize(t string, size int) { prometheus.ObserveBufferSize(t, size) newrelic.ObserveBufferSize(t, size) datadog.ObserveBufferSize(t, size) + cloudwatch.ObserveBufferSize(t, size) } func SetBufferDefaultSize(t string, size int) { prometheus.SetBufferDefaultSize(t, size) newrelic.SetBufferDefaultSize(t, size) datadog.SetBufferDefaultSize(t, size) + cloudwatch.SetBufferDefaultSize(t, size) } func SetBufferMaxSize(t string, size int) { prometheus.SetBufferMaxSize(t, size) newrelic.SetBufferMaxSize(t, size) datadog.SetBufferMaxSize(t, size) + cloudwatch.SetBufferMaxSize(t, size) } diff --git a/vips/vips.go b/vips/vips.go index da5ed60d..e6419874 100644 --- a/vips/vips.go +++ b/vips/vips.go @@ -21,6 +21,7 @@ import ( "github.com/imgproxy/imgproxy/v3/ierrors" "github.com/imgproxy/imgproxy/v3/imagedata" "github.com/imgproxy/imgproxy/v3/imagetype" + "github.com/imgproxy/imgproxy/v3/metrics/cloudwatch" "github.com/imgproxy/imgproxy/v3/metrics/datadog" "github.com/imgproxy/imgproxy/v3/metrics/newrelic" "github.com/imgproxy/imgproxy/v3/metrics/otel" @@ -125,6 +126,10 @@ func Init() error { GetAllocs, ) + cloudwatch.AddGaugeFunc("VipsMemory", "Bytes", GetMem) + cloudwatch.AddGaugeFunc("VipsMaxMemory", "Bytes", GetMemHighwater) + cloudwatch.AddGaugeFunc("VipsAllocs", "Count", GetAllocs) + return nil }