You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	Use sync.Map and atomics to improve sum performance (#7427)
Alternative to https://github.com/open-telemetry/opentelemetry-go/pull/7380 This uses a sync.Map and atomics for the sum's counter value. This intentionally introduces a new race condition that didn't previously exist: * It is possible for the exemplar to be recorded in the batch of metrics after the add() for cumulative sum aggregations. For cumulative, this isn't a huge issue since exemplars are expected to persist across collection cycles. This is difficult to fix because we can't manage the internal storage of an exemplar.Reservoir (to atomically swap between hot and cold storage). If we are able to make assumptions about how exemplar reservoirs are managed (i.e. that the number of and order of exemplars returned is always the same), then we could possibly fix this by merging at export time. ### Alternatives Considered #### RWLock for the map instead of sync.Map This is significantly less performant. #### Single sync.Map without hotColdWaitGroup Deleting keys from the sync.Map concurrently with measurements (during Clear() of the sync.Map) can cause measurements to be made to a counter that has already been read, exported and deleted. This can produce incorrect sums when delta is used. Instead, atomically switching writes to a completely empty sync.Map and waiting for writes to the previous sync.Map complete eliminates this issue. #### Use two sync.Map for cumulative sums One idea I explored was doing a hot-cold swap for cumulative sums just like we do for delta sums. We would swap the hot and cold sync.Maps, wait for writes to the cold sync.Map to complete while new writes go to the hot map. Then, once we are done reading the cold map, we could merge the contents of the cold map back into the new hot map. This approach has two issues: * It isn't possible to "merge" one exemplar reservoir into another. This is an issue for persistent exemplars that aren't overwritten in a collection interval. * We can't keep a consistent set of keys in overflow scenarios. Measurements that are made to the hot map before the merge of the cold into hot that should have been overflows will be added as new attribute sets. That, in turn, means we will need to change previously-exported attribute sets to the overflow set, which will cause issues for users. ### Benchmarks Parallel: ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric cpu: AMD EPYC 7B12 │ main24.txt │ new24_new.txt │ │ sec/op │ sec/op vs base │ SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/0-24 255.65n ± 13% 68.06n ± 3% -73.38% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/1-24 286.70n ± 8% 67.66n ± 4% -76.40% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/10-24 287.15n ± 14% 69.90n ± 3% -75.66% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/0-24 244.75n ± 9% 68.83n ± 4% -71.88% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/1-24 267.20n ± 14% 65.86n ± 3% -75.35% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/10-24 291.50n ± 13% 66.59n ± 11% -77.15% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/0-24 247.85n ± 7% 66.06n ± 3% -73.34% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/1-24 286.75n ± 10% 68.52n ± 2% -76.10% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/10-24 289.50n ± 20% 67.45n ± 4% -76.70% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/0-24 246.25n ± 14% 66.69n ± 2% -72.92% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/1-24 289.55n ± 9% 65.54n ± 5% -77.36% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/10-24 286.05n ± 14% 67.55n ± 2% -76.39% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/0-24 254.8n ± 23% 225.9n ± 17% -11.32% (p=0.026 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/1-24 304.4n ± 13% 234.4n ± 19% -23.01% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/10-24 308.9n ± 20% 217.6n ± 10% -29.56% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/0-24 267.8n ± 14% 220.1n ± 19% -17.80% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/1-24 274.1n ± 21% 226.5n ± 5% -17.38% (p=0.024 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/10-24 239.0n ± 14% 236.1n ± 18% ~ (p=0.589 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/0-24 223.7n ± 11% 234.8n ± 7% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/1-24 253.9n ± 10% 244.8n ± 11% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/10-24 272.6n ± 7% 250.0n ± 12% -8.33% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/0-24 232.6n ± 4% 232.2n ± 8% ~ (p=0.937 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/1-24 276.7n ± 20% 249.2n ± 11% ~ (p=0.485 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/10-24 265.9n ± 18% 246.4n ± 9% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/0-24 294.0n ± 11% 269.0n ± 5% -8.47% (p=0.015 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/1-24 314.6n ± 10% 268.8n ± 6% -14.54% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/10-24 303.9n ± 11% 285.4n ± 4% ~ (p=0.180 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/0-24 274.7n ± 13% 262.9n ± 7% ~ (p=0.145 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/1-24 296.1n ± 6% 288.9n ± 9% ~ (p=0.180 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/10-24 276.0n ± 14% 299.4n ± 12% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/0-24 191.4n ± 4% 176.0n ± 3% -8.05% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/1-24 223.2n ± 8% 172.8n ± 3% -22.54% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/10-24 265.7n ± 19% 172.2n ± 2% -35.21% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/0-24 179.4n ± 18% 171.0n ± 3% -4.74% (p=0.009 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/1-24 209.1n ± 16% 175.4n ± 5% -16.07% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/10-24 222.5n ± 17% 175.6n ± 4% -21.08% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/0-24 194.4n ± 11% 176.9n ± 5% -9.03% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/1-24 207.5n ± 13% 175.1n ± 2% -15.66% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/10-24 243.7n ± 13% 172.6n ± 3% -29.15% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/0-24 218.3n ± 10% 177.6n ± 2% -18.67% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/1-24 193.5n ± 10% 176.1n ± 2% -8.99% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/10-24 192.8n ± 11% 173.7n ± 2% -9.91% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/0-24 185.1n ± 9% 204.8n ± 9% +10.61% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/1-24 218.8n ± 14% 229.7n ± 16% ~ (p=0.310 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/10-24 242.7n ± 8% 209.1n ± 18% -13.84% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/0-24 182.8n ± 42% 255.2n ± 8% +39.67% (p=0.015 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/1-24 198.0n ± 7% 280.6n ± 22% +41.72% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/10-24 236.3n ± 18% 261.7n ± 8% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/0-24 223.2n ± 9% 226.9n ± 4% ~ (p=0.965 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/1-24 270.1n ± 10% 280.2n ± 6% ~ (p=0.143 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/10-24 257.2n ± 7% 252.0n ± 7% ~ (p=0.485 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/0-24 277.0n ± 5% 310.4n ± 12% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/1-24 287.3n ± 9% 271.2n ± 12% ~ (p=0.699 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/10-24 281.8n ± 9% 316.5n ± 22% +12.29% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/0-24 289.1n ± 9% 297.1n ± 12% ~ (p=0.310 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/1-24 277.8n ± 6% 353.1n ± 11% +27.11% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/10-24 281.8n ± 11% 352.2n ± 16% +24.94% (p=0.009 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/0-24 294.1n ± 7% 317.5n ± 9% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/1-24 281.7n ± 10% 332.1n ± 8% +17.89% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/10-24 238.9n ± 12% 318.1n ± 9% +33.13% (p=0.002 n=6) geomean 251.9n 184.4n -26.77% ``` Single-threaded: ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric cpu: Intel(R) Xeon(R) CPU @ 2.20GHz │ main1.txt │ sync1.txt │ │ sec/op │ sec/op vs base │ SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/0 109.8n ± 7% 113.4n ± 23% ~ (p=1.000 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/1 115.0n ± 4% 113.3n ± 20% ~ (p=0.729 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Counter/Attributes/10 177.1n ± 34% 110.2n ± 16% -37.78% (p=0.009 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/0 110.5n ± 42% 109.2n ± 19% ~ (p=0.457 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/1 118.8n ± 2% 118.4n ± 5% ~ (p=0.619 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Counter/Attributes/10 119.0n ± 2% 116.8n ± 42% ~ (p=0.699 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/0 106.9n ± 1% 102.5n ± 5% -4.16% (p=0.030 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/1 117.2n ± 2% 116.9n ± 7% ~ (p=1.000 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64UpDownCounter/Attributes/10 115.4n ± 1% 115.1n ± 5% ~ (p=0.937 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/0 109.5n ± 5% 104.2n ± 8% -4.84% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/1 118.7n ± 14% 113.8n ± 35% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64UpDownCounter/Attributes/10 116.6n ± 1% 116.8n ± 8% ~ (p=0.968 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/0 106.6n ± 4% 109.4n ± 5% ~ (p=0.093 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/1 114.7n ± 4% 117.9n ± 4% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Gauge/Attributes/10 115.2n ± 4% 114.5n ± 1% ~ (p=0.162 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/0 109.4n ± 5% 107.5n ± 3% ~ (p=0.132 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/1 118.3n ± 2% 117.9n ± 3% ~ (p=0.589 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Gauge/Attributes/10 117.7n ± 2% 120.8n ± 14% ~ (p=0.093 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/0 96.78n ± 1% 99.37n ± 3% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/1 103.0n ± 3% 116.5n ± 26% +13.16% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/10 102.8n ± 1% 107.6n ± 22% +4.67% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/0 93.95n ± 22% 99.88n ± 18% +6.32% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/1 102.7n ± 5% 106.2n ± 6% ~ (p=0.089 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/10 104.1n ± 4% 108.3n ± 27% +4.03% (p=0.026 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/0 146.3n ± 1% 154.0n ± 24% +5.23% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/1 154.8n ± 3% 161.2n ± 2% +4.20% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialInt64Histogram/Attributes/10 155.5n ± 1% 164.0n ± 4% +5.43% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/0 145.9n ± 2% 159.7n ± 12% +9.42% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/1 155.2n ± 0% 164.0n ± 6% +5.70% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/ExponentialFloat64Histogram/Attributes/10 219.3n ± 29% 159.5n ± 3% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/0 263.6n ± 36% 177.2n ± 1% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/1 189.1n ± 8% 190.4n ± 12% ~ (p=0.589 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Counter/Attributes/10 184.3n ± 3% 189.4n ± 6% ~ (p=0.065 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/0 180.7n ± 1% 182.7n ± 2% ~ (p=0.457 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/1 192.8n ± 9% 192.0n ± 1% ~ (p=1.000 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Counter/Attributes/10 192.3n ± 4% 190.2n ± 4% ~ (p=0.093 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/0 176.5n ± 2% 181.7n ± 4% +2.95% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/1 184.0n ± 4% 192.0n ± 1% +4.32% (p=0.015 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64UpDownCounter/Attributes/10 184.4n ± 1% 195.2n ± 3% +5.83% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/0 183.0n ± 3% 177.4n ± 5% -3.06% (p=0.048 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/1 194.4n ± 4% 188.1n ± 5% ~ (p=0.084 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64UpDownCounter/Attributes/10 193.0n ± 5% 194.1n ± 5% ~ (p=0.699 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/0 178.4n ± 14% 185.6n ± 29% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/1 189.0n ± 8% 193.2n ± 2% ~ (p=0.132 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Gauge/Attributes/10 197.7n ± 5% 198.8n ± 2% ~ (p=0.619 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/0 185.5n ± 3% 188.8n ± 4% ~ (p=0.310 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/1 191.2n ± 3% 190.2n ± 7% ~ (p=0.732 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Gauge/Attributes/10 186.8n ± 2% 197.1n ± 6% +5.54% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/0 224.2n ± 4% 227.3n ± 2% ~ (p=0.394 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/1 232.5n ± 3% 242.5n ± 5% ~ (p=0.132 n=6) SyncMeasure/NoView/ExemplarsEnabled/Int64Histogram/Attributes/10 232.5n ± 3% 237.1n ± 5% +2.00% (p=0.045 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/0 227.5n ± 2% 238.5n ± 5% +4.81% (p=0.017 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/1 239.4n ± 8% 250.1n ± 6% ~ (p=0.240 n=6) SyncMeasure/NoView/ExemplarsEnabled/Float64Histogram/Attributes/10 241.5n ± 4% 254.0n ± 2% +5.18% (p=0.004 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/0 231.1n ± 5% 239.2n ± 3% ~ (p=0.084 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/1 260.2n ± 16% 253.8n ± 4% ~ (p=0.190 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialInt64Histogram/Attributes/10 234.3n ± 1% 246.8n ± 2% +5.29% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/0 221.8n ± 6% 232.0n ± 4% +4.58% (p=0.037 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/1 228.2n ± 7% 240.6n ± 1% +5.41% (p=0.041 n=6) SyncMeasure/NoView/ExemplarsEnabled/ExponentialFloat64Histogram/Attributes/10 228.6n ± 7% 244.7n ± 1% +7.04% (p=0.015 n=6) geomean 158.1n 158.1n +0.00% ``` --------- Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
		| @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm | ||||
| - The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421) | ||||
| - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. | ||||
|   If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) | ||||
| - Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427) | ||||
|  | ||||
| <!-- Released section --> | ||||
| <!-- Don't change this section unless doing release --> | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
| 	"context" | ||||
| 	"math" | ||||
| 	"math/rand/v2" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| @@ -37,6 +38,7 @@ var _ Reservoir = &FixedSizeReservoir{} | ||||
| type FixedSizeReservoir struct { | ||||
| 	reservoir.ConcurrentSafe | ||||
| 	*storage | ||||
| 	mu sync.Mutex | ||||
|  | ||||
| 	// count is the number of measurement seen. | ||||
| 	count int64 | ||||
| @@ -192,6 +194,8 @@ func (r *FixedSizeReservoir) advance() { | ||||
| // | ||||
| // The Reservoir state is preserved after this call. | ||||
| func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { | ||||
| 	r.mu.Lock() | ||||
| 	defer r.mu.Unlock() | ||||
| 	r.storage.Collect(dest) | ||||
| 	// Call reset here even though it will reset r.count and restart the random | ||||
| 	// number series. This will persist any old exemplars as long as no new | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
| 	"context" | ||||
| 	"slices" | ||||
| 	"sort" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| @@ -42,6 +43,7 @@ var _ Reservoir = &HistogramReservoir{} | ||||
| type HistogramReservoir struct { | ||||
| 	reservoir.ConcurrentSafe | ||||
| 	*storage | ||||
| 	mu sync.Mutex | ||||
|  | ||||
| 	// bounds are bucket bounds in ascending order. | ||||
| 	bounds []float64 | ||||
| @@ -76,3 +78,12 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a | ||||
| 	defer r.mu.Unlock() | ||||
| 	r.store(idx, m) | ||||
| } | ||||
|  | ||||
| // Collect returns all the held exemplars. | ||||
| // | ||||
| // The Reservoir state is preserved after this call. | ||||
| func (r *HistogramReservoir) Collect(dest *[]Exemplar) { | ||||
| 	r.mu.Lock() | ||||
| 	defer r.mu.Unlock() | ||||
| 	r.storage.Collect(dest) | ||||
| } | ||||
|   | ||||
| @@ -5,7 +5,6 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| @@ -14,7 +13,6 @@ import ( | ||||
|  | ||||
| // storage is an exemplar storage for [Reservoir] implementations. | ||||
| type storage struct { | ||||
| 	mu sync.Mutex | ||||
| 	// measurements are the measurements sampled. | ||||
| 	// | ||||
| 	// This does not use []metricdata.Exemplar because it potentially would | ||||
| @@ -34,8 +32,6 @@ func (r *storage) store(idx int, m measurement) { | ||||
| // | ||||
| // The Reservoir state is preserved after this call. | ||||
| func (r *storage) Collect(dest *[]Exemplar) { | ||||
| 	r.mu.Lock() | ||||
| 	defer r.mu.Unlock() | ||||
| 	*dest = reset(*dest, len(r.measurements), len(r.measurements)) | ||||
| 	var n int | ||||
| 	for _, m := range r.measurements { | ||||
|   | ||||
| @@ -110,12 +110,13 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati | ||||
|  | ||||
| // Sum returns a sum aggregate function input and output. | ||||
| func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { | ||||
| 	s := newSum[N](monotonic, b.AggregationLimit, b.resFunc()) | ||||
| 	switch b.Temporality { | ||||
| 	case metricdata.DeltaTemporality: | ||||
| 		return b.filter(s.measure), s.delta | ||||
| 		s := newDeltaSum[N](monotonic, b.AggregationLimit, b.resFunc()) | ||||
| 		return b.filter(s.measure), s.collect | ||||
| 	default: | ||||
| 		return b.filter(s.measure), s.cumulative | ||||
| 		s := newCumulativeSum[N](monotonic, b.AggregationLimit, b.resFunc()) | ||||
| 		return b.filter(s.measure), s.collect | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
							
								
								
									
										184
									
								
								sdk/metric/internal/aggregate/atomic.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										184
									
								
								sdk/metric/internal/aggregate/atomic.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,184 @@ | ||||
| // Copyright The OpenTelemetry Authors | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
|  | ||||
| package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" | ||||
|  | ||||
| import ( | ||||
| 	"math" | ||||
| 	"runtime" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| ) | ||||
|  | ||||
| // atomicCounter is an efficient way of adding to a number which is either an | ||||
| // int64 or float64. It is designed to be efficient when adding whole | ||||
| // numbers, regardless of whether N is an int64 or float64. | ||||
| // | ||||
| // Inspired by the Prometheus counter implementation: | ||||
| // https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108 | ||||
| type atomicCounter[N int64 | float64] struct { | ||||
| 	// nFloatBits contains only the non-integer portion of the counter. | ||||
| 	nFloatBits atomic.Uint64 | ||||
| 	// nInt contains only the integer portion of the counter. | ||||
| 	nInt atomic.Int64 | ||||
| } | ||||
|  | ||||
| // load returns the current value. The caller must ensure all calls to add have | ||||
| // returned prior to calling load. | ||||
| func (n *atomicCounter[N]) load() N { | ||||
| 	fval := math.Float64frombits(n.nFloatBits.Load()) | ||||
| 	ival := n.nInt.Load() | ||||
| 	return N(fval + float64(ival)) | ||||
| } | ||||
|  | ||||
| func (n *atomicCounter[N]) add(value N) { | ||||
| 	ival := int64(value) | ||||
| 	// This case is where the value is an int, or if it is a whole-numbered float. | ||||
| 	if float64(ival) == float64(value) { | ||||
| 		n.nInt.Add(ival) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Value must be a float below. | ||||
| 	for { | ||||
| 		oldBits := n.nFloatBits.Load() | ||||
| 		newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value)) | ||||
| 		if n.nFloatBits.CompareAndSwap(oldBits, newBits) { | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // hotColdWaitGroup is a synchronization primitive which enables lockless | ||||
| // writes for concurrent writers and enables a reader to acquire exclusive | ||||
| // access to a snapshot of state including only completed operations. | ||||
| // Conceptually, it can be thought of as a "hot" wait group, | ||||
| // and a "cold" wait group, with the ability for the reader to atomically swap | ||||
| // the hot and cold wait groups, and wait for the now-cold wait group to | ||||
| // complete. | ||||
| // | ||||
| // Inspired by the prometheus/client_golang histogram implementation: | ||||
| // https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725 | ||||
| // | ||||
| // Usage: | ||||
| // | ||||
| //	var hcwg hotColdWaitGroup | ||||
| //	var data [2]any | ||||
| // | ||||
| //	func write() { | ||||
| //	  hotIdx := hcwg.start() | ||||
| //	  defer hcwg.done(hotIdx) | ||||
| //	  // modify data without locking | ||||
| //	  data[hotIdx].update() | ||||
| //	} | ||||
| // | ||||
| //	func read() { | ||||
| //	  coldIdx := hcwg.swapHotAndWait() | ||||
| //	  // read data now that all writes to the cold data have completed. | ||||
| //	  data[coldIdx].read() | ||||
| //	} | ||||
| type hotColdWaitGroup struct { | ||||
| 	// startedCountAndHotIdx contains a 63-bit counter in the lower bits, | ||||
| 	// and a 1 bit hot index to denote which of the two data-points new | ||||
| 	// measurements to write to. These are contained together so that read() | ||||
| 	// can atomically swap the hot bit, reset the started writes to zero, and | ||||
| 	// read the number writes that were started prior to the hot bit being | ||||
| 	// swapped. | ||||
| 	startedCountAndHotIdx atomic.Uint64 | ||||
| 	// endedCounts is the number of writes that have completed to each | ||||
| 	// dataPoint. | ||||
| 	endedCounts [2]atomic.Uint64 | ||||
| } | ||||
|  | ||||
| // start returns the hot index that the writer should write to. The returned | ||||
| // hot index is 0 or 1. The caller must call done(hot index) after it finishes | ||||
| // its operation. start() is safe to call concurrently with other methods. | ||||
| func (l *hotColdWaitGroup) start() uint64 { | ||||
| 	// We increment h.startedCountAndHotIdx so that the counter in the lower | ||||
| 	// 63 bits gets incremented. At the same time, we get the new value | ||||
| 	// back, which we can use to return the currently-hot index. | ||||
| 	return l.startedCountAndHotIdx.Add(1) >> 63 | ||||
| } | ||||
|  | ||||
| // done signals to the reader that an operation has fully completed. | ||||
| // done is safe to call concurrently. | ||||
| func (l *hotColdWaitGroup) done(hotIdx uint64) { | ||||
| 	l.endedCounts[hotIdx].Add(1) | ||||
| } | ||||
|  | ||||
| // swapHotAndWait swaps the hot bit, waits for all start() calls to be done(), | ||||
| // and then returns the now-cold index for the reader to read from. The | ||||
| // returned index is 0 or 1. swapHotAndWait must not be called concurrently. | ||||
| func (l *hotColdWaitGroup) swapHotAndWait() uint64 { | ||||
| 	n := l.startedCountAndHotIdx.Load() | ||||
| 	coldIdx := (^n) >> 63 | ||||
| 	// Swap the hot and cold index while resetting the started measurements | ||||
| 	// count to zero. | ||||
| 	n = l.startedCountAndHotIdx.Swap((coldIdx << 63)) | ||||
| 	hotIdx := n >> 63 | ||||
| 	startedCount := n & ((1 << 63) - 1) | ||||
| 	// Wait for all measurements to the previously-hot map to finish. | ||||
| 	for startedCount != l.endedCounts[hotIdx].Load() { | ||||
| 		runtime.Gosched() // Let measurements complete. | ||||
| 	} | ||||
| 	// reset the number of ended operations | ||||
| 	l.endedCounts[hotIdx].Store(0) | ||||
| 	return hotIdx | ||||
| } | ||||
|  | ||||
| // limitedSyncMap is a sync.Map which enforces the aggregation limit on | ||||
| // attribute sets and provides a Len() function. | ||||
| type limitedSyncMap struct { | ||||
| 	sync.Map | ||||
| 	aggLimit int | ||||
| 	len      int | ||||
| 	lenMux   sync.Mutex | ||||
| } | ||||
|  | ||||
| func (m *limitedSyncMap) LoadOrStoreAttr(fltrAttr attribute.Set, newValue func(attribute.Set) any) any { | ||||
| 	actual, loaded := m.Load(fltrAttr.Equivalent()) | ||||
| 	if loaded { | ||||
| 		return actual | ||||
| 	} | ||||
| 	// If the overflow set exists, assume we have already overflowed and don't | ||||
| 	// bother with the slow path below. | ||||
| 	actual, loaded = m.Load(overflowSet.Equivalent()) | ||||
| 	if loaded { | ||||
| 		return actual | ||||
| 	} | ||||
| 	// Slow path: add a new attribute set. | ||||
| 	m.lenMux.Lock() | ||||
| 	defer m.lenMux.Unlock() | ||||
|  | ||||
| 	// re-fetch now that we hold the lock to ensure we don't use the overflow | ||||
| 	// set unless we are sure the attribute set isn't being written | ||||
| 	// concurrently. | ||||
| 	actual, loaded = m.Load(fltrAttr.Equivalent()) | ||||
| 	if loaded { | ||||
| 		return actual | ||||
| 	} | ||||
|  | ||||
| 	if m.aggLimit > 0 && m.len >= m.aggLimit-1 { | ||||
| 		fltrAttr = overflowSet | ||||
| 	} | ||||
| 	actual, loaded = m.LoadOrStore(fltrAttr.Equivalent(), newValue(fltrAttr)) | ||||
| 	if !loaded { | ||||
| 		m.len++ | ||||
| 	} | ||||
| 	return actual | ||||
| } | ||||
|  | ||||
| func (m *limitedSyncMap) Clear() { | ||||
| 	m.lenMux.Lock() | ||||
| 	defer m.lenMux.Unlock() | ||||
| 	m.len = 0 | ||||
| 	m.Map.Clear() | ||||
| } | ||||
|  | ||||
| func (m *limitedSyncMap) Len() int { | ||||
| 	m.lenMux.Lock() | ||||
| 	defer m.lenMux.Unlock() | ||||
| 	return m.len | ||||
| } | ||||
							
								
								
									
										78
									
								
								sdk/metric/internal/aggregate/atomic_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										78
									
								
								sdk/metric/internal/aggregate/atomic_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,78 @@ | ||||
| // Copyright The OpenTelemetry Authors | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
|  | ||||
| package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" | ||||
|  | ||||
| import ( | ||||
| 	"math" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
|  | ||||
| func TestAtomicSumAddFloatConcurrentSafe(t *testing.T) { | ||||
| 	var wg sync.WaitGroup | ||||
| 	var aSum atomicCounter[float64] | ||||
| 	for _, in := range []float64{ | ||||
| 		0.2, | ||||
| 		0.25, | ||||
| 		1.6, | ||||
| 		10.55, | ||||
| 		42.4, | ||||
| 	} { | ||||
| 		wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer wg.Done() | ||||
| 			aSum.add(in) | ||||
| 		}() | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| 	assert.Equal(t, float64(55), math.Round(aSum.load())) | ||||
| } | ||||
|  | ||||
| func TestAtomicSumAddIntConcurrentSafe(t *testing.T) { | ||||
| 	var wg sync.WaitGroup | ||||
| 	var aSum atomicCounter[int64] | ||||
| 	for _, in := range []int64{ | ||||
| 		1, | ||||
| 		2, | ||||
| 		3, | ||||
| 		4, | ||||
| 		5, | ||||
| 	} { | ||||
| 		wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer wg.Done() | ||||
| 			aSum.add(in) | ||||
| 		}() | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| 	assert.Equal(t, int64(15), aSum.load()) | ||||
| } | ||||
|  | ||||
| func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { | ||||
| 	var wg sync.WaitGroup | ||||
| 	hcwg := &hotColdWaitGroup{} | ||||
| 	var data [2]uint64 | ||||
| 	for range 5 { | ||||
| 		wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer wg.Done() | ||||
| 			hotIdx := hcwg.start() | ||||
| 			defer hcwg.done(hotIdx) | ||||
| 			atomic.AddUint64(&data[hotIdx], 1) | ||||
| 		}() | ||||
| 	} | ||||
| 	for range 2 { | ||||
| 		readIdx := hcwg.swapHotAndWait() | ||||
| 		assert.NotPanics(t, func() { | ||||
| 			// reading without using atomics should not panic since we are | ||||
| 			// reading from the cold element, and have waited for all writes to | ||||
| 			// finish. | ||||
| 			t.Logf("read value %+v", data[readIdx]) | ||||
| 		}) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| } | ||||
| @@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64]( | ||||
| 		maxScale: maxScale, | ||||
|  | ||||
| 		newRes: r, | ||||
| 		limit:  newLimiter[*expoHistogramDataPoint[N]](limit), | ||||
| 		limit:  newLimiter[expoHistogramDataPoint[N]](limit), | ||||
| 		values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]), | ||||
|  | ||||
| 		start: now(), | ||||
| @@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct { | ||||
| 	maxScale int32 | ||||
|  | ||||
| 	newRes   func(attribute.Set) FilteredExemplarReservoir[N] | ||||
| 	limit    limiter[*expoHistogramDataPoint[N]] | ||||
| 	limit    limiter[expoHistogramDataPoint[N]] | ||||
| 	values   map[attribute.Distinct]*expoHistogramDataPoint[N] | ||||
| 	valuesMu sync.Mutex | ||||
|  | ||||
|   | ||||
| @@ -54,12 +54,12 @@ func NewFilteredExemplarReservoir[N int64 | float64]( | ||||
|  | ||||
| func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { | ||||
| 	if f.filter(ctx) { | ||||
| 		// only record the current time if we are sampling this measurement. | ||||
| 		ts := time.Now() | ||||
| 		if !f.concurrentSafe { | ||||
| 			f.reservoirMux.Lock() | ||||
| 			defer f.reservoirMux.Unlock() | ||||
| 		} | ||||
| 		// only record the current time if we are sampling this measurement. | ||||
| 		f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -52,7 +52,7 @@ type histValues[N int64 | float64] struct { | ||||
| 	bounds   []float64 | ||||
|  | ||||
| 	newRes   func(attribute.Set) FilteredExemplarReservoir[N] | ||||
| 	limit    limiter[*buckets[N]] | ||||
| 	limit    limiter[buckets[N]] | ||||
| 	values   map[attribute.Distinct]*buckets[N] | ||||
| 	valuesMu sync.Mutex | ||||
| } | ||||
| @@ -74,7 +74,7 @@ func newHistValues[N int64 | float64]( | ||||
| 		noSum:    noSum, | ||||
| 		bounds:   b, | ||||
| 		newRes:   r, | ||||
| 		limit:    newLimiter[*buckets[N]](limit), | ||||
| 		limit:    newLimiter[buckets[N]](limit), | ||||
| 		values:   make(map[attribute.Distinct]*buckets[N]), | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -23,7 +23,7 @@ func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredEx | ||||
| 	return &lastValue[N]{ | ||||
| 		newRes: r, | ||||
| 		limit:  newLimiter[datapoint[N]](limit), | ||||
| 		values: make(map[attribute.Distinct]datapoint[N]), | ||||
| 		values: make(map[attribute.Distinct]*datapoint[N]), | ||||
| 		start:  now(), | ||||
| 	} | ||||
| } | ||||
| @@ -34,7 +34,7 @@ type lastValue[N int64 | float64] struct { | ||||
|  | ||||
| 	newRes func(attribute.Set) FilteredExemplarReservoir[N] | ||||
| 	limit  limiter[datapoint[N]] | ||||
| 	values map[attribute.Distinct]datapoint[N] | ||||
| 	values map[attribute.Distinct]*datapoint[N] | ||||
| 	start  time.Time | ||||
| } | ||||
|  | ||||
| @@ -45,9 +45,10 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. | ||||
| 	d, ok := s.values[fltrAttr.Equivalent()] | ||||
| 	if !ok { | ||||
| 		fltrAttr = s.limit.Attributes(fltrAttr, s.values) | ||||
| 		d = s.values[fltrAttr.Equivalent()] | ||||
| 		d.res = s.newRes(fltrAttr) | ||||
| 		d.attrs = fltrAttr | ||||
| 		d = &datapoint[N]{ | ||||
| 			res:   s.newRes(fltrAttr), | ||||
| 			attrs: fltrAttr, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	d.value = value | ||||
|   | ||||
| @@ -30,7 +30,7 @@ func newLimiter[V any](aggregation int) limiter[V] { | ||||
| // aggregation cardinality limit for the existing measurements. If it will, | ||||
| // overflowSet is returned. Otherwise, if it will not exceed the limit, or the | ||||
| // limit is not set (limit <= 0), attr is returned. | ||||
| func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set { | ||||
| func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set { | ||||
| 	if l.aggLimit > 0 { | ||||
| 		_, exists := measurements[attrs.Equivalent()] | ||||
| 		if !exists && len(measurements) >= l.aggLimit-1 { | ||||
|   | ||||
| @@ -12,7 +12,8 @@ import ( | ||||
| ) | ||||
|  | ||||
| func TestLimiterAttributes(t *testing.T) { | ||||
| 	m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} | ||||
| 	var val struct{} | ||||
| 	m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val} | ||||
| 	t.Run("NoLimit", func(t *testing.T) { | ||||
| 		l := newLimiter[struct{}](0) | ||||
| 		assert.Equal(t, alice, l.Attributes(alice, m)) | ||||
| @@ -43,7 +44,8 @@ func TestLimiterAttributes(t *testing.T) { | ||||
| var limitedAttr attribute.Set | ||||
|  | ||||
| func BenchmarkLimiterAttributes(b *testing.B) { | ||||
| 	m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} | ||||
| 	var val struct{} | ||||
| 	m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val} | ||||
| 	l := newLimiter[struct{}](2) | ||||
|  | ||||
| 	b.ReportAllocs() | ||||
|   | ||||
| @@ -5,7 +5,6 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| @@ -13,65 +12,75 @@ import ( | ||||
| ) | ||||
|  | ||||
| type sumValue[N int64 | float64] struct { | ||||
| 	n     N | ||||
| 	n     atomicCounter[N] | ||||
| 	res   FilteredExemplarReservoir[N] | ||||
| 	attrs attribute.Set | ||||
| } | ||||
|  | ||||
| // valueMap is the storage for sums. | ||||
| type valueMap[N int64 | float64] struct { | ||||
| 	sync.Mutex | ||||
| 	values limitedSyncMap | ||||
| 	newRes func(attribute.Set) FilteredExemplarReservoir[N] | ||||
| 	limit  limiter[sumValue[N]] | ||||
| 	values map[attribute.Distinct]sumValue[N] | ||||
| } | ||||
|  | ||||
| func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { | ||||
| 	return &valueMap[N]{ | ||||
| 		newRes: r, | ||||
| 		limit:  newLimiter[sumValue[N]](limit), | ||||
| 		values: make(map[attribute.Distinct]sumValue[N]), | ||||
| 	} | ||||
| func (s *valueMap[N]) measure( | ||||
| 	ctx context.Context, | ||||
| 	value N, | ||||
| 	fltrAttr attribute.Set, | ||||
| 	droppedAttr []attribute.KeyValue, | ||||
| ) { | ||||
| 	sv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { | ||||
| 		return &sumValue[N]{ | ||||
| 			res:   s.newRes(attr), | ||||
| 			attrs: attr, | ||||
| 		} | ||||
| 	}).(*sumValue[N]) | ||||
| 	sv.n.add(value) | ||||
| 	// It is possible for collection to race with measurement and observe the | ||||
| 	// exemplar in the batch of metrics after the add() for cumulative sums. | ||||
| 	// This is an accepted tradeoff to avoid locking during measurement. | ||||
| 	sv.res.Offer(ctx, value, droppedAttr) | ||||
| } | ||||
|  | ||||
| func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
|  | ||||
| 	v, ok := s.values[fltrAttr.Equivalent()] | ||||
| 	if !ok { | ||||
| 		fltrAttr = s.limit.Attributes(fltrAttr, s.values) | ||||
| 		v = s.values[fltrAttr.Equivalent()] | ||||
| 		v.res = s.newRes(fltrAttr) | ||||
| 		v.attrs = fltrAttr | ||||
| 	} | ||||
|  | ||||
| 	v.n += value | ||||
| 	v.res.Offer(ctx, value, droppedAttr) | ||||
|  | ||||
| 	s.values[fltrAttr.Equivalent()] = v | ||||
| } | ||||
|  | ||||
| // newSum returns an aggregator that summarizes a set of measurements as their | ||||
| // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle | ||||
| // the measurements were made in. | ||||
| func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] { | ||||
| 	return &sum[N]{ | ||||
| 		valueMap:  newValueMap[N](limit, r), | ||||
| // newDeltaSum returns an aggregator that summarizes a set of measurements as | ||||
| // their arithmetic sum. Each sum is scoped by attributes and the aggregation | ||||
| // cycle the measurements were made in. | ||||
| func newDeltaSum[N int64 | float64]( | ||||
| 	monotonic bool, | ||||
| 	limit int, | ||||
| 	r func(attribute.Set) FilteredExemplarReservoir[N], | ||||
| ) *deltaSum[N] { | ||||
| 	return &deltaSum[N]{ | ||||
| 		monotonic: monotonic, | ||||
| 		start:     now(), | ||||
| 		hotColdValMap: [2]valueMap[N]{ | ||||
| 			{ | ||||
| 				values: limitedSyncMap{aggLimit: limit}, | ||||
| 				newRes: r, | ||||
| 			}, | ||||
| 			{ | ||||
| 				values: limitedSyncMap{aggLimit: limit}, | ||||
| 				newRes: r, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // sum summarizes a set of measurements made as their arithmetic sum. | ||||
| type sum[N int64 | float64] struct { | ||||
| 	*valueMap[N] | ||||
|  | ||||
| // deltaSum is the storage for sums which resets every collection interval. | ||||
| type deltaSum[N int64 | float64] struct { | ||||
| 	monotonic bool | ||||
| 	start     time.Time | ||||
|  | ||||
| 	hcwg          hotColdWaitGroup | ||||
| 	hotColdValMap [2]valueMap[N] | ||||
| } | ||||
|  | ||||
| func (s *sum[N]) delta( | ||||
| func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { | ||||
| 	hotIdx := s.hcwg.start() | ||||
| 	defer s.hcwg.done(hotIdx) | ||||
| 	s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr) | ||||
| } | ||||
|  | ||||
| func (s *deltaSum[N]) collect( | ||||
| 	dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface | ||||
| ) int { | ||||
| 	t := now() | ||||
| @@ -82,33 +91,61 @@ func (s *sum[N]) delta( | ||||
| 	sData.Temporality = metricdata.DeltaTemporality | ||||
| 	sData.IsMonotonic = s.monotonic | ||||
|  | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
|  | ||||
| 	n := len(s.values) | ||||
| 	// delta always clears values on collection | ||||
| 	readIdx := s.hcwg.swapHotAndWait() | ||||
| 	// The len will not change while we iterate over values, since we waited | ||||
| 	// for all writes to finish to the cold values and len. | ||||
| 	n := s.hotColdValMap[readIdx].values.Len() | ||||
| 	dPts := reset(sData.DataPoints, n, n) | ||||
|  | ||||
| 	var i int | ||||
| 	for _, val := range s.values { | ||||
| 	s.hotColdValMap[readIdx].values.Range(func(_, value any) bool { | ||||
| 		val := value.(*sumValue[N]) | ||||
| 		collectExemplars(&dPts[i].Exemplars, val.res.Collect) | ||||
| 		dPts[i].Attributes = val.attrs | ||||
| 		dPts[i].StartTime = s.start | ||||
| 		dPts[i].Time = t | ||||
| 		dPts[i].Value = val.n | ||||
| 		collectExemplars(&dPts[i].Exemplars, val.res.Collect) | ||||
| 		dPts[i].Value = val.n.load() | ||||
| 		i++ | ||||
| 	} | ||||
| 	// Do not report stale values. | ||||
| 	clear(s.values) | ||||
| 		return true | ||||
| 	}) | ||||
| 	s.hotColdValMap[readIdx].values.Clear() | ||||
| 	// The delta collection cycle resets. | ||||
| 	s.start = t | ||||
|  | ||||
| 	sData.DataPoints = dPts | ||||
| 	*dest = sData | ||||
|  | ||||
| 	return n | ||||
| 	return i | ||||
| } | ||||
|  | ||||
| func (s *sum[N]) cumulative( | ||||
| // newCumulativeSum returns an aggregator that summarizes a set of measurements | ||||
| // as their arithmetic sum. Each sum is scoped by attributes and the | ||||
| // aggregation cycle the measurements were made in. | ||||
| func newCumulativeSum[N int64 | float64]( | ||||
| 	monotonic bool, | ||||
| 	limit int, | ||||
| 	r func(attribute.Set) FilteredExemplarReservoir[N], | ||||
| ) *cumulativeSum[N] { | ||||
| 	return &cumulativeSum[N]{ | ||||
| 		monotonic: monotonic, | ||||
| 		start:     now(), | ||||
| 		valueMap: valueMap[N]{ | ||||
| 			values: limitedSyncMap{aggLimit: limit}, | ||||
| 			newRes: r, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // deltaSum is the storage for sums which never reset. | ||||
| type cumulativeSum[N int64 | float64] struct { | ||||
| 	monotonic bool | ||||
| 	start     time.Time | ||||
|  | ||||
| 	valueMap[N] | ||||
| } | ||||
|  | ||||
| func (s *cumulativeSum[N]) collect( | ||||
| 	dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface | ||||
| ) int { | ||||
| 	t := now() | ||||
| @@ -119,30 +156,33 @@ func (s *sum[N]) cumulative( | ||||
| 	sData.Temporality = metricdata.CumulativeTemporality | ||||
| 	sData.IsMonotonic = s.monotonic | ||||
|  | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
|  | ||||
| 	n := len(s.values) | ||||
| 	dPts := reset(sData.DataPoints, n, n) | ||||
| 	// Values are being concurrently written while we iterate, so only use the | ||||
| 	// current length for capacity. | ||||
| 	dPts := reset(sData.DataPoints, 0, s.values.Len()) | ||||
|  | ||||
| 	var i int | ||||
| 	for _, value := range s.values { | ||||
| 		dPts[i].Attributes = value.attrs | ||||
| 		dPts[i].StartTime = s.start | ||||
| 		dPts[i].Time = t | ||||
| 		dPts[i].Value = value.n | ||||
| 		collectExemplars(&dPts[i].Exemplars, value.res.Collect) | ||||
| 	s.values.Range(func(_, value any) bool { | ||||
| 		val := value.(*sumValue[N]) | ||||
| 		newPt := metricdata.DataPoint[N]{ | ||||
| 			Attributes: val.attrs, | ||||
| 			StartTime:  s.start, | ||||
| 			Time:       t, | ||||
| 			Value:      val.n.load(), | ||||
| 		} | ||||
| 		collectExemplars(&newPt.Exemplars, val.res.Collect) | ||||
| 		dPts = append(dPts, newPt) | ||||
| 		// TODO (#3006): This will use an unbounded amount of memory if there | ||||
| 		// are unbounded number of attribute sets being aggregated. Attribute | ||||
| 		// sets that become "stale" need to be forgotten so this will not | ||||
| 		// overload the system. | ||||
| 		i++ | ||||
| 	} | ||||
| 		return true | ||||
| 	}) | ||||
|  | ||||
| 	sData.DataPoints = dPts | ||||
| 	*dest = sData | ||||
|  | ||||
| 	return n | ||||
| 	return i | ||||
| } | ||||
|  | ||||
| // newPrecomputedSum returns an aggregator that summarizes a set of | ||||
| @@ -154,27 +194,22 @@ func newPrecomputedSum[N int64 | float64]( | ||||
| 	r func(attribute.Set) FilteredExemplarReservoir[N], | ||||
| ) *precomputedSum[N] { | ||||
| 	return &precomputedSum[N]{ | ||||
| 		valueMap:  newValueMap[N](limit, r), | ||||
| 		monotonic: monotonic, | ||||
| 		start:     now(), | ||||
| 		deltaSum: newDeltaSum(monotonic, limit, r), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // precomputedSum summarizes a set of observations as their arithmetic sum. | ||||
| type precomputedSum[N int64 | float64] struct { | ||||
| 	*valueMap[N] | ||||
| 	*deltaSum[N] | ||||
|  | ||||
| 	monotonic bool | ||||
| 	start     time.Time | ||||
|  | ||||
| 	reported map[attribute.Distinct]N | ||||
| 	reported map[any]N | ||||
| } | ||||
|  | ||||
| func (s *precomputedSum[N]) delta( | ||||
| 	dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface | ||||
| ) int { | ||||
| 	t := now() | ||||
| 	newReported := make(map[attribute.Distinct]N) | ||||
| 	newReported := make(map[any]N) | ||||
|  | ||||
| 	// If *dest is not a metricdata.Sum, memory reuse is missed. In that case, | ||||
| 	// use the zero-value sData and hope for better alignment next cycle. | ||||
| @@ -182,27 +217,29 @@ func (s *precomputedSum[N]) delta( | ||||
| 	sData.Temporality = metricdata.DeltaTemporality | ||||
| 	sData.IsMonotonic = s.monotonic | ||||
|  | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
|  | ||||
| 	n := len(s.values) | ||||
| 	// delta always clears values on collection | ||||
| 	readIdx := s.hcwg.swapHotAndWait() | ||||
| 	// The len will not change while we iterate over values, since we waited | ||||
| 	// for all writes to finish to the cold values and len. | ||||
| 	n := s.hotColdValMap[readIdx].values.Len() | ||||
| 	dPts := reset(sData.DataPoints, n, n) | ||||
|  | ||||
| 	var i int | ||||
| 	for key, value := range s.values { | ||||
| 		delta := value.n - s.reported[key] | ||||
| 	s.hotColdValMap[readIdx].values.Range(func(key, value any) bool { | ||||
| 		val := value.(*sumValue[N]) | ||||
| 		n := val.n.load() | ||||
|  | ||||
| 		dPts[i].Attributes = value.attrs | ||||
| 		delta := n - s.reported[key] | ||||
| 		collectExemplars(&dPts[i].Exemplars, val.res.Collect) | ||||
| 		dPts[i].Attributes = val.attrs | ||||
| 		dPts[i].StartTime = s.start | ||||
| 		dPts[i].Time = t | ||||
| 		dPts[i].Value = delta | ||||
| 		collectExemplars(&dPts[i].Exemplars, value.res.Collect) | ||||
|  | ||||
| 		newReported[key] = value.n | ||||
| 		newReported[key] = n | ||||
| 		i++ | ||||
| 	} | ||||
| 	// Unused attribute sets do not report. | ||||
| 	clear(s.values) | ||||
| 		return true | ||||
| 	}) | ||||
| 	s.hotColdValMap[readIdx].values.Clear() | ||||
| 	s.reported = newReported | ||||
| 	// The delta collection cycle resets. | ||||
| 	s.start = t | ||||
| @@ -210,7 +247,7 @@ func (s *precomputedSum[N]) delta( | ||||
| 	sData.DataPoints = dPts | ||||
| 	*dest = sData | ||||
|  | ||||
| 	return n | ||||
| 	return i | ||||
| } | ||||
|  | ||||
| func (s *precomputedSum[N]) cumulative( | ||||
| @@ -224,27 +261,28 @@ func (s *precomputedSum[N]) cumulative( | ||||
| 	sData.Temporality = metricdata.CumulativeTemporality | ||||
| 	sData.IsMonotonic = s.monotonic | ||||
|  | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
|  | ||||
| 	n := len(s.values) | ||||
| 	// cumulative precomputed always clears values on collection | ||||
| 	readIdx := s.hcwg.swapHotAndWait() | ||||
| 	// The len will not change while we iterate over values, since we waited | ||||
| 	// for all writes to finish to the cold values and len. | ||||
| 	n := s.hotColdValMap[readIdx].values.Len() | ||||
| 	dPts := reset(sData.DataPoints, n, n) | ||||
|  | ||||
| 	var i int | ||||
| 	for _, val := range s.values { | ||||
| 	s.hotColdValMap[readIdx].values.Range(func(_, value any) bool { | ||||
| 		val := value.(*sumValue[N]) | ||||
| 		collectExemplars(&dPts[i].Exemplars, val.res.Collect) | ||||
| 		dPts[i].Attributes = val.attrs | ||||
| 		dPts[i].StartTime = s.start | ||||
| 		dPts[i].Time = t | ||||
| 		dPts[i].Value = val.n | ||||
| 		collectExemplars(&dPts[i].Exemplars, val.res.Collect) | ||||
|  | ||||
| 		dPts[i].Value = val.n.load() | ||||
| 		i++ | ||||
| 	} | ||||
| 	// Unused attribute sets do not report. | ||||
| 	clear(s.values) | ||||
| 		return true | ||||
| 	}) | ||||
| 	s.hotColdValMap[readIdx].values.Clear() | ||||
|  | ||||
| 	sData.DataPoints = dPts | ||||
| 	*dest = sData | ||||
|  | ||||
| 	return n | ||||
| 	return i | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user