You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-07-15 01:04:25 +02:00
Metrics: Rename sdk/export/metric.ExportKind to aggregation.Temporality (#2274)
* Rename ExportKind to aggregation.Temporality * Changelog uhhhhhhh * Apply suggestions from code review Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> * int->uint8 * go generate * go.sum * anual edit Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
## Changed
|
||||
|
||||
- Skip links with invalid span context. (#2275)
|
||||
- Metric SDK `export.ExportKind`, `export.ExportKindSelector` types have been renamed to `aggregation.Temporality` and `aggregation.TemporalitySelector` respectively to keep in line with current specification and protocol along with built-in selectors (e.g., `aggregation.CumulativeTemporalitySelector`, ...). (#2274)
|
||||
- The Metric `Exporter` interface now requires a `TemporalitySelector` method instead of an `ExportKindSelector`. (#2274)
|
||||
- Metrics API cleanup. The `metric/sdkapi` package has been created to relocate the API-to-SDK interface:
|
||||
- The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner`
|
||||
- The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`.
|
||||
|
@ -79,7 +79,7 @@ var _ export.Reader = &metricReader{}
|
||||
|
||||
// ForEach iterates through the metrics data, synthesizing an
|
||||
// export.Record with the appropriate aggregation for the exporter.
|
||||
func (d *metricReader) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error {
|
||||
func (d *metricReader) ForEach(_ aggregation.TemporalitySelector, f func(export.Record) error) error {
|
||||
for _, m := range d.metrics {
|
||||
descriptor, err := convertDescriptor(m.Descriptor)
|
||||
if err != nil {
|
||||
|
@ -48,7 +48,7 @@ type fakeExporter struct {
|
||||
}
|
||||
|
||||
func (f *fakeExporter) Export(ctx context.Context, res *resource.Resource, ilr exportmetric.InstrumentationLibraryReader) error {
|
||||
return controllertest.ReadAll(ilr, export.StatelessExportKindSelector(),
|
||||
return controllertest.ReadAll(ilr, aggregation.StatelessTemporalitySelector(),
|
||||
func(_ instrumentation.Library, record exportmetric.Record) error {
|
||||
f.resource = res
|
||||
f.records = append(f.records, record)
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
"go.opentelemetry.io/otel/exporters/prometheus"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/global"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
@ -44,7 +44,7 @@ func initMeter() {
|
||||
selector.NewWithHistogramDistribution(
|
||||
histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries),
|
||||
),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
processor.WithMemory(true),
|
||||
),
|
||||
)
|
||||
|
@ -33,8 +33,8 @@ var (
|
||||
|
||||
// Exporter exports metrics data in the OTLP wire format.
|
||||
type Exporter struct {
|
||||
client Client
|
||||
exportKindSelector metricsdk.ExportKindSelector
|
||||
client Client
|
||||
temporalitySelector aggregation.TemporalitySelector
|
||||
|
||||
mu sync.RWMutex
|
||||
started bool
|
||||
@ -96,8 +96,8 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *Exporter) ExportKindFor(descriptor *sdkapi.Descriptor, aggregatorKind aggregation.Kind) metricsdk.ExportKind {
|
||||
return e.exportKindSelector.ExportKindFor(descriptor, aggregatorKind)
|
||||
func (e *Exporter) TemporalityFor(descriptor *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality {
|
||||
return e.temporalitySelector.TemporalityFor(descriptor, kind)
|
||||
}
|
||||
|
||||
var _ metricsdk.Exporter = (*Exporter)(nil)
|
||||
@ -114,10 +114,10 @@ func New(ctx context.Context, client Client, opts ...Option) (*Exporter, error)
|
||||
// NewUnstarted constructs a new Exporter and does not start it.
|
||||
func NewUnstarted(client Client, opts ...Option) *Exporter {
|
||||
cfg := config{
|
||||
// Note: the default ExportKindSelector is specified
|
||||
// Note: the default TemporalitySelector is specified
|
||||
// as Cumulative:
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/731
|
||||
exportKindSelector: metricsdk.CumulativeExportKindSelector(),
|
||||
temporalitySelector: aggregation.CumulativeTemporalitySelector(),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@ -125,8 +125,8 @@ func NewUnstarted(client Client, opts ...Option) *Exporter {
|
||||
}
|
||||
|
||||
e := &Exporter{
|
||||
client: client,
|
||||
exportKindSelector: cfg.exportKindSelector,
|
||||
client: client,
|
||||
temporalitySelector: cfg.temporalitySelector,
|
||||
}
|
||||
|
||||
return e
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
@ -606,7 +607,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
)
|
||||
}
|
||||
|
||||
func TestStatelessExportKind(t *testing.T) {
|
||||
func TestStatelessAggregationTemporality(t *testing.T) {
|
||||
type testcase struct {
|
||||
name string
|
||||
instrumentKind sdkapi.InstrumentKind
|
||||
@ -624,8 +625,8 @@ func TestStatelessExportKind(t *testing.T) {
|
||||
runMetricExportTests(
|
||||
t,
|
||||
[]otlpmetric.Option{
|
||||
otlpmetric.WithMetricExportKindSelector(
|
||||
metricsdk.StatelessExportKindSelector(),
|
||||
otlpmetric.WithMetricAggregationTemporalitySelector(
|
||||
aggregation.StatelessTemporalitySelector(),
|
||||
),
|
||||
},
|
||||
testerAResource,
|
||||
|
@ -72,12 +72,12 @@ func toNanos(t time.Time) uint64 {
|
||||
|
||||
// InstrumentationLibraryReader transforms all records contained in a checkpoint into
|
||||
// batched OTLP ResourceMetrics.
|
||||
func InstrumentationLibraryReader(ctx context.Context, exportSelector export.ExportKindSelector, res *resource.Resource, ilmr export.InstrumentationLibraryReader, numWorkers uint) (*metricpb.ResourceMetrics, error) {
|
||||
func InstrumentationLibraryReader(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, res *resource.Resource, ilmr export.InstrumentationLibraryReader, numWorkers uint) (*metricpb.ResourceMetrics, error) {
|
||||
var ilms []*metricpb.InstrumentationLibraryMetrics
|
||||
|
||||
err := ilmr.ForEach(func(lib instrumentation.Library, mr export.Reader) error {
|
||||
|
||||
records, errc := source(ctx, exportSelector, mr)
|
||||
records, errc := source(ctx, temporalitySelector, mr)
|
||||
|
||||
// Start a fixed number of goroutines to transform records.
|
||||
transformed := make(chan result)
|
||||
@ -86,7 +86,7 @@ func InstrumentationLibraryReader(ctx context.Context, exportSelector export.Exp
|
||||
for i := uint(0); i < numWorkers; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
transformer(ctx, exportSelector, records, transformed)
|
||||
transformer(ctx, temporalitySelector, records, transformed)
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
@ -134,14 +134,14 @@ func InstrumentationLibraryReader(ctx context.Context, exportSelector export.Exp
|
||||
// source starts a goroutine that sends each one of the Records yielded by
|
||||
// the Reader on the returned chan. Any error encountered will be sent
|
||||
// on the returned error chan after seeding is complete.
|
||||
func source(ctx context.Context, exportSelector export.ExportKindSelector, mr export.Reader) (<-chan export.Record, <-chan error) {
|
||||
func source(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, mr export.Reader) (<-chan export.Record, <-chan error) {
|
||||
errc := make(chan error, 1)
|
||||
out := make(chan export.Record)
|
||||
// Seed records into process.
|
||||
go func() {
|
||||
defer close(out)
|
||||
// No select is needed since errc is buffered.
|
||||
errc <- mr.ForEach(exportSelector, func(r export.Record) error {
|
||||
errc <- mr.ForEach(temporalitySelector, func(r export.Record) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ErrContextCanceled
|
||||
@ -155,9 +155,9 @@ func source(ctx context.Context, exportSelector export.ExportKindSelector, mr ex
|
||||
|
||||
// transformer transforms records read from the passed in chan into
|
||||
// OTLP Metrics which are sent on the out chan.
|
||||
func transformer(ctx context.Context, exportSelector export.ExportKindSelector, in <-chan export.Record, out chan<- result) {
|
||||
func transformer(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, in <-chan export.Record, out chan<- result) {
|
||||
for r := range in {
|
||||
m, err := Record(exportSelector, r)
|
||||
m, err := Record(temporalitySelector, r)
|
||||
// Propagate errors, but do not send empty results.
|
||||
if err == nil && m == nil {
|
||||
continue
|
||||
@ -237,7 +237,7 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.Metric, error) {
|
||||
|
||||
// Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg
|
||||
// error is returned if the Record Aggregator is not supported.
|
||||
func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricpb.Metric, error) {
|
||||
func Record(temporalitySelector aggregation.TemporalitySelector, r export.Record) (*metricpb.Metric, error) {
|
||||
agg := r.Aggregation()
|
||||
switch agg.Kind() {
|
||||
case aggregation.MinMaxSumCountKind:
|
||||
@ -252,7 +252,7 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
|
||||
}
|
||||
return histogramPoint(r, exportSelector.ExportKindFor(r.Descriptor(), aggregation.HistogramKind), h)
|
||||
return histogramPoint(r, temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.HistogramKind), h)
|
||||
|
||||
case aggregation.SumKind:
|
||||
s, ok := agg.(aggregation.Sum)
|
||||
@ -263,7 +263,7 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sumPoint(r, sum, r.StartTime(), r.EndTime(), exportSelector.ExportKindFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic())
|
||||
return sumPoint(r, sum, r.StartTime(), r.EndTime(), temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic())
|
||||
|
||||
case aggregation.LastValueKind:
|
||||
lv, ok := agg.(aggregation.LastValue)
|
||||
@ -388,17 +388,17 @@ func gaugePoint(record export.Record, num number.Number, start, end time.Time) (
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func exportKindToTemporality(ek export.ExportKind) metricpb.AggregationTemporality {
|
||||
switch ek {
|
||||
case export.DeltaExportKind:
|
||||
func sdkTemporalityToTemporality(temporality aggregation.Temporality) metricpb.AggregationTemporality {
|
||||
switch temporality {
|
||||
case aggregation.DeltaTemporality:
|
||||
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
|
||||
case export.CumulativeExportKind:
|
||||
case aggregation.CumulativeTemporality:
|
||||
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE
|
||||
}
|
||||
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED
|
||||
}
|
||||
|
||||
func sumPoint(record export.Record, num number.Number, start, end time.Time, ek export.ExportKind, monotonic bool) (*metricpb.Metric, error) {
|
||||
func sumPoint(record export.Record, num number.Number, start, end time.Time, temporality aggregation.Temporality, monotonic bool) (*metricpb.Metric, error) {
|
||||
desc := record.Descriptor()
|
||||
labels := record.Labels()
|
||||
|
||||
@ -413,7 +413,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, ek
|
||||
m.Data = &metricpb.Metric_Sum{
|
||||
Sum: &metricpb.Sum{
|
||||
IsMonotonic: monotonic,
|
||||
AggregationTemporality: exportKindToTemporality(ek),
|
||||
AggregationTemporality: sdkTemporalityToTemporality(temporality),
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{
|
||||
@ -430,7 +430,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, ek
|
||||
m.Data = &metricpb.Metric_Sum{
|
||||
Sum: &metricpb.Sum{
|
||||
IsMonotonic: monotonic,
|
||||
AggregationTemporality: exportKindToTemporality(ek),
|
||||
AggregationTemporality: sdkTemporalityToTemporality(temporality),
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsDouble{
|
||||
@ -522,7 +522,7 @@ func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []ui
|
||||
}
|
||||
|
||||
// histogram transforms a Histogram Aggregator into an OTLP Metric.
|
||||
func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Histogram) (*metricpb.Metric, error) {
|
||||
func histogramPoint(record export.Record, temporality aggregation.Temporality, a aggregation.Histogram) (*metricpb.Metric, error) {
|
||||
desc := record.Descriptor()
|
||||
labels := record.Labels()
|
||||
boundaries, counts, err := histogramValues(a)
|
||||
@ -546,7 +546,7 @@ func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Hi
|
||||
Unit: string(desc.Unit()),
|
||||
Data: &metricpb.Metric_Histogram{
|
||||
Histogram: &metricpb.Histogram{
|
||||
AggregationTemporality: exportKindToTemporality(ek),
|
||||
AggregationTemporality: sdkTemporalityToTemporality(temporality),
|
||||
DataPoints: []*metricpb.HistogramDataPoint{
|
||||
{
|
||||
Sum: sum.CoerceToFloat64(desc.NumberKind()),
|
||||
|
@ -190,7 +190,7 @@ func TestSumIntDataPoints(t *testing.T) {
|
||||
value, err := ckpt.Sum()
|
||||
require.NoError(t, err)
|
||||
|
||||
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true); assert.NoError(t, err) {
|
||||
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true); assert.NoError(t, err) {
|
||||
assert.Nil(t, m.GetGauge())
|
||||
assert.Equal(t, &metricpb.Sum{
|
||||
AggregationTemporality: otelCumulative,
|
||||
@ -229,7 +229,7 @@ func TestSumFloatDataPoints(t *testing.T) {
|
||||
value, err := ckpt.Sum()
|
||||
require.NoError(t, err)
|
||||
|
||||
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.DeltaExportKind, false); assert.NoError(t, err) {
|
||||
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.DeltaTemporality, false); assert.NoError(t, err) {
|
||||
assert.Nil(t, m.GetGauge())
|
||||
assert.Equal(t, &metricpb.Sum{
|
||||
IsMonotonic: false,
|
||||
@ -367,7 +367,7 @@ func TestSumErrUnknownValueType(t *testing.T) {
|
||||
value, err := s.Sum()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true)
|
||||
_, err = sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true)
|
||||
assert.Error(t, err)
|
||||
if !errors.Is(err, ErrUnknownValueType) {
|
||||
t.Errorf("expected ErrUnknownValueType, got %v", err)
|
||||
@ -451,7 +451,7 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
|
||||
kind: kind,
|
||||
agg: agg,
|
||||
}
|
||||
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd))
|
||||
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd))
|
||||
}
|
||||
|
||||
mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0])
|
||||
@ -483,7 +483,7 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
|
||||
makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) {
|
||||
desc := metrictest.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind)
|
||||
labels := attribute.NewSet()
|
||||
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd))
|
||||
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd))
|
||||
}
|
||||
|
||||
errEx := fmt.Errorf("timeout")
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
exportmetric "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
@ -40,7 +40,7 @@ import (
|
||||
// themselves.
|
||||
func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter, mcMetrics Collector) {
|
||||
selector := simple.NewWithInexpensiveDistribution()
|
||||
proc := processor.NewFactory(selector, exportmetric.StatelessExportKindSelector())
|
||||
proc := processor.NewFactory(selector, aggregation.StatelessTemporalitySelector())
|
||||
cont := controller.New(proc, controller.WithExporter(exp))
|
||||
require.NoError(t, cont.Start(ctx))
|
||||
|
||||
|
@ -14,7 +14,7 @@
|
||||
|
||||
package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
|
||||
|
||||
import metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
import "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
|
||||
// Option are setting options passed to an Exporter on creation.
|
||||
type Option interface {
|
||||
@ -28,15 +28,15 @@ func (fn exporterOptionFunc) apply(cfg *config) {
|
||||
}
|
||||
|
||||
type config struct {
|
||||
exportKindSelector metricsdk.ExportKindSelector
|
||||
temporalitySelector aggregation.TemporalitySelector
|
||||
}
|
||||
|
||||
// WithMetricExportKindSelector defines the ExportKindSelector used
|
||||
// for selecting AggregationTemporality (i.e., Cumulative vs. Delta
|
||||
// WithMetricAggregationTemporalitySelector defines the aggregation.TemporalitySelector used
|
||||
// for selecting aggregation.Temporality (i.e., Cumulative vs. Delta
|
||||
// aggregation). If not specified otherwise, exporter will use a
|
||||
// cumulative export kind selector.
|
||||
func WithMetricExportKindSelector(selector metricsdk.ExportKindSelector) Option {
|
||||
// cumulative temporality selector.
|
||||
func WithMetricAggregationTemporalitySelector(selector aggregation.TemporalitySelector) Option {
|
||||
return exporterOptionFunc(func(cfg *config) {
|
||||
cfg.exportKindSelector = selector
|
||||
cfg.temporalitySelector = selector
|
||||
})
|
||||
}
|
||||
|
@ -132,9 +132,9 @@ func (e *Exporter) Controller() *controller.Controller {
|
||||
return e.controller
|
||||
}
|
||||
|
||||
// ExportKindFor implements ExportKindSelector.
|
||||
func (e *Exporter) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) export.ExportKind {
|
||||
return export.CumulativeExportKindSelector().ExportKindFor(desc, kind)
|
||||
// TemporalityFor implements TemporalitySelector.
|
||||
func (e *Exporter) TemporalityFor(desc *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality {
|
||||
return aggregation.CumulativeTemporalitySelector().TemporalityFor(desc, kind)
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler.
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/prometheus"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
@ -88,7 +88,7 @@ func newPipeline(config prometheus.Config, options ...controller.Option) (*prome
|
||||
selector.NewWithHistogramDistribution(
|
||||
histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries),
|
||||
),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
processor.WithMemory(true),
|
||||
),
|
||||
options...,
|
||||
|
@ -47,8 +47,8 @@ type line struct {
|
||||
Timestamp *time.Time `json:"Timestamp,omitempty"`
|
||||
}
|
||||
|
||||
func (e *metricExporter) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) exportmetric.ExportKind {
|
||||
return exportmetric.StatelessExportKindSelector().ExportKindFor(desc, kind)
|
||||
func (e *metricExporter) TemporalityFor(desc *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality {
|
||||
return aggregation.StatelessTemporalitySelector().TemporalityFor(desc, kind)
|
||||
}
|
||||
|
||||
func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reader exportmetric.InstrumentationLibraryReader) error {
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
@ -61,7 +61,7 @@ func newFixtureWithResource(t *testing.T, res *resource.Resource, opts ...stdout
|
||||
t.Fatal("Error building fixture: ", err)
|
||||
}
|
||||
aggSel := processortest.AggregatorSelector()
|
||||
proc := processor.NewFactory(aggSel, export.StatelessExportKindSelector())
|
||||
proc := processor.NewFactory(aggSel, aggregation.StatelessTemporalitySelector())
|
||||
cont := controller.New(proc,
|
||||
controller.WithExporter(exp),
|
||||
controller.WithResource(res),
|
||||
@ -87,7 +87,7 @@ func (fix testFixture) Output() string {
|
||||
func TestStdoutTimestamp(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
aggSel := processortest.AggregatorSelector()
|
||||
proc := processor.NewFactory(aggSel, export.CumulativeExportKindSelector())
|
||||
proc := processor.NewFactory(aggSel, aggregation.CumulativeTemporalitySelector())
|
||||
exporter, err := stdoutmetric.New(
|
||||
stdoutmetric.WithWriter(&buf),
|
||||
)
|
||||
|
@ -401,6 +401,7 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
|
||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/itchyny/go-flags v1.5.0 h1:Z5q2ist2sfDjDlExVPBrMqlsEDxDR2h4zuOElB0OEYI=
|
||||
github.com/itchyny/go-flags v1.5.0/go.mod h1:lenkYuCobuxLBAd/HGFE4LRoW8D3B6iXRQfWYJ+MNbA=
|
||||
github.com/itchyny/gojq v0.12.5 h1:6SJ1BQ1VAwJAlIvLSIZmqHP/RUEq3qfVWvsRxrqhsD0=
|
||||
github.com/itchyny/gojq v0.12.5/go.mod h1:3e1hZXv+Kwvdp6V9HXpVrvddiHVApi5EDZwS+zLFeiE=
|
||||
|
117
sdk/export/metric/aggregation/temporality.go
Normal file
117
sdk/export/metric/aggregation/temporality.go
Normal file
@ -0,0 +1,117 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:generate stringer -type=Temporality
|
||||
|
||||
package aggregation // import "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
|
||||
import (
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
)
|
||||
|
||||
// Temporality indicates the temporal aggregation exported by an exporter.
|
||||
// These bits may be OR-d together when multiple exporters are in use.
|
||||
type Temporality uint8
|
||||
|
||||
const (
|
||||
// CumulativeTemporality indicates that an Exporter expects a
|
||||
// Cumulative Aggregation.
|
||||
CumulativeTemporality Temporality = 1
|
||||
|
||||
// DeltaTemporality indicates that an Exporter expects a
|
||||
// Delta Aggregation.
|
||||
DeltaTemporality Temporality = 2
|
||||
)
|
||||
|
||||
// Includes returns if t includes support for other temporality.
|
||||
func (t Temporality) Includes(other Temporality) bool {
|
||||
return t&other != 0
|
||||
}
|
||||
|
||||
// MemoryRequired returns whether an exporter of this temporality requires
|
||||
// memory to export correctly.
|
||||
func (t Temporality) MemoryRequired(mkind sdkapi.InstrumentKind) bool {
|
||||
switch mkind {
|
||||
case sdkapi.HistogramInstrumentKind, sdkapi.GaugeObserverInstrumentKind,
|
||||
sdkapi.CounterInstrumentKind, sdkapi.UpDownCounterInstrumentKind:
|
||||
// Delta-oriented instruments:
|
||||
return t.Includes(CumulativeTemporality)
|
||||
|
||||
case sdkapi.CounterObserverInstrumentKind, sdkapi.UpDownCounterObserverInstrumentKind:
|
||||
// Cumulative-oriented instruments:
|
||||
return t.Includes(DeltaTemporality)
|
||||
}
|
||||
// Something unexpected is happening--we could panic. This
|
||||
// will become an error when the exporter tries to access a
|
||||
// checkpoint, presumably, so let it be.
|
||||
return false
|
||||
}
|
||||
|
||||
type (
|
||||
constantTemporalitySelector Temporality
|
||||
statelessTemporalitySelector struct{}
|
||||
)
|
||||
|
||||
var (
|
||||
_ TemporalitySelector = constantTemporalitySelector(0)
|
||||
_ TemporalitySelector = statelessTemporalitySelector{}
|
||||
)
|
||||
|
||||
// ConstantTemporalitySelector returns an TemporalitySelector that returns
|
||||
// a constant Temporality.
|
||||
func ConstantTemporalitySelector(t Temporality) TemporalitySelector {
|
||||
return constantTemporalitySelector(t)
|
||||
}
|
||||
|
||||
// CumulativeTemporalitySelector returns an TemporalitySelector that
|
||||
// always returns CumulativeTemporality.
|
||||
func CumulativeTemporalitySelector() TemporalitySelector {
|
||||
return ConstantTemporalitySelector(CumulativeTemporality)
|
||||
}
|
||||
|
||||
// DeltaTemporalitySelector returns an TemporalitySelector that
|
||||
// always returns DeltaTemporality.
|
||||
func DeltaTemporalitySelector() TemporalitySelector {
|
||||
return ConstantTemporalitySelector(DeltaTemporality)
|
||||
}
|
||||
|
||||
// StatelessTemporalitySelector returns an TemporalitySelector that
|
||||
// always returns the Temporality that avoids long-term memory
|
||||
// requirements.
|
||||
func StatelessTemporalitySelector() TemporalitySelector {
|
||||
return statelessTemporalitySelector{}
|
||||
}
|
||||
|
||||
// TemporalityFor implements TemporalitySelector.
|
||||
func (c constantTemporalitySelector) TemporalityFor(_ *sdkapi.Descriptor, _ Kind) Temporality {
|
||||
return Temporality(c)
|
||||
}
|
||||
|
||||
// TemporalityFor implements TemporalitySelector.
|
||||
func (s statelessTemporalitySelector) TemporalityFor(desc *sdkapi.Descriptor, kind Kind) Temporality {
|
||||
if kind == SumKind && desc.InstrumentKind().PrecomputedSum() {
|
||||
return CumulativeTemporality
|
||||
}
|
||||
return DeltaTemporality
|
||||
}
|
||||
|
||||
// TemporalitySelector is a sub-interface of Exporter used to indicate
|
||||
// whether the Processor should compute Delta or Cumulative
|
||||
// Aggregations.
|
||||
type TemporalitySelector interface {
|
||||
// TemporalityFor should return the correct Temporality that
|
||||
// should be used when exporting data for the given metric
|
||||
// instrument and Aggregator kind.
|
||||
TemporalityFor(descriptor *sdkapi.Descriptor, aggregationKind Kind) Temporality
|
||||
}
|
25
sdk/export/metric/aggregation/temporality_string.go
Normal file
25
sdk/export/metric/aggregation/temporality_string.go
Normal file
@ -0,0 +1,25 @@
|
||||
// Code generated by "stringer -type=Temporality"; DO NOT EDIT.
|
||||
|
||||
package aggregation // import "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
|
||||
import "strconv"
|
||||
|
||||
func _() {
|
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{}
|
||||
_ = x[CumulativeTemporality-1]
|
||||
_ = x[DeltaTemporality-2]
|
||||
}
|
||||
|
||||
const _Temporality_name = "CumulativeTemporalityDeltaTemporality"
|
||||
|
||||
var _Temporality_index = [...]uint8{0, 21, 37}
|
||||
|
||||
func (i Temporality) String() string {
|
||||
i -= 1
|
||||
if i >= Temporality(len(_Temporality_index)-1) {
|
||||
return "Temporality(" + strconv.FormatInt(int64(i+1), 10) + ")"
|
||||
}
|
||||
return _Temporality_name[_Temporality_index[i]:_Temporality_index[i+1]]
|
||||
}
|
74
sdk/export/metric/aggregation/temporality_test.go
Normal file
74
sdk/export/metric/aggregation/temporality_test.go
Normal file
@ -0,0 +1,74 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aggregation
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/metrictest"
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
)
|
||||
|
||||
func TestTemporalityIncludes(t *testing.T) {
|
||||
require.True(t, CumulativeTemporality.Includes(CumulativeTemporality))
|
||||
require.True(t, DeltaTemporality.Includes(CumulativeTemporality|DeltaTemporality))
|
||||
}
|
||||
|
||||
var deltaMemoryTemporalties = []sdkapi.InstrumentKind{
|
||||
sdkapi.CounterObserverInstrumentKind,
|
||||
sdkapi.UpDownCounterObserverInstrumentKind,
|
||||
}
|
||||
|
||||
var cumulativeMemoryTemporalties = []sdkapi.InstrumentKind{
|
||||
sdkapi.HistogramInstrumentKind,
|
||||
sdkapi.GaugeObserverInstrumentKind,
|
||||
sdkapi.CounterInstrumentKind,
|
||||
sdkapi.UpDownCounterInstrumentKind,
|
||||
}
|
||||
|
||||
func TestTemporalityMemoryRequired(t *testing.T) {
|
||||
for _, kind := range deltaMemoryTemporalties {
|
||||
require.True(t, DeltaTemporality.MemoryRequired(kind))
|
||||
require.False(t, CumulativeTemporality.MemoryRequired(kind))
|
||||
}
|
||||
|
||||
for _, kind := range cumulativeMemoryTemporalties {
|
||||
require.True(t, CumulativeTemporality.MemoryRequired(kind))
|
||||
require.False(t, DeltaTemporality.MemoryRequired(kind))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTemporalitySelectors(t *testing.T) {
|
||||
cAggTemp := CumulativeTemporalitySelector()
|
||||
dAggTemp := DeltaTemporalitySelector()
|
||||
sAggTemp := StatelessTemporalitySelector()
|
||||
|
||||
for _, ikind := range append(deltaMemoryTemporalties, cumulativeMemoryTemporalties...) {
|
||||
desc := metrictest.NewDescriptor("instrument", ikind, number.Int64Kind)
|
||||
|
||||
var akind Kind
|
||||
if ikind.Adding() {
|
||||
akind = SumKind
|
||||
} else {
|
||||
akind = HistogramKind
|
||||
}
|
||||
require.Equal(t, CumulativeTemporality, cAggTemp.TemporalityFor(&desc, akind))
|
||||
require.Equal(t, DeltaTemporality, dAggTemp.TemporalityFor(&desc, akind))
|
||||
require.False(t, sAggTemp.TemporalityFor(&desc, akind).MemoryRequired(ikind))
|
||||
}
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
// Code generated by "stringer -type=ExportKind"; DO NOT EDIT.
|
||||
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
|
||||
import "strconv"
|
||||
|
||||
func _() {
|
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{}
|
||||
_ = x[CumulativeExportKind-1]
|
||||
_ = x[DeltaExportKind-2]
|
||||
}
|
||||
|
||||
const _ExportKind_name = "CumulativeExportKindDeltaExportKind"
|
||||
|
||||
var _ExportKind_index = [...]uint8{0, 20, 35}
|
||||
|
||||
func (i ExportKind) String() string {
|
||||
i -= 1
|
||||
if i < 0 || i >= ExportKind(len(_ExportKind_index)-1) {
|
||||
return "ExportKind(" + strconv.FormatInt(int64(i+1), 10) + ")"
|
||||
}
|
||||
return _ExportKind_name[_ExportKind_index[i]:_ExportKind_index[i+1]]
|
||||
}
|
@ -1,75 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/metrictest"
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
)
|
||||
|
||||
func TestExportKindIncludes(t *testing.T) {
|
||||
require.True(t, CumulativeExportKind.Includes(CumulativeExportKind))
|
||||
require.True(t, DeltaExportKind.Includes(CumulativeExportKind|DeltaExportKind))
|
||||
}
|
||||
|
||||
var deltaMemoryKinds = []sdkapi.InstrumentKind{
|
||||
sdkapi.CounterObserverInstrumentKind,
|
||||
sdkapi.UpDownCounterObserverInstrumentKind,
|
||||
}
|
||||
|
||||
var cumulativeMemoryKinds = []sdkapi.InstrumentKind{
|
||||
sdkapi.HistogramInstrumentKind,
|
||||
sdkapi.GaugeObserverInstrumentKind,
|
||||
sdkapi.CounterInstrumentKind,
|
||||
sdkapi.UpDownCounterInstrumentKind,
|
||||
}
|
||||
|
||||
func TestExportKindMemoryRequired(t *testing.T) {
|
||||
for _, kind := range deltaMemoryKinds {
|
||||
require.True(t, DeltaExportKind.MemoryRequired(kind))
|
||||
require.False(t, CumulativeExportKind.MemoryRequired(kind))
|
||||
}
|
||||
|
||||
for _, kind := range cumulativeMemoryKinds {
|
||||
require.True(t, CumulativeExportKind.MemoryRequired(kind))
|
||||
require.False(t, DeltaExportKind.MemoryRequired(kind))
|
||||
}
|
||||
}
|
||||
|
||||
func TestExportKindSelectors(t *testing.T) {
|
||||
ceks := CumulativeExportKindSelector()
|
||||
deks := DeltaExportKindSelector()
|
||||
seks := StatelessExportKindSelector()
|
||||
|
||||
for _, ikind := range append(deltaMemoryKinds, cumulativeMemoryKinds...) {
|
||||
desc := metrictest.NewDescriptor("instrument", ikind, number.Int64Kind)
|
||||
|
||||
var akind aggregation.Kind
|
||||
if ikind.Adding() {
|
||||
akind = aggregation.SumKind
|
||||
} else {
|
||||
akind = aggregation.HistogramKind
|
||||
}
|
||||
require.Equal(t, CumulativeExportKind, ceks.ExportKindFor(&desc, akind))
|
||||
require.Equal(t, DeltaExportKind, deks.ExportKindFor(&desc, akind))
|
||||
require.False(t, seks.ExportKindFor(&desc, akind).MemoryRequired(ikind))
|
||||
}
|
||||
}
|
@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:generate stringer -type=ExportKind
|
||||
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
|
||||
import (
|
||||
@ -219,20 +217,10 @@ type Exporter interface {
|
||||
// Processor that just completed collection.
|
||||
Export(ctx context.Context, resource *resource.Resource, reader InstrumentationLibraryReader) error
|
||||
|
||||
// ExportKindSelector is an interface used by the Processor
|
||||
// TemporalitySelector is an interface used by the Processor
|
||||
// in deciding whether to compute Delta or Cumulative
|
||||
// Aggregations when passing Records to this Exporter.
|
||||
ExportKindSelector
|
||||
}
|
||||
|
||||
// ExportKindSelector is a sub-interface of Exporter used to indicate
|
||||
// whether the Processor should compute Delta or Cumulative
|
||||
// Aggregations.
|
||||
type ExportKindSelector interface {
|
||||
// ExportKindFor should return the correct ExportKind that
|
||||
// should be used when exporting data for the given metric
|
||||
// instrument and Aggregator kind.
|
||||
ExportKindFor(descriptor *sdkapi.Descriptor, aggregatorKind aggregation.Kind) ExportKind
|
||||
aggregation.TemporalitySelector
|
||||
}
|
||||
|
||||
// InstrumentationLibraryReader is an interface for exporters to iterate
|
||||
@ -254,7 +242,7 @@ type Reader interface {
|
||||
// period. Each aggregated checkpoint returned by the
|
||||
// function parameter may return an error.
|
||||
//
|
||||
// The ExportKindSelector argument is used to determine
|
||||
// The TemporalitySelector argument is used to determine
|
||||
// whether the Record is computed using Delta or Cumulative
|
||||
// aggregation.
|
||||
//
|
||||
@ -262,7 +250,7 @@ type Reader interface {
|
||||
// expected from the Meter implementation. Any other kind
|
||||
// of error will immediately halt ForEach and return
|
||||
// the error to the caller.
|
||||
ForEach(kindSelector ExportKindSelector, recordFunc func(Record) error) error
|
||||
ForEach(tempSelector aggregation.TemporalitySelector, recordFunc func(Record) error) error
|
||||
|
||||
// Locker supports locking the checkpoint set. Collection
|
||||
// into the checkpoint set cannot take place (in case of a
|
||||
@ -364,90 +352,3 @@ func (r Record) StartTime() time.Time {
|
||||
func (r Record) EndTime() time.Time {
|
||||
return r.end
|
||||
}
|
||||
|
||||
// ExportKind indicates the kind of data exported by an exporter.
|
||||
// These bits may be OR-d together when multiple exporters are in use.
|
||||
type ExportKind int
|
||||
|
||||
const (
|
||||
// CumulativeExportKind indicates that an Exporter expects a
|
||||
// Cumulative Aggregation.
|
||||
CumulativeExportKind ExportKind = 1
|
||||
|
||||
// DeltaExportKind indicates that an Exporter expects a
|
||||
// Delta Aggregation.
|
||||
DeltaExportKind ExportKind = 2
|
||||
)
|
||||
|
||||
// Includes tests whether `kind` includes a specific kind of
|
||||
// exporter.
|
||||
func (kind ExportKind) Includes(has ExportKind) bool {
|
||||
return kind&has != 0
|
||||
}
|
||||
|
||||
// MemoryRequired returns whether an exporter of this kind requires
|
||||
// memory to export correctly.
|
||||
func (kind ExportKind) MemoryRequired(mkind sdkapi.InstrumentKind) bool {
|
||||
switch mkind {
|
||||
case sdkapi.HistogramInstrumentKind, sdkapi.GaugeObserverInstrumentKind,
|
||||
sdkapi.CounterInstrumentKind, sdkapi.UpDownCounterInstrumentKind:
|
||||
// Delta-oriented instruments:
|
||||
return kind.Includes(CumulativeExportKind)
|
||||
|
||||
case sdkapi.CounterObserverInstrumentKind, sdkapi.UpDownCounterObserverInstrumentKind:
|
||||
// Cumulative-oriented instruments:
|
||||
return kind.Includes(DeltaExportKind)
|
||||
}
|
||||
// Something unexpected is happening--we could panic. This
|
||||
// will become an error when the exporter tries to access a
|
||||
// checkpoint, presumably, so let it be.
|
||||
return false
|
||||
}
|
||||
|
||||
type (
|
||||
constantExportKindSelector ExportKind
|
||||
statelessExportKindSelector struct{}
|
||||
)
|
||||
|
||||
var (
|
||||
_ ExportKindSelector = constantExportKindSelector(0)
|
||||
_ ExportKindSelector = statelessExportKindSelector{}
|
||||
)
|
||||
|
||||
// ConstantExportKindSelector returns an ExportKindSelector that returns
|
||||
// a constant ExportKind, one that is either always cumulative or always delta.
|
||||
func ConstantExportKindSelector(kind ExportKind) ExportKindSelector {
|
||||
return constantExportKindSelector(kind)
|
||||
}
|
||||
|
||||
// CumulativeExportKindSelector returns an ExportKindSelector that
|
||||
// always returns CumulativeExportKind.
|
||||
func CumulativeExportKindSelector() ExportKindSelector {
|
||||
return ConstantExportKindSelector(CumulativeExportKind)
|
||||
}
|
||||
|
||||
// DeltaExportKindSelector returns an ExportKindSelector that
|
||||
// always returns DeltaExportKind.
|
||||
func DeltaExportKindSelector() ExportKindSelector {
|
||||
return ConstantExportKindSelector(DeltaExportKind)
|
||||
}
|
||||
|
||||
// StatelessExportKindSelector returns an ExportKindSelector that
|
||||
// always returns the ExportKind that avoids long-term memory
|
||||
// requirements.
|
||||
func StatelessExportKindSelector() ExportKindSelector {
|
||||
return statelessExportKindSelector{}
|
||||
}
|
||||
|
||||
// ExportKindFor implements ExportKindSelector.
|
||||
func (c constantExportKindSelector) ExportKindFor(_ *sdkapi.Descriptor, _ aggregation.Kind) ExportKind {
|
||||
return ExportKind(c)
|
||||
}
|
||||
|
||||
// ExportKindFor implements ExportKindSelector.
|
||||
func (s statelessExportKindSelector) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) ExportKind {
|
||||
if kind == aggregation.SumKind && desc.InstrumentKind().PrecomputedSum() {
|
||||
return CumulativeExportKind
|
||||
}
|
||||
return DeltaExportKind
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ func getMap(t *testing.T, cont *controller.Controller) map[string]float64 {
|
||||
require.NoError(t, cont.ForEach(
|
||||
func(_ instrumentation.Library, reader export.Reader) error {
|
||||
return reader.ForEach(
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
func(record export.Record) error {
|
||||
return out.AddRecord(record)
|
||||
},
|
||||
@ -115,7 +115,7 @@ func TestControllerUsesResource(t *testing.T) {
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) {
|
||||
sel := export.CumulativeExportKindSelector()
|
||||
sel := aggregation.CumulativeTemporalitySelector()
|
||||
exp := processortest.New(sel, attribute.DefaultEncoder())
|
||||
cont := controller.New(
|
||||
processor.NewFactory(
|
||||
@ -145,7 +145,7 @@ func TestStartNoExporter(t *testing.T) {
|
||||
cont := controller.New(
|
||||
processor.NewFactory(
|
||||
processortest.AggregatorSelector(),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
),
|
||||
controller.WithCollectPeriod(time.Second),
|
||||
controller.WithResource(resource.Empty()),
|
||||
@ -214,7 +214,7 @@ func TestObserverCanceled(t *testing.T) {
|
||||
cont := controller.New(
|
||||
processor.NewFactory(
|
||||
processortest.AggregatorSelector(),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
),
|
||||
controller.WithCollectPeriod(0),
|
||||
controller.WithCollectTimeout(time.Millisecond),
|
||||
@ -246,7 +246,7 @@ func TestObserverContext(t *testing.T) {
|
||||
cont := controller.New(
|
||||
processor.NewFactory(
|
||||
processortest.AggregatorSelector(),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
),
|
||||
controller.WithCollectTimeout(0),
|
||||
controller.WithResource(resource.Empty()),
|
||||
@ -278,7 +278,7 @@ type blockingExporter struct {
|
||||
func newBlockingExporter() *blockingExporter {
|
||||
return &blockingExporter{
|
||||
exporter: processortest.New(
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
),
|
||||
}
|
||||
@ -296,11 +296,8 @@ func (b *blockingExporter) Export(ctx context.Context, res *resource.Resource, o
|
||||
return err
|
||||
}
|
||||
|
||||
func (*blockingExporter) ExportKindFor(
|
||||
*sdkapi.Descriptor,
|
||||
aggregation.Kind,
|
||||
) export.ExportKind {
|
||||
return export.CumulativeExportKind
|
||||
func (*blockingExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality {
|
||||
return aggregation.CumulativeTemporality
|
||||
}
|
||||
|
||||
func TestExportTimeout(t *testing.T) {
|
||||
@ -308,7 +305,7 @@ func TestExportTimeout(t *testing.T) {
|
||||
cont := controller.New(
|
||||
processor.NewFactory(
|
||||
processortest.AggregatorSelector(),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
),
|
||||
controller.WithCollectPeriod(time.Second),
|
||||
controller.WithPushTimeout(time.Millisecond),
|
||||
@ -357,7 +354,7 @@ func TestExportTimeout(t *testing.T) {
|
||||
|
||||
func TestCollectAfterStopThenStartAgain(t *testing.T) {
|
||||
exp := processortest.New(
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
)
|
||||
cont := controller.New(
|
||||
@ -436,7 +433,7 @@ func TestCollectAfterStopThenStartAgain(t *testing.T) {
|
||||
|
||||
func TestRegistryFunction(t *testing.T) {
|
||||
exp := processortest.New(
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
)
|
||||
cont := controller.New(
|
||||
|
@ -24,7 +24,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/controllertest"
|
||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
@ -36,7 +36,7 @@ func TestPullNoCollect(t *testing.T) {
|
||||
puller := controller.New(
|
||||
processor.NewFactory(
|
||||
processortest.AggregatorSelector(),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
processor.WithMemory(true),
|
||||
),
|
||||
controller.WithCollectPeriod(0),
|
||||
@ -51,7 +51,7 @@ func TestPullNoCollect(t *testing.T) {
|
||||
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records := processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord))
|
||||
require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter.sum/A=B/": 10,
|
||||
@ -61,7 +61,7 @@ func TestPullNoCollect(t *testing.T) {
|
||||
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records = processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord))
|
||||
require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter.sum/A=B/": 20,
|
||||
@ -72,7 +72,7 @@ func TestPullWithCollect(t *testing.T) {
|
||||
puller := controller.New(
|
||||
processor.NewFactory(
|
||||
processortest.AggregatorSelector(),
|
||||
export.CumulativeExportKindSelector(),
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
processor.WithMemory(true),
|
||||
),
|
||||
controller.WithCollectPeriod(time.Second),
|
||||
@ -89,7 +89,7 @@ func TestPullWithCollect(t *testing.T) {
|
||||
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records := processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord))
|
||||
require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter.sum/A=B/": 10,
|
||||
@ -100,7 +100,7 @@ func TestPullWithCollect(t *testing.T) {
|
||||
// Cached value!
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records = processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord))
|
||||
require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter.sum/A=B/": 10,
|
||||
@ -112,7 +112,7 @@ func TestPullWithCollect(t *testing.T) {
|
||||
// Re-computed value!
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records = processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord))
|
||||
require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter.sum/A=B/": 20,
|
||||
|
@ -67,7 +67,7 @@ func init() {
|
||||
|
||||
func newExporter() *processortest.Exporter {
|
||||
return processortest.New(
|
||||
export.StatelessExportKindSelector(),
|
||||
aggregation.StatelessTemporalitySelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
)
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"github.com/benbjohnson/clock"
|
||||
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
|
||||
)
|
||||
@ -64,7 +65,7 @@ func (t MockTicker) C() <-chan time.Time {
|
||||
// metric).
|
||||
func ReadAll(
|
||||
reader export.InstrumentationLibraryReader,
|
||||
kind export.ExportKindSelector,
|
||||
kind aggregation.TemporalitySelector,
|
||||
apply func(instrumentation.Library, export.Record) error,
|
||||
) error {
|
||||
return reader.ForEach(func(library instrumentation.Library, reader export.Reader) error {
|
||||
|
@ -28,7 +28,7 @@ import (
|
||||
|
||||
type (
|
||||
Processor struct {
|
||||
export.ExportKindSelector
|
||||
aggregation.TemporalitySelector
|
||||
export.AggregatorSelector
|
||||
|
||||
state
|
||||
@ -118,32 +118,32 @@ var _ export.Reader = &state{}
|
||||
// ErrInconsistentState is returned when the sequence of collection's starts and finishes are incorrectly balanced.
|
||||
var ErrInconsistentState = fmt.Errorf("inconsistent processor state")
|
||||
|
||||
// ErrInvalidExportKind is returned for unknown metric.ExportKind.
|
||||
var ErrInvalidExportKind = fmt.Errorf("invalid export kind")
|
||||
// ErrInvalidTemporality is returned for unknown metric.Temporality.
|
||||
var ErrInvalidTemporality = fmt.Errorf("invalid aggregation temporality")
|
||||
|
||||
// New returns a basic Processor that is also a Checkpointer using the provided
|
||||
// AggregatorSelector to select Aggregators. The ExportKindSelector
|
||||
// AggregatorSelector to select Aggregators. The TemporalitySelector
|
||||
// is consulted to determine the kind(s) of exporter that will consume
|
||||
// data, so that this Processor can prepare to compute Delta or
|
||||
// Cumulative Aggregations as needed.
|
||||
func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) *Processor {
|
||||
return NewFactory(aselector, eselector, opts...).NewCheckpointer().(*Processor)
|
||||
func New(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) *Processor {
|
||||
return NewFactory(aselector, tselector, opts...).NewCheckpointer().(*Processor)
|
||||
}
|
||||
|
||||
type factory struct {
|
||||
aselector export.AggregatorSelector
|
||||
eselector export.ExportKindSelector
|
||||
tselector aggregation.TemporalitySelector
|
||||
config config
|
||||
}
|
||||
|
||||
func NewFactory(aselector export.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) export.CheckpointerFactory {
|
||||
func NewFactory(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) export.CheckpointerFactory {
|
||||
var config config
|
||||
for _, opt := range opts {
|
||||
opt.applyProcessor(&config)
|
||||
}
|
||||
return factory{
|
||||
aselector: aselector,
|
||||
eselector: eselector,
|
||||
tselector: tselector,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
@ -153,8 +153,8 @@ var _ export.CheckpointerFactory = factory{}
|
||||
func (f factory) NewCheckpointer() export.Checkpointer {
|
||||
now := time.Now()
|
||||
p := &Processor{
|
||||
AggregatorSelector: f.aselector,
|
||||
ExportKindSelector: f.eselector,
|
||||
AggregatorSelector: f.aselector,
|
||||
TemporalitySelector: f.tselector,
|
||||
state: state{
|
||||
values: map[stateKey]*stateValue{},
|
||||
processStart: now,
|
||||
@ -181,7 +181,7 @@ func (b *Processor) Process(accum export.Accumulation) error {
|
||||
// Check if there is an existing value.
|
||||
value, ok := b.state.values[key]
|
||||
if !ok {
|
||||
stateful := b.ExportKindFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.InstrumentKind())
|
||||
stateful := b.TemporalityFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.InstrumentKind())
|
||||
|
||||
newValue := &stateValue{
|
||||
labels: accum.Labels(),
|
||||
@ -227,7 +227,7 @@ func (b *Processor) Process(accum export.Accumulation) error {
|
||||
// instrument reports a PrecomputedSum to a DeltaExporter or
|
||||
// the reverse, a non-PrecomputedSum instrument with a
|
||||
// CumulativeExporter. This logic is encapsulated in
|
||||
// ExportKind.MemoryRequired(InstrumentKind).
|
||||
// Temporality.MemoryRequired(InstrumentKind).
|
||||
//
|
||||
// Case (b) occurs when the variable `sameCollection` is true,
|
||||
// indicating that the stateKey for Accumulation has already
|
||||
@ -340,7 +340,7 @@ func (b *Processor) FinishCollection() error {
|
||||
// ForEach iterates through the Reader, passing an
|
||||
// export.Record with the appropriate Cumulative or Delta aggregation
|
||||
// to an exporter.
|
||||
func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error {
|
||||
func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.Record) error) error {
|
||||
if b.startedCollection != b.finishedCollection {
|
||||
return ErrInconsistentState
|
||||
}
|
||||
@ -356,9 +356,9 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record
|
||||
continue
|
||||
}
|
||||
|
||||
ekind := exporter.ExportKindFor(key.descriptor, value.current.Aggregation().Kind())
|
||||
switch ekind {
|
||||
case export.CumulativeExportKind:
|
||||
aggTemp := exporter.TemporalityFor(key.descriptor, value.current.Aggregation().Kind())
|
||||
switch aggTemp {
|
||||
case aggregation.CumulativeTemporality:
|
||||
// If stateful, the sum has been computed. If stateless, the
|
||||
// input was already cumulative. Either way, use the checkpointed
|
||||
// value:
|
||||
@ -369,7 +369,7 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record
|
||||
}
|
||||
start = b.processStart
|
||||
|
||||
case export.DeltaExportKind:
|
||||
case aggregation.DeltaTemporality:
|
||||
// Precomputed sums are a special case.
|
||||
if mkind.PrecomputedSum() {
|
||||
agg = value.delta.Aggregation()
|
||||
@ -379,7 +379,7 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record
|
||||
start = b.intervalStart
|
||||
|
||||
default:
|
||||
return fmt.Errorf("%v: %w", ekind, ErrInvalidExportKind)
|
||||
return fmt.Errorf("%v: %w", aggTemp, ErrInvalidTemporality)
|
||||
}
|
||||
|
||||
if err := f(export.NewRecord(
|
||||
|
@ -47,7 +47,7 @@ func requireNotAfter(t *testing.T, t1, t2 time.Time) {
|
||||
// TestProcessor tests all the non-error paths in this package.
|
||||
func TestProcessor(t *testing.T) {
|
||||
type exportCase struct {
|
||||
kind export.ExportKind
|
||||
kind aggregation.Temporality
|
||||
}
|
||||
type instrumentCase struct {
|
||||
kind sdkapi.InstrumentKind
|
||||
@ -60,8 +60,8 @@ func TestProcessor(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range []exportCase{
|
||||
{kind: export.CumulativeExportKind},
|
||||
{kind: export.DeltaExportKind},
|
||||
{kind: aggregation.CumulativeTemporality},
|
||||
{kind: aggregation.DeltaTemporality},
|
||||
} {
|
||||
t.Run(tc.kind.String(), func(t *testing.T) {
|
||||
for _, ic := range []instrumentCase{
|
||||
@ -121,7 +121,7 @@ func updateFor(t *testing.T, desc *sdkapi.Descriptor, selector export.Aggregator
|
||||
|
||||
func testProcessor(
|
||||
t *testing.T,
|
||||
ekind export.ExportKind,
|
||||
aggTemp aggregation.Temporality,
|
||||
mkind sdkapi.InstrumentKind,
|
||||
nkind number.Kind,
|
||||
akind aggregation.Kind,
|
||||
@ -134,7 +134,7 @@ func testProcessor(
|
||||
labs2 := []attribute.KeyValue{attribute.String("L2", "V")}
|
||||
|
||||
testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) {
|
||||
processor := basic.New(selector, export.ConstantExportKindSelector(ekind), basic.WithMemory(hasMemory))
|
||||
processor := basic.New(selector, aggregation.ConstantTemporalitySelector(aggTemp), basic.WithMemory(hasMemory))
|
||||
|
||||
instSuffix := fmt.Sprint(".", strings.ToLower(akind.String()))
|
||||
|
||||
@ -166,7 +166,7 @@ func testProcessor(
|
||||
_, canSub := subr.(export.Subtractor)
|
||||
|
||||
// Allow unsupported subraction case only when it is called for.
|
||||
require.True(t, mkind.PrecomputedSum() && ekind == export.DeltaExportKind && !canSub)
|
||||
require.True(t, mkind.PrecomputedSum() && aggTemp == aggregation.DeltaTemporality && !canSub)
|
||||
return
|
||||
} else if err != nil {
|
||||
t.Fatal("unexpected FinishCollection error: ", err)
|
||||
@ -190,7 +190,7 @@ func testProcessor(
|
||||
|
||||
// Test the final checkpoint state.
|
||||
records1 := processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
err = reader.ForEach(export.ConstantExportKindSelector(ekind), records1.AddRecord)
|
||||
err = reader.ForEach(aggregation.ConstantTemporalitySelector(aggTemp), records1.AddRecord)
|
||||
|
||||
// Test for an allowed error:
|
||||
if err != nil && err != aggregation.ErrNoSubtraction {
|
||||
@ -203,7 +203,7 @@ func testProcessor(
|
||||
// number of Accumulators, unless LastValue aggregation.
|
||||
// If a precomputed sum, we expect cumulative inputs.
|
||||
if mkind.PrecomputedSum() {
|
||||
if ekind == export.DeltaExportKind && akind != aggregation.LastValueKind {
|
||||
if aggTemp == aggregation.DeltaTemporality && akind != aggregation.LastValueKind {
|
||||
multiplier = int64(nAccum)
|
||||
} else if akind == aggregation.LastValueKind {
|
||||
multiplier = cumulativeMultiplier
|
||||
@ -211,7 +211,7 @@ func testProcessor(
|
||||
multiplier = cumulativeMultiplier * int64(nAccum)
|
||||
}
|
||||
} else {
|
||||
if ekind == export.CumulativeExportKind && akind != aggregation.LastValueKind {
|
||||
if aggTemp == aggregation.CumulativeTemporality && akind != aggregation.LastValueKind {
|
||||
multiplier = cumulativeMultiplier * int64(nAccum)
|
||||
} else if akind == aggregation.LastValueKind {
|
||||
multiplier = 1
|
||||
@ -223,7 +223,7 @@ func testProcessor(
|
||||
// Synchronous accumulate results from multiple accumulators,
|
||||
// use that number as the baseline multiplier.
|
||||
multiplier = int64(nAccum)
|
||||
if ekind == export.CumulativeExportKind {
|
||||
if aggTemp == aggregation.CumulativeTemporality {
|
||||
// If a cumulative exporter, include prior checkpoints.
|
||||
multiplier *= cumulativeMultiplier
|
||||
}
|
||||
@ -265,8 +265,8 @@ func testProcessor(
|
||||
|
||||
type bogusExporter struct{}
|
||||
|
||||
func (bogusExporter) ExportKindFor(*sdkapi.Descriptor, aggregation.Kind) export.ExportKind {
|
||||
return 1000000
|
||||
func (bogusExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality {
|
||||
return 100
|
||||
}
|
||||
|
||||
func (bogusExporter) Export(context.Context, export.Reader) error {
|
||||
@ -275,39 +275,39 @@ func (bogusExporter) Export(context.Context, export.Reader) error {
|
||||
|
||||
func TestBasicInconsistent(t *testing.T) {
|
||||
// Test double-start
|
||||
b := basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector())
|
||||
b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
b.StartCollection()
|
||||
b.StartCollection()
|
||||
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test finish without start
|
||||
b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector())
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test no finish
|
||||
b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector())
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
b.StartCollection()
|
||||
require.Equal(
|
||||
t,
|
||||
basic.ErrInconsistentState,
|
||||
b.ForEach(
|
||||
export.StatelessExportKindSelector(),
|
||||
aggregation.StatelessTemporalitySelector(),
|
||||
func(export.Record) error { return nil },
|
||||
),
|
||||
)
|
||||
|
||||
// Test no start
|
||||
b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector())
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
desc := metrictest.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind)
|
||||
accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{})
|
||||
require.Equal(t, basic.ErrInconsistentState, b.Process(accum))
|
||||
|
||||
// Test invalid kind:
|
||||
b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector())
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b.StartCollection()
|
||||
require.NoError(t, b.Process(accum))
|
||||
require.NoError(t, b.FinishCollection())
|
||||
@ -316,14 +316,14 @@ func TestBasicInconsistent(t *testing.T) {
|
||||
bogusExporter{},
|
||||
func(export.Record) error { return nil },
|
||||
)
|
||||
require.True(t, errors.Is(err, basic.ErrInvalidExportKind))
|
||||
require.True(t, errors.Is(err, basic.ErrInvalidTemporality))
|
||||
|
||||
}
|
||||
|
||||
func TestBasicTimestamps(t *testing.T) {
|
||||
beforeNew := time.Now()
|
||||
time.Sleep(time.Nanosecond)
|
||||
b := basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector())
|
||||
b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
time.Sleep(time.Nanosecond)
|
||||
afterNew := time.Now()
|
||||
|
||||
@ -336,7 +336,7 @@ func TestBasicTimestamps(t *testing.T) {
|
||||
|
||||
var start1, end1 time.Time
|
||||
|
||||
require.NoError(t, b.ForEach(export.StatelessExportKindSelector(), func(rec export.Record) error {
|
||||
require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error {
|
||||
start1 = rec.StartTime()
|
||||
end1 = rec.EndTime()
|
||||
return nil
|
||||
@ -353,7 +353,7 @@ func TestBasicTimestamps(t *testing.T) {
|
||||
|
||||
var start2, end2 time.Time
|
||||
|
||||
require.NoError(t, b.ForEach(export.StatelessExportKindSelector(), func(rec export.Record) error {
|
||||
require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error {
|
||||
start2 = rec.StartTime()
|
||||
end2 = rec.EndTime()
|
||||
return nil
|
||||
@ -370,12 +370,12 @@ func TestBasicTimestamps(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatefulNoMemoryCumulative(t *testing.T) {
|
||||
ekindSel := export.CumulativeExportKindSelector()
|
||||
aggTempSel := aggregation.CumulativeTemporalitySelector()
|
||||
|
||||
desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterInstrumentKind, number.Int64Kind)
|
||||
selector := processorTest.AggregatorSelector()
|
||||
|
||||
processor := basic.New(selector, ekindSel, basic.WithMemory(false))
|
||||
processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
|
||||
reader := processor.Reader()
|
||||
|
||||
for i := 1; i < 3; i++ {
|
||||
@ -385,7 +385,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
|
||||
|
||||
// Verify zero elements
|
||||
records := processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(ekindSel, records.AddRecord))
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{}, records.Map())
|
||||
|
||||
// Add 10
|
||||
@ -395,7 +395,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
|
||||
|
||||
// Verify one element
|
||||
records = processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(ekindSel, records.AddRecord))
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"inst.sum/A=B/": float64(i * 10),
|
||||
}, records.Map())
|
||||
@ -403,12 +403,12 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatefulNoMemoryDelta(t *testing.T) {
|
||||
ekindSel := export.DeltaExportKindSelector()
|
||||
aggTempSel := aggregation.DeltaTemporalitySelector()
|
||||
|
||||
desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind)
|
||||
selector := processorTest.AggregatorSelector()
|
||||
|
||||
processor := basic.New(selector, ekindSel, basic.WithMemory(false))
|
||||
processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
|
||||
reader := processor.Reader()
|
||||
|
||||
for i := 1; i < 3; i++ {
|
||||
@ -418,7 +418,7 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
|
||||
|
||||
// Verify zero elements
|
||||
records := processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(ekindSel, records.AddRecord))
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{}, records.Map())
|
||||
|
||||
// Add 10
|
||||
@ -428,7 +428,7 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
|
||||
|
||||
// Verify one element
|
||||
records = processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(ekindSel, records.AddRecord))
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"inst.sum/A=B/": 10,
|
||||
}, records.Map())
|
||||
@ -436,15 +436,15 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMultiObserverSum(t *testing.T) {
|
||||
for _, ekindSel := range []export.ExportKindSelector{
|
||||
export.CumulativeExportKindSelector(),
|
||||
export.DeltaExportKindSelector(),
|
||||
for _, aggTempSel := range []aggregation.TemporalitySelector{
|
||||
aggregation.CumulativeTemporalitySelector(),
|
||||
aggregation.DeltaTemporalitySelector(),
|
||||
} {
|
||||
|
||||
desc := metrictest.NewDescriptor("observe.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind)
|
||||
selector := processorTest.AggregatorSelector()
|
||||
|
||||
processor := basic.New(selector, ekindSel, basic.WithMemory(false))
|
||||
processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
|
||||
reader := processor.Reader()
|
||||
|
||||
for i := 1; i < 3; i++ {
|
||||
@ -457,13 +457,13 @@ func TestMultiObserverSum(t *testing.T) {
|
||||
|
||||
// Multiplier is 1 for deltas, otherwise i.
|
||||
multiplier := i
|
||||
if ekindSel.ExportKindFor(&desc, aggregation.SumKind) == export.DeltaExportKind {
|
||||
if aggTempSel.TemporalityFor(&desc, aggregation.SumKind) == aggregation.DeltaTemporality {
|
||||
multiplier = 1
|
||||
}
|
||||
|
||||
// Verify one element
|
||||
records := processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(ekindSel, records.AddRecord))
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"observe.sum/A=B/": float64(3 * 10 * multiplier),
|
||||
}, records.Map())
|
||||
@ -473,7 +473,7 @@ func TestMultiObserverSum(t *testing.T) {
|
||||
|
||||
func TestCounterObserverEndToEnd(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
eselector := export.CumulativeExportKindSelector()
|
||||
eselector := aggregation.CumulativeTemporalitySelector()
|
||||
proc := basic.New(
|
||||
processorTest.AggregatorSelector(),
|
||||
eselector,
|
||||
|
@ -82,7 +82,7 @@ type (
|
||||
// Exporter is a testing implementation of export.Exporter that
|
||||
// assembles its results as a map[string]float64.
|
||||
Exporter struct {
|
||||
export.ExportKindSelector
|
||||
aggregation.TemporalitySelector
|
||||
output *Output
|
||||
exportCount int
|
||||
|
||||
@ -230,7 +230,7 @@ func NewOutput(labelEncoder attribute.Encoder) *Output {
|
||||
}
|
||||
|
||||
// ForEach implements export.Reader.
|
||||
func (o *Output) ForEach(_ export.ExportKindSelector, ff func(export.Record) error) error {
|
||||
func (o *Output) ForEach(_ aggregation.TemporalitySelector, ff func(export.Record) error) error {
|
||||
for key, value := range o.m {
|
||||
if err := ff(export.NewRecord(
|
||||
key.desc,
|
||||
@ -281,7 +281,7 @@ func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource
|
||||
// is chosen, whichever is implemented by the underlying Aggregator.
|
||||
func (o *Output) Map() map[string]float64 {
|
||||
r := make(map[string]float64)
|
||||
err := o.ForEach(export.StatelessExportKindSelector(), func(record export.Record) error {
|
||||
err := o.ForEach(aggregation.StatelessTemporalitySelector(), func(record export.Record) error {
|
||||
for key, entry := range o.m {
|
||||
encoded := entry.labels.Encoded(o.labelEncoder)
|
||||
rencoded := entry.resource.Encoded(o.labelEncoder)
|
||||
@ -344,10 +344,10 @@ func (o *Output) AddAccumulation(acc export.Accumulation) error {
|
||||
//
|
||||
// Where in the example A=1,B=2 is the encoded labels and R=V is the
|
||||
// encoded resource value.
|
||||
func New(selector export.ExportKindSelector, encoder attribute.Encoder) *Exporter {
|
||||
func New(selector aggregation.TemporalitySelector, encoder attribute.Encoder) *Exporter {
|
||||
return &Exporter{
|
||||
ExportKindSelector: selector,
|
||||
output: NewOutput(encoder),
|
||||
TemporalitySelector: selector,
|
||||
output: NewOutput(encoder),
|
||||
}
|
||||
}
|
||||
|
||||
@ -356,7 +356,7 @@ func (e *Exporter) Export(_ context.Context, res *resource.Resource, ckpt export
|
||||
defer e.output.Unlock()
|
||||
e.exportCount++
|
||||
return ckpt.ForEach(func(library instrumentation.Library, mr export.Reader) error {
|
||||
return mr.ForEach(e.ExportKindSelector, func(r export.Record) error {
|
||||
return mr.ForEach(e.TemporalitySelector, func(r export.Record) error {
|
||||
if e.InjectErr != nil {
|
||||
if err := e.InjectErr(r); err != nil {
|
||||
return err
|
||||
@ -433,7 +433,7 @@ type metricReader struct {
|
||||
|
||||
var _ export.Reader = &metricReader{}
|
||||
|
||||
func (m *metricReader) ForEach(_ export.ExportKindSelector, fn func(export.Record) error) error {
|
||||
func (m *metricReader) ForEach(_ aggregation.TemporalitySelector, fn func(export.Record) error) error {
|
||||
for _, record := range m.records {
|
||||
if err := fn(record); err != nil && err != aggregation.ErrNoData {
|
||||
return err
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
@ -71,7 +72,7 @@ func TestProcessorTesting(t *testing.T) {
|
||||
|
||||
// Export the data and validate it again.
|
||||
exporter := processorTest.New(
|
||||
export.StatelessExportKindSelector(),
|
||||
aggregation.StatelessTemporalitySelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
)
|
||||
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
@ -91,7 +91,7 @@ func TestFilterProcessor(t *testing.T) {
|
||||
|
||||
// Test a filter with the ../basic Processor.
|
||||
func TestFilterBasicProcessor(t *testing.T) {
|
||||
basicProc := basic.New(processorTest.AggregatorSelector(), export.CumulativeExportKindSelector())
|
||||
basicProc := basic.New(processorTest.AggregatorSelector(), aggregation.CumulativeTemporalitySelector())
|
||||
accum := metricsdk.NewAccumulator(
|
||||
reducer.New(testFilter{}, basicProc),
|
||||
)
|
||||
|
Reference in New Issue
Block a user