mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-14 10:13:10 +02:00
204 lines
5.6 KiB
Go
204 lines
5.6 KiB
Go
|
// Copyright The OpenTelemetry Authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"net"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is
|
||
|
// different than the current conn then the new address is dialed and the conn is swapped.
|
||
|
type reconnectingUDPConn struct {
|
||
|
// `sync/atomic` expects the first word in an allocated struct to be 64-bit
|
||
|
// aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details.
|
||
|
bufferBytes int64
|
||
|
hostPort string
|
||
|
resolveFunc resolveFunc
|
||
|
dialFunc dialFunc
|
||
|
logger *log.Logger
|
||
|
|
||
|
connMtx sync.RWMutex
|
||
|
conn *net.UDPConn
|
||
|
destAddr *net.UDPAddr
|
||
|
closeChan chan struct{}
|
||
|
}
|
||
|
|
||
|
type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error)
|
||
|
type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error)
|
||
|
|
||
|
// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is
|
||
|
// different than the current conn then the new address is dialed and the conn is swapped.
|
||
|
func newReconnectingUDPConn(hostPort string, bufferBytes int, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger *log.Logger) (*reconnectingUDPConn, error) {
|
||
|
conn := &reconnectingUDPConn{
|
||
|
hostPort: hostPort,
|
||
|
resolveFunc: resolveFunc,
|
||
|
dialFunc: dialFunc,
|
||
|
logger: logger,
|
||
|
closeChan: make(chan struct{}),
|
||
|
bufferBytes: int64(bufferBytes),
|
||
|
}
|
||
|
|
||
|
if err := conn.attemptResolveAndDial(); err != nil {
|
||
|
conn.logf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout)
|
||
|
}
|
||
|
|
||
|
go conn.reconnectLoop(resolveTimeout)
|
||
|
|
||
|
return conn, nil
|
||
|
}
|
||
|
|
||
|
func (c *reconnectingUDPConn) logf(format string, args ...interface{}) {
|
||
|
if c.logger != nil {
|
||
|
c.logger.Printf(format, args...)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) {
|
||
|
ticker := time.NewTicker(resolveTimeout)
|
||
|
defer ticker.Stop()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-c.closeChan:
|
||
|
return
|
||
|
case <-ticker.C:
|
||
|
if err := c.attemptResolveAndDial(); err != nil {
|
||
|
c.logf("%s", err.Error())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *reconnectingUDPConn) attemptResolveAndDial() error {
|
||
|
newAddr, err := c.resolveFunc("udp", c.hostPort)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err)
|
||
|
}
|
||
|
|
||
|
c.connMtx.RLock()
|
||
|
curAddr := c.destAddr
|
||
|
c.connMtx.RUnlock()
|
||
|
|
||
|
// dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn
|
||
|
if curAddr != nil && newAddr.String() == curAddr.String() {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
if err := c.attemptDialNewAddr(newAddr); err != nil {
|
||
|
return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error {
|
||
|
connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 {
|
||
|
if err = connUDP.SetWriteBuffer(bufferBytes); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
c.connMtx.Lock()
|
||
|
c.destAddr = newAddr
|
||
|
// store prev to close later
|
||
|
prevConn := c.conn
|
||
|
c.conn = connUDP
|
||
|
c.connMtx.Unlock()
|
||
|
|
||
|
if prevConn != nil {
|
||
|
return prevConn.Close()
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning
|
||
|
func (c *reconnectingUDPConn) Write(b []byte) (int, error) {
|
||
|
var bytesWritten int
|
||
|
var err error
|
||
|
|
||
|
c.connMtx.RLock()
|
||
|
conn := c.conn
|
||
|
c.connMtx.RUnlock()
|
||
|
|
||
|
if conn == nil {
|
||
|
// if connection is not initialized indicate this with err in order to hook into retry logic
|
||
|
err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved")
|
||
|
} else {
|
||
|
bytesWritten, err = conn.Write(b)
|
||
|
}
|
||
|
|
||
|
if err == nil {
|
||
|
return bytesWritten, nil
|
||
|
}
|
||
|
|
||
|
// attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again
|
||
|
if reconnErr := c.attemptResolveAndDial(); reconnErr == nil {
|
||
|
c.connMtx.RLock()
|
||
|
conn := c.conn
|
||
|
c.connMtx.RUnlock()
|
||
|
|
||
|
return conn.Write(b)
|
||
|
}
|
||
|
|
||
|
// return original error if reconn fails
|
||
|
return bytesWritten, err
|
||
|
}
|
||
|
|
||
|
// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation
|
||
|
func (c *reconnectingUDPConn) Close() error {
|
||
|
close(c.closeChan)
|
||
|
|
||
|
// acquire rw lock before closing conn to ensure calls to Write drain
|
||
|
c.connMtx.Lock()
|
||
|
defer c.connMtx.Unlock()
|
||
|
|
||
|
if c.conn != nil {
|
||
|
return c.conn.Close()
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held
|
||
|
// and SetWriteBuffer is called store bufferBytes to be set for new conns
|
||
|
func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error {
|
||
|
var err error
|
||
|
|
||
|
c.connMtx.RLock()
|
||
|
conn := c.conn
|
||
|
c.connMtx.RUnlock()
|
||
|
|
||
|
if conn != nil {
|
||
|
err = c.conn.SetWriteBuffer(bytes)
|
||
|
}
|
||
|
|
||
|
if err == nil {
|
||
|
atomic.StoreInt64(&c.bufferBytes, int64(bytes))
|
||
|
}
|
||
|
|
||
|
return err
|
||
|
}
|