mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-03 22:52:30 +02:00
Add HTTP/JSON to the otlp exporter (#1586)
* Add HTTP/JSON to the otlp exporter Co-Authored-By: Roy van de Water <72016+royvandewater@users.noreply.github.com> * PR fixup Co-authored-by: Roy van de Water <72016+royvandewater@users.noreply.github.com>
This commit is contained in:
parent
62e2a0f766
commit
7153ef2dc2
@ -8,6 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## Added
|
||||||
|
|
||||||
|
- Added `Marshler` config option to `otlphttp` to enable otlp over json or protobufs. (#1586)
|
||||||
### Removed
|
### Removed
|
||||||
|
|
||||||
- Removed the exported `SimpleSpanProcessor` and `BatchSpanProcessor` structs.
|
- Removed the exported `SimpleSpanProcessor` and `BatchSpanProcessor` structs.
|
||||||
|
@ -28,6 +28,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/jsonpb"
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/exporters/otlp"
|
"go.opentelemetry.io/otel/exporters/otlp"
|
||||||
colmetricspb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
|
colmetricspb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
|
||||||
@ -37,7 +40,8 @@ import (
|
|||||||
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
|
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
const contentType = "application/x-protobuf"
|
const contentTypeProto = "application/x-protobuf"
|
||||||
|
const contentTypeJSON = "application/json"
|
||||||
|
|
||||||
// Keep it in sync with golang's DefaultTransport from net/http! We
|
// Keep it in sync with golang's DefaultTransport from net/http! We
|
||||||
// have our own copy to avoid handling a situation where the
|
// have our own copy to avoid handling a situation where the
|
||||||
@ -142,7 +146,7 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
|
|||||||
pbRequest := &colmetricspb.ExportMetricsServiceRequest{
|
pbRequest := &colmetricspb.ExportMetricsServiceRequest{
|
||||||
ResourceMetrics: rms,
|
ResourceMetrics: rms,
|
||||||
}
|
}
|
||||||
rawRequest, err := pbRequest.Marshal()
|
rawRequest, err := d.marshal(pbRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -158,13 +162,21 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
|
|||||||
pbRequest := &coltracepb.ExportTraceServiceRequest{
|
pbRequest := &coltracepb.ExportTraceServiceRequest{
|
||||||
ResourceSpans: protoSpans,
|
ResourceSpans: protoSpans,
|
||||||
}
|
}
|
||||||
rawRequest, err := pbRequest.Marshal()
|
rawRequest, err := d.marshal(pbRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
|
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *driver) marshal(msg proto.Message) ([]byte, error) {
|
||||||
|
if d.cfg.marshaler == MarshalJSON {
|
||||||
|
s, err := (&jsonpb.Marshaler{}).MarshalToString(msg)
|
||||||
|
return []byte(s), err
|
||||||
|
}
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
|
func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
|
||||||
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
|
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
@ -267,7 +279,11 @@ func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Head
|
|||||||
headers.Set(k, v)
|
headers.Set(k, v)
|
||||||
}
|
}
|
||||||
contentLength := (int64)(len(rawRequest))
|
contentLength := (int64)(len(rawRequest))
|
||||||
headers.Set("Content-Type", contentType)
|
if d.cfg.marshaler == MarshalJSON {
|
||||||
|
headers.Set("Content-Type", contentTypeJSON)
|
||||||
|
} else {
|
||||||
|
headers.Set("Content-Type", contentTypeProto)
|
||||||
|
}
|
||||||
requestReader := bytes.NewBuffer(rawRequest)
|
requestReader := bytes.NewBuffer(rawRequest)
|
||||||
switch d.cfg.compression {
|
switch d.cfg.compression {
|
||||||
case NoCompression:
|
case NoCompression:
|
||||||
|
@ -105,6 +105,12 @@ func TestEndToEnd(t *testing.T) {
|
|||||||
ExpectedHeaders: testHeaders,
|
ExpectedHeaders: testHeaders,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "with json encoding",
|
||||||
|
opts: []otlphttp.Option{
|
||||||
|
otlphttp.WithMarshal(otlphttp.MarshalJSON),
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/jsonpb"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
@ -109,15 +110,25 @@ func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
request := collectormetricpb.ExportMetricsServiceRequest{}
|
request, err := unmarshalMetricsRequest(rawRequest, r.Header.Get("content-type"))
|
||||||
if err := request.Unmarshal(rawRequest); err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeReply(w, rawResponse, 0, c.injectContentType)
|
writeReply(w, rawResponse, 0, c.injectContentType)
|
||||||
c.metricLock.Lock()
|
c.metricLock.Lock()
|
||||||
defer c.metricLock.Unlock()
|
defer c.metricLock.Unlock()
|
||||||
c.metricsStorage.AddMetrics(&request)
|
c.metricsStorage.AddMetrics(request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalMetricsRequest(rawRequest []byte, contentType string) (*collectormetricpb.ExportMetricsServiceRequest, error) {
|
||||||
|
request := &collectormetricpb.ExportMetricsServiceRequest{}
|
||||||
|
if contentType == "application/json" {
|
||||||
|
err := jsonpb.UnmarshalString(string(rawRequest), request)
|
||||||
|
return request, err
|
||||||
|
}
|
||||||
|
err := request.Unmarshal(rawRequest)
|
||||||
|
return request, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
|
func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -140,15 +151,26 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
request := collectortracepb.ExportTraceServiceRequest{}
|
|
||||||
if err := request.Unmarshal(rawRequest); err != nil {
|
request, err := unmarshalTraceRequest(rawRequest, r.Header.Get("content-type"))
|
||||||
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeReply(w, rawResponse, 0, c.injectContentType)
|
writeReply(w, rawResponse, 0, c.injectContentType)
|
||||||
c.spanLock.Lock()
|
c.spanLock.Lock()
|
||||||
defer c.spanLock.Unlock()
|
defer c.spanLock.Unlock()
|
||||||
c.spansStorage.AddSpans(&request)
|
c.spansStorage.AddSpans(request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalTraceRequest(rawRequest []byte, contentType string) (*collectortracepb.ExportTraceServiceRequest, error) {
|
||||||
|
request := &collectortracepb.ExportTraceServiceRequest{}
|
||||||
|
if contentType == "application/json" {
|
||||||
|
err := jsonpb.UnmarshalString(string(rawRequest), request)
|
||||||
|
return request, err
|
||||||
|
}
|
||||||
|
err := request.Unmarshal(rawRequest)
|
||||||
|
return request, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockCollector) checkHeaders(r *http.Request) bool {
|
func (c *mockCollector) checkHeaders(r *http.Request) bool {
|
||||||
|
@ -48,6 +48,16 @@ const (
|
|||||||
DefaultBackoff time.Duration = 300 * time.Millisecond
|
DefaultBackoff time.Duration = 300 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Marshaler describes the kind of message format sent to the collector
|
||||||
|
type Marshaler int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MarshalProto tells the driver to send using the protobuf binary format.
|
||||||
|
MarshalProto Marshaler = iota
|
||||||
|
// MarshalJSON tells the driver to send using json format.
|
||||||
|
MarshalJSON
|
||||||
|
)
|
||||||
|
|
||||||
type config struct {
|
type config struct {
|
||||||
endpoint string
|
endpoint string
|
||||||
compression Compression
|
compression Compression
|
||||||
@ -58,6 +68,7 @@ type config struct {
|
|||||||
tlsCfg *tls.Config
|
tlsCfg *tls.Config
|
||||||
insecure bool
|
insecure bool
|
||||||
headers map[string]string
|
headers map[string]string
|
||||||
|
marshaler Marshaler
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option applies an option to the HTTP driver.
|
// Option applies an option to the HTTP driver.
|
||||||
@ -201,3 +212,16 @@ func (headersOption) private() {}
|
|||||||
func WithHeaders(headers map[string]string) Option {
|
func WithHeaders(headers map[string]string) Option {
|
||||||
return (headersOption)(headers)
|
return (headersOption)(headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type marshalerOption Marshaler
|
||||||
|
|
||||||
|
func (o marshalerOption) Apply(cfg *config) {
|
||||||
|
cfg.marshaler = Marshaler(o)
|
||||||
|
}
|
||||||
|
func (marshalerOption) private() {}
|
||||||
|
|
||||||
|
// WithMarshal tells the driver which wire format to use when sending to the
|
||||||
|
// collector. If unset, MarshalProto will be used
|
||||||
|
func WithMarshal(m Marshaler) Option {
|
||||||
|
return marshalerOption(m)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user