1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Do not block Temporality/Aggregation on OTLP metric export (#4395)

* otlpmetricgrpc - no block temp/agg selection with export

* otlpmetrichttp - no block temp/agg selection with export

* Add test Export doesn't block Temporality or Aggregation

* Deprecate internal New and Exporter

* Add changes to changelog

* Apply suggestions from code review
This commit is contained in:
Tyler Yahn
2023-08-02 07:38:04 -07:00
committed by GitHub
parent 528a0cb34f
commit 378e51e365
10 changed files with 473 additions and 77 deletions
@@ -34,9 +34,6 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
@@ -47,9 +44,6 @@ type client struct {
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}
// Keep it in sync with golang's DefaultTransport from net/http! We
@@ -70,9 +64,7 @@ var ourTransport = &http.Transport{
}
// newClient creates a new HTTP metric client.
func newClient(opts ...Option) (ominternal.Client, error) {
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
func newClient(cfg oconf.Config) (*client, error) {
httpClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.Metrics.Timeout,
@@ -111,25 +103,9 @@ func newClient(opts ...Option) (ominternal.Client, error) {
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}, nil
}
// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}
// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}
// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }
// Shutdown shuts down the client, freeing all resources.
func (c *client) Shutdown(ctx context.Context) error {
// The otlpmetric.Exporter synchronizes access to client methods and
@@ -28,20 +28,38 @@ import (
"github.com/stretchr/testify/require"
ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type clientShim struct {
*client
}
func (clientShim) Temporality(metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
func (clientShim) Aggregation(metric.InstrumentKind) aggregation.Aggregation {
return nil
}
func (clientShim) ForceFlush(ctx context.Context) error {
return ctx.Err()
}
func TestClient(t *testing.T) {
factory := func(rCh <-chan otest.ExportResult) (ominternal.Client, otest.Collector) {
coll, err := otest.NewHTTPCollector("", rCh)
require.NoError(t, err)
addr := coll.Addr().String()
client, err := newClient(WithEndpoint(addr), WithInsecure())
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
client, err := newClient(cfg)
require.NoError(t, err)
return client, coll
return clientShim{client}, coll
}
t.Run("Integration", otest.RunClientTests(factory))
@@ -16,27 +16,62 @@ package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpme
import (
"context"
"fmt"
"sync"
ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
// Exporter is a OpenTelemetry metric Exporter using protobufs over HTTP.
type Exporter struct {
wrapped *ominternal.Exporter
// Ensure synchronous access to the client across all functionality.
clientMu sync.Mutex
client interface {
UploadMetrics(context.Context, *metricpb.ResourceMetrics) error
Shutdown(context.Context) error
}
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
shutdownOnce sync.Once
}
func newExporter(c *client, cfg oconf.Config) (*Exporter, error) {
ts := cfg.Metrics.TemporalitySelector
if ts == nil {
ts = func(metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
}
as := cfg.Metrics.AggregationSelector
if as == nil {
as = metric.DefaultAggregationSelector
}
return &Exporter{
client: c,
temporalitySelector: ts,
aggregationSelector: as,
}, nil
}
// Temporality returns the Temporality to use for an instrument kind.
func (e *Exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return e.wrapped.Temporality(k)
return e.temporalitySelector(k)
}
// Aggregation returns the Aggregation to use for an instrument kind.
func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
return e.wrapped.Aggregation(k)
return e.aggregationSelector(k)
}
// Export transforms and transmits metric data to an OTLP receiver.
@@ -44,8 +79,20 @@ func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation
// This method returns an error if called after Shutdown.
// This method returns an error if the method is canceled by the passed context.
func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
err := e.wrapped.Export(ctx, rm)
global.Debug("OTLP/HTTP exporter export", "Data", rm)
defer global.Debug("OTLP/HTTP exporter export", "Data", rm)
otlpRm, err := transform.ResourceMetrics(rm)
// Best effort upload of transformable metrics.
e.clientMu.Lock()
upErr := e.client.UploadMetrics(ctx, otlpRm)
e.clientMu.Unlock()
if upErr != nil {
if err == nil {
return fmt.Errorf("failed to upload metrics: %w", upErr)
}
// Merge the two errors.
return fmt.Errorf("failed to upload incomplete metrics (%s): %w", err, upErr)
}
return err
}
@@ -56,7 +103,8 @@ func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) e
//
// This method is safe to call concurrently.
func (e *Exporter) ForceFlush(ctx context.Context) error {
return e.wrapped.ForceFlush(ctx)
// The exporter and client hold no state, nothing to flush.
return ctx.Err()
}
// Shutdown flushes all metric data held by an exporter and releases any held
@@ -67,7 +115,34 @@ func (e *Exporter) ForceFlush(ctx context.Context) error {
//
// This method is safe to call concurrently.
func (e *Exporter) Shutdown(ctx context.Context) error {
return e.wrapped.Shutdown(ctx)
err := errShutdown
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
return err
}
var errShutdown = fmt.Errorf("HTTP exporter is shutdown")
type shutdownClient struct{}
func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return err
}
return errShutdown
}
func (c shutdownClient) UploadMetrics(ctx context.Context, _ *metricpb.ResourceMetrics) error {
return c.err(ctx)
}
func (c shutdownClient) Shutdown(ctx context.Context) error {
return c.err(ctx)
}
// MarshalLog returns logging data about the Exporter.
@@ -79,10 +154,10 @@ func (e *Exporter) MarshalLog() interface{} {
// a PeriodicReader to export OpenTelemetry metric data to an OTLP receiving
// endpoint using protobufs over HTTP.
func New(_ context.Context, opts ...Option) (*Exporter, error) {
c, err := newClient(opts...)
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
c, err := newClient(cfg)
if err != nil {
return nil, err
}
exp := ominternal.New(c)
return &Exporter{exp}, nil
return newExporter(c, cfg)
}
@@ -0,0 +1,124 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
func TestExporterClientConcurrentSafe(t *testing.T) {
const goroutines = 5
coll, err := otest.NewHTTPCollector("", nil)
require.NoError(t, err)
ctx := context.Background()
addr := coll.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
client, err := newClient(cfg)
require.NoError(t, err)
exp, err := newExporter(client, oconf.Config{})
require.NoError(t, err)
rm := new(metricdata.ResourceMetrics)
done := make(chan struct{})
first := make(chan struct{}, goroutines)
var wg sync.WaitGroup
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, exp.Export(ctx, rm))
assert.NoError(t, exp.ForceFlush(ctx))
// Ensure some work is done before shutting down.
first <- struct{}{}
for {
_ = exp.Export(ctx, rm)
_ = exp.ForceFlush(ctx)
select {
case <-done:
return
default:
}
}
}()
}
for i := 0; i < goroutines; i++ {
<-first
}
close(first)
assert.NoError(t, exp.Shutdown(ctx))
assert.ErrorIs(t, exp.Shutdown(ctx), errShutdown)
close(done)
wg.Wait()
}
func TestExporterDoesNotBlockTemporalityAndAggregation(t *testing.T) {
rCh := make(chan otest.ExportResult, 1)
coll, err := otest.NewHTTPCollector("", rCh)
require.NoError(t, err)
ctx := context.Background()
addr := coll.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
client, err := newClient(cfg)
require.NoError(t, err)
exp, err := newExporter(client, oconf.Config{})
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
rm := new(metricdata.ResourceMetrics)
t.Log("starting export")
require.NoError(t, exp.Export(ctx, rm))
t.Log("export complete")
}()
assert.Eventually(t, func() bool {
const inst = metric.InstrumentKindCounter
// These should not be blocked.
t.Log("getting temporality")
_ = exp.Temporality(inst)
t.Log("getting aggregation")
_ = exp.Aggregation(inst)
return true
}, time.Second, 10*time.Millisecond)
// Clear the export.
rCh <- otest.ExportResult{}
close(rCh)
wg.Wait()
}