You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	Adds Histogram Aggregator (#433)
* histogram aggregator draft
* add tests for buckets
* naming stuffs
* docs
* add tests for buckets
* fix doc
* update year
* adds docs for Histogram
* docs for boundaries.
* addresses review comments
Change to less-than buckets. Add offset checks. Unexport fields that don't need to be exported. Fix tests when running on profile with int64 number kind.
* sort boundaries
* remove testing field
* fixes import order
* remove print 🙈
			
			
This commit is contained in:
		
				
					committed by
					
						 Rahul Patel
						Rahul Patel
					
				
			
			
				
	
			
			
			
						parent
						
							dcd0a10493
						
					
				
				
					commit
					2c460f0d97
				
			| @@ -63,6 +63,20 @@ type ( | ||||
| 		Points() ([]core.Number, error) | ||||
| 	} | ||||
|  | ||||
| 	// Buckets represents histogram buckets boundaries and counts. | ||||
| 	// | ||||
| 	// For a Histogram with N defined boundaries, e.g, [x, y, z]. | ||||
| 	// There are N+1 counts: [-inf, x), [x, y), [y, z), [z, +inf] | ||||
| 	Buckets struct { | ||||
| 		Boundaries []core.Number | ||||
| 		Counts     []core.Number | ||||
| 	} | ||||
|  | ||||
| 	// Histogram returns the count of events in pre-determined buckets. | ||||
| 	Histogram interface { | ||||
| 		Histogram() (Buckets, error) | ||||
| 	} | ||||
|  | ||||
| 	// MinMaxSumCount supports the Min, Max, Sum, and Count interfaces. | ||||
| 	MinMaxSumCount interface { | ||||
| 		Min | ||||
|   | ||||
							
								
								
									
										184
									
								
								sdk/metric/aggregator/histogram/histogram.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										184
									
								
								sdk/metric/aggregator/histogram/histogram.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,184 @@ | ||||
| // Copyright 2020, OpenTelemetry Authors | ||||
| // | ||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| // you may not use this file except in compliance with the License. | ||||
| // You may obtain a copy of the License at | ||||
| // | ||||
| //     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| // | ||||
| // Unless required by applicable law or agreed to in writing, software | ||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| // See the License for the specific language governing permissions and | ||||
| // limitations under the License. | ||||
|  | ||||
| package histogram // import "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sort" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/api/core" | ||||
| 	export "go.opentelemetry.io/otel/sdk/export/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/export/metric/aggregator" | ||||
| ) | ||||
|  | ||||
| type ( | ||||
| 	// Aggregator observe events and counts them in pre-determined buckets. | ||||
| 	// It also calculates the sum and count of all events. | ||||
| 	Aggregator struct { | ||||
| 		// state needs to be aligned for 64-bit atomic operations. | ||||
| 		current state | ||||
| 		// checkpoint needs to be aligned for 64-bit atomic operations. | ||||
| 		checkpoint state | ||||
| 		boundaries []core.Number | ||||
| 		kind       core.NumberKind | ||||
| 	} | ||||
|  | ||||
| 	// state represents the state of a histogram, consisting of | ||||
| 	// the sum and counts for all observed values and | ||||
| 	// the less than equal bucket count for the pre-determined boundaries. | ||||
| 	state struct { | ||||
| 		// all fields have to be aligned for 64-bit atomic operations. | ||||
| 		buckets aggregator.Buckets | ||||
| 		count   core.Number | ||||
| 		sum     core.Number | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| var _ export.Aggregator = &Aggregator{} | ||||
| var _ aggregator.Sum = &Aggregator{} | ||||
| var _ aggregator.Count = &Aggregator{} | ||||
| var _ aggregator.Histogram = &Aggregator{} | ||||
|  | ||||
| // New returns a new measure aggregator for computing Histograms. | ||||
| // | ||||
| // A Histogram observe events and counts them in pre-defined buckets. | ||||
| // And also provides the total sum and count of all observations. | ||||
| // | ||||
| // Note that this aggregator maintains each value using independent | ||||
| // atomic operations, which introduces the possibility that | ||||
| // checkpoints are inconsistent. | ||||
| func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { | ||||
| 	// Boundaries MUST be ordered otherwise the histogram could not | ||||
| 	// be properly computed. | ||||
| 	sortedBoundaries := numbers{ | ||||
| 		numbers: make([]core.Number, len(boundaries)), | ||||
| 		kind:    desc.NumberKind(), | ||||
| 	} | ||||
|  | ||||
| 	copy(sortedBoundaries.numbers, boundaries) | ||||
| 	sort.Sort(&sortedBoundaries) | ||||
| 	boundaries = sortedBoundaries.numbers | ||||
|  | ||||
| 	agg := Aggregator{ | ||||
| 		kind:       desc.NumberKind(), | ||||
| 		boundaries: boundaries, | ||||
| 		current: state{ | ||||
| 			buckets: aggregator.Buckets{ | ||||
| 				Boundaries: boundaries, | ||||
| 				Counts:     make([]core.Number, len(boundaries)+1), | ||||
| 			}, | ||||
| 		}, | ||||
| 		checkpoint: state{ | ||||
| 			buckets: aggregator.Buckets{ | ||||
| 				Boundaries: boundaries, | ||||
| 				Counts:     make([]core.Number, len(boundaries)+1), | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	return &agg | ||||
| } | ||||
|  | ||||
| // Sum returns the sum of all values in the checkpoint. | ||||
| func (c *Aggregator) Sum() (core.Number, error) { | ||||
| 	return c.checkpoint.sum, nil | ||||
| } | ||||
|  | ||||
| // Count returns the number of values in the checkpoint. | ||||
| func (c *Aggregator) Count() (int64, error) { | ||||
| 	return int64(c.checkpoint.count.AsUint64()), nil | ||||
| } | ||||
|  | ||||
| // Histogram returns the count of events in pre-determined buckets. | ||||
| func (c *Aggregator) Histogram() (aggregator.Buckets, error) { | ||||
| 	return c.checkpoint.buckets, nil | ||||
| } | ||||
|  | ||||
| // Checkpoint saves the current state and resets the current state to | ||||
| // the empty set.  Since no locks are taken, there is a chance that | ||||
| // the independent Sum, Count and Bucket Count are not consistent with each | ||||
| // other. | ||||
| func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { | ||||
| 	// N.B. There is no atomic operation that can update all three | ||||
| 	// values at once without a memory allocation. | ||||
| 	// | ||||
| 	// This aggregator is intended to trade this correctness for | ||||
| 	// speed. | ||||
| 	// | ||||
| 	// Therefore, atomically swap fields independently, knowing | ||||
| 	// that individually the three parts of this aggregation could | ||||
| 	// be spread across multiple collections in rare cases. | ||||
|  | ||||
| 	c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0)) | ||||
| 	c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0)) | ||||
|  | ||||
| 	for i := 0; i < len(c.checkpoint.buckets.Counts); i++ { | ||||
| 		c.checkpoint.buckets.Counts[i].SetUint64(c.current.buckets.Counts[i].SwapUint64Atomic(0)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Update adds the recorded measurement to the current data set. | ||||
| func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { | ||||
| 	kind := desc.NumberKind() | ||||
|  | ||||
| 	c.current.count.AddUint64Atomic(1) | ||||
| 	c.current.sum.AddNumberAtomic(kind, number) | ||||
|  | ||||
| 	for i, boundary := range c.boundaries { | ||||
| 		if number.CompareNumber(kind, boundary) < 0 { | ||||
| 			c.current.buckets.Counts[i].AddUint64Atomic(1) | ||||
| 			return nil | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Observed event is bigger than all defined boundaries. | ||||
| 	c.current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Merge combines two data sets into one. | ||||
| func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error { | ||||
| 	o, _ := oa.(*Aggregator) | ||||
| 	if o == nil { | ||||
| 		return aggregator.NewInconsistentMergeError(c, oa) | ||||
| 	} | ||||
|  | ||||
| 	c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) | ||||
| 	c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count) | ||||
|  | ||||
| 	for i := 0; i < len(c.current.buckets.Counts); i++ { | ||||
| 		c.checkpoint.buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.buckets.Counts[i]) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // numbers is an auxiliary struct to order histogram bucket boundaries (slice of core.Number) | ||||
| type numbers struct { | ||||
| 	numbers []core.Number | ||||
| 	kind    core.NumberKind | ||||
| } | ||||
|  | ||||
| var _ sort.Interface = (*numbers)(nil) | ||||
|  | ||||
| func (n *numbers) Len() int { | ||||
| 	return len(n.numbers) | ||||
| } | ||||
|  | ||||
| func (n *numbers) Less(i, j int) bool { | ||||
| 	return -1 == n.numbers[i].CompareNumber(n.kind, n.numbers[j]) | ||||
| } | ||||
|  | ||||
| func (n *numbers) Swap(i, j int) { | ||||
| 	n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i] | ||||
| } | ||||
							
								
								
									
										259
									
								
								sdk/metric/aggregator/histogram/histogram_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										259
									
								
								sdk/metric/aggregator/histogram/histogram_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,259 @@ | ||||
| // Copyright 2020, OpenTelemetry Authors | ||||
| // | ||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| // you may not use this file except in compliance with the License. | ||||
| // You may obtain a copy of the License at | ||||
| // | ||||
| //     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| // | ||||
| // Unless required by applicable law or agreed to in writing, software | ||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| // See the License for the specific language governing permissions and | ||||
| // limitations under the License. | ||||
|  | ||||
| package histogram | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
| 	"math/rand" | ||||
| 	"os" | ||||
| 	"sort" | ||||
| 	"testing" | ||||
| 	"unsafe" | ||||
|  | ||||
| 	"github.com/stretchr/testify/require" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/api/core" | ||||
| 	ottest "go.opentelemetry.io/otel/internal/testing" | ||||
| 	export "go.opentelemetry.io/otel/sdk/export/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/test" | ||||
| ) | ||||
|  | ||||
| const count = 100 | ||||
|  | ||||
| type policy struct { | ||||
| 	name     string | ||||
| 	absolute bool | ||||
| 	sign     func() int | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	positiveOnly = policy{ | ||||
| 		name:     "absolute", | ||||
| 		absolute: true, | ||||
| 		sign:     func() int { return +1 }, | ||||
| 	} | ||||
| 	negativeOnly = policy{ | ||||
| 		name:     "negative", | ||||
| 		absolute: false, | ||||
| 		sign:     func() int { return -1 }, | ||||
| 	} | ||||
| 	positiveAndNegative = policy{ | ||||
| 		name:     "positiveAndNegative", | ||||
| 		absolute: false, | ||||
| 		sign: func() int { | ||||
| 			if rand.Uint32() > math.MaxUint32/2 { | ||||
| 				return -1 | ||||
| 			} | ||||
| 			return 1 | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	boundaries = map[core.NumberKind][]core.Number{ | ||||
| 		core.Float64NumberKind: {core.NewFloat64Number(500), core.NewFloat64Number(250), core.NewFloat64Number(750)}, | ||||
| 		core.Int64NumberKind:   {core.NewInt64Number(500), core.NewInt64Number(250), core.NewInt64Number(750)}, | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| // Ensure struct alignment prior to running tests. | ||||
| func TestMain(m *testing.M) { | ||||
| 	fields := []ottest.FieldOffset{ | ||||
| 		{ | ||||
| 			Name:   "Aggregator.current", | ||||
| 			Offset: unsafe.Offsetof(Aggregator{}.current), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:   "Aggregator.checkpoint", | ||||
| 			Offset: unsafe.Offsetof(Aggregator{}.checkpoint), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:   "state.buckets", | ||||
| 			Offset: unsafe.Offsetof(state{}.buckets), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:   "state.sum", | ||||
| 			Offset: unsafe.Offsetof(state{}.sum), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:   "state.count", | ||||
| 			Offset: unsafe.Offsetof(state{}.count), | ||||
| 		}, | ||||
| 	} | ||||
| 	fmt.Println(fields) | ||||
|  | ||||
| 	if !ottest.Aligned8Byte(fields, os.Stderr) { | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
|  | ||||
| 	os.Exit(m.Run()) | ||||
| } | ||||
|  | ||||
| func TestHistogramAbsolute(t *testing.T) { | ||||
| 	test.RunProfiles(t, func(t *testing.T, profile test.Profile) { | ||||
| 		histogram(t, profile, positiveOnly) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func TestHistogramNegativeOnly(t *testing.T) { | ||||
| 	test.RunProfiles(t, func(t *testing.T, profile test.Profile) { | ||||
| 		histogram(t, profile, negativeOnly) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func TestHistogramPositiveAndNegative(t *testing.T) { | ||||
| 	test.RunProfiles(t, func(t *testing.T, profile test.Profile) { | ||||
| 		histogram(t, profile, positiveAndNegative) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // Validates count, sum and buckets for a given profile and policy | ||||
| func histogram(t *testing.T, profile test.Profile, policy policy) { | ||||
| 	ctx := context.Background() | ||||
| 	descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute) | ||||
|  | ||||
| 	agg := New(descriptor, boundaries[profile.NumberKind]) | ||||
|  | ||||
| 	all := test.NewNumbers(profile.NumberKind) | ||||
|  | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		x := profile.Random(policy.sign()) | ||||
| 		all.Append(x) | ||||
| 		test.CheckedUpdate(t, agg, x, descriptor) | ||||
| 	} | ||||
|  | ||||
| 	agg.Checkpoint(ctx, descriptor) | ||||
|  | ||||
| 	all.Sort() | ||||
|  | ||||
| 	asum, err := agg.Sum() | ||||
| 	sum := all.Sum() | ||||
| 	require.InEpsilon(t, | ||||
| 		sum.CoerceToFloat64(profile.NumberKind), | ||||
| 		asum.CoerceToFloat64(profile.NumberKind), | ||||
| 		0.000000001, | ||||
| 		"Same sum - "+policy.name) | ||||
| 	require.Nil(t, err) | ||||
|  | ||||
| 	count, err := agg.Count() | ||||
| 	require.Equal(t, all.Count(), count, "Same count -"+policy.name) | ||||
| 	require.Nil(t, err) | ||||
|  | ||||
| 	require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") | ||||
|  | ||||
| 	counts := calcBuckets(all.Points(), profile) | ||||
| 	for i, v := range counts { | ||||
| 		bCount := agg.checkpoint.buckets.Counts[i].AsUint64() | ||||
| 		require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestHistogramMerge(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
|  | ||||
| 	test.RunProfiles(t, func(t *testing.T, profile test.Profile) { | ||||
| 		descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) | ||||
|  | ||||
| 		agg1 := New(descriptor, boundaries[profile.NumberKind]) | ||||
| 		agg2 := New(descriptor, boundaries[profile.NumberKind]) | ||||
|  | ||||
| 		all := test.NewNumbers(profile.NumberKind) | ||||
|  | ||||
| 		for i := 0; i < count; i++ { | ||||
| 			x := profile.Random(+1) | ||||
| 			all.Append(x) | ||||
| 			test.CheckedUpdate(t, agg1, x, descriptor) | ||||
| 		} | ||||
| 		for i := 0; i < count; i++ { | ||||
| 			x := profile.Random(+1) | ||||
| 			all.Append(x) | ||||
| 			test.CheckedUpdate(t, agg2, x, descriptor) | ||||
| 		} | ||||
|  | ||||
| 		agg1.Checkpoint(ctx, descriptor) | ||||
| 		agg2.Checkpoint(ctx, descriptor) | ||||
|  | ||||
| 		test.CheckedMerge(t, agg1, agg2, descriptor) | ||||
|  | ||||
| 		all.Sort() | ||||
|  | ||||
| 		asum, err := agg1.Sum() | ||||
| 		sum := all.Sum() | ||||
| 		require.InEpsilon(t, | ||||
| 			sum.CoerceToFloat64(profile.NumberKind), | ||||
| 			asum.CoerceToFloat64(profile.NumberKind), | ||||
| 			0.000000001, | ||||
| 			"Same sum - absolute") | ||||
| 		require.Nil(t, err) | ||||
|  | ||||
| 		count, err := agg1.Count() | ||||
| 		require.Equal(t, all.Count(), count, "Same count - absolute") | ||||
| 		require.Nil(t, err) | ||||
|  | ||||
| 		require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") | ||||
|  | ||||
| 		counts := calcBuckets(all.Points(), profile) | ||||
| 		for i, v := range counts { | ||||
| 			bCount := agg1.checkpoint.buckets.Counts[i].AsUint64() | ||||
| 			require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts) | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func TestHistogramNotSet(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
|  | ||||
| 	test.RunProfiles(t, func(t *testing.T, profile test.Profile) { | ||||
| 		descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) | ||||
|  | ||||
| 		agg := New(descriptor, boundaries[profile.NumberKind]) | ||||
| 		agg.Checkpoint(ctx, descriptor) | ||||
|  | ||||
| 		asum, err := agg.Sum() | ||||
| 		require.Equal(t, core.Number(0), asum, "Empty checkpoint sum = 0") | ||||
| 		require.Nil(t, err) | ||||
|  | ||||
| 		count, err := agg.Count() | ||||
| 		require.Equal(t, int64(0), count, "Empty checkpoint count = 0") | ||||
| 		require.Nil(t, err) | ||||
|  | ||||
| 		require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") | ||||
| 		for i, bCount := range agg.checkpoint.buckets.Counts { | ||||
| 			require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i) | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func calcBuckets(points []core.Number, profile test.Profile) []uint64 { | ||||
| 	sortedBoundaries := numbers{ | ||||
| 		numbers: make([]core.Number, len(boundaries[profile.NumberKind])), | ||||
| 		kind:    profile.NumberKind, | ||||
| 	} | ||||
|  | ||||
| 	copy(sortedBoundaries.numbers, boundaries[profile.NumberKind]) | ||||
| 	sort.Sort(&sortedBoundaries) | ||||
| 	boundaries := sortedBoundaries.numbers | ||||
|  | ||||
| 	counts := make([]uint64, len(boundaries)+1) | ||||
| 	idx := 0 | ||||
| 	for _, p := range points { | ||||
| 		for idx < len(boundaries) && p.CompareNumber(profile.NumberKind, boundaries[idx]) != -1 { | ||||
| 			idx++ | ||||
| 		} | ||||
| 		counts[idx]++ | ||||
| 	} | ||||
|  | ||||
| 	return counts | ||||
| } | ||||
		Reference in New Issue
	
	Block a user