1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-04 09:43:23 +02:00

Histogram aggregator initial state (fix #735) (#736)

* Add a test

* Add comments and description options

* Another test

* Undo buffer re-use

* Mod tidy

* Precommit

* Again

* Copyright

* Undo rename
This commit is contained in:
Joshua MacDonald 2020-05-18 09:44:33 -07:00 committed by GitHub
parent 6bc14ffd2c
commit 2dee67652a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 30 deletions

View File

@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus_test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
sdk "go.opentelemetry.io/otel/sdk/metric"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
// This test demonstrates that it is relatively difficult to setup a
// Prometheus export pipeline:
//
// 1. The default boundaries are difficult to pass, should be []float instead of []metric.Number
// 2. The push controller doesn't make sense b/c Prometheus is pull-bsaed
//
// TODO: Address these issues; add Resources to the test.
func ExampleNewExportPipeline() {
// Create a meter
selector := simple.NewWithHistogramDistribution(nil)
exporter, err := prometheus.NewRawExporter(prometheus.Config{})
if err != nil {
panic(err)
}
integrator := integrator.New(selector, true)
meterImpl := sdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(meterImpl, "example")
ctx := context.Background()
// Use two instruments
counter := metric.Must(meter).NewInt64Counter(
"a.counter",
metric.WithDescription("Counts things"),
)
recorder := metric.Must(meter).NewInt64ValueRecorder(
"a.valuerecorder",
metric.WithDescription("Records values"),
)
counter.Add(ctx, 100, kv.String("key", "value"))
recorder.Record(ctx, 100, kv.String("key", "value"))
// Simulate a push
meterImpl.Collect(ctx)
err = exporter.Export(ctx, nil, integrator.CheckpointSet())
if err != nil {
panic(err)
}
// GET the HTTP endpoint
var input bytes.Buffer
resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/", &input)
if err != nil {
panic(err)
}
exporter.ServeHTTP(resp, req)
data, err := ioutil.ReadAll(resp.Result().Body)
if err != nil {
panic(err)
}
fmt.Print(string(data))
// Output:
// # HELP a_counter Counts things
// # TYPE a_counter counter
// a_counter{key="value"} 100
// # HELP a_valuerecorder Records values
// # TYPE a_valuerecorder histogram
// a_valuerecorder_bucket{key="value",le="+Inf"} 1
// a_valuerecorder_sum{key="value"} 100
// a_valuerecorder_count{key="value"} 1
}

View File

@ -24,6 +24,11 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
// Note: This code uses a Mutex to govern access to the exclusive
// aggregator state. This is in contrast to a lock-free approach
// (as in the Go prometheus client) that was reverted here:
// https://github.com/open-telemetry/opentelemetry-go/pull/669
type (
// Aggregator observe events and counts them in pre-determined buckets.
// It also calculates the sum and count of all events.
@ -39,8 +44,7 @@ type (
// the sum and counts for all observed values and
// the less than equal bucket count for the pre-determined boundaries.
state struct {
// all fields have to be aligned for 64-bit atomic operations.
buckets aggregator.Buckets
bucketCounts []metric.Number
count metric.Number
sum metric.Number
}
@ -71,17 +75,12 @@ func New(desc *metric.Descriptor, boundaries []metric.Number) *Aggregator {
sort.Sort(&sortedBoundaries)
boundaries = sortedBoundaries.numbers
agg := Aggregator{
return &Aggregator{
kind: desc.NumberKind(),
boundaries: boundaries,
current: state{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]metric.Number, len(boundaries)+1),
},
},
current: emptyState(boundaries),
checkpoint: emptyState(boundaries),
}
return &agg
}
// Sum returns the sum of all values in the checkpoint.
@ -102,7 +101,10 @@ func (c *Aggregator) Count() (int64, error) {
func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint.buckets, nil
return aggregator.Buckets{
Boundaries: c.boundaries,
Counts: c.checkpoint.bucketCounts,
}, nil
}
// Checkpoint saves the current state and resets the current state to
@ -111,16 +113,13 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
// other.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, c.emptyState()
c.checkpoint, c.current = c.current, emptyState(c.boundaries)
c.lock.Unlock()
}
func (c *Aggregator) emptyState() state {
func emptyState(boundaries []metric.Number) state {
return state{
buckets: aggregator.Buckets{
Boundaries: c.boundaries,
Counts: make([]metric.Number, len(c.boundaries)+1),
},
bucketCounts: make([]metric.Number, len(boundaries)+1),
}
}
@ -141,7 +140,7 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
c.current.count.AddInt64(1)
c.current.sum.AddNumber(kind, number)
c.current.buckets.Counts[bucketID].AddUint64(1)
c.current.bucketCounts[bucketID].AddUint64(1)
return nil
}
@ -156,8 +155,8 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(metric.Uint64NumberKind, o.checkpoint.count)
for i := 0; i < len(c.checkpoint.buckets.Counts); i++ {
c.checkpoint.buckets.Counts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.buckets.Counts[i])
for i := 0; i < len(c.checkpoint.bucketCounts); i++ {
c.checkpoint.bucketCounts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.bucketCounts[i])
}
return nil
}

View File

@ -113,15 +113,28 @@ func histogram(t *testing.T, profile test.Profile, policy policy) {
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.Nil(t, err)
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg.checkpoint.buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts)
bCount := agg.checkpoint.bucketCounts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.bucketCounts)
}
}
func TestHistogramInitial(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor, boundaries[profile.NumberKind])
buckets, err := agg.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries[profile.NumberKind])+1)
require.Equal(t, len(buckets.Boundaries), len(boundaries[profile.NumberKind]))
})
}
func TestHistogramMerge(t *testing.T) {
ctx := context.Background()
@ -164,12 +177,12 @@ func TestHistogramMerge(t *testing.T) {
require.Equal(t, all.Count(), count, "Same count - absolute")
require.Nil(t, err)
require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
require.Equal(t, len(agg1.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg1.checkpoint.buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts)
bCount := agg1.checkpoint.bucketCounts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.bucketCounts)
}
})
}
@ -191,8 +204,8 @@ func TestHistogramNotSet(t *testing.T) {
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.Nil(t, err)
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range agg.checkpoint.buckets.Counts {
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range agg.checkpoint.bucketCounts {
require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i)
}
})