mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-09 13:37:12 +02:00
Remove internal StateLocker implementation (#688)
Fixes #657 With the changes in #667 and #669 to use a plain-old-mutex for concurrent access of Histogram and MinMaxSumCount aggregators, the StateLocker implementation is no longer used in the project. Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
This commit is contained in:
parent
44bd7e9421
commit
34bd998963
@ -1,110 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// StateLocker implements a two state lock algorithm that enabled lock free operations inside a state
|
||||
// and a global lock for switching between states. At every time, only one state is active and one cold state.
|
||||
// States are represented by int numbers 0 and 1.
|
||||
//
|
||||
// This was inspired by the algorithm used on the prometheus client library that can be found at:
|
||||
// https://github.com/prometheus/client_golang/blob/e7776d2c54305c1b62fdb113b5a7e9b944c5c27e/prometheus/histogram.go#L227
|
||||
//
|
||||
// To execute operations within the same state, call `Start()` before the operation and call `End(idx)`
|
||||
// to end this operation. The `idx` argument of `End()` is the index of the active state when the operation
|
||||
// started and it is returned by the `Start()` method. It is recommended to defer the call to `End(idx)`.
|
||||
//
|
||||
// One can change the active state by calling `SwapActiveState(fn)`. `fn` is a function that will be executed *before*
|
||||
// switching the active state. Operations such as preparing the new state shall be called by this function. This will
|
||||
// wait in-flight operations to end.
|
||||
//
|
||||
// Example workflow:
|
||||
// 1. State 0 is active.
|
||||
// 1.1 Operations to the active state can happen with `Start()` and `End(idx)` methods.
|
||||
// 2. Call to `SwitchState(fn)`
|
||||
// 2.1 run `fn` function to prepare the new state
|
||||
// 2.2 make state 1 active
|
||||
// 2.3 wait in-flight operations of the state 0 to end.
|
||||
// 3. State 1 is now active and every new operation are executed in it.
|
||||
//
|
||||
// `SwitchState(fn)` are synchronized with a mutex that can be access with the `Lock()` and `Unlock()` methods.
|
||||
// Access to the cold state must also be synchronized to ensure the cold state is not in the middle of state switch
|
||||
// since that could represent an invalid state.
|
||||
//
|
||||
type StateLocker struct {
|
||||
countsAndActiveIdx uint64
|
||||
finishedOperations [2]uint64
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// Start an operation that will happen on a state. The current active state is returned.
|
||||
// A call to `End(idx int)` must happens for every `Start()` call.
|
||||
func (c *StateLocker) Start() int {
|
||||
n := atomic.AddUint64(&c.countsAndActiveIdx, 1)
|
||||
return int(n >> 63)
|
||||
}
|
||||
|
||||
// End an operation that happened to the idx state.
|
||||
func (c *StateLocker) End(idx int) {
|
||||
atomic.AddUint64(&c.finishedOperations[idx], 1)
|
||||
}
|
||||
|
||||
// ColdIdx returns the index of the cold state.
|
||||
func (c *StateLocker) ColdIdx() int {
|
||||
return int((^c.countsAndActiveIdx) >> 63)
|
||||
}
|
||||
|
||||
// SwapActiveState swaps the cold and active states.
|
||||
//
|
||||
// This will wait all for in-flight operations that are happening to the current
|
||||
// active state to end, this ensure that all access to this state will be consistent.
|
||||
//
|
||||
// This is synchronized by a mutex.
|
||||
func (c *StateLocker) SwapActiveState(beforeFn func()) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if beforeFn != nil {
|
||||
// prepare the state change
|
||||
beforeFn()
|
||||
}
|
||||
|
||||
// Adding 1<<63 switches the active index (from 0 to 1 or from 1 to 0)
|
||||
// without touching the count bits.
|
||||
n := atomic.AddUint64(&c.countsAndActiveIdx, 1<<63)
|
||||
|
||||
// count represents how many operations have started *before* the state change.
|
||||
count := n & ((1 << 63) - 1)
|
||||
|
||||
activeFinishedOperations := &c.finishedOperations[n>>63]
|
||||
// coldFinishedOperations are the number of operations that have *ended* on the previous state.
|
||||
coldFinishedOperations := &c.finishedOperations[(^n)>>63]
|
||||
|
||||
// Await all cold writers to finish writing, when coldFinishedOperations == count, all in-flight operations
|
||||
// have finished and we can cleanly end the state change.
|
||||
for count != atomic.LoadUint64(coldFinishedOperations) {
|
||||
runtime.Gosched() // Let observations get work done.
|
||||
}
|
||||
|
||||
// Make sure that the new state keeps the same count of *ended* operations.
|
||||
atomic.AddUint64(activeFinishedOperations, count)
|
||||
atomic.StoreUint64(coldFinishedOperations, 0)
|
||||
}
|
@ -1,102 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestInflightOperationMustEndBeforeSwap(t *testing.T) {
|
||||
var swapped bool
|
||||
ch := make(chan struct{})
|
||||
|
||||
l := StateLocker{}
|
||||
op1 := l.Start()
|
||||
|
||||
go func() {
|
||||
l.SwapActiveState(func() {})
|
||||
swapped = true
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
|
||||
require.False(t, swapped, "Swap should wait the end of the in-flight operation.")
|
||||
|
||||
l.End(op1)
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
require.True(t, swapped, "Swap should've been completed. ")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatal("Swap was not concluded after 50 milliseconds.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureIndexIsConsistent(t *testing.T) {
|
||||
l := StateLocker{}
|
||||
op1 := l.Start()
|
||||
l.End(op1)
|
||||
|
||||
l.SwapActiveState(func() {})
|
||||
|
||||
op2 := l.Start()
|
||||
l.End(op2)
|
||||
|
||||
op3 := l.Start()
|
||||
l.End(op3)
|
||||
|
||||
l.SwapActiveState(func() {})
|
||||
|
||||
op4 := l.Start()
|
||||
l.End(op4)
|
||||
|
||||
require.Equal(t, op1, op4, "two operations separated by two swaps should have the same index.")
|
||||
require.Equal(t, op2, op3, "two operations with no swap in between should have the same index.")
|
||||
|
||||
require.Equal(t, 0, op1, "first index should be 0")
|
||||
require.Equal(t, 1, op2, "second index should be 1")
|
||||
}
|
||||
|
||||
func TestTwoSwapsCanHappenWithoutOperationsInBetween(t *testing.T) {
|
||||
l := StateLocker{}
|
||||
|
||||
require.Equal(t, 1, l.ColdIdx(), "first cold index should be 1")
|
||||
l.SwapActiveState(func() {})
|
||||
require.Equal(t, 0, l.ColdIdx(), "second cold index should be 0")
|
||||
l.SwapActiveState(func() {})
|
||||
require.Equal(t, 1, l.ColdIdx(), "third cold index should be 1")
|
||||
}
|
||||
|
||||
func BenchmarkStateLocker_StartEnd(b *testing.B) {
|
||||
l := StateLocker{}
|
||||
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
l.End(l.Start())
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkStateLocker_SwapActiveState(b *testing.B) {
|
||||
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
l := StateLocker{}
|
||||
l.SwapActiveState(func() {})
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user