You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-07-15 01:04:25 +02:00
Add single instrument callback and split metric instrument configuration (#3507)
* Split metric inst config Instead of having the same configuration for both the Synchronous and Asynchronous instruments, use specific options for both. * Use Async/Sync opt for appropriate inst * Update noop inst providers * Update internal global impl * Update sdk * Remove unused method for callbackOption * Test instrument configuration * Lint imports * Add changes to changelog * Refactor callbacks and further split opts Define callbacks to return the value observed. Because of the different types returned for different observables, the callbacks and options are move to the sync/async packages. * Update noop impl * Fix example_test.go * Fix internal impl * Update Callbacks Return observations for distinct attr sets. * Refactor common code in sdk/metric inst provider * Update examples and prom exporter * Generalize callback * Update changelog * Add unit tests for callback * Add meter tests for cbacks on creation * Rename Observations to Measurements * Update Callback to accept an Observer * Update SDK impl * Move conf to instrument pkg * Apply suggestions from code review
This commit is contained in:
@ -10,6 +10,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
### Added
|
||||
|
||||
- The `WithInt64Callback` option is added to `go.opentelemetry.io/otel/metric/instrument` to configure int64 Observer callbacks during their creation. (#3507)
|
||||
- The `WithFloat64Callback` option is added to `go.opentelemetry.io/otel/metric/instrument` to configure float64 Observer callbacks during their creation. (#3507)
|
||||
- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
|
||||
This `Registration` can be used to unregister callbacks. (#3522)
|
||||
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
|
||||
@ -30,6 +32,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
### Changed
|
||||
|
||||
- Instrument configuration in `go.opentelemetry.io/otel/metric/instrument` is split into specific options and confguration based on the instrument type. (#3507)
|
||||
- Use the added `Int64Option` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/syncint64`.
|
||||
- Use the added `Float64Option` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/syncfloat64`.
|
||||
- Use the added `Int64ObserverOption` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/asyncint64`.
|
||||
- Use the added `Float64ObserverOption` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/asyncfloat64`.
|
||||
- The `InstrumentProvider` from `go.opentelemetry.io/otel/sdk/metric/asyncint64` is removed.
|
||||
Use the new creation methods of the `Meter` in `go.opentelemetry.io/otel/sdk/metric` instead. (#3530)
|
||||
- The `Counter` method is replaced by `Meter.Int64ObservableCounter`
|
||||
|
101
metric/instrument/asyncfloat64.go
Normal file
101
metric/instrument/asyncfloat64.go
Normal file
@ -0,0 +1,101 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
// Float64Observer is a recorder of float64 measurement values.
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type Float64Observer interface {
|
||||
Asynchronous
|
||||
|
||||
// Observe records the measurement value for a set of attributes.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of
|
||||
// the registered callback it should have no effect on the instrument, and
|
||||
// an error will be reported via the error handler.
|
||||
Observe(ctx context.Context, value float64, attributes ...attribute.KeyValue)
|
||||
}
|
||||
|
||||
// Float64Callback is a function registered with a Meter that makes
|
||||
// observations for a Float64Observer it is registered with.
|
||||
//
|
||||
// The function needs to complete in a finite amount of time and the deadline
|
||||
// of the passed context is expected to be honored.
|
||||
//
|
||||
// The function needs to make unique observations across all registered
|
||||
// Float64Callbacks. Meaning, it should not report measurements with the same
|
||||
// attributes as another Float64Callbacks also registered for the same
|
||||
// instrument.
|
||||
//
|
||||
// The function needs to be concurrent safe.
|
||||
type Float64Callback func(context.Context, Float64Observer) error
|
||||
|
||||
// Float64ObserverConfig contains options for Asynchronous instruments that
|
||||
// observe float64 values.
|
||||
type Float64ObserverConfig struct {
|
||||
description string
|
||||
unit unit.Unit
|
||||
callbacks []Float64Callback
|
||||
}
|
||||
|
||||
// NewFloat64ObserverConfig returns a new Float64ObserverConfig with all opts
|
||||
// applied.
|
||||
func NewFloat64ObserverConfig(opts ...Float64ObserverOption) Float64ObserverConfig {
|
||||
var config Float64ObserverConfig
|
||||
for _, o := range opts {
|
||||
config = o.applyFloat64Observer(config)
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// Description returns the Config description.
|
||||
func (c Float64ObserverConfig) Description() string {
|
||||
return c.description
|
||||
}
|
||||
|
||||
// Unit returns the Config unit.
|
||||
func (c Float64ObserverConfig) Unit() unit.Unit {
|
||||
return c.unit
|
||||
}
|
||||
|
||||
// Callbacks returns the Config callbacks.
|
||||
func (c Float64ObserverConfig) Callbacks() []Float64Callback {
|
||||
return c.callbacks
|
||||
}
|
||||
|
||||
// Float64ObserverOption applies options to float64 Observer instruments.
|
||||
type Float64ObserverOption interface {
|
||||
applyFloat64Observer(Float64ObserverConfig) Float64ObserverConfig
|
||||
}
|
||||
|
||||
type float64ObserverOptionFunc func(Float64ObserverConfig) Float64ObserverConfig
|
||||
|
||||
func (fn float64ObserverOptionFunc) applyFloat64Observer(cfg Float64ObserverConfig) Float64ObserverConfig {
|
||||
return fn(cfg)
|
||||
}
|
||||
|
||||
// WithFloat64Callback adds callback to be called for an instrument.
|
||||
func WithFloat64Callback(callback Float64Callback) Float64ObserverOption {
|
||||
return float64ObserverOptionFunc(func(cfg Float64ObserverConfig) Float64ObserverConfig {
|
||||
cfg.callbacks = append(cfg.callbacks, callback)
|
||||
return cfg
|
||||
})
|
||||
}
|
@ -14,53 +14,28 @@
|
||||
|
||||
package asyncfloat64 // import "go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
|
||||
|
||||
import (
|
||||
"context"
|
||||
import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/instrument"
|
||||
)
|
||||
|
||||
// Counter is an instrument that records increasing values.
|
||||
// Counter is an instrument used to asynchronously record increasing float64
|
||||
// measurements once per a measurement collection cycle. The Observe method is
|
||||
// used to record the measured state of the instrument when it is called.
|
||||
// Implementations will assume the observed value to be the cumulative sum of
|
||||
// the count.
|
||||
//
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type Counter interface {
|
||||
// Observe records the state of the instrument to be x. Implementations
|
||||
// will assume x to be the cumulative sum of the count.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of the
|
||||
// registered callback it should have no effect on the instrument, and an
|
||||
// error will be reported via the error handler.
|
||||
Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue)
|
||||
type Counter interface{ instrument.Float64Observer }
|
||||
|
||||
instrument.Asynchronous
|
||||
}
|
||||
|
||||
// UpDownCounter is an instrument that records increasing or decreasing values.
|
||||
// UpDownCounter is an instrument used to asynchronously record float64
|
||||
// measurements once per a measurement collection cycle. The Observe method is
|
||||
// used to record the measured state of the instrument when it is called.
|
||||
// Implementations will assume the observed value to be the cumulative sum of
|
||||
// the count.
|
||||
//
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type UpDownCounter interface {
|
||||
// Observe records the state of the instrument to be x. Implementations
|
||||
// will assume x to be the cumulative sum of the count.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of the
|
||||
// registered callback it should have no effect on the instrument, and an
|
||||
// error will be reported via the error handler.
|
||||
Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue)
|
||||
type UpDownCounter interface{ instrument.Float64Observer }
|
||||
|
||||
instrument.Asynchronous
|
||||
}
|
||||
|
||||
// Gauge is an instrument that records independent readings.
|
||||
// Gauge is an instrument used to asynchronously record instantaneous float64
|
||||
// measurements once per a measurement collection cycle.
|
||||
//
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type Gauge interface {
|
||||
// Observe records the state of the instrument to be x.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of the
|
||||
// registered callback it should have no effect on the instrument, and an
|
||||
// error will be reported via the error handler.
|
||||
Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue)
|
||||
|
||||
instrument.Asynchronous
|
||||
}
|
||||
type Gauge interface{ instrument.Float64Observer }
|
||||
|
62
metric/instrument/asyncfloat64_test.go
Normal file
62
metric/instrument/asyncfloat64_test.go
Normal file
@ -0,0 +1,62 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
func TestFloat64ObserverOptions(t *testing.T) {
|
||||
const (
|
||||
token float64 = 43
|
||||
desc = "Instrument description."
|
||||
uBytes = unit.Bytes
|
||||
)
|
||||
|
||||
got := NewFloat64ObserverConfig(
|
||||
WithDescription(desc),
|
||||
WithUnit(uBytes),
|
||||
WithFloat64Callback(func(ctx context.Context, o Float64Observer) error {
|
||||
o.Observe(ctx, token)
|
||||
return nil
|
||||
}),
|
||||
)
|
||||
assert.Equal(t, desc, got.Description(), "description")
|
||||
assert.Equal(t, uBytes, got.Unit(), "unit")
|
||||
|
||||
// Functions are not comparable.
|
||||
cBacks := got.Callbacks()
|
||||
require.Len(t, cBacks, 1, "callbacks")
|
||||
o := &float64Observer{}
|
||||
err := cBacks[0](context.Background(), o)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, token, o.got, "callback not set")
|
||||
}
|
||||
|
||||
type float64Observer struct {
|
||||
Asynchronous
|
||||
got float64
|
||||
}
|
||||
|
||||
func (o *float64Observer) Observe(_ context.Context, v float64, _ ...attribute.KeyValue) {
|
||||
o.got = v
|
||||
}
|
102
metric/instrument/asyncint64.go
Normal file
102
metric/instrument/asyncint64.go
Normal file
@ -0,0 +1,102 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
// Int64Observer is a recorder of int64 measurement values.
|
||||
//
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type Int64Observer interface {
|
||||
Asynchronous
|
||||
|
||||
// Observe records the measurement value for a set of attributes.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of
|
||||
// the registered callback it should have no effect on the instrument, and
|
||||
// an error will be reported via the error handler.
|
||||
Observe(ctx context.Context, value int64, attributes ...attribute.KeyValue)
|
||||
}
|
||||
|
||||
// Int64Callback is a function registered with a Meter that makes
|
||||
// observations for an Int64Observer it is registered with.
|
||||
//
|
||||
// The function needs to complete in a finite amount of time and the deadline
|
||||
// of the passed context is expected to be honored.
|
||||
//
|
||||
// The function needs to make unique observations across all registered
|
||||
// Int64Callback. Meaning, it should not report measurements with the same
|
||||
// attributes as another Int64Callbacks also registered for the same
|
||||
// instrument.
|
||||
//
|
||||
// The function needs to be concurrent safe.
|
||||
type Int64Callback func(context.Context, Int64Observer) error
|
||||
|
||||
// Int64ObserverConfig contains options for Asynchronous instruments that
|
||||
// observe int64 values.
|
||||
type Int64ObserverConfig struct {
|
||||
description string
|
||||
unit unit.Unit
|
||||
callbacks []Int64Callback
|
||||
}
|
||||
|
||||
// NewInt64ObserverConfig returns a new Int64ObserverConfig with all opts
|
||||
// applied.
|
||||
func NewInt64ObserverConfig(opts ...Int64ObserverOption) Int64ObserverConfig {
|
||||
var config Int64ObserverConfig
|
||||
for _, o := range opts {
|
||||
config = o.applyInt64Observer(config)
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// Description returns the Config description.
|
||||
func (c Int64ObserverConfig) Description() string {
|
||||
return c.description
|
||||
}
|
||||
|
||||
// Unit returns the Config unit.
|
||||
func (c Int64ObserverConfig) Unit() unit.Unit {
|
||||
return c.unit
|
||||
}
|
||||
|
||||
// Callbacks returns the Config callbacks.
|
||||
func (c Int64ObserverConfig) Callbacks() []Int64Callback {
|
||||
return c.callbacks
|
||||
}
|
||||
|
||||
// Int64ObserverOption applies options to int64 Observer instruments.
|
||||
type Int64ObserverOption interface {
|
||||
applyInt64Observer(Int64ObserverConfig) Int64ObserverConfig
|
||||
}
|
||||
|
||||
type int64ObserverOptionFunc func(Int64ObserverConfig) Int64ObserverConfig
|
||||
|
||||
func (fn int64ObserverOptionFunc) applyInt64Observer(cfg Int64ObserverConfig) Int64ObserverConfig {
|
||||
return fn(cfg)
|
||||
}
|
||||
|
||||
// WithInt64Callback adds callback to be called for an instrument.
|
||||
func WithInt64Callback(callback Int64Callback) Int64ObserverOption {
|
||||
return int64ObserverOptionFunc(func(cfg Int64ObserverConfig) Int64ObserverConfig {
|
||||
cfg.callbacks = append(cfg.callbacks, callback)
|
||||
return cfg
|
||||
})
|
||||
}
|
@ -14,53 +14,28 @@
|
||||
|
||||
package asyncint64 // import "go.opentelemetry.io/otel/metric/instrument/asyncint64"
|
||||
|
||||
import (
|
||||
"context"
|
||||
import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/instrument"
|
||||
)
|
||||
|
||||
// Counter is an instrument that records increasing values.
|
||||
// Counter is an instrument used to asynchronously record increasing int64
|
||||
// measurements once per a measurement collection cycle. The Observe method is
|
||||
// used to record the measured state of the instrument when it is called.
|
||||
// Implementations will assume the observed value to be the cumulative sum of
|
||||
// the count.
|
||||
//
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type Counter interface {
|
||||
// Observe records the state of the instrument to be x. Implementations
|
||||
// will assume x to be the cumulative sum of the count.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of the
|
||||
// registered callback it should have no effect on the instrument, and an
|
||||
// error will be reported via the error handler.
|
||||
Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue)
|
||||
type Counter interface{ instrument.Int64Observer }
|
||||
|
||||
instrument.Asynchronous
|
||||
}
|
||||
|
||||
// UpDownCounter is an instrument that records increasing or decreasing values.
|
||||
// UpDownCounter is an instrument used to asynchronously record int64
|
||||
// measurements once per a measurement collection cycle. The Observe method is
|
||||
// used to record the measured state of the instrument when it is called.
|
||||
// Implementations will assume the observed value to be the cumulative sum of
|
||||
// the count.
|
||||
//
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type UpDownCounter interface {
|
||||
// Observe records the state of the instrument to be x. Implementations
|
||||
// will assume x to be the cumulative sum of the count.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of the
|
||||
// registered callback it should have no effect on the instrument, and an
|
||||
// error will be reported via the error handler.
|
||||
Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue)
|
||||
type UpDownCounter interface{ instrument.Int64Observer }
|
||||
|
||||
instrument.Asynchronous
|
||||
}
|
||||
|
||||
// Gauge is an instrument that records independent readings.
|
||||
// Gauge is an instrument used to asynchronously record instantaneous int64
|
||||
// measurements once per a measurement collection cycle.
|
||||
//
|
||||
// Warning: methods may be added to this interface in minor releases.
|
||||
type Gauge interface {
|
||||
// Observe records the state of the instrument to be x.
|
||||
//
|
||||
// It is only valid to call this within a callback. If called outside of the
|
||||
// registered callback it should have no effect on the instrument, and an
|
||||
// error will be reported via the error handler.
|
||||
Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue)
|
||||
|
||||
instrument.Asynchronous
|
||||
}
|
||||
type Gauge interface{ instrument.Int64Observer }
|
||||
|
62
metric/instrument/asyncint64_test.go
Normal file
62
metric/instrument/asyncint64_test.go
Normal file
@ -0,0 +1,62 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
func TestInt64ObserverOptions(t *testing.T) {
|
||||
const (
|
||||
token int64 = 43
|
||||
desc = "Instrument description."
|
||||
uBytes = unit.Bytes
|
||||
)
|
||||
|
||||
got := NewInt64ObserverConfig(
|
||||
WithDescription(desc),
|
||||
WithUnit(uBytes),
|
||||
WithInt64Callback(func(ctx context.Context, o Int64Observer) error {
|
||||
o.Observe(ctx, token)
|
||||
return nil
|
||||
}),
|
||||
)
|
||||
assert.Equal(t, desc, got.Description(), "description")
|
||||
assert.Equal(t, uBytes, got.Unit(), "unit")
|
||||
|
||||
// Functions are not comparable.
|
||||
cBacks := got.Callbacks()
|
||||
require.Len(t, cBacks, 1, "callbacks")
|
||||
o := &int64Observer{}
|
||||
err := cBacks[0](context.Background(), o)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, token, o.got, "callback not set")
|
||||
}
|
||||
|
||||
type int64Observer struct {
|
||||
Asynchronous
|
||||
got int64
|
||||
}
|
||||
|
||||
func (o *int64Observer) Observe(_ context.Context, v int64, _ ...attribute.KeyValue) {
|
||||
o.got = v
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import "go.opentelemetry.io/otel/metric/unit"
|
||||
|
||||
// Config contains options for metric instrument descriptors.
|
||||
type Config struct {
|
||||
description string
|
||||
unit unit.Unit
|
||||
}
|
||||
|
||||
// Description describes the instrument in human-readable terms.
|
||||
func (cfg Config) Description() string {
|
||||
return cfg.description
|
||||
}
|
||||
|
||||
// Unit describes the measurement unit for an instrument.
|
||||
func (cfg Config) Unit() unit.Unit {
|
||||
return cfg.unit
|
||||
}
|
||||
|
||||
// Option is an interface for applying metric instrument options.
|
||||
type Option interface {
|
||||
applyInstrument(Config) Config
|
||||
}
|
||||
|
||||
// NewConfig creates a new Config and applies all the given options.
|
||||
func NewConfig(opts ...Option) Config {
|
||||
var config Config
|
||||
for _, o := range opts {
|
||||
config = o.applyInstrument(config)
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
type optionFunc func(Config) Config
|
||||
|
||||
func (fn optionFunc) applyInstrument(cfg Config) Config {
|
||||
return fn(cfg)
|
||||
}
|
||||
|
||||
// WithDescription applies provided description.
|
||||
func WithDescription(desc string) Option {
|
||||
return optionFunc(func(cfg Config) Config {
|
||||
cfg.description = desc
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
// WithUnit applies provided unit.
|
||||
func WithUnit(u unit.Unit) Option {
|
||||
return optionFunc(func(cfg Config) Config {
|
||||
cfg.unit = u
|
||||
return cfg
|
||||
})
|
||||
}
|
@ -14,6 +14,8 @@
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import "go.opentelemetry.io/otel/metric/unit"
|
||||
|
||||
// Asynchronous instruments are instruments that are updated within a Callback.
|
||||
// If an instrument is observed outside of it's callback it should be an error.
|
||||
//
|
||||
@ -28,3 +30,61 @@ type Asynchronous interface {
|
||||
type Synchronous interface {
|
||||
synchronous()
|
||||
}
|
||||
|
||||
// Option applies options to all instruments.
|
||||
type Option interface {
|
||||
Float64ObserverOption
|
||||
Int64ObserverOption
|
||||
Float64Option
|
||||
Int64Option
|
||||
}
|
||||
|
||||
type descOpt string
|
||||
|
||||
func (o descOpt) applyFloat64(c Float64Config) Float64Config {
|
||||
c.description = string(o)
|
||||
return c
|
||||
}
|
||||
|
||||
func (o descOpt) applyInt64(c Int64Config) Int64Config {
|
||||
c.description = string(o)
|
||||
return c
|
||||
}
|
||||
|
||||
func (o descOpt) applyFloat64Observer(c Float64ObserverConfig) Float64ObserverConfig {
|
||||
c.description = string(o)
|
||||
return c
|
||||
}
|
||||
|
||||
func (o descOpt) applyInt64Observer(c Int64ObserverConfig) Int64ObserverConfig {
|
||||
c.description = string(o)
|
||||
return c
|
||||
}
|
||||
|
||||
// WithDescription sets the instrument description.
|
||||
func WithDescription(desc string) Option { return descOpt(desc) }
|
||||
|
||||
type unitOpt unit.Unit
|
||||
|
||||
func (o unitOpt) applyFloat64(c Float64Config) Float64Config {
|
||||
c.unit = unit.Unit(o)
|
||||
return c
|
||||
}
|
||||
|
||||
func (o unitOpt) applyInt64(c Int64Config) Int64Config {
|
||||
c.unit = unit.Unit(o)
|
||||
return c
|
||||
}
|
||||
|
||||
func (o unitOpt) applyFloat64Observer(c Float64ObserverConfig) Float64ObserverConfig {
|
||||
c.unit = unit.Unit(o)
|
||||
return c
|
||||
}
|
||||
|
||||
func (o unitOpt) applyInt64Observer(c Int64ObserverConfig) Int64ObserverConfig {
|
||||
c.unit = unit.Unit(o)
|
||||
return c
|
||||
}
|
||||
|
||||
// WithUnit sets the instrument unit.
|
||||
func WithUnit(u unit.Unit) Option { return unitOpt(u) }
|
||||
|
51
metric/instrument/syncfloat64.go
Normal file
51
metric/instrument/syncfloat64.go
Normal file
@ -0,0 +1,51 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
// Float64Config contains options for Asynchronous instruments that
|
||||
// observe float64 values.
|
||||
type Float64Config struct {
|
||||
description string
|
||||
unit unit.Unit
|
||||
}
|
||||
|
||||
// Float64Config contains options for Synchronous instruments that record
|
||||
// float64 values.
|
||||
func NewFloat64Config(opts ...Float64Option) Float64Config {
|
||||
var config Float64Config
|
||||
for _, o := range opts {
|
||||
config = o.applyFloat64(config)
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// Description returns the Config description.
|
||||
func (c Float64Config) Description() string {
|
||||
return c.description
|
||||
}
|
||||
|
||||
// Unit returns the Config unit.
|
||||
func (c Float64Config) Unit() unit.Unit {
|
||||
return c.unit
|
||||
}
|
||||
|
||||
// Float64Option applies options to synchronous float64 instruments.
|
||||
type Float64Option interface {
|
||||
applyFloat64(Float64Config) Float64Config
|
||||
}
|
35
metric/instrument/syncfloat64_test.go
Normal file
35
metric/instrument/syncfloat64_test.go
Normal file
@ -0,0 +1,35 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
func TestFloat64Options(t *testing.T) {
|
||||
const (
|
||||
token float64 = 43
|
||||
desc = "Instrument description."
|
||||
uBytes = unit.Bytes
|
||||
)
|
||||
|
||||
got := NewFloat64Config(WithDescription(desc), WithUnit(uBytes))
|
||||
assert.Equal(t, desc, got.Description(), "description")
|
||||
assert.Equal(t, uBytes, got.Unit(), "unit")
|
||||
}
|
51
metric/instrument/syncint64.go
Normal file
51
metric/instrument/syncint64.go
Normal file
@ -0,0 +1,51 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
// Int64Config contains options for Synchronous instruments that record int64
|
||||
// values.
|
||||
type Int64Config struct {
|
||||
description string
|
||||
unit unit.Unit
|
||||
}
|
||||
|
||||
// NewInt64Config returns a new Int64Config with all opts
|
||||
// applied.
|
||||
func NewInt64Config(opts ...Int64Option) Int64Config {
|
||||
var config Int64Config
|
||||
for _, o := range opts {
|
||||
config = o.applyInt64(config)
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// Description returns the Config description.
|
||||
func (c Int64Config) Description() string {
|
||||
return c.description
|
||||
}
|
||||
|
||||
// Unit returns the Config unit.
|
||||
func (c Int64Config) Unit() unit.Unit {
|
||||
return c.unit
|
||||
}
|
||||
|
||||
// Int64Option applies options to synchronous int64 instruments.
|
||||
type Int64Option interface {
|
||||
applyInt64(Int64Config) Int64Config
|
||||
}
|
35
metric/instrument/syncint64_test.go
Normal file
35
metric/instrument/syncint64_test.go
Normal file
@ -0,0 +1,35 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package instrument // import "go.opentelemetry.io/otel/metric/instrument"
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
)
|
||||
|
||||
func TestInt64Options(t *testing.T) {
|
||||
const (
|
||||
token int64 = 43
|
||||
desc = "Instrument description."
|
||||
uBytes = unit.Bytes
|
||||
)
|
||||
|
||||
got := NewInt64Config(WithDescription(desc), WithUnit(uBytes))
|
||||
assert.Equal(t, desc, got.Description(), "description")
|
||||
assert.Equal(t, uBytes, got.Unit(), "unit")
|
||||
}
|
@ -30,7 +30,7 @@ import (
|
||||
|
||||
type afCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Float64ObserverOption
|
||||
|
||||
delegate atomic.Value //asyncfloat64.Counter
|
||||
|
||||
@ -61,7 +61,7 @@ func (i *afCounter) unwrap() instrument.Asynchronous {
|
||||
|
||||
type afUpDownCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Float64ObserverOption
|
||||
|
||||
delegate atomic.Value //asyncfloat64.UpDownCounter
|
||||
|
||||
@ -92,7 +92,7 @@ func (i *afUpDownCounter) unwrap() instrument.Asynchronous {
|
||||
|
||||
type afGauge struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Float64ObserverOption
|
||||
|
||||
delegate atomic.Value //asyncfloat64.Gauge
|
||||
|
||||
@ -123,7 +123,7 @@ func (i *afGauge) unwrap() instrument.Asynchronous {
|
||||
|
||||
type aiCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Int64ObserverOption
|
||||
|
||||
delegate atomic.Value //asyncint64.Counter
|
||||
|
||||
@ -154,7 +154,7 @@ func (i *aiCounter) unwrap() instrument.Asynchronous {
|
||||
|
||||
type aiUpDownCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Int64ObserverOption
|
||||
|
||||
delegate atomic.Value //asyncint64.UpDownCounter
|
||||
|
||||
@ -185,7 +185,7 @@ func (i *aiUpDownCounter) unwrap() instrument.Asynchronous {
|
||||
|
||||
type aiGauge struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Int64ObserverOption
|
||||
|
||||
delegate atomic.Value //asyncint64.Gauge
|
||||
|
||||
@ -217,7 +217,7 @@ func (i *aiGauge) unwrap() instrument.Asynchronous {
|
||||
// Sync Instruments.
|
||||
type sfCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Float64Option
|
||||
|
||||
delegate atomic.Value //syncfloat64.Counter
|
||||
|
||||
@ -241,7 +241,7 @@ func (i *sfCounter) Add(ctx context.Context, incr float64, attrs ...attribute.Ke
|
||||
|
||||
type sfUpDownCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Float64Option
|
||||
|
||||
delegate atomic.Value //syncfloat64.UpDownCounter
|
||||
|
||||
@ -265,7 +265,7 @@ func (i *sfUpDownCounter) Add(ctx context.Context, incr float64, attrs ...attrib
|
||||
|
||||
type sfHistogram struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Float64Option
|
||||
|
||||
delegate atomic.Value //syncfloat64.Histogram
|
||||
|
||||
@ -289,7 +289,7 @@ func (i *sfHistogram) Record(ctx context.Context, x float64, attrs ...attribute.
|
||||
|
||||
type siCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Int64Option
|
||||
|
||||
delegate atomic.Value //syncint64.Counter
|
||||
|
||||
@ -313,7 +313,7 @@ func (i *siCounter) Add(ctx context.Context, x int64, attrs ...attribute.KeyValu
|
||||
|
||||
type siUpDownCounter struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Int64Option
|
||||
|
||||
delegate atomic.Value //syncint64.UpDownCounter
|
||||
|
||||
@ -337,7 +337,7 @@ func (i *siUpDownCounter) Add(ctx context.Context, x int64, attrs ...attribute.K
|
||||
|
||||
type siHistogram struct {
|
||||
name string
|
||||
opts []instrument.Option
|
||||
opts []instrument.Int64Option
|
||||
|
||||
delegate atomic.Value //syncint64.Histogram
|
||||
|
||||
|
@ -147,7 +147,7 @@ func (m *meter) setDelegate(provider metric.MeterProvider) {
|
||||
m.registry.Init()
|
||||
}
|
||||
|
||||
func (m *meter) Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error) {
|
||||
func (m *meter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64Counter(name, options...)
|
||||
}
|
||||
@ -158,7 +158,7 @@ func (m *meter) Int64Counter(name string, options ...instrument.Option) (syncint
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error) {
|
||||
func (m *meter) Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64UpDownCounter(name, options...)
|
||||
}
|
||||
@ -169,7 +169,7 @@ func (m *meter) Int64UpDownCounter(name string, options ...instrument.Option) (s
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error) {
|
||||
func (m *meter) Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64Histogram(name, options...)
|
||||
}
|
||||
@ -180,7 +180,7 @@ func (m *meter) Int64Histogram(name string, options ...instrument.Option) (synci
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error) {
|
||||
func (m *meter) Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64ObservableCounter(name, options...)
|
||||
}
|
||||
@ -191,7 +191,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...instrument.Option
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error) {
|
||||
func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64ObservableUpDownCounter(name, options...)
|
||||
}
|
||||
@ -202,7 +202,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error) {
|
||||
func (m *meter) Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64ObservableGauge(name, options...)
|
||||
}
|
||||
@ -213,7 +213,7 @@ func (m *meter) Int64ObservableGauge(name string, options ...instrument.Option)
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error) {
|
||||
func (m *meter) Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64Counter(name, options...)
|
||||
}
|
||||
@ -224,7 +224,7 @@ func (m *meter) Float64Counter(name string, options ...instrument.Option) (syncf
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error) {
|
||||
func (m *meter) Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64UpDownCounter(name, options...)
|
||||
}
|
||||
@ -235,7 +235,7 @@ func (m *meter) Float64UpDownCounter(name string, options ...instrument.Option)
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error) {
|
||||
func (m *meter) Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64Histogram(name, options...)
|
||||
}
|
||||
@ -246,7 +246,7 @@ func (m *meter) Float64Histogram(name string, options ...instrument.Option) (syn
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error) {
|
||||
func (m *meter) Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64ObservableCounter(name, options...)
|
||||
}
|
||||
@ -257,7 +257,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...instrument.Opti
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
|
||||
func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64ObservableUpDownCounter(name, options...)
|
||||
}
|
||||
@ -268,7 +268,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrumen
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error) {
|
||||
func (m *meter) Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64ObservableGauge(name, options...)
|
||||
}
|
||||
|
@ -55,62 +55,62 @@ type testMeter struct {
|
||||
callbacks []func(context.Context)
|
||||
}
|
||||
|
||||
func (m *testMeter) Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error) {
|
||||
func (m *testMeter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) {
|
||||
m.siCount++
|
||||
return &testCountingIntInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error) {
|
||||
func (m *testMeter) Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error) {
|
||||
m.siUDCount++
|
||||
return &testCountingIntInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error) {
|
||||
func (m *testMeter) Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error) {
|
||||
m.siHist++
|
||||
return &testCountingIntInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error) {
|
||||
func (m *testMeter) Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error) {
|
||||
m.aiCount++
|
||||
return &testCountingIntInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error) {
|
||||
func (m *testMeter) Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) {
|
||||
m.aiUDCount++
|
||||
return &testCountingIntInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error) {
|
||||
func (m *testMeter) Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) {
|
||||
m.aiGauge++
|
||||
return &testCountingIntInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error) {
|
||||
func (m *testMeter) Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error) {
|
||||
m.sfCount++
|
||||
return &testCountingFloatInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error) {
|
||||
func (m *testMeter) Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) {
|
||||
m.sfUDCount++
|
||||
return &testCountingFloatInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error) {
|
||||
func (m *testMeter) Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error) {
|
||||
m.sfHist++
|
||||
return &testCountingFloatInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error) {
|
||||
func (m *testMeter) Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) {
|
||||
m.afCount++
|
||||
return &testCountingFloatInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
|
||||
func (m *testMeter) Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) {
|
||||
m.afUDCount++
|
||||
return &testCountingFloatInstrument{}, nil
|
||||
}
|
||||
|
||||
func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error) {
|
||||
func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) {
|
||||
m.afGauge++
|
||||
return &testCountingFloatInstrument{}, nil
|
||||
}
|
||||
|
@ -44,56 +44,56 @@ type Meter interface {
|
||||
// Int64Counter returns a new instrument identified by name and configured
|
||||
// with options. The instrument is used to synchronously record increasing
|
||||
// int64 measurements during a computational operation.
|
||||
Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error)
|
||||
Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error)
|
||||
// Int64UpDownCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to synchronously record
|
||||
// int64 measurements during a computational operation.
|
||||
Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error)
|
||||
Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error)
|
||||
// Int64Histogram returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to synchronously record
|
||||
// the distribution of int64 measurements during a computational operation.
|
||||
Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error)
|
||||
Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error)
|
||||
// Int64ObservableCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// increasing int64 measurements once per a measurement collection cycle.
|
||||
Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error)
|
||||
Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error)
|
||||
// Int64ObservableUpDownCounter returns a new instrument identified by name
|
||||
// and configured with options. The instrument is used to asynchronously
|
||||
// record int64 measurements once per a measurement collection cycle.
|
||||
Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error)
|
||||
Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error)
|
||||
// Int64ObservableGauge returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// instantaneous int64 measurements once per a measurement collection
|
||||
// cycle.
|
||||
Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error)
|
||||
Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error)
|
||||
|
||||
// Float64Counter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to synchronously record
|
||||
// increasing float64 measurements during a computational operation.
|
||||
Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error)
|
||||
Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error)
|
||||
// Float64UpDownCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to synchronously record
|
||||
// float64 measurements during a computational operation.
|
||||
Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error)
|
||||
Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error)
|
||||
// Float64Histogram returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to synchronously record
|
||||
// the distribution of float64 measurements during a computational
|
||||
// operation.
|
||||
Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error)
|
||||
Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error)
|
||||
// Float64ObservableCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// increasing float64 measurements once per a measurement collection cycle.
|
||||
Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error)
|
||||
Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error)
|
||||
// Float64ObservableUpDownCounter returns a new instrument identified by
|
||||
// name and configured with options. The instrument is used to
|
||||
// asynchronously record float64 measurements once per a measurement
|
||||
// collection cycle.
|
||||
Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error)
|
||||
Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error)
|
||||
// Float64ObservableGauge returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// instantaneous float64 measurements once per a measurement collection
|
||||
// cycle.
|
||||
Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error)
|
||||
Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error)
|
||||
|
||||
// RegisterCallback registers f to be called during the collection of a
|
||||
// measurement cycle.
|
||||
|
@ -43,51 +43,51 @@ func NewNoopMeter() Meter {
|
||||
|
||||
type noopMeter struct{}
|
||||
|
||||
func (noopMeter) Int64Counter(string, ...instrument.Option) (syncint64.Counter, error) {
|
||||
func (noopMeter) Int64Counter(string, ...instrument.Int64Option) (syncint64.Counter, error) {
|
||||
return nonrecordingSyncInt64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Int64UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) {
|
||||
func (noopMeter) Int64UpDownCounter(string, ...instrument.Int64Option) (syncint64.UpDownCounter, error) {
|
||||
return nonrecordingSyncInt64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Int64Histogram(string, ...instrument.Option) (syncint64.Histogram, error) {
|
||||
func (noopMeter) Int64Histogram(string, ...instrument.Int64Option) (syncint64.Histogram, error) {
|
||||
return nonrecordingSyncInt64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Int64ObservableCounter(string, ...instrument.Option) (asyncint64.Counter, error) {
|
||||
func (noopMeter) Int64ObservableCounter(string, ...instrument.Int64ObserverOption) (asyncint64.Counter, error) {
|
||||
return nonrecordingAsyncInt64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Int64ObservableUpDownCounter(string, ...instrument.Option) (asyncint64.UpDownCounter, error) {
|
||||
func (noopMeter) Int64ObservableUpDownCounter(string, ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) {
|
||||
return nonrecordingAsyncInt64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Int64ObservableGauge(string, ...instrument.Option) (asyncint64.Gauge, error) {
|
||||
func (noopMeter) Int64ObservableGauge(string, ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) {
|
||||
return nonrecordingAsyncInt64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Float64Counter(string, ...instrument.Option) (syncfloat64.Counter, error) {
|
||||
func (noopMeter) Float64Counter(string, ...instrument.Float64Option) (syncfloat64.Counter, error) {
|
||||
return nonrecordingSyncFloat64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Float64UpDownCounter(string, ...instrument.Option) (syncfloat64.UpDownCounter, error) {
|
||||
func (noopMeter) Float64UpDownCounter(string, ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) {
|
||||
return nonrecordingSyncFloat64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Float64Histogram(string, ...instrument.Option) (syncfloat64.Histogram, error) {
|
||||
func (noopMeter) Float64Histogram(string, ...instrument.Float64Option) (syncfloat64.Histogram, error) {
|
||||
return nonrecordingSyncFloat64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Float64ObservableCounter(string, ...instrument.Option) (asyncfloat64.Counter, error) {
|
||||
func (noopMeter) Float64ObservableCounter(string, ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) {
|
||||
return nonrecordingAsyncFloat64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Float64ObservableUpDownCounter(string, ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
|
||||
func (noopMeter) Float64ObservableUpDownCounter(string, ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) {
|
||||
return nonrecordingAsyncFloat64Instrument{}, nil
|
||||
}
|
||||
|
||||
func (noopMeter) Float64ObservableGauge(string, ...instrument.Option) (asyncfloat64.Gauge, error) {
|
||||
func (noopMeter) Float64ObservableGauge(string, ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) {
|
||||
return nonrecordingAsyncFloat64Instrument{}, nil
|
||||
}
|
||||
|
||||
@ -110,15 +110,15 @@ var (
|
||||
_ asyncfloat64.Gauge = nonrecordingAsyncFloat64Instrument{}
|
||||
)
|
||||
|
||||
func (n nonrecordingAsyncFloat64Instrument) Counter(string, ...instrument.Option) (asyncfloat64.Counter, error) {
|
||||
func (n nonrecordingAsyncFloat64Instrument) Counter(string, ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingAsyncFloat64Instrument) UpDownCounter(string, ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
|
||||
func (n nonrecordingAsyncFloat64Instrument) UpDownCounter(string, ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingAsyncFloat64Instrument) Gauge(string, ...instrument.Option) (asyncfloat64.Gauge, error) {
|
||||
func (n nonrecordingAsyncFloat64Instrument) Gauge(string, ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
@ -136,15 +136,15 @@ var (
|
||||
_ asyncint64.Gauge = nonrecordingAsyncInt64Instrument{}
|
||||
)
|
||||
|
||||
func (n nonrecordingAsyncInt64Instrument) Counter(string, ...instrument.Option) (asyncint64.Counter, error) {
|
||||
func (n nonrecordingAsyncInt64Instrument) Counter(string, ...instrument.Int64ObserverOption) (asyncint64.Counter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingAsyncInt64Instrument) UpDownCounter(string, ...instrument.Option) (asyncint64.UpDownCounter, error) {
|
||||
func (n nonrecordingAsyncInt64Instrument) UpDownCounter(string, ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingAsyncInt64Instrument) Gauge(string, ...instrument.Option) (asyncint64.Gauge, error) {
|
||||
func (n nonrecordingAsyncInt64Instrument) Gauge(string, ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
@ -161,15 +161,15 @@ var (
|
||||
_ syncfloat64.Histogram = nonrecordingSyncFloat64Instrument{}
|
||||
)
|
||||
|
||||
func (n nonrecordingSyncFloat64Instrument) Counter(string, ...instrument.Option) (syncfloat64.Counter, error) {
|
||||
func (n nonrecordingSyncFloat64Instrument) Counter(string, ...instrument.Float64Option) (syncfloat64.Counter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingSyncFloat64Instrument) UpDownCounter(string, ...instrument.Option) (syncfloat64.UpDownCounter, error) {
|
||||
func (n nonrecordingSyncFloat64Instrument) UpDownCounter(string, ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingSyncFloat64Instrument) Histogram(string, ...instrument.Option) (syncfloat64.Histogram, error) {
|
||||
func (n nonrecordingSyncFloat64Instrument) Histogram(string, ...instrument.Float64Option) (syncfloat64.Histogram, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
@ -191,15 +191,15 @@ var (
|
||||
_ syncint64.Histogram = nonrecordingSyncInt64Instrument{}
|
||||
)
|
||||
|
||||
func (n nonrecordingSyncInt64Instrument) Counter(string, ...instrument.Option) (syncint64.Counter, error) {
|
||||
func (n nonrecordingSyncInt64Instrument) Counter(string, ...instrument.Int64Option) (syncint64.Counter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingSyncInt64Instrument) UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) {
|
||||
func (n nonrecordingSyncInt64Instrument) UpDownCounter(string, ...instrument.Int64Option) (syncint64.UpDownCounter, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n nonrecordingSyncInt64Instrument) Histogram(string, ...instrument.Option) (syncint64.Histogram, error) {
|
||||
func (n nonrecordingSyncInt64Instrument) Histogram(string, ...instrument.Int64Option) (syncint64.Histogram, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,7 @@ type meter struct {
|
||||
aggregations []metricdata.Aggregation
|
||||
}
|
||||
|
||||
func (p *meter) Int64Counter(string, ...instrument.Option) (syncint64.Counter, error) {
|
||||
func (p *meter) Int64Counter(string, ...instrument.Int64Option) (syncint64.Counter, error) {
|
||||
// This is an example of how a meter would create an aggregator for a new
|
||||
// counter. At this point the provider would determine the aggregation and
|
||||
// temporality to used based on the Reader and View configuration. Assume
|
||||
@ -47,7 +47,7 @@ func (p *meter) Int64Counter(string, ...instrument.Option) (syncint64.Counter, e
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (p *meter) Int64UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) {
|
||||
func (p *meter) Int64UpDownCounter(string, ...instrument.Int64Option) (syncint64.UpDownCounter, error) {
|
||||
// This is an example of how a meter would create an aggregator for a new
|
||||
// up-down counter. At this point the provider would determine the
|
||||
// aggregation and temporality to used based on the Reader and View
|
||||
@ -64,7 +64,7 @@ func (p *meter) Int64UpDownCounter(string, ...instrument.Option) (syncint64.UpDo
|
||||
return upDownCount, nil
|
||||
}
|
||||
|
||||
func (p *meter) Int64Histogram(string, ...instrument.Option) (syncint64.Histogram, error) {
|
||||
func (p *meter) Int64Histogram(string, ...instrument.Int64Option) (syncint64.Histogram, error) {
|
||||
// This is an example of how a meter would create an aggregator for a new
|
||||
// histogram. At this point the provider would determine the aggregation
|
||||
// and temporality to used based on the Reader and View configuration.
|
||||
|
@ -15,12 +15,15 @@
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/instrument"
|
||||
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
)
|
||||
|
||||
@ -31,8 +34,8 @@ import (
|
||||
type meter struct {
|
||||
pipes pipelines
|
||||
|
||||
instProviderInt64 *instProvider[int64]
|
||||
instProviderFloat64 *instProvider[float64]
|
||||
int64IP *instProvider[int64]
|
||||
float64IP *instProvider[float64]
|
||||
}
|
||||
|
||||
func newMeter(s instrumentation.Scope, p pipelines) *meter {
|
||||
@ -46,9 +49,9 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
|
||||
fc := newInstrumentCache[float64](nil, &viewCache)
|
||||
|
||||
return &meter{
|
||||
pipes: p,
|
||||
instProviderInt64: newInstProvider(s, p, ic),
|
||||
instProviderFloat64: newInstProvider(s, p, fc),
|
||||
pipes: p,
|
||||
int64IP: newInstProvider(s, p, ic),
|
||||
float64IP: newInstProvider(s, p, fc),
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,85 +61,145 @@ var _ metric.Meter = (*meter)(nil)
|
||||
// Int64Counter returns a new instrument identified by name and configured with
|
||||
// options. The instrument is used to synchronously record increasing int64
|
||||
// measurements during a computational operation.
|
||||
func (m *meter) Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error) {
|
||||
return m.instProviderInt64.lookup(InstrumentKindCounter, name, options)
|
||||
func (m *meter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) {
|
||||
cfg := instrument.NewInt64Config(options...)
|
||||
const kind = InstrumentKindCounter
|
||||
return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
}
|
||||
|
||||
// Int64UpDownCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to synchronously record
|
||||
// int64 measurements during a computational operation.
|
||||
func (m *meter) Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error) {
|
||||
return m.instProviderInt64.lookup(InstrumentKindUpDownCounter, name, options)
|
||||
func (m *meter) Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error) {
|
||||
cfg := instrument.NewInt64Config(options...)
|
||||
const kind = InstrumentKindUpDownCounter
|
||||
return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
}
|
||||
|
||||
// Int64Histogram returns a new instrument identified by name and configured
|
||||
// with options. The instrument is used to synchronously record the
|
||||
// distribution of int64 measurements during a computational operation.
|
||||
func (m *meter) Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error) {
|
||||
return m.instProviderInt64.lookup(InstrumentKindHistogram, name, options)
|
||||
func (m *meter) Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error) {
|
||||
cfg := instrument.NewInt64Config(options...)
|
||||
const kind = InstrumentKindHistogram
|
||||
return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
}
|
||||
|
||||
// Int64ObservableCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// increasing int64 measurements once per a measurement collection cycle.
|
||||
func (m *meter) Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error) {
|
||||
return m.instProviderInt64.lookup(InstrumentKindObservableCounter, name, options)
|
||||
func (m *meter) Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error) {
|
||||
cfg := instrument.NewInt64ObserverConfig(options...)
|
||||
const kind = InstrumentKindObservableCounter
|
||||
p := int64ObservProvider{m.int64IP}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.registerCallbacks(inst, cfg.Callbacks())
|
||||
return inst, nil
|
||||
}
|
||||
|
||||
// Int64ObservableUpDownCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// int64 measurements once per a measurement collection cycle.
|
||||
func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error) {
|
||||
return m.instProviderInt64.lookup(InstrumentKindObservableUpDownCounter, name, options)
|
||||
func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) {
|
||||
cfg := instrument.NewInt64ObserverConfig(options...)
|
||||
const kind = InstrumentKindObservableUpDownCounter
|
||||
p := int64ObservProvider{m.int64IP}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.registerCallbacks(inst, cfg.Callbacks())
|
||||
return inst, nil
|
||||
}
|
||||
|
||||
// Int64ObservableGauge returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// instantaneous int64 measurements once per a measurement collection cycle.
|
||||
func (m *meter) Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error) {
|
||||
return m.instProviderInt64.lookup(InstrumentKindObservableGauge, name, options)
|
||||
func (m *meter) Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) {
|
||||
cfg := instrument.NewInt64ObserverConfig(options...)
|
||||
const kind = InstrumentKindObservableGauge
|
||||
p := int64ObservProvider{m.int64IP}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.registerCallbacks(inst, cfg.Callbacks())
|
||||
return inst, nil
|
||||
}
|
||||
|
||||
// Float64Counter returns a new instrument identified by name and configured
|
||||
// with options. The instrument is used to synchronously record increasing
|
||||
// float64 measurements during a computational operation.
|
||||
func (m *meter) Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error) {
|
||||
return m.instProviderFloat64.lookup(InstrumentKindCounter, name, options)
|
||||
func (m *meter) Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error) {
|
||||
cfg := instrument.NewFloat64Config(options...)
|
||||
const kind = InstrumentKindCounter
|
||||
return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
}
|
||||
|
||||
// Float64UpDownCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to synchronously record
|
||||
// float64 measurements during a computational operation.
|
||||
func (m *meter) Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error) {
|
||||
return m.instProviderFloat64.lookup(InstrumentKindUpDownCounter, name, options)
|
||||
func (m *meter) Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) {
|
||||
cfg := instrument.NewFloat64Config(options...)
|
||||
const kind = InstrumentKindUpDownCounter
|
||||
return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
}
|
||||
|
||||
// Float64Histogram returns a new instrument identified by name and configured
|
||||
// with options. The instrument is used to synchronously record the
|
||||
// distribution of float64 measurements during a computational operation.
|
||||
func (m *meter) Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error) {
|
||||
return m.instProviderFloat64.lookup(InstrumentKindHistogram, name, options)
|
||||
func (m *meter) Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error) {
|
||||
cfg := instrument.NewFloat64Config(options...)
|
||||
const kind = InstrumentKindHistogram
|
||||
return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
}
|
||||
|
||||
// Float64ObservableCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// increasing float64 measurements once per a measurement collection cycle.
|
||||
func (m *meter) Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error) {
|
||||
return m.instProviderFloat64.lookup(InstrumentKindObservableCounter, name, options)
|
||||
func (m *meter) Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) {
|
||||
cfg := instrument.NewFloat64ObserverConfig(options...)
|
||||
const kind = InstrumentKindObservableCounter
|
||||
p := float64ObservProvider{m.float64IP}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.registerCallbacks(inst, cfg.Callbacks())
|
||||
return inst, nil
|
||||
}
|
||||
|
||||
// Float64ObservableUpDownCounter returns a new instrument identified by name
|
||||
// and configured with options. The instrument is used to asynchronously record
|
||||
// float64 measurements once per a measurement collection cycle.
|
||||
func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
|
||||
return m.instProviderFloat64.lookup(InstrumentKindObservableUpDownCounter, name, options)
|
||||
func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) {
|
||||
cfg := instrument.NewFloat64ObserverConfig(options...)
|
||||
const kind = InstrumentKindObservableUpDownCounter
|
||||
p := float64ObservProvider{m.float64IP}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.registerCallbacks(inst, cfg.Callbacks())
|
||||
return inst, nil
|
||||
}
|
||||
|
||||
// Float64ObservableGauge returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// instantaneous float64 measurements once per a measurement collection cycle.
|
||||
func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error) {
|
||||
return m.instProviderFloat64.lookup(InstrumentKindObservableGauge, name, options)
|
||||
func (m *meter) Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) {
|
||||
cfg := instrument.NewFloat64ObserverConfig(options...)
|
||||
const kind = InstrumentKindObservableGauge
|
||||
p := float64ObservProvider{m.float64IP}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.registerCallbacks(inst, cfg.Callbacks())
|
||||
return inst, nil
|
||||
}
|
||||
|
||||
// RegisterCallback registers the function f to be called when any of the
|
||||
@ -148,18 +211,18 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callb
|
||||
switch t := inst.(type) {
|
||||
case *instrumentImpl[int64]:
|
||||
if len(t.aggregators) > 0 {
|
||||
return m.registerCallback(f)
|
||||
return m.registerMultiCallback(f)
|
||||
}
|
||||
case *instrumentImpl[float64]:
|
||||
if len(t.aggregators) > 0 {
|
||||
return m.registerCallback(f)
|
||||
return m.registerMultiCallback(f)
|
||||
}
|
||||
default:
|
||||
// Instrument external to the SDK. For example, an instrument from
|
||||
// the "go.opentelemetry.io/otel/metric/internal/global" package.
|
||||
//
|
||||
// Fail gracefully here, assume a valid instrument.
|
||||
return m.registerCallback(f)
|
||||
return m.registerMultiCallback(f)
|
||||
}
|
||||
}
|
||||
// All insts use drop aggregation.
|
||||
@ -172,30 +235,64 @@ func (noopRegister) Unregister() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *meter) registerCallback(c metric.Callback) (metric.Registration, error) {
|
||||
return m.pipes.registerCallback(c), nil
|
||||
func (m *meter) registerMultiCallback(c metric.Callback) (metric.Registration, error) {
|
||||
return m.pipes.registerMultiCallback(c), nil
|
||||
}
|
||||
|
||||
// instProvider provides all OpenTelemetry instruments.
|
||||
type instProvider[N int64 | float64] struct {
|
||||
scope instrumentation.Scope
|
||||
pipes pipelines
|
||||
resolve resolver[N]
|
||||
}
|
||||
|
||||
func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] {
|
||||
return &instProvider[N]{scope: s, resolve: newResolver(p, c)}
|
||||
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)}
|
||||
}
|
||||
|
||||
// lookup returns the resolved instrumentImpl.
|
||||
func (p *instProvider[N]) lookup(kind InstrumentKind, name string, opts []instrument.Option) (*instrumentImpl[N], error) {
|
||||
cfg := instrument.NewConfig(opts...)
|
||||
i := Instrument{
|
||||
func (p *instProvider[N]) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (*instrumentImpl[N], error) {
|
||||
inst := Instrument{
|
||||
Name: name,
|
||||
Description: cfg.Description(),
|
||||
Unit: cfg.Unit(),
|
||||
Description: desc,
|
||||
Unit: u,
|
||||
Kind: kind,
|
||||
Scope: p.scope,
|
||||
}
|
||||
aggs, err := p.resolve.Aggregators(i)
|
||||
aggs, err := p.resolve.Aggregators(inst)
|
||||
return &instrumentImpl[N]{aggregators: aggs}, err
|
||||
}
|
||||
|
||||
type int64ObservProvider struct{ *instProvider[int64] }
|
||||
|
||||
func (p int64ObservProvider) registerCallbacks(inst *instrumentImpl[int64], cBacks []instrument.Int64Callback) {
|
||||
if inst == nil {
|
||||
// Drop aggregator.
|
||||
return
|
||||
}
|
||||
|
||||
for _, cBack := range cBacks {
|
||||
p.pipes.registerCallback(p.callback(inst, cBack))
|
||||
}
|
||||
}
|
||||
|
||||
func (p int64ObservProvider) callback(i *instrumentImpl[int64], f instrument.Int64Callback) func(context.Context) error {
|
||||
return func(ctx context.Context) error { return f(ctx, i) }
|
||||
}
|
||||
|
||||
type float64ObservProvider struct{ *instProvider[float64] }
|
||||
|
||||
func (p float64ObservProvider) registerCallbacks(inst *instrumentImpl[float64], cBacks []instrument.Float64Callback) {
|
||||
if inst == nil {
|
||||
// Drop aggregator.
|
||||
return
|
||||
}
|
||||
|
||||
for _, cBack := range cBacks {
|
||||
p.pipes.registerCallback(p.callback(inst, cBack))
|
||||
}
|
||||
}
|
||||
|
||||
func (p float64ObservProvider) callback(i *instrumentImpl[float64], f instrument.Float64Callback) func(context.Context) error {
|
||||
return func(ctx context.Context) error { return f(ctx, i) }
|
||||
}
|
||||
|
@ -167,6 +167,7 @@ func TestCallbackUnregisterConcurrency(t *testing.T) {
|
||||
|
||||
// Instruments should produce correct ResourceMetrics.
|
||||
func TestMeterCreatesInstruments(t *testing.T) {
|
||||
attrs := []attribute.KeyValue{attribute.String("name", "alice")}
|
||||
seven := 7.0
|
||||
testCases := []struct {
|
||||
name string
|
||||
@ -176,7 +177,11 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableInt64Count",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
ctr, err := m.Int64ObservableCounter("aint")
|
||||
cback := func(ctx context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Int64ObservableCounter("aint", instrument.WithInt64Callback(cback))
|
||||
assert.NoError(t, err)
|
||||
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 3)
|
||||
@ -192,6 +197,7 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: attribute.NewSet(attrs...), Value: 4},
|
||||
{Value: 3},
|
||||
},
|
||||
},
|
||||
@ -200,7 +206,11 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableInt64UpDownCount",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
ctr, err := m.Int64ObservableUpDownCounter("aint")
|
||||
cback := func(ctx context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Int64ObservableUpDownCounter("aint", instrument.WithInt64Callback(cback))
|
||||
assert.NoError(t, err)
|
||||
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 11)
|
||||
@ -216,6 +226,7 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: false,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: attribute.NewSet(attrs...), Value: 4},
|
||||
{Value: 11},
|
||||
},
|
||||
},
|
||||
@ -224,7 +235,11 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableInt64Gauge",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
gauge, err := m.Int64ObservableGauge("agauge")
|
||||
cback := func(ctx context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
return nil
|
||||
}
|
||||
gauge, err := m.Int64ObservableGauge("agauge", instrument.WithInt64Callback(cback))
|
||||
assert.NoError(t, err)
|
||||
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
|
||||
gauge.Observe(ctx, 11)
|
||||
@ -238,6 +253,7 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
Name: "agauge",
|
||||
Data: metricdata.Gauge[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: attribute.NewSet(attrs...), Value: 4},
|
||||
{Value: 11},
|
||||
},
|
||||
},
|
||||
@ -246,7 +262,11 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableFloat64Count",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
ctr, err := m.Float64ObservableCounter("afloat")
|
||||
cback := func(ctx context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Float64ObservableCounter("afloat", instrument.WithFloat64Callback(cback))
|
||||
assert.NoError(t, err)
|
||||
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 3)
|
||||
@ -262,6 +282,7 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{Attributes: attribute.NewSet(attrs...), Value: 4},
|
||||
{Value: 3},
|
||||
},
|
||||
},
|
||||
@ -270,7 +291,11 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableFloat64UpDownCount",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
ctr, err := m.Float64ObservableUpDownCounter("afloat")
|
||||
cback := func(ctx context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Float64ObservableUpDownCounter("afloat", instrument.WithFloat64Callback(cback))
|
||||
assert.NoError(t, err)
|
||||
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 11)
|
||||
@ -286,6 +311,7 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: false,
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{Attributes: attribute.NewSet(attrs...), Value: 4},
|
||||
{Value: 11},
|
||||
},
|
||||
},
|
||||
@ -294,7 +320,11 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableFloat64Gauge",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
gauge, err := m.Float64ObservableGauge("agauge")
|
||||
cback := func(ctx context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
return nil
|
||||
}
|
||||
gauge, err := m.Float64ObservableGauge("agauge", instrument.WithFloat64Callback(cback))
|
||||
assert.NoError(t, err)
|
||||
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
|
||||
gauge.Observe(ctx, 11)
|
||||
@ -308,6 +338,7 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
Name: "agauge",
|
||||
Data: metricdata.Gauge[float64]{
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{Attributes: attribute.NewSet(attrs...), Value: 4},
|
||||
{Value: 11},
|
||||
},
|
||||
},
|
||||
|
@ -76,8 +76,9 @@ type pipeline struct {
|
||||
views []View
|
||||
|
||||
sync.Mutex
|
||||
aggregations map[instrumentation.Scope][]instrumentSync
|
||||
callbacks list.List
|
||||
aggregations map[instrumentation.Scope][]instrumentSync
|
||||
callbacks []func(context.Context) error
|
||||
multiCallbacks list.List
|
||||
}
|
||||
|
||||
// addSync adds the instrumentSync to pipeline p with scope. This method is not
|
||||
@ -95,14 +96,23 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
|
||||
p.aggregations[scope] = append(p.aggregations[scope], iSync)
|
||||
}
|
||||
|
||||
// addCallback registers a callback to be run when `produce()` is called.
|
||||
func (p *pipeline) addCallback(c metric.Callback) (unregister func()) {
|
||||
// addCallback registers a single instrument callback to be run when
|
||||
// `produce()` is called.
|
||||
func (p *pipeline) addCallback(cback func(context.Context) error) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
e := p.callbacks.PushBack(c)
|
||||
p.callbacks = append(p.callbacks, cback)
|
||||
}
|
||||
|
||||
// addMultiCallback registers a multi-instrument callback to be run when
|
||||
// `produce()` is called.
|
||||
func (p *pipeline) addMultiCallback(c metric.Callback) (unregister func()) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
e := p.multiCallbacks.PushBack(c)
|
||||
return func() {
|
||||
p.Lock()
|
||||
p.callbacks.Remove(e)
|
||||
p.multiCallbacks.Remove(e)
|
||||
p.Unlock()
|
||||
}
|
||||
}
|
||||
@ -124,7 +134,17 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
for e := p.callbacks.Front(); e != nil; e = e.Next() {
|
||||
var errs multierror
|
||||
for _, c := range p.callbacks {
|
||||
// TODO make the callbacks parallel. ( #3034 )
|
||||
if err := c(ctx); err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return metricdata.ResourceMetrics{}, err
|
||||
}
|
||||
}
|
||||
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
|
||||
// TODO make the callbacks parallel. ( #3034 )
|
||||
f := e.Value.(metric.Callback)
|
||||
f(ctx)
|
||||
@ -159,7 +179,7 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
|
||||
return metricdata.ResourceMetrics{
|
||||
Resource: p.resource,
|
||||
ScopeMetrics: sm,
|
||||
}, nil
|
||||
}, errs.errorOrNil()
|
||||
}
|
||||
|
||||
// inserter facilitates inserting of new instruments from a single scope into a
|
||||
@ -447,10 +467,16 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli
|
||||
return pipes
|
||||
}
|
||||
|
||||
func (p pipelines) registerCallback(c metric.Callback) metric.Registration {
|
||||
func (p pipelines) registerCallback(cback func(context.Context) error) {
|
||||
for _, pipe := range p {
|
||||
pipe.addCallback(cback)
|
||||
}
|
||||
}
|
||||
|
||||
func (p pipelines) registerMultiCallback(c metric.Callback) metric.Registration {
|
||||
unregs := make([]func(), len(p))
|
||||
for i, pipe := range p {
|
||||
unregs[i] = pipe.addCallback(c)
|
||||
unregs[i] = pipe.addMultiCallback(c)
|
||||
}
|
||||
return unregisterFuncs(unregs)
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ func TestEmptyPipeline(t *testing.T) {
|
||||
})
|
||||
|
||||
require.NotPanics(t, func() {
|
||||
pipe.addCallback(func(ctx context.Context) {})
|
||||
pipe.addMultiCallback(func(ctx context.Context) {})
|
||||
})
|
||||
|
||||
output, err = pipe.produce(context.Background())
|
||||
@ -78,7 +78,7 @@ func TestNewPipeline(t *testing.T) {
|
||||
})
|
||||
|
||||
require.NotPanics(t, func() {
|
||||
pipe.addCallback(func(ctx context.Context) {})
|
||||
pipe.addMultiCallback(func(ctx context.Context) {})
|
||||
})
|
||||
|
||||
output, err = pipe.produce(context.Background())
|
||||
@ -121,7 +121,7 @@ func TestPipelineConcurrency(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
pipe.addCallback(func(ctx context.Context) {})
|
||||
pipe.addMultiCallback(func(ctx context.Context) {})
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
Reference in New Issue
Block a user