From 1ea4ee2717aac3260d38cfadfbed334df751fa27 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 19 Apr 2024 10:17:11 -0700 Subject: [PATCH] Add the otlploghttp HTTP client (#5224) * Add the HTTP client * Add client tests * Clean up * Fix merge --- exporters/otlp/otlplog/otlploghttp/client.go | 273 ++++++- .../otlp/otlplog/otlploghttp/client_test.go | 757 ++++++++++++++++++ exporters/otlp/otlplog/otlploghttp/config.go | 8 +- .../otlp/otlplog/otlploghttp/config_test.go | 7 +- exporters/otlp/otlplog/otlploghttp/go.mod | 3 +- 5 files changed, 1038 insertions(+), 10 deletions(-) create mode 100644 exporters/otlp/otlplog/otlploghttp/client_test.go diff --git a/exporters/otlp/otlplog/otlploghttp/client.go b/exporters/otlp/otlplog/otlploghttp/client.go index fc5d911f7..1ff61d0d6 100644 --- a/exporters/otlp/otlplog/otlploghttp/client.go +++ b/exporters/otlp/otlplog/otlploghttp/client.go @@ -4,9 +4,26 @@ package otlploghttp // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" import ( + "bytes" + "compress/gzip" "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "sync" + "time" + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/otel" + collogpb "go.opentelemetry.io/proto/slim/otlp/collector/logs/v1" logpb "go.opentelemetry.io/proto/slim/otlp/logs/v1" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" ) type client struct { @@ -26,6 +43,258 @@ func newNoopClient() *client { // newHTTPClient creates a new HTTP log client. func newHTTPClient(cfg config) (*client, error) { - // TODO: implement. - return &client{}, nil + hc := &http.Client{ + Transport: ourTransport, + Timeout: cfg.timeout.Value, + } + + if cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil { + clonedTransport := ourTransport.Clone() + hc.Transport = clonedTransport + + if cfg.tlsCfg.Value != nil { + clonedTransport.TLSClientConfig = cfg.tlsCfg.Value + } + if cfg.proxy.Value != nil { + clonedTransport.Proxy = cfg.proxy.Value + } + } + + u := &url.URL{ + Scheme: "https", + Host: cfg.endpoint.Value, + Path: cfg.path.Value, + } + if cfg.insecure.Value { + u.Scheme = "http" + } + // Body is set when this is cloned during upload. + req, err := http.NewRequest(http.MethodPost, u.String(), http.NoBody) + if err != nil { + return nil, err + } + + userAgent := "OTel Go OTLP over HTTP/protobuf logs exporter/" + Version() + req.Header.Set("User-Agent", userAgent) + + if n := len(cfg.headers.Value); n > 0 { + for k, v := range cfg.headers.Value { + req.Header.Set(k, v) + } + } + req.Header.Set("Content-Type", "application/x-protobuf") + + c := &httpClient{ + compression: cfg.compression.Value, + req: req, + requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate), + client: hc, + } + return &client{uploadLogs: c.uploadLogs}, nil +} + +type httpClient struct { + // req is cloned for every upload the client makes. + req *http.Request + compression Compression + requestFunc retry.RequestFunc + client *http.Client +} + +// Keep it in sync with golang's DefaultTransport from net/http! We +// have our own copy to avoid handling a situation where the +// DefaultTransport is overwritten with some different implementation +// of http.RoundTripper or it's modified by another package. +var ourTransport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, +} + +func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) error { + // The Exporter synchronizes access to client methods. This is not called + // after the Exporter is shutdown. Only thing to do here is send data. + + pbRequest := &collogpb.ExportLogsServiceRequest{ResourceLogs: data} + body, err := proto.Marshal(pbRequest) + if err != nil { + return err + } + request, err := c.newRequest(ctx, body) + if err != nil { + return err + } + + return c.requestFunc(ctx, func(iCtx context.Context) error { + select { + case <-iCtx.Done(): + return iCtx.Err() + default: + } + + request.reset(iCtx) + resp, err := c.client.Do(request.Request) + var urlErr *url.Error + if errors.As(err, &urlErr) && urlErr.Temporary() { + return newResponseError(http.Header{}) + } + if err != nil { + return err + } + + var rErr error + switch sc := resp.StatusCode; { + case sc >= 200 && sc <= 299: + // Success, do not retry. + + // Read the partial success message, if any. + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { + return err + } + if respData.Len() == 0 { + return nil + } + + if resp.Header.Get("Content-Type") == "application/x-protobuf" { + var respProto collogpb.ExportLogsServiceResponse + if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { + return err + } + + if respProto.PartialSuccess != nil { + msg := respProto.PartialSuccess.GetErrorMessage() + n := respProto.PartialSuccess.GetRejectedLogRecords() + if n != 0 || msg != "" { + err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n) + otel.Handle(err) + } + } + } + return nil + case sc == http.StatusTooManyRequests, + sc == http.StatusBadGateway, + sc == http.StatusServiceUnavailable, + sc == http.StatusGatewayTimeout: + // Retry-able failure. + rErr = newResponseError(resp.Header) + + // Going to retry, drain the body to reuse the connection. + if _, err := io.Copy(io.Discard, resp.Body); err != nil { + _ = resp.Body.Close() + return err + } + default: + rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status) + } + + if err := resp.Body.Close(); err != nil { + return err + } + return rErr + }) +} + +var gzPool = sync.Pool{ + New: func() interface{} { + w := gzip.NewWriter(io.Discard) + return w + }, +} + +func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, error) { + r := c.req.Clone(ctx) + req := request{Request: r} + + switch c.compression { + case NoCompression: + r.ContentLength = (int64)(len(body)) + req.bodyReader = bodyReader(body) + case GzipCompression: + // Ensure the content length is not used. + r.ContentLength = -1 + r.Header.Set("Content-Encoding", "gzip") + + gz := gzPool.Get().(*gzip.Writer) + defer gzPool.Put(gz) + + var b bytes.Buffer + gz.Reset(&b) + + if _, err := gz.Write(body); err != nil { + return req, err + } + // Close needs to be called to ensure body is fully written. + if err := gz.Close(); err != nil { + return req, err + } + + req.bodyReader = bodyReader(b.Bytes()) + } + + return req, nil +} + +// bodyReader returns a closure returning a new reader for buf. +func bodyReader(buf []byte) func() io.ReadCloser { + return func() io.ReadCloser { + return io.NopCloser(bytes.NewReader(buf)) + } +} + +// request wraps an http.Request with a resettable body reader. +type request struct { + *http.Request + + // bodyReader allows the same body to be used for multiple requests. + bodyReader func() io.ReadCloser +} + +// reset reinitializes the request Body and uses ctx for the request. +func (r *request) reset(ctx context.Context) { + r.Body = r.bodyReader() + r.Request = r.WithContext(ctx) +} + +// retryableError represents a request failure that can be retried. +type retryableError struct { + throttle int64 +} + +// newResponseError returns a retryableError and will extract any explicit +// throttle delay contained in headers. +func newResponseError(header http.Header) error { + var rErr retryableError + if v := header.Get("Retry-After"); v != "" { + if t, err := strconv.ParseInt(v, 10, 64); err == nil { + rErr.throttle = t + } + } + return rErr +} + +func (e retryableError) Error() string { + return "retry-able request failure" +} + +// evaluate returns if err is retry-able. If it is and it includes an explicit +// throttling delay, that delay is also returned. +func evaluate(err error) (bool, time.Duration) { + if err == nil { + return false, 0 + } + + rErr, ok := err.(retryableError) + if !ok { + return false, 0 + } + + return true, time.Duration(rErr.throttle) } diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go new file mode 100644 index 000000000..799d4022d --- /dev/null +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -0,0 +1,757 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlploghttp + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "errors" + "fmt" + "io" + "math/big" + "net" + "net/http" + "net/url" + "strings" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/otel" + collogpb "go.opentelemetry.io/proto/slim/otlp/collector/logs/v1" + cpb "go.opentelemetry.io/proto/slim/otlp/common/v1" + lpb "go.opentelemetry.io/proto/slim/otlp/logs/v1" + rpb "go.opentelemetry.io/proto/slim/otlp/resource/v1" + + "go.opentelemetry.io/otel/sdk/log" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +var ( + // Sat Jan 01 2000 00:00:00 GMT+0000. + ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0)) + obs = ts.Add(30 * time.Second) + + kvAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + }} + kvBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + }} + kvSrvName = &cpb.KeyValue{Key: "service.name", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + }} + kvSrvVer = &cpb.KeyValue{Key: "service.version", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + }} + + pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO + pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + + pbBodyA = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "a", + }, + } + pbBodyB = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "b", + }, + } + + spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1} + spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2} + traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + flagsA = byte(1) + flagsB = byte(0) + + logRecords = []*lpb.LogRecord{ + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{kvAlice}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{kvBob}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{kvAlice}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{kvBob}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + } + + scope = &cpb.InstrumentationScope{ + Name: "test/code/path", + Version: "v0.1.0", + } + scopeLogs = []*lpb.ScopeLogs{ + { + Scope: scope, + LogRecords: logRecords, + SchemaUrl: semconv.SchemaURL, + }, + } + + res = &rpb.Resource{ + Attributes: []*cpb.KeyValue{kvSrvName, kvSrvVer}, + } + resourceLogs = []*lpb.ResourceLogs{{ + Resource: res, + ScopeLogs: scopeLogs, + SchemaUrl: semconv.SchemaURL, + }} +) + +type exportResult struct { + Response *collogpb.ExportLogsServiceResponse + Err error +} + +// storage stores uploaded OTLP log data in their proto form. +type storage struct { + dataMu sync.Mutex + data []*lpb.ResourceLogs +} + +// newStorage returns a configure storage ready to store received requests. +func newStorage() *storage { + return &storage{} +} + +// Add adds the request to the Storage. +func (s *storage) Add(request *collogpb.ExportLogsServiceRequest) { + s.dataMu.Lock() + defer s.dataMu.Unlock() + s.data = append(s.data, request.ResourceLogs...) +} + +// Dump returns all added ResourceLogs and clears the storage. +func (s *storage) Dump() []*lpb.ResourceLogs { + s.dataMu.Lock() + defer s.dataMu.Unlock() + + var data []*lpb.ResourceLogs + data, s.data = s.data, []*lpb.ResourceLogs{} + return data +} + +var emptyExportLogsServiceResponse = func() []byte { + body := collogpb.ExportLogsServiceResponse{} + r, err := proto.Marshal(&body) + if err != nil { + panic(err) + } + return r +}() + +type httpResponseError struct { + Err error + Status int + Header http.Header +} + +func (e *httpResponseError) Error() string { + return fmt.Sprintf("%d: %s", e.Status, e.Err) +} + +func (e *httpResponseError) Unwrap() error { return e.Err } + +// httpCollector is an OTLP HTTP server that collects all requests it receives. +type httpCollector struct { + plainTextResponse bool + + headersMu sync.Mutex + headers http.Header + storage *storage + + resultCh <-chan exportResult + listener net.Listener + srv *http.Server +} + +// newHTTPCollector returns a *HTTPCollector that is listening at the provided +// endpoint. +// +// If endpoint is an empty string, the returned collector will be listening on +// the localhost interface at an OS chosen port, not use TLS, and listen at the +// default OTLP log endpoint path ("/v1/logs"). If the endpoint contains a +// prefix of "https" the server will generate weak self-signed TLS certificates +// and use them to server data. If the endpoint contains a path, that path will +// be used instead of the default OTLP metri endpoint path. +// +// If errCh is not nil, the collector will respond to HTTP requests with errors +// sent on that channel. This means that if errCh is not nil Export calls will +// block until an error is received. +func newHTTPCollector(endpoint string, resultCh <-chan exportResult, opts ...func(*httpCollector)) (*httpCollector, error) { + u, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + if u.Host == "" { + u.Host = "localhost:0" + } + if u.Path == "" { + u.Path = defaultPath + } + + c := &httpCollector{ + headers: http.Header{}, + storage: newStorage(), + resultCh: resultCh, + } + for _, opt := range opts { + opt(c) + } + + c.listener, err = net.Listen("tcp", u.Host) + if err != nil { + return nil, err + } + + mux := http.NewServeMux() + mux.Handle(u.Path, http.HandlerFunc(c.handler)) + c.srv = &http.Server{ + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + if u.Scheme == "https" { + cert, err := newWeakCertificate() + if err != nil { + return nil, err + } + c.srv.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + go func() { _ = c.srv.ServeTLS(c.listener, "", "") }() + } else { + go func() { _ = c.srv.Serve(c.listener) }() + } + return c, nil +} + +// withHTTPCollectorRespondingPlainText makes the HTTPCollector return +// a plaintext, instead of protobuf, response. +func withHTTPCollectorRespondingPlainText() func(*httpCollector) { + return func(s *httpCollector) { + s.plainTextResponse = true + } +} + +// Shutdown shuts down the HTTP server closing all open connections and +// listeners. +func (c *httpCollector) Shutdown(ctx context.Context) error { + return c.srv.Shutdown(ctx) +} + +// Addr returns the net.Addr c is listening at. +func (c *httpCollector) Addr() net.Addr { + return c.listener.Addr() +} + +// Collect returns the Storage holding all collected requests. +func (c *httpCollector) Collect() *storage { + return c.storage +} + +// Headers returns the headers received for all requests. +func (c *httpCollector) Headers() map[string][]string { + // Makes a copy. + c.headersMu.Lock() + defer c.headersMu.Unlock() + return c.headers.Clone() +} + +func (c *httpCollector) handler(w http.ResponseWriter, r *http.Request) { + c.respond(w, c.record(r)) +} + +func (c *httpCollector) record(r *http.Request) exportResult { + // Currently only supports protobuf. + if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" { + err := fmt.Errorf("content-type not supported: %s", v) + return exportResult{Err: err} + } + + body, err := c.readBody(r) + if err != nil { + return exportResult{Err: err} + } + pbRequest := &collogpb.ExportLogsServiceRequest{} + err = proto.Unmarshal(body, pbRequest) + if err != nil { + return exportResult{ + Err: &httpResponseError{ + Err: err, + Status: http.StatusInternalServerError, + }, + } + } + c.storage.Add(pbRequest) + + c.headersMu.Lock() + for k, vals := range r.Header { + for _, v := range vals { + c.headers.Add(k, v) + } + } + c.headersMu.Unlock() + + if c.resultCh != nil { + return <-c.resultCh + } + return exportResult{Err: err} +} + +func (c *httpCollector) readBody(r *http.Request) (body []byte, err error) { + var reader io.ReadCloser + switch r.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(r.Body) + if err != nil { + _ = reader.Close() + return nil, &httpResponseError{ + Err: err, + Status: http.StatusInternalServerError, + } + } + default: + reader = r.Body + } + + defer func() { + cErr := reader.Close() + if err == nil && cErr != nil { + err = &httpResponseError{ + Err: cErr, + Status: http.StatusInternalServerError, + } + } + }() + body, err = io.ReadAll(reader) + if err != nil { + err = &httpResponseError{ + Err: err, + Status: http.StatusInternalServerError, + } + } + return body, err +} + +func (c *httpCollector) respond(w http.ResponseWriter, resp exportResult) { + if resp.Err != nil { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + var e *httpResponseError + if errors.As(resp.Err, &e) { + for k, vals := range e.Header { + for _, v := range vals { + w.Header().Add(k, v) + } + } + w.WriteHeader(e.Status) + fmt.Fprintln(w, e.Error()) + } else { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintln(w, resp.Err.Error()) + } + return + } + + if c.plainTextResponse { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(http.StatusOK) + if resp.Response == nil { + _, _ = w.Write(emptyExportLogsServiceResponse) + } else { + r, err := proto.Marshal(resp.Response) + if err != nil { + panic(err) + } + _, _ = w.Write(r) + } +} + +// Based on https://golang.org/src/crypto/tls/generate_cert.go, +// simplified and weakened. +func newWeakCertificate() (tls.Certificate, error) { + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return tls.Certificate{}, err + } + notBefore := time.Now() + notAfter := notBefore.Add(time.Hour) + max := new(big.Int).Lsh(big.NewInt(1), 128) + sn, err := rand.Int(rand.Reader, max) + if err != nil { + return tls.Certificate{}, err + } + tmpl := x509.Certificate{ + SerialNumber: sn, + Subject: pkix.Name{Organization: []string{"otel-go"}}, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + DNSNames: []string{"localhost"}, + IPAddresses: []net.IP{net.IPv6loopback, net.IPv4(127, 0, 0, 1)}, + } + derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv) + if err != nil { + return tls.Certificate{}, err + } + var certBuf bytes.Buffer + err = pem.Encode(&certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + if err != nil { + return tls.Certificate{}, err + } + privBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + return tls.Certificate{}, err + } + var privBuf bytes.Buffer + err = pem.Encode(&privBuf, &pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) + if err != nil { + return tls.Certificate{}, err + } + return tls.X509KeyPair(certBuf.Bytes(), privBuf.Bytes()) +} + +func TestClient(t *testing.T) { + factory := func(rCh <-chan exportResult) (*client, *httpCollector) { + coll, err := newHTTPCollector("", rCh) + require.NoError(t, err) + + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := newConfig(opts) + client, err := newHTTPClient(cfg) + require.NoError(t, err) + return client, coll + } + + t.Run("ClientHonorsContextErrors", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + t.Run("DeadlineExceeded", func(t *testing.T) { + innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond) + t.Cleanup(innerCancel) + <-innerCtx.Done() + + c, _ := factory(nil) + assert.ErrorIs(t, c.uploadLogs(innerCtx, nil), context.DeadlineExceeded) + }) + + t.Run("Canceled", func(t *testing.T) { + innerCtx, innerCancel := context.WithCancel(ctx) + innerCancel() + + c, _ := factory(nil) + assert.ErrorIs(t, c.uploadLogs(innerCtx, nil), context.Canceled) + }) + }) + + t.Run("uploadLogs", func(t *testing.T) { + ctx := context.Background() + client, coll := factory(nil) + + require.NoError(t, client.uploadLogs(ctx, resourceLogs)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") + diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal)) + if diff != "" { + t.Fatalf("unexpected ResourceLogs:\n%s", diff) + } + }) + + t.Run("PartialSuccess", func(t *testing.T) { + const n, msg = 2, "bad data" + rCh := make(chan exportResult, 3) + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + // Should not be logged. + RejectedLogRecords: 0, + ErrorMessage: "", + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{}, + } + + ctx := context.Background() + client, _ := factory(rCh) + + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + + errs := []error{} + eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) + otel.SetErrorHandler(eh) + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + + require.Equal(t, 1, len(errs)) + want := fmt.Sprintf("%s (%d log records rejected)", msg, n) + assert.ErrorContains(t, errs[0], want) + }) +} + +func TestClientWithHTTPCollectorRespondingPlainText(t *testing.T) { + ctx := context.Background() + coll, err := newHTTPCollector("", nil, withHTTPCollectorRespondingPlainText()) + require.NoError(t, err) + + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := newConfig(opts) + client, err := newHTTPClient(cfg) + require.NoError(t, err) + + require.NoError(t, client.uploadLogs(ctx, make([]*lpb.ResourceLogs, 1))) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") +} + +func TestNewWithInvalidEndpoint(t *testing.T) { + ctx := context.Background() + exp, err := New(ctx, WithEndpoint("host:invalid-port")) + assert.Error(t, err) + assert.Nil(t, exp) +} + +func TestConfig(t *testing.T) { + factoryFunc := func(ePt string, rCh <-chan exportResult, o ...Option) (log.Exporter, *httpCollector) { + coll, err := newHTTPCollector(ePt, rCh) + require.NoError(t, err) + + opts := []Option{WithEndpoint(coll.Addr().String())} + if !strings.HasPrefix(strings.ToLower(ePt), "https") { + opts = append(opts, WithInsecure()) + } + opts = append(opts, o...) + + ctx := context.Background() + exp, err := New(ctx, opts...) + require.NoError(t, err) + return exp, coll + } + + t.Run("WithEndpointURL", func(t *testing.T) { + coll, err := newHTTPCollector("", nil) + require.NoError(t, err) + ctx := context.Background() + + target := "http://" + coll.Addr().String() + defaultPath + exp, err := New(ctx, WithEndpointURL(target)) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithHeaders", func(t *testing.T) { + key := http.CanonicalHeaderKey("my-custom-header") + headers := map[string]string{key: "custom-value"} + exp, coll := factoryFunc("", nil, WithHeaders(headers)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Regexp(t, "OTel Go OTLP over HTTP/protobuf logs exporter/[01]\\..*", got) + require.Contains(t, got, key) + assert.Equal(t, got[key], []string{headers[key]}) + }) + + t.Run("WithTimeout", func(t *testing.T) { + // Do not send on rCh so the Collector never responds to the client. + rCh := make(chan exportResult) + exp, coll := factoryFunc( + "", + rCh, + WithTimeout(time.Millisecond), + WithRetry(RetryConfig{Enabled: false}), + ) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + // Push this after Shutdown so the HTTP server doesn't hang. + t.Cleanup(func() { close(rCh) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + err := exp.Export(ctx, make([]log.Record, 1)) + assert.ErrorAs(t, err, new(retryableError)) + }) + + t.Run("WithCompressionGZip", func(t *testing.T) { + exp, coll := factoryFunc("", nil, WithCompression(GzipCompression)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithRetry", func(t *testing.T) { + emptyErr := errors.New("") + rCh := make(chan exportResult, 5) + header := http.Header{http.CanonicalHeaderKey("Retry-After"): {"10"}} + // All retryable errors. + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusServiceUnavailable, + Err: emptyErr, + Header: header, + }} + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusTooManyRequests, + Err: emptyErr, + }} + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusGatewayTimeout, + Err: emptyErr, + }} + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusBadGateway, + Err: emptyErr, + }} + rCh <- exportResult{} + exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{ + Enabled: true, + InitialInterval: time.Nanosecond, + MaxInterval: time.Millisecond, + MaxElapsedTime: time.Minute, + })) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + // Push this after Shutdown so the HTTP server doesn't hang. + t.Cleanup(func() { close(rCh) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1)), "failed retry") + assert.Len(t, rCh, 0, "failed HTTP responses did not occur") + }) + + t.Run("WithURLPath", func(t *testing.T) { + path := "/prefix/v2/logs" + ePt := fmt.Sprintf("http://localhost:0%s", path) + exp, coll := factoryFunc(ePt, nil, WithURLPath(path)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithTLSClientConfig", func(t *testing.T) { + ePt := "https://localhost:0" + tlsCfg := &tls.Config{InsecureSkipVerify: true} + exp, coll := factoryFunc(ePt, nil, WithTLSClientConfig(tlsCfg)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithCustomUserAgent", func(t *testing.T) { + key := http.CanonicalHeaderKey("user-agent") + headers := map[string]string{key: "custom-user-agent"} + exp, coll := factoryFunc("", nil, WithHeaders(headers)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Contains(t, got, key) + assert.Equal(t, got[key], []string{headers[key]}) + }) + + t.Run("WithProxy", func(t *testing.T) { + headerKeySetInProxy := http.CanonicalHeaderKey("X-Using-Proxy") + headerValueSetInProxy := "true" + exp, coll := factoryFunc("", nil, WithProxy(func(r *http.Request) (*url.URL, error) { + r.Header.Set(headerKeySetInProxy, headerValueSetInProxy) + return r.URL, nil + })) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Contains(t, got, headerKeySetInProxy) + assert.Equal(t, got[headerKeySetInProxy], []string{headerValueSetInProxy}) + }) +} diff --git a/exporters/otlp/otlplog/otlploghttp/config.go b/exporters/otlp/otlplog/otlploghttp/config.go index 5610434a7..3564e9caf 100644 --- a/exporters/otlp/otlplog/otlploghttp/config.go +++ b/exporters/otlp/otlplog/otlploghttp/config.go @@ -26,7 +26,7 @@ var ( defaultPath = "/v1/logs" defaultTimeout = 10 * time.Second defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment - defaultRetryCfg = RetryConfig(retry.DefaultConfig) + defaultRetryCfg = retry.DefaultConfig ) // Environment variable keys. @@ -93,7 +93,7 @@ type config struct { compression setting[Compression] timeout setting[time.Duration] proxy setting[HTTPTransportProxyFunc] - retryCfg setting[RetryConfig] + retryCfg setting[retry.Config] } func newConfig(options []Option) config { @@ -131,7 +131,7 @@ func newConfig(options []Option) config { fallback[HTTPTransportProxyFunc](defaultProxy), ) c.retryCfg = c.retryCfg.Resolve( - fallback[RetryConfig](defaultRetryCfg), + fallback[retry.Config](defaultRetryCfg), ) return c @@ -322,7 +322,7 @@ type RetryConfig retry.Config // after each error for no more than a total time of 1 minute. func WithRetry(rc RetryConfig) Option { return fnOpt(func(c config) config { - c.retryCfg = newSetting(rc) + c.retryCfg = newSetting(retry.Config(rc)) return c }) } diff --git a/exporters/otlp/otlplog/otlploghttp/config_test.go b/exporters/otlp/otlplog/otlploghttp/config_test.go index 72d03d8ef..bf95c4300 100644 --- a/exporters/otlp/otlplog/otlploghttp/config_test.go +++ b/exporters/otlp/otlplog/otlploghttp/config_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" ) const ( @@ -79,7 +80,7 @@ func TestNewConfig(t *testing.T) { require.NoError(t, err, "testing TLS config") headers := map[string]string{"a": "A"} - rc := RetryConfig{} + rc := retry.Config{} testcases := []struct { name string @@ -107,7 +108,7 @@ func TestNewConfig(t *testing.T) { WithCompression(GzipCompression), WithHeaders(headers), WithTimeout(time.Second), - WithRetry(rc), + WithRetry(RetryConfig(rc)), // Do not test WithProxy. Requires func comparison. }, want: config{ @@ -293,7 +294,7 @@ func TestNewConfig(t *testing.T) { WithCompression(GzipCompression), WithHeaders(headers), WithTimeout(time.Second), - WithRetry(rc), + WithRetry(RetryConfig(rc)), }, want: config{ endpoint: newSetting("test"), diff --git a/exporters/otlp/otlplog/otlploghttp/go.mod b/exporters/otlp/otlplog/otlploghttp/go.mod index bb7321e0b..c629bc440 100644 --- a/exporters/otlp/otlplog/otlploghttp/go.mod +++ b/exporters/otlp/otlplog/otlploghttp/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/cenkalti/backoff/v4 v4.3.0 + github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.25.0 go.opentelemetry.io/otel/log v0.0.1-alpha @@ -11,6 +12,7 @@ require ( go.opentelemetry.io/otel/sdk/log v0.0.0-20240403115316-6c6e1e7416e9 go.opentelemetry.io/otel/trace v1.25.0 go.opentelemetry.io/proto/slim/otlp v1.2.0 + google.golang.org/protobuf v1.33.0 ) require ( @@ -20,7 +22,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/metric v1.25.0 // indirect golang.org/x/sys v0.19.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect )