You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-07-17 01:12:45 +02:00
Use the label.Set.Equivalent value instead of an encoding in the batcher (#658)
This commit is contained in:
@ -159,7 +159,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
|
|||||||
// it could try again on the next scrape and no data would be lost, only resolution.
|
// it could try again on the next scrape and no data would be lost, only resolution.
|
||||||
//
|
//
|
||||||
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
|
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
|
||||||
batcher := ungrouped.New(selector, label.DefaultEncoder(), true)
|
batcher := ungrouped.New(selector, true)
|
||||||
pusher := push.New(batcher, exporter, period)
|
pusher := push.New(batcher, exporter, period)
|
||||||
pusher.Start()
|
pusher.Start()
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
batcher := ungrouped.New(selector, exporter.config.LabelEncoder, true)
|
batcher := ungrouped.New(selector, true)
|
||||||
pusher := push.New(batcher, exporter, period)
|
pusher := push.New(batcher, exporter, period)
|
||||||
pusher.Start()
|
pusher.Start()
|
||||||
|
|
||||||
|
@ -28,7 +28,6 @@ import (
|
|||||||
|
|
||||||
"go.opentelemetry.io/otel/api/core"
|
"go.opentelemetry.io/otel/api/core"
|
||||||
"go.opentelemetry.io/otel/api/key"
|
"go.opentelemetry.io/otel/api/key"
|
||||||
"go.opentelemetry.io/otel/api/label"
|
|
||||||
"go.opentelemetry.io/otel/api/metric"
|
"go.opentelemetry.io/otel/api/metric"
|
||||||
metricapi "go.opentelemetry.io/otel/api/metric"
|
metricapi "go.opentelemetry.io/otel/api/metric"
|
||||||
"go.opentelemetry.io/otel/exporters/otlp"
|
"go.opentelemetry.io/otel/exporters/otlp"
|
||||||
@ -112,7 +111,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
|
|||||||
}
|
}
|
||||||
|
|
||||||
selector := simple.NewWithExactMeasure()
|
selector := simple.NewWithExactMeasure()
|
||||||
batcher := ungrouped.New(selector, label.DefaultEncoder(), true)
|
batcher := ungrouped.New(selector, true)
|
||||||
pusher := push.New(batcher, exp, 60*time.Second)
|
pusher := push.New(batcher, exp, 60*time.Second)
|
||||||
pusher.Start()
|
pusher.Start()
|
||||||
|
|
||||||
|
@ -29,12 +29,11 @@ type (
|
|||||||
selector export.AggregationSelector
|
selector export.AggregationSelector
|
||||||
batchMap batchMap
|
batchMap batchMap
|
||||||
stateful bool
|
stateful bool
|
||||||
labelEncoder label.Encoder
|
|
||||||
}
|
}
|
||||||
|
|
||||||
batchKey struct {
|
batchKey struct {
|
||||||
descriptor *metric.Descriptor
|
descriptor *metric.Descriptor
|
||||||
encoded string
|
distinct label.Distinct
|
||||||
}
|
}
|
||||||
|
|
||||||
batchValue struct {
|
batchValue struct {
|
||||||
@ -48,12 +47,11 @@ type (
|
|||||||
var _ export.Batcher = &Batcher{}
|
var _ export.Batcher = &Batcher{}
|
||||||
var _ export.CheckpointSet = batchMap{}
|
var _ export.CheckpointSet = batchMap{}
|
||||||
|
|
||||||
func New(selector export.AggregationSelector, labelEncoder label.Encoder, stateful bool) *Batcher {
|
func New(selector export.AggregationSelector, stateful bool) *Batcher {
|
||||||
return &Batcher{
|
return &Batcher{
|
||||||
selector: selector,
|
selector: selector,
|
||||||
batchMap: batchMap{},
|
batchMap: batchMap{},
|
||||||
stateful: stateful,
|
stateful: stateful,
|
||||||
labelEncoder: labelEncoder,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,10 +61,9 @@ func (b *Batcher) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator
|
|||||||
|
|
||||||
func (b *Batcher) Process(_ context.Context, record export.Record) error {
|
func (b *Batcher) Process(_ context.Context, record export.Record) error {
|
||||||
desc := record.Descriptor()
|
desc := record.Descriptor()
|
||||||
encoded := record.Labels().Encoded(b.labelEncoder)
|
|
||||||
key := batchKey{
|
key := batchKey{
|
||||||
descriptor: desc,
|
descriptor: desc,
|
||||||
encoded: encoded,
|
distinct: record.Labels().Equivalent(),
|
||||||
}
|
}
|
||||||
agg := record.Aggregator()
|
agg := record.Aggregator()
|
||||||
value, ok := b.batchMap[key]
|
value, ok := b.batchMap[key]
|
||||||
|
@ -30,7 +30,7 @@ import (
|
|||||||
|
|
||||||
func TestUngroupedStateless(t *testing.T) {
|
func TestUngroupedStateless(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
b := ungrouped.New(test.NewAggregationSelector(), test.SdkEncoder, false)
|
b := ungrouped.New(test.NewAggregationSelector(), false)
|
||||||
|
|
||||||
// Set initial lastValue values
|
// Set initial lastValue values
|
||||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
|
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
|
||||||
@ -92,7 +92,7 @@ func TestUngroupedStateless(t *testing.T) {
|
|||||||
|
|
||||||
func TestUngroupedStateful(t *testing.T) {
|
func TestUngroupedStateful(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
b := ungrouped.New(test.NewAggregationSelector(), test.SdkEncoder, true)
|
b := ungrouped.New(test.NewAggregationSelector(), true)
|
||||||
|
|
||||||
counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)
|
counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)
|
||||||
caggA := counterA.Aggregator()
|
caggA := counterA.Aggregator()
|
||||||
|
Reference in New Issue
Block a user