1
0
mirror of https://github.com/imgproxy/imgproxy.git synced 2025-12-23 22:11:10 +02:00
Files
imgproxy/monitoring/cloudwatch/cloudwatch.go
2025-10-01 20:05:06 +02:00

193 lines
5.1 KiB
Go

package cloudwatch
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
cloudwatchTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/imgproxy/imgproxy/v3/monitoring/stats"
"github.com/imgproxy/imgproxy/v3/vips"
)
const (
// AWS CloudWatch PutMetrics timeout
putMetricsTimeout = 30 * time.Second
// default AWS region to set if neither aws env region nor config region are set
defaultAwsRegion = "us-west-1"
)
// CloudWatch holds CloudWatch client and configuration
type CloudWatch struct {
config *Config
stats *stats.Stats
client *cloudwatch.Client
collectorCtx context.Context
collectorCtxCancel context.CancelFunc
}
// New creates a new CloudWatch instance
func New(ctx context.Context, config *Config, stats *stats.Stats) (*CloudWatch, error) {
cw := &CloudWatch{
config: config,
stats: stats,
}
if !config.Enabled() {
return cw, nil
}
if err := config.Validate(); err != nil {
return nil, err
}
conf, err := awsConfig.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("can't load CloudWatch config: %s", err)
}
if len(config.Region) > 0 {
conf.Region = config.Region
}
if len(conf.Region) == 0 {
conf.Region = defaultAwsRegion
}
cw.client = cloudwatch.NewFromConfig(conf)
cw.collectorCtx, cw.collectorCtxCancel = context.WithCancel(ctx)
go cw.runMetricsCollector()
return cw, nil
}
// Enabled returns true if CloudWatch is enabled
func (cw *CloudWatch) Enabled() bool {
return cw.config.Enabled()
}
// Stop stops the CloudWatch metrics collection
func (cw *CloudWatch) Stop() {
if cw.collectorCtxCancel != nil {
cw.collectorCtxCancel()
}
}
// runMetricsCollector collects and sends metrics to CloudWatch
func (cw *CloudWatch) runMetricsCollector() {
tick := time.NewTicker(cw.config.MetricsInterval)
defer tick.Stop()
dimension := cloudwatchTypes.Dimension{
Name: aws.String("ServiceName"),
Value: aws.String(cw.config.ServiceName),
}
dimensions := []cloudwatchTypes.Dimension{dimension}
namespace := aws.String(cw.config.Namespace)
// metric names
metricNameWorkers := aws.String("Workers")
metricNameRequestsInProgress := aws.String("RequestsInProgress")
metricNameImagesInProgress := aws.String("ImagesInProgress")
metricNameConcurrencyUtilization := aws.String("ConcurrencyUtilization")
metricNameWorkersUtilization := aws.String("WorkersUtilization")
metricNameVipsMemory := aws.String("VipsMemory")
metricNameVipsMaxMemory := aws.String("VipsMaxMemory")
metricNameVipsAllocs := aws.String("VipsAllocs")
for {
select {
case <-tick.C:
// 8 is the number of metrics we send
metrics := make([]cloudwatchTypes.MetricDatum, 0, 8)
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameWorkers,
Unit: cloudwatchTypes.StandardUnitCount,
Value: aws.Float64(float64(cw.stats.WorkersNumber)),
})
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameRequestsInProgress,
Unit: cloudwatchTypes.StandardUnitCount,
Value: aws.Float64(cw.stats.RequestsInProgress()),
})
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameImagesInProgress,
Unit: cloudwatchTypes.StandardUnitCount,
Value: aws.Float64(cw.stats.ImagesInProgress()),
})
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameConcurrencyUtilization,
Unit: cloudwatchTypes.StandardUnitPercent,
Value: aws.Float64(
cw.stats.WorkersUtilization(),
),
})
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameWorkersUtilization,
Unit: cloudwatchTypes.StandardUnitPercent,
Value: aws.Float64(
cw.stats.WorkersUtilization(),
),
})
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameVipsMemory,
Unit: cloudwatchTypes.StandardUnitBytes,
Value: aws.Float64(vips.GetMem()),
})
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameVipsMaxMemory,
Unit: cloudwatchTypes.StandardUnitBytes,
Value: aws.Float64(vips.GetMemHighwater()),
})
metrics = append(metrics, cloudwatchTypes.MetricDatum{
Dimensions: dimensions,
MetricName: metricNameVipsAllocs,
Unit: cloudwatchTypes.StandardUnitCount,
Value: aws.Float64(vips.GetAllocs()),
})
input := cloudwatch.PutMetricDataInput{
Namespace: namespace,
MetricData: metrics,
}
func() {
ctx, cancel := context.WithTimeout(cw.collectorCtx, putMetricsTimeout)
defer cancel()
if _, err := cw.client.PutMetricData(ctx, &input); err != nil {
slog.Warn(fmt.Sprintf("can't send CloudWatch metrics: %s", err))
}
}()
case <-cw.collectorCtx.Done():
return
}
}
}