mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-03 22:52:30 +02:00
Update the metric Export
interface to accept a *ResourceMetrics
instead of ResourceMetrics
(#3853)
* Change the signature of Export method * Pass tests for otlp exporter * Pass tests for otlp grpc and http packages * Update opencensus bridge * Refactor and pass tests for stdoutmetric package * Update periodic reader tests * Update changelog * Apply suggestions * Apply suggestions * Update CHANGELOG.md --------- Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
parent
3a40e65a38
commit
7fc24d2b14
@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Avoid creating new objects on all calls to `WithDeferredSetup` and `SkipContextSetup` in OpenTracing bridge. (#3833)
|
||||
- The `New` and `Detect` functions from `go.opentelemetry.io/otel/sdk/resource` return errors that wrap underlying errors instead of just containing the underlying error strings. (#3844)
|
||||
- Both the `Histogram` and `HistogramDataPoint` are redefined with a generic argument of `[N int64 | float64]` in `go.opentelemetry.io/otel/sdk/metric/metricdata`. (#3849)
|
||||
- The metric `Export` interface from `go.opentelemetry.io/otel/sdk/metric` accepts a `*ResourceMetrics` instead of `ResourceMetrics`. (#3853)
|
||||
|
||||
### Removed
|
||||
|
||||
|
@ -85,7 +85,7 @@ func (e *exporter) ExportMetrics(ctx context.Context, ocmetrics []*ocmetricdata.
|
||||
if len(otelmetrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
return e.base.Export(ctx, metricdata.ResourceMetrics{
|
||||
return e.base.Export(ctx, &metricdata.ResourceMetrics{
|
||||
Resource: e.res,
|
||||
ScopeMetrics: []metricdata.ScopeMetrics{
|
||||
{
|
||||
|
@ -278,9 +278,9 @@ type fakeExporter struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeExporter) Export(ctx context.Context, data metricdata.ResourceMetrics) error {
|
||||
func (f *fakeExporter) Export(ctx context.Context, data *metricdata.ResourceMetrics) error {
|
||||
if f.err == nil {
|
||||
f.data = &data
|
||||
f.data = data
|
||||
}
|
||||
return f.err
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation
|
||||
}
|
||||
|
||||
// Export transforms and transmits metric data to an OTLP receiver.
|
||||
func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error {
|
||||
func (e *exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
||||
otlpRm, err := transform.ResourceMetrics(rm)
|
||||
// Best effort upload of transformable metrics.
|
||||
e.clientMu.Lock()
|
||||
|
@ -60,7 +60,7 @@ func TestExporterClientConcurrency(t *testing.T) {
|
||||
const goroutines = 5
|
||||
|
||||
exp := New(&client{})
|
||||
rm := metricdata.ResourceMetrics{}
|
||||
rm := new(metricdata.ResourceMetrics)
|
||||
ctx := context.Background()
|
||||
|
||||
done := make(chan struct{})
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
// ResourceMetrics returns an OTLP ResourceMetrics generated from rm. If rm
|
||||
// contains invalid ScopeMetrics, an error will be returned along with an OTLP
|
||||
// ResourceMetrics that contains partial OTLP ScopeMetrics.
|
||||
func ResourceMetrics(rm metricdata.ResourceMetrics) (*mpb.ResourceMetrics, error) {
|
||||
func ResourceMetrics(rm *metricdata.ResourceMetrics) (*mpb.ResourceMetrics, error) {
|
||||
sms, err := ScopeMetrics(rm.ScopeMetrics)
|
||||
return &mpb.ResourceMetrics{
|
||||
Resource: &rpb.Resource{
|
||||
|
@ -345,7 +345,7 @@ var (
|
||||
},
|
||||
}
|
||||
|
||||
otelResourceMetrics = metricdata.ResourceMetrics{
|
||||
otelResourceMetrics = &metricdata.ResourceMetrics{
|
||||
Resource: otelRes,
|
||||
ScopeMetrics: otelScopeMetrics,
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ func TestConfig(t *testing.T) {
|
||||
exp, coll := factoryFunc(nil, WithHeaders(headers))
|
||||
t.Cleanup(coll.Shutdown)
|
||||
ctx := context.Background()
|
||||
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
// Ensure everything is flushed.
|
||||
require.NoError(t, exp.Shutdown(ctx))
|
||||
|
||||
@ -187,7 +187,7 @@ func TestConfig(t *testing.T) {
|
||||
t.Cleanup(coll.Shutdown)
|
||||
ctx := context.Background()
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
err := exp.Export(ctx, metricdata.ResourceMetrics{})
|
||||
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
|
||||
assert.ErrorContains(t, err, context.DeadlineExceeded.Error())
|
||||
})
|
||||
|
||||
@ -197,7 +197,7 @@ func TestConfig(t *testing.T) {
|
||||
exp, coll := factoryFunc(nil, WithDialOption(grpc.WithUserAgent(customerUserAgent)))
|
||||
t.Cleanup(coll.Shutdown)
|
||||
ctx := context.Background()
|
||||
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
// Ensure everything is flushed.
|
||||
require.NoError(t, exp.Shutdown(ctx))
|
||||
|
||||
|
@ -70,7 +70,7 @@ func TestConfig(t *testing.T) {
|
||||
exp, coll := factoryFunc("", nil, WithHeaders(headers))
|
||||
ctx := context.Background()
|
||||
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
// Ensure everything is flushed.
|
||||
require.NoError(t, exp.Shutdown(ctx))
|
||||
|
||||
@ -94,7 +94,7 @@ func TestConfig(t *testing.T) {
|
||||
// Push this after Shutdown so the HTTP server doesn't hang.
|
||||
t.Cleanup(func() { close(rCh) })
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
err := exp.Export(ctx, metricdata.ResourceMetrics{})
|
||||
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
|
||||
assert.ErrorContains(t, err, context.DeadlineExceeded.Error())
|
||||
})
|
||||
|
||||
@ -103,7 +103,7 @@ func TestConfig(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
assert.Len(t, coll.Collect().Dump(), 1)
|
||||
})
|
||||
|
||||
@ -133,7 +133,7 @@ func TestConfig(t *testing.T) {
|
||||
// Push this after Shutdown so the HTTP server doesn't hang.
|
||||
t.Cleanup(func() { close(rCh) })
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}), "failed retry")
|
||||
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}), "failed retry")
|
||||
assert.Len(t, rCh, 0, "failed HTTP responses did not occur")
|
||||
})
|
||||
|
||||
@ -144,7 +144,7 @@ func TestConfig(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
assert.Len(t, coll.Collect().Dump(), 1)
|
||||
})
|
||||
|
||||
@ -155,7 +155,7 @@ func TestConfig(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
assert.Len(t, coll.Collect().Dump(), 1)
|
||||
})
|
||||
|
||||
@ -166,7 +166,7 @@ func TestConfig(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
assert.Len(t, coll.Collect().Dump(), 1)
|
||||
})
|
||||
|
||||
@ -176,7 +176,7 @@ func TestConfig(t *testing.T) {
|
||||
exp, coll := factoryFunc("", nil, WithHeaders(headers))
|
||||
ctx := context.Background()
|
||||
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
|
||||
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
|
||||
// Ensure everything is flushed.
|
||||
require.NoError(t, exp.Shutdown(ctx))
|
||||
|
||||
|
@ -153,7 +153,7 @@ func Example() {
|
||||
// This is where the sdk would be used to create a Meter and from that
|
||||
// instruments that would make measurements of your code. To simulate that
|
||||
// behavior, call export directly with mocked data.
|
||||
_ = exp.Export(ctx, mockData)
|
||||
_ = exp.Export(ctx, &mockData)
|
||||
|
||||
// Ensure the periodic reader is cleaned up by shutting down the sdk.
|
||||
_ = sdk.Shutdown(ctx)
|
||||
@ -325,6 +325,6 @@ func Example() {
|
||||
// }
|
||||
// }
|
||||
// ],
|
||||
// "ScopeMetrics": []
|
||||
// "ScopeMetrics": null
|
||||
// }
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation
|
||||
return e.aggregationSelector(k)
|
||||
}
|
||||
|
||||
func (e *exporter) Export(ctx context.Context, data metricdata.ResourceMetrics) error {
|
||||
func (e *exporter) Export(ctx context.Context, data *metricdata.ResourceMetrics) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Don't do anything if the context has already timed out.
|
||||
@ -71,7 +71,7 @@ func (e *exporter) Export(ctx context.Context, data metricdata.ResourceMetrics)
|
||||
// Context is still valid, continue.
|
||||
}
|
||||
if e.redactTimestamps {
|
||||
data = redactTimestamps(data)
|
||||
redactTimestamps(data)
|
||||
}
|
||||
return e.encVal.Load().(encoderHolder).Encode(data)
|
||||
}
|
||||
@ -90,26 +90,14 @@ func (e *exporter) Shutdown(ctx context.Context) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func redactTimestamps(orig metricdata.ResourceMetrics) metricdata.ResourceMetrics {
|
||||
rm := metricdata.ResourceMetrics{
|
||||
Resource: orig.Resource,
|
||||
ScopeMetrics: make([]metricdata.ScopeMetrics, len(orig.ScopeMetrics)),
|
||||
}
|
||||
func redactTimestamps(orig *metricdata.ResourceMetrics) {
|
||||
for i, sm := range orig.ScopeMetrics {
|
||||
rm.ScopeMetrics[i] = metricdata.ScopeMetrics{
|
||||
Scope: sm.Scope,
|
||||
Metrics: make([]metricdata.Metrics, len(sm.Metrics)),
|
||||
}
|
||||
for j, m := range sm.Metrics {
|
||||
rm.ScopeMetrics[i].Metrics[j] = metricdata.Metrics{
|
||||
Name: m.Name,
|
||||
Description: m.Description,
|
||||
Unit: m.Unit,
|
||||
Data: redactAggregationTimestamps(m.Data),
|
||||
}
|
||||
metrics := sm.Metrics
|
||||
for j, m := range metrics {
|
||||
data := m.Data
|
||||
orig.ScopeMetrics[i].Metrics[j].Data = redactAggregationTimestamps(data)
|
||||
}
|
||||
}
|
||||
return rm
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -83,7 +83,7 @@ func TestExporterHonorsContextErrors(t *testing.T) {
|
||||
exp, err := stdoutmetric.New(testEncoderOption())
|
||||
require.NoError(t, err)
|
||||
return func(ctx context.Context) error {
|
||||
var data metricdata.ResourceMetrics
|
||||
data := new(metricdata.ResourceMetrics)
|
||||
return exp.Export(ctx, data)
|
||||
}
|
||||
}))
|
||||
@ -91,7 +91,7 @@ func TestExporterHonorsContextErrors(t *testing.T) {
|
||||
|
||||
func TestShutdownExporterReturnsShutdownErrorOnExport(t *testing.T) {
|
||||
var (
|
||||
data metricdata.ResourceMetrics
|
||||
data = new(metricdata.ResourceMetrics)
|
||||
ctx = context.Background()
|
||||
exp, err = stdoutmetric.New(testEncoderOption())
|
||||
)
|
||||
|
@ -49,7 +49,7 @@ type Exporter interface {
|
||||
// The passed ResourceMetrics may be reused when the call completes. If an
|
||||
// exporter needs to hold this data after it returns, it needs to make a
|
||||
// copy.
|
||||
Export(context.Context, metricdata.ResourceMetrics) error
|
||||
Export(context.Context, *metricdata.ResourceMetrics) error
|
||||
|
||||
// ForceFlush flushes any metric data held by an exporter.
|
||||
//
|
||||
|
@ -223,7 +223,7 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
|
||||
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
|
||||
err := r.Collect(ctx, rm)
|
||||
if err == nil {
|
||||
err = r.export(ctx, *rm)
|
||||
err = r.export(ctx, rm)
|
||||
}
|
||||
r.rmPool.Put(rm)
|
||||
return err
|
||||
@ -275,7 +275,7 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricd
|
||||
}
|
||||
|
||||
// export exports metric data m using r's exporter.
|
||||
func (r *periodicReader) export(ctx context.Context, m metricdata.ResourceMetrics) error {
|
||||
func (r *periodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
|
||||
c, cancel := context.WithTimeout(ctx, r.timeout)
|
||||
defer cancel()
|
||||
return r.exporter.Export(c, m)
|
||||
@ -321,7 +321,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
|
||||
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
|
||||
err = r.collect(ctx, ph, m)
|
||||
if err == nil {
|
||||
err = r.export(ctx, *m)
|
||||
err = r.export(ctx, m)
|
||||
}
|
||||
r.rmPool.Put(m)
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ func TestIntervalEnvAndOption(t *testing.T) {
|
||||
type fnExporter struct {
|
||||
temporalityFunc TemporalitySelector
|
||||
aggregationFunc AggregationSelector
|
||||
exportFunc func(context.Context, metricdata.ResourceMetrics) error
|
||||
exportFunc func(context.Context, *metricdata.ResourceMetrics) error
|
||||
flushFunc func(context.Context) error
|
||||
shutdownFunc func(context.Context) error
|
||||
}
|
||||
@ -173,7 +173,7 @@ func (e *fnExporter) Aggregation(k InstrumentKind) aggregation.Aggregation {
|
||||
return DefaultAggregationSelector(k)
|
||||
}
|
||||
|
||||
func (e *fnExporter) Export(ctx context.Context, m metricdata.ResourceMetrics) error {
|
||||
func (e *fnExporter) Export(ctx context.Context, m *metricdata.ResourceMetrics) error {
|
||||
if e.exportFunc != nil {
|
||||
return e.exportFunc(ctx, m)
|
||||
}
|
||||
@ -204,7 +204,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
|
||||
ts.readerTestSuite.SetupTest()
|
||||
|
||||
e := &fnExporter{
|
||||
exportFunc: func(context.Context, metricdata.ResourceMetrics) error { return assert.AnError },
|
||||
exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError },
|
||||
flushFunc: func(context.Context) error { return assert.AnError },
|
||||
shutdownFunc: func(context.Context) error { return assert.AnError },
|
||||
}
|
||||
@ -282,9 +282,9 @@ func TestPeriodicReaderRun(t *testing.T) {
|
||||
otel.SetErrorHandler(eh)
|
||||
|
||||
exp := &fnExporter{
|
||||
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
|
||||
exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error {
|
||||
// The testSDKProducer produces testResourceMetricsAB.
|
||||
assert.Equal(t, testResourceMetricsAB, m)
|
||||
assert.Equal(t, testResourceMetricsAB, *m)
|
||||
return assert.AnError
|
||||
},
|
||||
}
|
||||
@ -307,9 +307,9 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
|
||||
expFunc := func(t *testing.T) (exp Exporter, called *bool) {
|
||||
called = new(bool)
|
||||
return &fnExporter{
|
||||
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
|
||||
exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error {
|
||||
// The testSDKProducer produces testResourceMetricsA.
|
||||
assert.Equal(t, testResourceMetricsAB, m)
|
||||
assert.Equal(t, testResourceMetricsAB, *m)
|
||||
*called = true
|
||||
return assert.AnError
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user