You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Return merged Resource on schema conflict (#4876)
* Return merged Resource on schema conflict * Add changes to changelog * Doc returned resource to have no schema URL * Refactor Merge based on feedback * Add the schema URLs to the returned error * Ensure no schema URL when merge conflict on detect * Replaced isErr with wantErr in TestNew * Update sdk/resource/auto_test.go Co-authored-by: Robert Pająk <pellared@hotmail.com> * Update TestDetect based on feedback --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
@@ -13,6 +13,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)
|
- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)
|
||||||
- Experimental exemplar exporting is added to the metric SDK.
|
- Experimental exemplar exporting is added to the metric SDK.
|
||||||
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871)
|
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871)
|
||||||
|
- `ErrSchemaURLConflict` is added to `go.opentelemetry.io/otel/sdk/resource`.
|
||||||
|
This error is returned when a merge of two `Resource`s with different (non-empty) schema URL is attepted. (#4876)
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- The `Merge` and `New` functions in `go.opentelemetry.io/otel/sdk/resource` now returns a partial result if there is a schema URL merge conflict.
|
||||||
|
Instead of returning `nil` when two `Resource`s with different (non-empty) schema URLs are merged the merged `Resource`, along with the new `ErrSchemaURLConflict` error, is returned.
|
||||||
|
It is up to the user to decide if they want to use the returned `Resource` or not.
|
||||||
|
It may have desired attributes overwritten or include stale semantic conventions. (#4876)
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
|
|||||||
+23
-2
@@ -41,8 +41,20 @@ type Detector interface {
|
|||||||
// must never be done outside of a new major release.
|
// must never be done outside of a new major release.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Detect calls all input detectors sequentially and merges each result with the previous one.
|
// Detect returns a new [Resource] merged from all the Resources each of the
|
||||||
// It returns the merged error too.
|
// detectors produces. Each of the detectors are called sequentially, in the
|
||||||
|
// order they are passed, merging the produced resource into the previous.
|
||||||
|
//
|
||||||
|
// This may return a partial Resource along with an error containing
|
||||||
|
// [ErrPartialResource] if that error is returned from a detector. It may also
|
||||||
|
// return a merge-conflicting Resource along with an error containing
|
||||||
|
// [ErrSchemaURLConflict] if merging Resources from different detectors results
|
||||||
|
// in a schema URL conflict. It is up to the caller to determine if this
|
||||||
|
// returned Resource should be used or not.
|
||||||
|
//
|
||||||
|
// If one of the detectors returns an error that is not [ErrPartialResource],
|
||||||
|
// the resource produced by the detector will not be merged and the returned
|
||||||
|
// error will wrap that detector's error.
|
||||||
func Detect(ctx context.Context, detectors ...Detector) (*Resource, error) {
|
func Detect(ctx context.Context, detectors ...Detector) (*Resource, error) {
|
||||||
r := new(Resource)
|
r := new(Resource)
|
||||||
return r, detect(ctx, r, detectors)
|
return r, detect(ctx, r, detectors)
|
||||||
@@ -50,6 +62,10 @@ func Detect(ctx context.Context, detectors ...Detector) (*Resource, error) {
|
|||||||
|
|
||||||
// detect runs all detectors using ctx and merges the result into res. This
|
// detect runs all detectors using ctx and merges the result into res. This
|
||||||
// assumes res is allocated and not nil, it will panic otherwise.
|
// assumes res is allocated and not nil, it will panic otherwise.
|
||||||
|
//
|
||||||
|
// If the detectors or merging resources produces any errors (i.e.
|
||||||
|
// [ErrPartialResource] [ErrSchemaURLConflict]), a single error wrapping all of
|
||||||
|
// these errors will be returned. Otherwise, nil is returned.
|
||||||
func detect(ctx context.Context, res *Resource, detectors []Detector) error {
|
func detect(ctx context.Context, res *Resource, detectors []Detector) error {
|
||||||
var (
|
var (
|
||||||
r *Resource
|
r *Resource
|
||||||
@@ -78,6 +94,11 @@ func detect(ctx context.Context, res *Resource, detectors []Detector) error {
|
|||||||
if len(errs) == 0 {
|
if len(errs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if errors.Is(errs, ErrSchemaURLConflict) {
|
||||||
|
// If there has been a merge conflict, ensure the resource has no
|
||||||
|
// schema URL.
|
||||||
|
res.schemaURL = ""
|
||||||
|
}
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+61
-19
@@ -16,47 +16,89 @@ package resource_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type detector struct {
|
||||||
|
SchemaURL string
|
||||||
|
Attributes []attribute.KeyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDetector(schemaURL string, attrs ...attribute.KeyValue) resource.Detector {
|
||||||
|
return detector{schemaURL, attrs}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d detector) Detect(context.Context) (*resource.Resource, error) {
|
||||||
|
return resource.NewWithAttributes(d.SchemaURL, d.Attributes...), nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestDetect(t *testing.T) {
|
func TestDetect(t *testing.T) {
|
||||||
|
v130 := "https://opentelemetry.io/schemas/1.3.0"
|
||||||
|
v140 := "https://opentelemetry.io/schemas/1.4.0"
|
||||||
|
v150 := "https://opentelemetry.io/schemas/1.5.0"
|
||||||
|
|
||||||
|
alice := attribute.String("name", "Alice")
|
||||||
|
bob := attribute.String("name", "Bob")
|
||||||
|
carol := attribute.String("name", "Carol")
|
||||||
|
|
||||||
|
admin := attribute.Bool("admin", true)
|
||||||
|
user := attribute.Bool("admin", false)
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
schema1, schema2 string
|
detectors []resource.Detector
|
||||||
isErr bool
|
want *resource.Resource
|
||||||
|
wantErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "different schema urls",
|
name: "two different schema urls",
|
||||||
schema1: "https://opentelemetry.io/schemas/1.3.0",
|
detectors: []resource.Detector{
|
||||||
schema2: "https://opentelemetry.io/schemas/1.4.0",
|
newDetector(v130, alice, admin),
|
||||||
isErr: true,
|
newDetector(v140, bob, user),
|
||||||
|
},
|
||||||
|
want: resource.NewSchemaless(bob, user),
|
||||||
|
wantErr: resource.ErrSchemaURLConflict,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "same schema url",
|
name: "three different schema urls",
|
||||||
schema1: "https://opentelemetry.io/schemas/1.4.0",
|
detectors: []resource.Detector{
|
||||||
schema2: "https://opentelemetry.io/schemas/1.4.0",
|
newDetector(v130, alice, admin),
|
||||||
isErr: false,
|
newDetector(v140, bob, user),
|
||||||
|
newDetector(v150, carol),
|
||||||
|
},
|
||||||
|
want: resource.NewSchemaless(carol, user),
|
||||||
|
wantErr: resource.ErrSchemaURLConflict,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "same schema url",
|
||||||
|
detectors: []resource.Detector{
|
||||||
|
newDetector(v140, alice, admin),
|
||||||
|
newDetector(v140, bob, user),
|
||||||
|
},
|
||||||
|
want: resource.NewWithAttributes(v140, bob, user),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) {
|
t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) {
|
||||||
d1 := resource.StringDetector(c.schema1, semconv.HostNameKey, os.Hostname)
|
r, err := resource.Detect(context.Background(), c.detectors...)
|
||||||
d2 := resource.StringDetector(c.schema2, semconv.HostNameKey, os.Hostname)
|
if c.wantErr != nil {
|
||||||
r, err := resource.Detect(context.Background(), d1, d2)
|
assert.ErrorIs(t, err, c.wantErr)
|
||||||
assert.NotNil(t, r)
|
if errors.Is(c.wantErr, resource.ErrSchemaURLConflict) {
|
||||||
if c.isErr {
|
assert.Zero(t, r.SchemaURL())
|
||||||
assert.Error(t, err)
|
}
|
||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
assert.Equal(t, c.want.SchemaURL(), r.SchemaURL())
|
||||||
|
assert.ElementsMatch(t, c.want.Attributes(), r.Attributes())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+52
-25
@@ -17,6 +17,7 @@ package resource // import "go.opentelemetry.io/otel/sdk/resource"
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
@@ -40,9 +41,20 @@ var (
|
|||||||
defaultResourceOnce sync.Once
|
defaultResourceOnce sync.Once
|
||||||
)
|
)
|
||||||
|
|
||||||
var errMergeConflictSchemaURL = errors.New("cannot merge resource due to conflicting Schema URL")
|
// ErrSchemaURLConflict is an error returned when two Resources are merged
|
||||||
|
// together that contain different, non-empty, schema URLs.
|
||||||
|
var ErrSchemaURLConflict = errors.New("conflicting Schema URL")
|
||||||
|
|
||||||
// New returns a Resource combined from the user-provided detectors.
|
// New returns a [Resource] built using opts.
|
||||||
|
//
|
||||||
|
// This may return a partial Resource along with an error containing
|
||||||
|
// [ErrPartialResource] if options that provide a [Detector] are used and that
|
||||||
|
// error is returned from one or more of the Detectors. It may also return a
|
||||||
|
// merge-conflict Resource along with an error containing
|
||||||
|
// [ErrSchemaURLConflict] if merging Resources from the opts results in a
|
||||||
|
// schema URL conflict (see [Resource.Merge] for more information). It is up to
|
||||||
|
// the caller to determine if this returned Resource should be used or not
|
||||||
|
// based on these errors.
|
||||||
func New(ctx context.Context, opts ...Option) (*Resource, error) {
|
func New(ctx context.Context, opts ...Option) (*Resource, error) {
|
||||||
cfg := config{}
|
cfg := config{}
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@@ -146,16 +158,29 @@ func (r *Resource) Equal(eq *Resource) bool {
|
|||||||
return r.Equivalent() == eq.Equivalent()
|
return r.Equivalent() == eq.Equivalent()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge creates a new resource by combining resource a and b.
|
// Merge creates a new [Resource] by merging a and b.
|
||||||
//
|
//
|
||||||
// If there are common keys between resource a and b, then the value
|
// If there are common keys between a and b, then the value from b will
|
||||||
// from resource b will overwrite the value from resource a, even
|
// overwrite the value from a, even if b's value is empty.
|
||||||
// if resource b's value is empty.
|
|
||||||
//
|
//
|
||||||
// The SchemaURL of the resources will be merged according to the spec rules:
|
// The SchemaURL of the resources will be merged according to the
|
||||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/resource/sdk.md#merge
|
// [OpenTelemetry specification rules]:
|
||||||
// If the resources have different non-empty schemaURL an empty resource and an error
|
//
|
||||||
// will be returned.
|
// - If a's schema URL is empty then the returned Resource's schema URL will
|
||||||
|
// be set to the schema URL of b,
|
||||||
|
// - Else if b's schema URL is empty then the returned Resource's schema URL
|
||||||
|
// will be set to the schema URL of a,
|
||||||
|
// - Else if the schema URLs of a and b are the same then that will be the
|
||||||
|
// schema URL of the returned Resource,
|
||||||
|
// - Else this is a merging error. If the resources have different,
|
||||||
|
// non-empty, schema URLs an error containing [ErrSchemaURLConflict] will
|
||||||
|
// be returned with the merged Resource. The merged Resource will have an
|
||||||
|
// empty schema URL. It may be the case that some unintended attributes
|
||||||
|
// have been overwritten or old semantic conventions persisted in the
|
||||||
|
// returned Resource. It is up to the caller to determine if this returned
|
||||||
|
// Resource should be used or not.
|
||||||
|
//
|
||||||
|
// [OpenTelemetry specification rules]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/resource/sdk.md#merge
|
||||||
func Merge(a, b *Resource) (*Resource, error) {
|
func Merge(a, b *Resource) (*Resource, error) {
|
||||||
if a == nil && b == nil {
|
if a == nil && b == nil {
|
||||||
return Empty(), nil
|
return Empty(), nil
|
||||||
@@ -167,19 +192,6 @@ func Merge(a, b *Resource) (*Resource, error) {
|
|||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge the schema URL.
|
|
||||||
var schemaURL string
|
|
||||||
switch true {
|
|
||||||
case a.schemaURL == "":
|
|
||||||
schemaURL = b.schemaURL
|
|
||||||
case b.schemaURL == "":
|
|
||||||
schemaURL = a.schemaURL
|
|
||||||
case a.schemaURL == b.schemaURL:
|
|
||||||
schemaURL = a.schemaURL
|
|
||||||
default:
|
|
||||||
return Empty(), errMergeConflictSchemaURL
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note: 'b' attributes will overwrite 'a' with last-value-wins in attribute.Key()
|
// Note: 'b' attributes will overwrite 'a' with last-value-wins in attribute.Key()
|
||||||
// Meaning this is equivalent to: append(a.Attributes(), b.Attributes()...)
|
// Meaning this is equivalent to: append(a.Attributes(), b.Attributes()...)
|
||||||
mi := attribute.NewMergeIterator(b.Set(), a.Set())
|
mi := attribute.NewMergeIterator(b.Set(), a.Set())
|
||||||
@@ -187,8 +199,23 @@ func Merge(a, b *Resource) (*Resource, error) {
|
|||||||
for mi.Next() {
|
for mi.Next() {
|
||||||
combine = append(combine, mi.Attribute())
|
combine = append(combine, mi.Attribute())
|
||||||
}
|
}
|
||||||
merged := NewWithAttributes(schemaURL, combine...)
|
|
||||||
return merged, nil
|
switch {
|
||||||
|
case a.schemaURL == "":
|
||||||
|
return NewWithAttributes(b.schemaURL, combine...), nil
|
||||||
|
case b.schemaURL == "":
|
||||||
|
return NewWithAttributes(a.schemaURL, combine...), nil
|
||||||
|
case a.schemaURL == b.schemaURL:
|
||||||
|
return NewWithAttributes(a.schemaURL, combine...), nil
|
||||||
|
}
|
||||||
|
// Return the merged resource with an appropriate error. It is up to
|
||||||
|
// the user to decide if the returned resource can be used or not.
|
||||||
|
return NewSchemaless(combine...), fmt.Errorf(
|
||||||
|
"%w: %s and %s",
|
||||||
|
ErrSchemaURLConflict,
|
||||||
|
a.schemaURL,
|
||||||
|
b.schemaURL,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty returns an instance of Resource with no attributes. It is
|
// Empty returns an instance of Resource with no attributes. It is
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ func TestMerge(t *testing.T) {
|
|||||||
name: "Merge with different schemas",
|
name: "Merge with different schemas",
|
||||||
a: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.4.0", kv41),
|
a: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.4.0", kv41),
|
||||||
b: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.3.0", kv42),
|
b: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.3.0", kv42),
|
||||||
want: nil,
|
want: []attribute.KeyValue{kv42},
|
||||||
isErr: true,
|
isErr: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -324,7 +324,7 @@ func TestNew(t *testing.T) {
|
|||||||
|
|
||||||
resourceValues map[string]string
|
resourceValues map[string]string
|
||||||
schemaURL string
|
schemaURL string
|
||||||
isErr bool
|
wantErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "No Options returns empty resource",
|
name: "No Options returns empty resource",
|
||||||
@@ -406,9 +406,14 @@ func TestNew(t *testing.T) {
|
|||||||
),
|
),
|
||||||
resource.WithSchemaURL("https://opentelemetry.io/schemas/1.1.0"),
|
resource.WithSchemaURL("https://opentelemetry.io/schemas/1.1.0"),
|
||||||
},
|
},
|
||||||
resourceValues: map[string]string{},
|
resourceValues: map[string]string{
|
||||||
schemaURL: "",
|
string(semconv.HostNameKey): func() (hostname string) {
|
||||||
isErr: true,
|
hostname, _ = os.Hostname()
|
||||||
|
return hostname
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
schemaURL: "",
|
||||||
|
wantErr: resource.ErrSchemaURLConflict,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "With conflicting detector schema urls",
|
name: "With conflicting detector schema urls",
|
||||||
@@ -420,9 +425,14 @@ func TestNew(t *testing.T) {
|
|||||||
),
|
),
|
||||||
resource.WithSchemaURL("https://opentelemetry.io/schemas/1.2.0"),
|
resource.WithSchemaURL("https://opentelemetry.io/schemas/1.2.0"),
|
||||||
},
|
},
|
||||||
resourceValues: map[string]string{},
|
resourceValues: map[string]string{
|
||||||
schemaURL: "",
|
string(semconv.HostNameKey): func() (hostname string) {
|
||||||
isErr: true,
|
hostname, _ = os.Hostname()
|
||||||
|
return hostname
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
schemaURL: "",
|
||||||
|
wantErr: resource.ErrSchemaURLConflict,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tc {
|
for _, tt := range tc {
|
||||||
@@ -436,10 +446,10 @@ func TestNew(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
res, err := resource.New(ctx, tt.options...)
|
res, err := resource.New(ctx, tt.options...)
|
||||||
|
|
||||||
if tt.isErr {
|
if tt.wantErr != nil {
|
||||||
require.Error(t, err)
|
assert.ErrorIs(t, err, tt.wantErr)
|
||||||
} else {
|
} else {
|
||||||
require.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.EqualValues(t, tt.resourceValues, toMap(res))
|
require.EqualValues(t, tt.resourceValues, toMap(res))
|
||||||
|
|||||||
Reference in New Issue
Block a user