1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-23 22:34:47 +02:00
Files
opentelemetry-go/experimental/streaming/exporter/buffer/buffer.go
Joshua MacDonald a3ae50d8bc Remove experimental/streaming globals, add streaming example (#135)
* Remove globals from exp/streaming

* basic example

* Build the streaming example

* Update README.md for running streaming example

* Remove observer package

* Move observer to exporter

* Fix

* Re-run make circle-ci
2019-09-23 14:39:10 -07:00

75 lines
1.6 KiB
Go

// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"sync"
"sync/atomic"
"go.opentelemetry.io/experimental/streaming/exporter"
)
type Buffer struct {
observers []exporter.Observer
events chan exporter.Event
dropped uint64
wait sync.WaitGroup
close chan struct{}
}
func NewBuffer(size int, observers ...exporter.Observer) *Buffer {
b := &Buffer{
observers: observers,
events: make(chan exporter.Event, size),
close: make(chan struct{}),
}
b.wait.Add(1)
go b.run()
return b
}
func (b *Buffer) Observe(data exporter.Event) {
select {
case b.events <- data:
default:
atomic.AddUint64(&b.dropped, 1)
}
}
func (b *Buffer) Close() {
close(b.close)
b.wait.Wait()
}
func (b *Buffer) run() {
defer func() {
_ = recover()
b.wait.Done()
}()
for {
select {
case <-b.close:
return
case ev := <-b.events:
// TODO: This has to ensure ordered arrival,
// e.g., put into a heap and delay observations.
for _, obs := range b.observers {
obs.Observe(ev)
}
}
}
}