mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-07 13:31:42 +02:00
Fix TODO, use sync.Map to avoid blocking calls (#2381)
* Fix TODO, use sync.Map to avoid blocking calls Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> * Update sdk/metric/controller/basic/controller.go Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
parent
4d9d882c38
commit
196ccd7050
@ -56,14 +56,9 @@ var ErrControllerStarted = fmt.Errorf("controller already started")
|
||||
// using the export.Reader RWLock interface. Collection will
|
||||
// be blocked by a pull request in the basic controller.
|
||||
type Controller struct {
|
||||
// lock protects libraries and synchronizes Start() and Stop().
|
||||
lock sync.Mutex
|
||||
// TODO: libraries is synchronized by lock, but could be
|
||||
// accomplished using a sync.Map. The SDK specification will
|
||||
// probably require this, as the draft already states that
|
||||
// Stop() and MeterProvider.Meter() should not block each
|
||||
// other.
|
||||
libraries map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl
|
||||
// lock synchronizes Start() and Stop().
|
||||
lock sync.Mutex
|
||||
libraries sync.Map
|
||||
checkpointerFactory export.CheckpointerFactory
|
||||
|
||||
resource *resource.Resource
|
||||
@ -93,21 +88,18 @@ func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOptio
|
||||
SchemaURL: cfg.SchemaURL(),
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
m, ok := c.libraries[library]
|
||||
m, ok := c.libraries.Load(library)
|
||||
if !ok {
|
||||
checkpointer := c.checkpointerFactory.NewCheckpointer()
|
||||
accumulator := sdk.NewAccumulator(checkpointer)
|
||||
m = registry.NewUniqueInstrumentMeterImpl(&accumulatorCheckpointer{
|
||||
Accumulator: accumulator,
|
||||
checkpointer: checkpointer,
|
||||
library: library,
|
||||
})
|
||||
|
||||
c.libraries[library] = m
|
||||
m, _ = c.libraries.LoadOrStore(
|
||||
library,
|
||||
registry.NewUniqueInstrumentMeterImpl(&accumulatorCheckpointer{
|
||||
Accumulator: sdk.NewAccumulator(checkpointer),
|
||||
checkpointer: checkpointer,
|
||||
library: library,
|
||||
}))
|
||||
}
|
||||
return metric.WrapMeterImpl(m)
|
||||
return metric.WrapMeterImpl(m.(*registry.UniqueInstrumentMeterImpl))
|
||||
}
|
||||
|
||||
type accumulatorCheckpointer struct {
|
||||
@ -138,7 +130,6 @@ func New(checkpointerFactory export.CheckpointerFactory, opts ...Option) *Contro
|
||||
}
|
||||
}
|
||||
return &Controller{
|
||||
libraries: map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl{},
|
||||
checkpointerFactory: checkpointerFactory,
|
||||
exporter: c.Exporter,
|
||||
resource: c.Resource,
|
||||
@ -251,16 +242,14 @@ func (c *Controller) collect(ctx context.Context) error {
|
||||
// accumulatorList returns a snapshot of current accumulators
|
||||
// registered to this controller. This briefly locks the controller.
|
||||
func (c *Controller) accumulatorList() []*accumulatorCheckpointer {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
var r []*accumulatorCheckpointer
|
||||
for _, entry := range c.libraries {
|
||||
acc, ok := entry.MeterImpl().(*accumulatorCheckpointer)
|
||||
c.libraries.Range(func(key, value interface{}) bool {
|
||||
acc, ok := value.(*registry.UniqueInstrumentMeterImpl).MeterImpl().(*accumulatorCheckpointer)
|
||||
if ok {
|
||||
r = append(r, acc)
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user