diff --git a/CHANGELOG.md b/CHANGELOG.md index febf6a41c..b9ee1c805 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#6710) - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#6710) - Validate exponential histogram scale range for Prometheus compatibility in `go.opentelemetry.io/otel/exporters/prometheus`. (#6822) +- Context cancellation during metric pipeline produce does not corrupt data in `go.opentelemetry.io/otel/sdk/metric`. (#6914) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 2240c26e9..7bdb699ca 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -121,6 +121,14 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) { // // This method is safe to call concurrently. func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error { + // Only check if context is already cancelled before starting, not inside or after callback loops. + // If this method returns after executing some callbacks but before running all aggregations, + // internal aggregation state can be corrupted and result in incorrect data returned + // by future produce calls. + if err := ctx.Err(); err != nil { + return err + } + p.Lock() defer p.Unlock() @@ -130,12 +138,6 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) if e := c(ctx); e != nil { err = errors.Join(err, e) } - if err := ctx.Err(); err != nil { - rm.Resource = nil - clear(rm.ScopeMetrics) // Erase elements to let GC collect objects. - rm.ScopeMetrics = rm.ScopeMetrics[:0] - return err - } } for e := p.multiCallbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) @@ -143,13 +145,6 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) if e := f(ctx); e != nil { err = errors.Join(err, e) } - if err := ctx.Err(); err != nil { - // This means the context expired before we finished running callbacks. - rm.Resource = nil - clear(rm.ScopeMetrics) // Erase elements to let GC collect objects. - rm.ScopeMetrics = rm.ScopeMetrics[:0] - return err - } } rm.Resource = p.resource diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index b1dc5dcbf..57d9f2901 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -613,3 +613,134 @@ func TestPipelineWithMultipleReaders(t *testing.T) { assert.Equal(t, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value) } } + +// TestPipelineProduceErrors tests the issue described in https://github.com/open-telemetry/opentelemetry-go/issues/6344. +// Earlier implementations of the pipeline produce method could corrupt metric data point state when the passed context +// was canceled during execution of callbacks. In this case, corroption was the result of some or all callbacks being +// invoked without instrument compAgg functions called. +func TestPipelineProduceErrors(t *testing.T) { + // Create a test pipeline with aggregations + pipeReader := NewManualReader() + pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter) + + // Set up an observable with callbacks + var testObsID observableID[int64] + aggBuilder := aggregate.Builder[int64]{Temporality: metricdata.CumulativeTemporality} + measure, _ := aggBuilder.Sum(true) + pipe.addInt64Measure(testObsID, []aggregate.Measure[int64]{measure}) + + // Add an aggregation that just sets the data point value to the number of times the aggregation is invoked + aggCallCount := 0 + inst := instrumentSync{ + name: "test-metric", + description: "test description", + unit: "test unit", + compAgg: func(dest *metricdata.Aggregation) int { + aggCallCount++ + + *dest = metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{{Value: int64(aggCallCount)}}, + } + return aggCallCount + }, + } + pipe.addSync(instrumentation.Scope{Name: "test"}, inst) + + ctx, cancelCtx := context.WithCancel(context.Background()) + var shouldCancelContext bool // When true, the second callback cancels ctx + var shouldReturnError bool // When true, the third callback returns an error + var callbackCounts [3]int + + // Callback 1: cancels the context during execution but continues to populate data + pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error { + callbackCounts[0]++ + for _, m := range pipe.int64Measures[testObsID] { + m(ctx, 123, *attribute.EmptySet()) + } + return nil + }) + + // Callback 2: populates int64 observable data + pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error { + callbackCounts[1]++ + if shouldCancelContext { + cancelCtx() + } + return nil + }) + + // Callback 3: return an error + pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error { + callbackCounts[2]++ + if shouldReturnError { + return fmt.Errorf("test callback error") + } + return nil + }) + + assertMetrics := func(rm *metricdata.ResourceMetrics, expectVal int64) { + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + metricdatatest.AssertEqual(t, metricdata.Metrics{ + Name: inst.name, + Description: inst.description, + Unit: inst.unit, + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{{Value: expectVal}}, + }, + }, rm.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp()) + } + + t.Run("no errors", func(t *testing.T) { + var rm metricdata.ResourceMetrics + err := pipe.produce(ctx, &rm) + require.NoError(t, err) + + assert.Equal(t, [3]int{1, 1, 1}, callbackCounts) + assert.Equal(t, 1, aggCallCount) + + assertMetrics(&rm, 1) + }) + + t.Run("callback returns error", func(t *testing.T) { + shouldReturnError = true + + var rm metricdata.ResourceMetrics + err := pipe.produce(ctx, &rm) + require.ErrorContains(t, err, "test callback error") + + // Even though a callback returned an error, the agg function is still called + assert.Equal(t, [3]int{2, 2, 2}, callbackCounts) + assert.Equal(t, 2, aggCallCount) + + assertMetrics(&rm, 2) + }) + + t.Run("context canceled during produce", func(t *testing.T) { + shouldCancelContext = true + + var rm metricdata.ResourceMetrics + err := pipe.produce(ctx, &rm) + require.ErrorContains(t, err, "test callback error") + + // Even though the context was canceled midway through invoking callbacks, + // all remaining callbacks and agg functions are still called + assert.Equal(t, [3]int{3, 3, 3}, callbackCounts) + assert.Equal(t, 3, aggCallCount) + }) + + t.Run("context already cancelled", func(t *testing.T) { + var output metricdata.ResourceMetrics + err := pipe.produce(ctx, &output) + require.ErrorIs(t, err, context.Canceled) + + // No callbacks or agg functions are called since the context was canceled prior to invoking + // the produce method + assert.Equal(t, [3]int{3, 3, 3}, callbackCounts) + assert.Equal(t, 3, aggCallCount) + }) +}