mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-05 22:54:18 +02:00
Switch MinMaxSumCount to a mutex lock instead of StateLocker (#667)
* Switch MinMaxSumCount to a mutex lock instead of StateLocker With multiple values being modified for each Update(), a single mutex lock and non-atomic operations ends up being faster than making each value update into an atomic operation. * Remove StateLocker implementation and comparison benchmarks * Remove field offset tests. No longer required with no atomics. Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
This commit is contained in:
parent
ee30252752
commit
bd16ce0cfa
@ -16,26 +16,25 @@ package minmaxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/internal"
|
||||
)
|
||||
|
||||
type (
|
||||
// Aggregator aggregates measure events, keeping only the max,
|
||||
// Aggregator aggregates measure events, keeping only the min, max,
|
||||
// sum, and count.
|
||||
Aggregator struct {
|
||||
// states has to be aligned for 64-bit atomic operations.
|
||||
states [2]state
|
||||
lock internal.StateLocker
|
||||
kind core.NumberKind
|
||||
lock sync.Mutex
|
||||
current state
|
||||
checkpoint state
|
||||
kind core.NumberKind
|
||||
}
|
||||
|
||||
state struct {
|
||||
// all fields have to be aligned for 64-bit atomic operations.
|
||||
count core.Number
|
||||
sum core.Number
|
||||
min core.Number
|
||||
@ -47,27 +46,18 @@ var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.MinMaxSumCount = &Aggregator{}
|
||||
|
||||
// New returns a new measure aggregator for computing min, max, sum, and
|
||||
// count. It does not compute quantile information other than Max.
|
||||
// count. It does not compute quantile information other than Min and Max.
|
||||
//
|
||||
// This aggregator uses the StateLocker pattern to guarantee
|
||||
// the count, sum, min and max are consistent within a checkpoint
|
||||
// This type uses a mutex for Update() and Checkpoint() concurrency.
|
||||
func New(desc *metric.Descriptor) *Aggregator {
|
||||
kind := desc.NumberKind()
|
||||
return &Aggregator{
|
||||
kind: kind,
|
||||
states: [2]state{
|
||||
{
|
||||
count: core.NewUint64Number(0),
|
||||
sum: kind.Zero(),
|
||||
min: kind.Maximum(),
|
||||
max: kind.Minimum(),
|
||||
},
|
||||
{
|
||||
count: core.NewUint64Number(0),
|
||||
sum: kind.Zero(),
|
||||
min: kind.Maximum(),
|
||||
max: kind.Minimum(),
|
||||
},
|
||||
current: state{
|
||||
count: core.NewUint64Number(0),
|
||||
sum: kind.Zero(),
|
||||
min: kind.Maximum(),
|
||||
max: kind.Minimum(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -76,14 +66,14 @@ func New(desc *metric.Descriptor) *Aggregator {
|
||||
func (c *Aggregator) Sum() (core.Number, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
return c.checkpoint().sum, nil
|
||||
return c.checkpoint.sum, nil
|
||||
}
|
||||
|
||||
// Count returns the number of values in the checkpoint.
|
||||
func (c *Aggregator) Count() (int64, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil
|
||||
return c.checkpoint.count.CoerceToInt64(core.Uint64NumberKind), nil
|
||||
}
|
||||
|
||||
// Min returns the minimum value in the checkpoint.
|
||||
@ -92,10 +82,10 @@ func (c *Aggregator) Count() (int64, error) {
|
||||
func (c *Aggregator) Min() (core.Number, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
|
||||
if c.checkpoint.count.IsZero(core.Uint64NumberKind) {
|
||||
return c.kind.Zero(), aggregator.ErrNoData
|
||||
}
|
||||
return c.checkpoint().min, nil
|
||||
return c.checkpoint.min, nil
|
||||
}
|
||||
|
||||
// Max returns the maximum value in the checkpoint.
|
||||
@ -104,63 +94,43 @@ func (c *Aggregator) Min() (core.Number, error) {
|
||||
func (c *Aggregator) Max() (core.Number, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
|
||||
if c.checkpoint.count.IsZero(core.Uint64NumberKind) {
|
||||
return c.kind.Zero(), aggregator.ErrNoData
|
||||
}
|
||||
return c.checkpoint().max, nil
|
||||
return c.checkpoint.max, nil
|
||||
}
|
||||
|
||||
// Checkpoint saves the current state and resets the current state to
|
||||
// the empty set.
|
||||
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
|
||||
c.lock.SwapActiveState(c.resetCheckpoint)
|
||||
c.lock.Lock()
|
||||
c.checkpoint, c.current = c.current, c.emptyState()
|
||||
c.lock.Unlock()
|
||||
}
|
||||
|
||||
// checkpoint returns the "cold" state, i.e. state collected prior to the
|
||||
// most recent Checkpoint() call
|
||||
func (c *Aggregator) checkpoint() *state {
|
||||
return &c.states[c.lock.ColdIdx()]
|
||||
}
|
||||
|
||||
func (c *Aggregator) resetCheckpoint() {
|
||||
checkpoint := c.checkpoint()
|
||||
|
||||
checkpoint.count.SetUint64(0)
|
||||
checkpoint.sum.SetNumber(c.kind.Zero())
|
||||
checkpoint.min.SetNumber(c.kind.Maximum())
|
||||
checkpoint.max.SetNumber(c.kind.Minimum())
|
||||
func (c *Aggregator) emptyState() state {
|
||||
kind := c.kind
|
||||
return state{
|
||||
count: core.NewUint64Number(0),
|
||||
sum: kind.Zero(),
|
||||
min: kind.Maximum(),
|
||||
max: kind.Minimum(),
|
||||
}
|
||||
}
|
||||
|
||||
// Update adds the recorded measurement to the current data set.
|
||||
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *metric.Descriptor) error {
|
||||
kind := desc.NumberKind()
|
||||
|
||||
cIdx := c.lock.Start()
|
||||
defer c.lock.End(cIdx)
|
||||
|
||||
current := &c.states[cIdx]
|
||||
current.count.AddUint64Atomic(1)
|
||||
current.sum.AddNumberAtomic(kind, number)
|
||||
|
||||
for {
|
||||
cmin := current.min.AsNumberAtomic()
|
||||
|
||||
if number.CompareNumber(kind, cmin) >= 0 {
|
||||
break
|
||||
}
|
||||
if current.min.CompareAndSwapNumber(cmin, number) {
|
||||
break
|
||||
}
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.current.count.AddInt64(1)
|
||||
c.current.sum.AddNumber(kind, number)
|
||||
if number.CompareNumber(kind, c.current.min) < 0 {
|
||||
c.current.min = number
|
||||
}
|
||||
for {
|
||||
cmax := current.max.AsNumberAtomic()
|
||||
|
||||
if number.CompareNumber(kind, cmax) <= 0 {
|
||||
break
|
||||
}
|
||||
if current.max.CompareAndSwapNumber(cmax, number) {
|
||||
break
|
||||
}
|
||||
if number.CompareNumber(kind, c.current.max) > 0 {
|
||||
c.current.max = number
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -172,22 +142,14 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
|
||||
return aggregator.NewInconsistentMergeError(c, oa)
|
||||
}
|
||||
|
||||
// Lock() synchronizes Merge() and Checkpoint() to ensure all operations of
|
||||
// Merge() are performed on the same state.
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
|
||||
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
|
||||
|
||||
current := c.checkpoint()
|
||||
ocheckpoint := o.checkpoint()
|
||||
|
||||
current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
|
||||
current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)
|
||||
|
||||
if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 {
|
||||
current.min.SetNumber(ocheckpoint.min)
|
||||
if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 {
|
||||
c.checkpoint.min.SetNumber(o.checkpoint.min)
|
||||
}
|
||||
if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 {
|
||||
current.max.SetNumber(ocheckpoint.max)
|
||||
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
|
||||
c.checkpoint.max.SetNumber(o.checkpoint.max)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -18,15 +18,12 @@ import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"unsafe"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
ottest "go.opentelemetry.io/otel/internal/testing"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
|
||||
)
|
||||
@ -62,37 +59,6 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
// Ensure struct alignment prior to running tests.
|
||||
func TestMain(m *testing.M) {
|
||||
fields := []ottest.FieldOffset{
|
||||
{
|
||||
Name: "Aggregator.states",
|
||||
Offset: unsafe.Offsetof(Aggregator{}.states),
|
||||
},
|
||||
{
|
||||
Name: "state.count",
|
||||
Offset: unsafe.Offsetof(state{}.count),
|
||||
},
|
||||
{
|
||||
Name: "state.sum",
|
||||
Offset: unsafe.Offsetof(state{}.sum),
|
||||
},
|
||||
{
|
||||
Name: "state.min",
|
||||
Offset: unsafe.Offsetof(state{}.min),
|
||||
},
|
||||
{
|
||||
Name: "state.max",
|
||||
Offset: unsafe.Offsetof(state{}.max),
|
||||
},
|
||||
}
|
||||
if !ottest.Aligned8Byte(fields, os.Stderr) {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountAbsolute(t *testing.T) {
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
minMaxSumCount(t, profile, positiveOnly)
|
||||
|
@ -12,10 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// This test is too large for the race detector. This SDK uses no locks
|
||||
// that the race detector would help with, anyway.
|
||||
// +build !race
|
||||
|
||||
package metric_test
|
||||
|
||||
import (
|
||||
|
Loading…
Reference in New Issue
Block a user