mirror of
https://github.com/imgproxy/imgproxy.git
synced 2025-12-23 22:11:10 +02:00
193 lines
5.1 KiB
Go
193 lines
5.1 KiB
Go
package cloudwatch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
|
|
cloudwatchTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
|
|
|
"github.com/imgproxy/imgproxy/v3/monitoring/stats"
|
|
"github.com/imgproxy/imgproxy/v3/vips"
|
|
)
|
|
|
|
const (
|
|
// AWS CloudWatch PutMetrics timeout
|
|
putMetricsTimeout = 30 * time.Second
|
|
|
|
// default AWS region to set if neither aws env region nor config region are set
|
|
defaultAwsRegion = "us-west-1"
|
|
)
|
|
|
|
// CloudWatch holds CloudWatch client and configuration
|
|
type CloudWatch struct {
|
|
config *Config
|
|
stats *stats.Stats
|
|
|
|
client *cloudwatch.Client
|
|
|
|
collectorCtx context.Context
|
|
collectorCtxCancel context.CancelFunc
|
|
}
|
|
|
|
// New creates a new CloudWatch instance
|
|
func New(ctx context.Context, config *Config, stats *stats.Stats) (*CloudWatch, error) {
|
|
cw := &CloudWatch{
|
|
config: config,
|
|
stats: stats,
|
|
}
|
|
|
|
if !config.Enabled() {
|
|
return cw, nil
|
|
}
|
|
|
|
if err := config.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
conf, err := awsConfig.LoadDefaultConfig(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't load CloudWatch config: %s", err)
|
|
}
|
|
|
|
if len(config.Region) > 0 {
|
|
conf.Region = config.Region
|
|
}
|
|
|
|
if len(conf.Region) == 0 {
|
|
conf.Region = defaultAwsRegion
|
|
}
|
|
|
|
cw.client = cloudwatch.NewFromConfig(conf)
|
|
cw.collectorCtx, cw.collectorCtxCancel = context.WithCancel(ctx)
|
|
|
|
go cw.runMetricsCollector()
|
|
|
|
return cw, nil
|
|
}
|
|
|
|
// Enabled returns true if CloudWatch is enabled
|
|
func (cw *CloudWatch) Enabled() bool {
|
|
return cw.config.Enabled()
|
|
}
|
|
|
|
// Stop stops the CloudWatch metrics collection
|
|
func (cw *CloudWatch) Stop() {
|
|
if cw.collectorCtxCancel != nil {
|
|
cw.collectorCtxCancel()
|
|
}
|
|
}
|
|
|
|
// runMetricsCollector collects and sends metrics to CloudWatch
|
|
func (cw *CloudWatch) runMetricsCollector() {
|
|
tick := time.NewTicker(cw.config.MetricsInterval)
|
|
defer tick.Stop()
|
|
|
|
dimension := cloudwatchTypes.Dimension{
|
|
Name: aws.String("ServiceName"),
|
|
Value: aws.String(cw.config.ServiceName),
|
|
}
|
|
|
|
dimensions := []cloudwatchTypes.Dimension{dimension}
|
|
|
|
namespace := aws.String(cw.config.Namespace)
|
|
|
|
// metric names
|
|
metricNameWorkers := aws.String("Workers")
|
|
metricNameRequestsInProgress := aws.String("RequestsInProgress")
|
|
metricNameImagesInProgress := aws.String("ImagesInProgress")
|
|
metricNameConcurrencyUtilization := aws.String("ConcurrencyUtilization")
|
|
metricNameWorkersUtilization := aws.String("WorkersUtilization")
|
|
metricNameVipsMemory := aws.String("VipsMemory")
|
|
metricNameVipsMaxMemory := aws.String("VipsMaxMemory")
|
|
metricNameVipsAllocs := aws.String("VipsAllocs")
|
|
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
// 8 is the number of metrics we send
|
|
metrics := make([]cloudwatchTypes.MetricDatum, 0, 8)
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameWorkers,
|
|
Unit: cloudwatchTypes.StandardUnitCount,
|
|
Value: aws.Float64(float64(cw.stats.WorkersNumber)),
|
|
})
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameRequestsInProgress,
|
|
Unit: cloudwatchTypes.StandardUnitCount,
|
|
Value: aws.Float64(cw.stats.RequestsInProgress()),
|
|
})
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameImagesInProgress,
|
|
Unit: cloudwatchTypes.StandardUnitCount,
|
|
Value: aws.Float64(cw.stats.ImagesInProgress()),
|
|
})
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameConcurrencyUtilization,
|
|
Unit: cloudwatchTypes.StandardUnitPercent,
|
|
Value: aws.Float64(
|
|
cw.stats.WorkersUtilization(),
|
|
),
|
|
})
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameWorkersUtilization,
|
|
Unit: cloudwatchTypes.StandardUnitPercent,
|
|
Value: aws.Float64(
|
|
cw.stats.WorkersUtilization(),
|
|
),
|
|
})
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameVipsMemory,
|
|
Unit: cloudwatchTypes.StandardUnitBytes,
|
|
Value: aws.Float64(vips.GetMem()),
|
|
})
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameVipsMaxMemory,
|
|
Unit: cloudwatchTypes.StandardUnitBytes,
|
|
Value: aws.Float64(vips.GetMemHighwater()),
|
|
})
|
|
|
|
metrics = append(metrics, cloudwatchTypes.MetricDatum{
|
|
Dimensions: dimensions,
|
|
MetricName: metricNameVipsAllocs,
|
|
Unit: cloudwatchTypes.StandardUnitCount,
|
|
Value: aws.Float64(vips.GetAllocs()),
|
|
})
|
|
|
|
input := cloudwatch.PutMetricDataInput{
|
|
Namespace: namespace,
|
|
MetricData: metrics,
|
|
}
|
|
|
|
func() {
|
|
ctx, cancel := context.WithTimeout(cw.collectorCtx, putMetricsTimeout)
|
|
defer cancel()
|
|
|
|
if _, err := cw.client.PutMetricData(ctx, &input); err != nil {
|
|
slog.Warn(fmt.Sprintf("can't send CloudWatch metrics: %s", err))
|
|
}
|
|
}()
|
|
case <-cw.collectorCtx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|