You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
463 lines
12 KiB
Cheetah
463 lines
12 KiB
Cheetah
// Code created by gotmpl. DO NOT MODIFY.
|
|
// source: internal/shared/otlp/otlpmetric/otest/collector.go.tmpl
|
|
|
|
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package otest
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/elliptic"
|
|
"crypto/rand"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"crypto/x509/pkix" // nolint:depguard // This is for testing.
|
|
"encoding/pem"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/big"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"{{ .oconfImportPath }}"
|
|
collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
|
|
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
|
|
)
|
|
|
|
// Collector is the collection target a Client sends metric uploads to.
|
|
type Collector interface {
|
|
Collect() *Storage
|
|
}
|
|
|
|
type ExportResult struct {
|
|
Response *collpb.ExportMetricsServiceResponse
|
|
Err error
|
|
}
|
|
|
|
// Storage stores uploaded OTLP metric data in their proto form.
|
|
type Storage struct {
|
|
dataMu sync.Mutex
|
|
data []*mpb.ResourceMetrics
|
|
}
|
|
|
|
// NewStorage returns a configure storage ready to store received requests.
|
|
func NewStorage() *Storage {
|
|
return &Storage{}
|
|
}
|
|
|
|
// Add adds the request to the Storage.
|
|
func (s *Storage) Add(request *collpb.ExportMetricsServiceRequest) {
|
|
s.dataMu.Lock()
|
|
defer s.dataMu.Unlock()
|
|
s.data = append(s.data, request.ResourceMetrics...)
|
|
}
|
|
|
|
// Dump returns all added ResourceMetrics and clears the storage.
|
|
func (s *Storage) Dump() []*mpb.ResourceMetrics {
|
|
s.dataMu.Lock()
|
|
defer s.dataMu.Unlock()
|
|
|
|
var data []*mpb.ResourceMetrics
|
|
data, s.data = s.data, []*mpb.ResourceMetrics{}
|
|
return data
|
|
}
|
|
|
|
// GRPCCollector is an OTLP gRPC server that collects all requests it receives.
|
|
type GRPCCollector struct {
|
|
collpb.UnimplementedMetricsServiceServer
|
|
|
|
headersMu sync.Mutex
|
|
headers metadata.MD
|
|
storage *Storage
|
|
|
|
resultCh <-chan ExportResult
|
|
listener net.Listener
|
|
srv *grpc.Server
|
|
}
|
|
|
|
// NewGRPCCollector returns a *GRPCCollector that is listening at the provided
|
|
// endpoint.
|
|
//
|
|
// If endpoint is an empty string, the returned collector will be listening on
|
|
// the localhost interface at an OS chosen port.
|
|
//
|
|
// If errCh is not nil, the collector will respond to Export calls with errors
|
|
// sent on that channel. This means that if errCh is not nil Export calls will
|
|
// block until an error is received.
|
|
func NewGRPCCollector(endpoint string, resultCh <-chan ExportResult) (*GRPCCollector, error) {
|
|
if endpoint == "" {
|
|
endpoint = "localhost:0"
|
|
}
|
|
|
|
c := &GRPCCollector{
|
|
storage: NewStorage(),
|
|
resultCh: resultCh,
|
|
}
|
|
|
|
var err error
|
|
c.listener, err = net.Listen("tcp", endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.srv = grpc.NewServer()
|
|
collpb.RegisterMetricsServiceServer(c.srv, c)
|
|
go func() { _ = c.srv.Serve(c.listener) }()
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// Shutdown shuts down the gRPC server closing all open connections and
|
|
// listeners immediately.
|
|
func (c *GRPCCollector) Shutdown() { c.srv.Stop() }
|
|
|
|
// Addr returns the net.Addr c is listening at.
|
|
func (c *GRPCCollector) Addr() net.Addr {
|
|
return c.listener.Addr()
|
|
}
|
|
|
|
// Collect returns the Storage holding all collected requests.
|
|
func (c *GRPCCollector) Collect() *Storage {
|
|
return c.storage
|
|
}
|
|
|
|
// Headers returns the headers received for all requests.
|
|
func (c *GRPCCollector) Headers() map[string][]string {
|
|
// Makes a copy.
|
|
c.headersMu.Lock()
|
|
defer c.headersMu.Unlock()
|
|
return metadata.Join(c.headers)
|
|
}
|
|
|
|
// Export handles the export req.
|
|
func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsServiceRequest) (*collpb.ExportMetricsServiceResponse, error) {
|
|
c.storage.Add(req)
|
|
|
|
if h, ok := metadata.FromIncomingContext(ctx); ok {
|
|
c.headersMu.Lock()
|
|
c.headers = metadata.Join(c.headers, h)
|
|
c.headersMu.Unlock()
|
|
}
|
|
|
|
if c.resultCh != nil {
|
|
r := <-c.resultCh
|
|
if r.Response == nil {
|
|
return &collpb.ExportMetricsServiceResponse{}, r.Err
|
|
}
|
|
return r.Response, r.Err
|
|
}
|
|
return &collpb.ExportMetricsServiceResponse{}, nil
|
|
}
|
|
|
|
var emptyExportMetricsServiceResponse = func() []byte {
|
|
body := collpb.ExportMetricsServiceResponse{}
|
|
r, err := proto.Marshal(&body)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return r
|
|
}()
|
|
|
|
type HTTPResponseError struct {
|
|
Err error
|
|
Status int
|
|
Header http.Header
|
|
}
|
|
|
|
func (e *HTTPResponseError) Error() string {
|
|
return fmt.Sprintf("%d: %s", e.Status, e.Err)
|
|
}
|
|
|
|
func (e *HTTPResponseError) Unwrap() error { return e.Err }
|
|
|
|
// HTTPCollector is an OTLP HTTP server that collects all requests it receives.
|
|
type HTTPCollector struct {
|
|
plainTextResponse bool
|
|
|
|
headersMu sync.Mutex
|
|
headers http.Header
|
|
storage *Storage
|
|
|
|
resultCh <-chan ExportResult
|
|
listener net.Listener
|
|
srv *http.Server
|
|
}
|
|
|
|
// NewHTTPCollector returns a *HTTPCollector that is listening at the provided
|
|
// endpoint.
|
|
//
|
|
// If endpoint is an empty string, the returned collector will be listening on
|
|
// the localhost interface at an OS chosen port, not use TLS, and listen at the
|
|
// default OTLP metric endpoint path ("/v1/metrics"). If the endpoint contains
|
|
// a prefix of "https" the server will generate weak self-signed TLS
|
|
// certificates and use them to server data. If the endpoint contains a path,
|
|
// that path will be used instead of the default OTLP metric endpoint path.
|
|
//
|
|
// If errCh is not nil, the collector will respond to HTTP requests with errors
|
|
// sent on that channel. This means that if errCh is not nil Export calls will
|
|
// block until an error is received.
|
|
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) {
|
|
u, err := url.Parse(endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if u.Host == "" {
|
|
u.Host = "localhost:0"
|
|
}
|
|
if u.Path == "" {
|
|
u.Path = oconf.DefaultMetricsPath
|
|
}
|
|
|
|
c := &HTTPCollector{
|
|
headers: http.Header{},
|
|
storage: NewStorage(),
|
|
resultCh: resultCh,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(c)
|
|
}
|
|
|
|
c.listener, err = net.Listen("tcp", u.Host)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
mux.Handle(u.Path, http.HandlerFunc(c.handler))
|
|
c.srv = &http.Server{
|
|
Handler: mux,
|
|
ReadTimeout: 10 * time.Second,
|
|
WriteTimeout: 10 * time.Second,
|
|
}
|
|
if u.Scheme == "https" {
|
|
cert, err := weakCertificate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.srv.TLSConfig = &tls.Config{
|
|
Certificates: []tls.Certificate{cert},
|
|
}
|
|
go func() { _ = c.srv.ServeTLS(c.listener, "", "") }()
|
|
} else {
|
|
go func() { _ = c.srv.Serve(c.listener) }()
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return
|
|
// a plaintext, instead of protobuf, response.
|
|
func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) {
|
|
return func(s *HTTPCollector) {
|
|
s.plainTextResponse = true
|
|
}
|
|
}
|
|
|
|
// Shutdown shuts down the HTTP server closing all open connections and
|
|
// listeners.
|
|
func (c *HTTPCollector) Shutdown(ctx context.Context) error {
|
|
return c.srv.Shutdown(ctx)
|
|
}
|
|
|
|
// Addr returns the net.Addr c is listening at.
|
|
func (c *HTTPCollector) Addr() net.Addr {
|
|
return c.listener.Addr()
|
|
}
|
|
|
|
// Collect returns the Storage holding all collected requests.
|
|
func (c *HTTPCollector) Collect() *Storage {
|
|
return c.storage
|
|
}
|
|
|
|
// Headers returns the headers received for all requests.
|
|
func (c *HTTPCollector) Headers() map[string][]string {
|
|
// Makes a copy.
|
|
c.headersMu.Lock()
|
|
defer c.headersMu.Unlock()
|
|
return c.headers.Clone()
|
|
}
|
|
|
|
func (c *HTTPCollector) handler(w http.ResponseWriter, r *http.Request) {
|
|
c.respond(w, c.record(r))
|
|
}
|
|
|
|
func (c *HTTPCollector) record(r *http.Request) ExportResult {
|
|
// Currently only supports protobuf.
|
|
if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" {
|
|
err := fmt.Errorf("content-type not supported: %s", v)
|
|
return ExportResult{Err: err}
|
|
}
|
|
|
|
body, err := c.readBody(r)
|
|
if err != nil {
|
|
return ExportResult{Err: err}
|
|
}
|
|
pbRequest := &collpb.ExportMetricsServiceRequest{}
|
|
err = proto.Unmarshal(body, pbRequest)
|
|
if err != nil {
|
|
return ExportResult{
|
|
Err: &HTTPResponseError{
|
|
Err: err,
|
|
Status: http.StatusInternalServerError,
|
|
},
|
|
}
|
|
}
|
|
c.storage.Add(pbRequest)
|
|
|
|
c.headersMu.Lock()
|
|
for k, vals := range r.Header {
|
|
for _, v := range vals {
|
|
c.headers.Add(k, v)
|
|
}
|
|
}
|
|
c.headersMu.Unlock()
|
|
|
|
if c.resultCh != nil {
|
|
return <-c.resultCh
|
|
}
|
|
return ExportResult{Err: err}
|
|
}
|
|
|
|
func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) {
|
|
var reader io.ReadCloser
|
|
switch r.Header.Get("Content-Encoding") {
|
|
case "gzip":
|
|
reader, err = gzip.NewReader(r.Body)
|
|
if err != nil {
|
|
_ = reader.Close()
|
|
return nil, &HTTPResponseError{
|
|
Err: err,
|
|
Status: http.StatusInternalServerError,
|
|
}
|
|
}
|
|
default:
|
|
reader = r.Body
|
|
}
|
|
|
|
defer func() {
|
|
cErr := reader.Close()
|
|
if err == nil && cErr != nil {
|
|
err = &HTTPResponseError{
|
|
Err: cErr,
|
|
Status: http.StatusInternalServerError,
|
|
}
|
|
}
|
|
}()
|
|
body, err = io.ReadAll(reader)
|
|
if err != nil {
|
|
err = &HTTPResponseError{
|
|
Err: err,
|
|
Status: http.StatusInternalServerError,
|
|
}
|
|
}
|
|
return body, err
|
|
}
|
|
|
|
func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
|
|
if resp.Err != nil {
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
w.Header().Set("X-Content-Type-Options", "nosniff")
|
|
var e *HTTPResponseError
|
|
if errors.As(resp.Err, &e) {
|
|
for k, vals := range e.Header {
|
|
for _, v := range vals {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
w.WriteHeader(e.Status)
|
|
fmt.Fprintln(w, e.Error())
|
|
} else {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintln(w, resp.Err.Error())
|
|
}
|
|
return
|
|
}
|
|
|
|
if c.plainTextResponse {
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("OK"))
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/x-protobuf")
|
|
w.WriteHeader(http.StatusOK)
|
|
if resp.Response == nil {
|
|
_, _ = w.Write(emptyExportMetricsServiceResponse)
|
|
} else {
|
|
r, err := proto.Marshal(resp.Response)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
_, _ = w.Write(r)
|
|
}
|
|
}
|
|
|
|
// Based on https://golang.org/src/crypto/tls/generate_cert.go,
|
|
// simplified and weakened.
|
|
func weakCertificate() (tls.Certificate, error) {
|
|
priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
|
if err != nil {
|
|
return tls.Certificate{}, err
|
|
}
|
|
notBefore := time.Now()
|
|
notAfter := notBefore.Add(time.Hour)
|
|
max := new(big.Int).Lsh(big.NewInt(1), 128)
|
|
sn, err := rand.Int(rand.Reader, max)
|
|
if err != nil {
|
|
return tls.Certificate{}, err
|
|
}
|
|
tmpl := x509.Certificate{
|
|
SerialNumber: sn,
|
|
Subject: pkix.Name{Organization: []string{"otel-go"}},
|
|
NotBefore: notBefore,
|
|
NotAfter: notAfter,
|
|
KeyUsage: x509.KeyUsageDigitalSignature,
|
|
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
|
BasicConstraintsValid: true,
|
|
DNSNames: []string{"localhost"},
|
|
IPAddresses: []net.IP{net.IPv6loopback, net.IPv4(127, 0, 0, 1)},
|
|
}
|
|
derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv)
|
|
if err != nil {
|
|
return tls.Certificate{}, err
|
|
}
|
|
var certBuf bytes.Buffer
|
|
err = pem.Encode(&certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
|
|
if err != nil {
|
|
return tls.Certificate{}, err
|
|
}
|
|
privBytes, err := x509.MarshalPKCS8PrivateKey(priv)
|
|
if err != nil {
|
|
return tls.Certificate{}, err
|
|
}
|
|
var privBuf bytes.Buffer
|
|
err = pem.Encode(&privBuf, &pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
|
|
if err != nil {
|
|
return tls.Certificate{}, err
|
|
}
|
|
return tls.X509KeyPair(certBuf.Bytes(), privBuf.Bytes())
|
|
}
|