1
0
mirror of https://github.com/imgproxy/imgproxy.git synced 2025-02-02 11:34:20 +02:00
2019-01-14 16:50:28 +06:00

904 lines
22 KiB
Go

package newrelic
import (
"errors"
"fmt"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
"github.com/newrelic/go-agent/internal"
)
type txnInput struct {
W http.ResponseWriter
Config Config
Reply *internal.ConnectReply
Consumer dataConsumer
attrConfig *internal.AttributeConfig
}
type txn struct {
txnInput
// This mutex is required since the consumer may call the public API
// interface functions from different routines.
sync.Mutex
// finished indicates whether or not End() has been called. After
// finished has been set to true, no recording should occur.
finished bool
numPayloadsCreated uint32
ignore bool
// wroteHeader prevents capturing multiple response code errors if the
// user erroneously calls WriteHeader multiple times.
wroteHeader bool
internal.TxnData
}
func newTxn(input txnInput, name string) *txn {
txn := &txn{
txnInput: input,
}
txn.Start = time.Now()
txn.Name = name
txn.Attrs = internal.NewAttributes(input.attrConfig)
if input.Config.DistributedTracer.Enabled {
txn.BetterCAT.Enabled = true
txn.BetterCAT.Priority = internal.NewPriority()
txn.BetterCAT.ID = internal.NewSpanID()
// Calculate sampled at the beginning of the transaction (rather
// than lazily at payload creation time) because it controls the
// creation of span events.
txn.BetterCAT.Sampled = txn.Reply.AdaptiveSampler.ComputeSampled(txn.BetterCAT.Priority.Float32(), txn.Start)
if txn.BetterCAT.Sampled {
txn.BetterCAT.Priority += 1.0
}
txn.SpanEventsEnabled = txn.Config.SpanEvents.Enabled && txn.Reply.CollectSpanEvents
}
txn.Attrs.Agent.Add(internal.AttributeHostDisplayName, txn.Config.HostDisplayName, nil)
txn.TxnTrace.Enabled = txn.txnTracesEnabled()
txn.TxnTrace.SegmentThreshold = txn.Config.TransactionTracer.SegmentThreshold
txn.StackTraceThreshold = txn.Config.TransactionTracer.StackTraceThreshold
txn.SlowQueriesEnabled = txn.slowQueriesEnabled()
txn.SlowQueryThreshold = txn.Config.DatastoreTracer.SlowQuery.Threshold
// Synthetics support is tied up with a transaction's Old CAT field,
// CrossProcess. To support Synthetics with either BetterCAT or Old CAT,
// Initialize the CrossProcess field of the transaction, passing in
// the top-level configuration.
doOldCAT := txn.Config.CrossApplicationTracer.Enabled
noGUID := txn.Config.DistributedTracer.Enabled
txn.CrossProcess.Init(doOldCAT, noGUID, input.Reply)
return txn
}
type requestWrap struct{ request *http.Request }
func (r requestWrap) Header() http.Header { return r.request.Header }
func (r requestWrap) URL() *url.URL { return r.request.URL }
func (r requestWrap) Method() string { return r.request.Method }
func (r requestWrap) Transport() TransportType {
if strings.HasPrefix(r.request.Proto, "HTTP") {
if r.request.TLS != nil {
return TransportHTTPS
}
return TransportHTTP
}
return TransportUnknown
}
func (txn *txn) SetWebRequest(r WebRequest) error {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return errAlreadyEnded
}
// Any call to SetWebRequest should indicate a web transaction.
txn.IsWeb = true
if nil == r {
return nil
}
if h := r.Header(); nil != h {
txn.Queuing = internal.QueueDuration(h, txn.Start)
if p := h.Get(DistributedTracePayloadHeader); p != "" {
txn.acceptDistributedTracePayloadLocked(r.Transport(), p)
}
txn.CrossProcess.InboundHTTPRequest(h)
}
internal.RequestAgentAttributes(txn.Attrs, r.Method(), r.Header())
if u := r.URL(); nil != u {
txn.CleanURL = internal.SafeURL(u)
}
return nil
}
func (txn *txn) slowQueriesEnabled() bool {
return txn.Config.DatastoreTracer.SlowQuery.Enabled &&
txn.Reply.CollectTraces
}
func (txn *txn) txnTracesEnabled() bool {
return txn.Config.TransactionTracer.Enabled &&
txn.Reply.CollectTraces
}
func (txn *txn) txnEventsEnabled() bool {
return txn.Config.TransactionEvents.Enabled &&
txn.Reply.CollectAnalyticsEvents
}
func (txn *txn) errorEventsEnabled() bool {
return txn.Config.ErrorCollector.CaptureEvents &&
txn.Reply.CollectErrorEvents
}
func (txn *txn) freezeName() {
if txn.ignore || ("" != txn.FinalName) {
return
}
txn.FinalName = internal.CreateFullTxnName(txn.Name, txn.Reply, txn.IsWeb)
if "" == txn.FinalName {
txn.ignore = true
}
}
func (txn *txn) getsApdex() bool {
return txn.IsWeb
}
func (txn *txn) txnTraceThreshold() time.Duration {
if txn.Config.TransactionTracer.Threshold.IsApdexFailing {
return internal.ApdexFailingThreshold(txn.ApdexThreshold)
}
return txn.Config.TransactionTracer.Threshold.Duration
}
func (txn *txn) shouldSaveTrace() bool {
return txn.CrossProcess.IsSynthetics() ||
(txn.txnTracesEnabled() && (txn.Duration >= txn.txnTraceThreshold()))
}
func (txn *txn) MergeIntoHarvest(h *internal.Harvest) {
var priority internal.Priority
if txn.BetterCAT.Enabled {
priority = txn.BetterCAT.Priority
} else {
priority = internal.NewPriority()
}
internal.CreateTxnMetrics(&txn.TxnData, h.Metrics)
internal.MergeBreakdownMetrics(&txn.TxnData, h.Metrics)
if txn.txnEventsEnabled() {
// Allocate a new TxnEvent to prevent a reference to the large transaction.
alloc := new(internal.TxnEvent)
*alloc = txn.TxnData.TxnEvent
h.TxnEvents.AddTxnEvent(alloc, priority)
}
internal.MergeTxnErrors(&h.ErrorTraces, txn.Errors, txn.TxnEvent)
if txn.errorEventsEnabled() {
for _, e := range txn.Errors {
errEvent := &internal.ErrorEvent{
ErrorData: *e,
TxnEvent: txn.TxnEvent,
}
// Since the stack trace is not used in error events, remove the reference
// to minimize memory.
errEvent.Stack = nil
h.ErrorEvents.Add(errEvent, priority)
}
}
if txn.shouldSaveTrace() {
h.TxnTraces.Witness(internal.HarvestTrace{
TxnEvent: txn.TxnEvent,
Trace: txn.TxnTrace,
})
}
if nil != txn.SlowQueries {
h.SlowSQLs.Merge(txn.SlowQueries, txn.TxnEvent)
}
if txn.BetterCAT.Sampled && txn.SpanEventsEnabled {
h.SpanEvents.MergeFromTransaction(&txn.TxnData)
}
}
// TransportType's name field is not mutable outside of its package
// however, it still periodically needs to be used and assigned within
// the this package. For testing purposes only.
func getTransport(transport string) string {
var retVal string
switch transport {
case TransportHTTP.name:
retVal = TransportHTTP.name
case TransportHTTPS.name:
retVal = TransportHTTPS.name
case TransportKafka.name:
retVal = TransportKafka.name
case TransportJMS.name:
retVal = TransportJMS.name
case TransportIronMQ.name:
retVal = TransportIronMQ.name
case TransportAMQP.name:
retVal = TransportAMQP.name
case TransportQueue.name:
retVal = TransportQueue.name
case TransportOther.name:
retVal = TransportOther.name
case TransportUnknown.name:
default:
retVal = TransportUnknown.name
}
return retVal
}
func responseCodeIsError(cfg *Config, code int) bool {
if code < http.StatusBadRequest { // 400
return false
}
for _, ignoreCode := range cfg.ErrorCollector.IgnoreStatusCodes {
if code == ignoreCode {
return false
}
}
return true
}
func headersJustWritten(txn *txn, code int) {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return
}
if txn.wroteHeader {
return
}
txn.wroteHeader = true
internal.ResponseHeaderAttributes(txn.Attrs, txn.W.Header())
internal.ResponseCodeAttribute(txn.Attrs, code)
if responseCodeIsError(&txn.Config, code) {
e := internal.TxnErrorFromResponseCode(time.Now(), code)
e.Stack = internal.GetStackTrace(1)
txn.noticeErrorInternal(e)
}
}
func (txn *txn) responseHeader() http.Header {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return nil
}
if txn.wroteHeader {
return nil
}
if !txn.CrossProcess.Enabled {
return nil
}
if !txn.CrossProcess.IsInbound() {
return nil
}
txn.freezeName()
contentLength := internal.GetContentLengthFromHeader(txn.W.Header())
appData, err := txn.CrossProcess.CreateAppData(txn.FinalName, txn.Queuing, time.Since(txn.Start), contentLength)
if err != nil {
txn.Config.Logger.Debug("error generating outbound response header", map[string]interface{}{
"error": err,
})
return nil
}
return internal.AppDataToHTTPHeader(appData)
}
func addCrossProcessHeaders(txn *txn) {
// responseHeader() checks the wroteHeader field and returns a nil map if the
// header has been written, so we don't need a check here.
for key, values := range txn.responseHeader() {
for _, value := range values {
txn.W.Header().Add(key, value)
}
}
}
func (txn *txn) Header() http.Header { return txn.W.Header() }
func (txn *txn) Write(b []byte) (int, error) {
// This is safe to call unconditionally, even if Write() is called multiple
// times; see also the commentary in addCrossProcessHeaders().
addCrossProcessHeaders(txn)
n, err := txn.W.Write(b)
headersJustWritten(txn, http.StatusOK)
return n, err
}
func (txn *txn) WriteHeader(code int) {
addCrossProcessHeaders(txn)
txn.W.WriteHeader(code)
headersJustWritten(txn, code)
}
func (txn *txn) End() error {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return errAlreadyEnded
}
txn.finished = true
r := recover()
if nil != r {
e := internal.TxnErrorFromPanic(time.Now(), r)
e.Stack = internal.GetStackTrace(0)
txn.noticeErrorInternal(e)
}
txn.Stop = time.Now()
txn.Duration = txn.Stop.Sub(txn.Start)
if children := internal.TracerRootChildren(&txn.TxnData); txn.Duration > children {
txn.Exclusive = txn.Duration - children
}
txn.freezeName()
// Finalise the CAT state.
if err := txn.CrossProcess.Finalise(txn.Name, txn.Config.AppName); err != nil {
txn.Config.Logger.Debug("error finalising the cross process state", map[string]interface{}{
"error": err,
})
}
// Assign apdexThreshold regardless of whether or not the transaction
// gets apdex since it may be used to calculate the trace threshold.
txn.ApdexThreshold = internal.CalculateApdexThreshold(txn.Reply, txn.FinalName)
if txn.getsApdex() {
if txn.HasErrors() {
txn.Zone = internal.ApdexFailing
} else {
txn.Zone = internal.CalculateApdexZone(txn.ApdexThreshold, txn.Duration)
}
} else {
txn.Zone = internal.ApdexNone
}
if txn.Config.Logger.DebugEnabled() {
txn.Config.Logger.Debug("transaction ended", map[string]interface{}{
"name": txn.FinalName,
"duration_ms": txn.Duration.Seconds() * 1000.0,
"ignored": txn.ignore,
"app_connected": "" != txn.Reply.RunID,
})
}
if !txn.ignore {
txn.Consumer.Consume(txn.Reply.RunID, txn)
}
// Note that if a consumer uses `panic(nil)`, the panic will not
// propagate.
if nil != r {
panic(r)
}
return nil
}
func (txn *txn) AddAttribute(name string, value interface{}) error {
txn.Lock()
defer txn.Unlock()
if txn.Config.HighSecurity {
return errHighSecurityEnabled
}
if !txn.Reply.SecurityPolicies.CustomParameters.Enabled() {
return errSecurityPolicy
}
if txn.finished {
return errAlreadyEnded
}
return internal.AddUserAttribute(txn.Attrs, name, value, internal.DestAll)
}
var (
errorsLocallyDisabled = errors.New("errors locally disabled")
errorsRemotelyDisabled = errors.New("errors remotely disabled")
errNilError = errors.New("nil error")
errAlreadyEnded = errors.New("transaction has already ended")
errSecurityPolicy = errors.New("disabled by security policy")
)
const (
highSecurityErrorMsg = "message removed by high security setting"
securityPolicyErrorMsg = "message removed by security policy"
)
func (txn *txn) noticeErrorInternal(err internal.ErrorData) error {
if !txn.Config.ErrorCollector.Enabled {
return errorsLocallyDisabled
}
if !txn.Reply.CollectErrors {
return errorsRemotelyDisabled
}
if nil == txn.Errors {
txn.Errors = internal.NewTxnErrors(internal.MaxTxnErrors)
}
if txn.Config.HighSecurity {
err.Msg = highSecurityErrorMsg
}
if !txn.Reply.SecurityPolicies.AllowRawExceptionMessages.Enabled() {
err.Msg = securityPolicyErrorMsg
}
txn.Errors.Add(err)
txn.TxnData.TxnEvent.HasError = true //mark transaction as having an error
return nil
}
var (
errTooManyErrorAttributes = fmt.Errorf("too many extra attributes: limit is %d",
internal.AttributeErrorLimit)
)
func (txn *txn) NoticeError(err error) error {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return errAlreadyEnded
}
if nil == err {
return errNilError
}
e := internal.ErrorData{
When: time.Now(),
Msg: err.Error(),
}
if ec, ok := err.(ErrorClasser); ok {
e.Klass = ec.ErrorClass()
}
if "" == e.Klass {
e.Klass = reflect.TypeOf(err).String()
}
if st, ok := err.(StackTracer); ok {
e.Stack = st.StackTrace()
// Note that if the provided stack trace is excessive in length,
// it will be truncated during JSON creation.
}
if nil == e.Stack {
e.Stack = internal.GetStackTrace(2)
}
if ea, ok := err.(ErrorAttributer); ok && !txn.Config.HighSecurity && txn.Reply.SecurityPolicies.CustomParameters.Enabled() {
unvetted := ea.ErrorAttributes()
if len(unvetted) > internal.AttributeErrorLimit {
return errTooManyErrorAttributes
}
e.ExtraAttributes = make(map[string]interface{})
for key, val := range unvetted {
val, errr := internal.ValidateUserAttribute(key, val)
if nil != errr {
return errr
}
e.ExtraAttributes[key] = val
}
}
return txn.noticeErrorInternal(e)
}
func (txn *txn) SetName(name string) error {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return errAlreadyEnded
}
txn.Name = name
return nil
}
func (txn *txn) Ignore() error {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return errAlreadyEnded
}
txn.ignore = true
return nil
}
func (txn *txn) StartSegmentNow() SegmentStartTime {
var s internal.SegmentStartTime
txn.Lock()
if !txn.finished {
s = internal.StartSegment(&txn.TxnData, time.Now())
}
txn.Unlock()
return SegmentStartTime{
segment: segment{
start: s,
txn: txn,
},
}
}
type segment struct {
start internal.SegmentStartTime
txn *txn
}
func endSegment(s *Segment) error {
txn := s.StartTime.txn
if nil == txn {
return nil
}
var err error
txn.Lock()
if txn.finished {
err = errAlreadyEnded
} else {
err = internal.EndBasicSegment(&txn.TxnData, s.StartTime.start, time.Now(), s.Name)
}
txn.Unlock()
return err
}
func endDatastore(s *DatastoreSegment) error {
txn := s.StartTime.txn
if nil == txn {
return nil
}
txn.Lock()
defer txn.Unlock()
if txn.finished {
return errAlreadyEnded
}
if txn.Config.HighSecurity {
s.QueryParameters = nil
}
if !txn.Config.DatastoreTracer.QueryParameters.Enabled {
s.QueryParameters = nil
}
if txn.Reply.SecurityPolicies.RecordSQL.IsSet() {
s.QueryParameters = nil
if !txn.Reply.SecurityPolicies.RecordSQL.Enabled() {
s.ParameterizedQuery = ""
}
}
if !txn.Config.DatastoreTracer.DatabaseNameReporting.Enabled {
s.DatabaseName = ""
}
if !txn.Config.DatastoreTracer.InstanceReporting.Enabled {
s.Host = ""
s.PortPathOrID = ""
}
return internal.EndDatastoreSegment(internal.EndDatastoreParams{
Tracer: &txn.TxnData,
Start: s.StartTime.start,
Now: time.Now(),
Product: string(s.Product),
Collection: s.Collection,
Operation: s.Operation,
ParameterizedQuery: s.ParameterizedQuery,
QueryParameters: s.QueryParameters,
Host: s.Host,
PortPathOrID: s.PortPathOrID,
Database: s.DatabaseName,
})
}
func externalSegmentMethod(s *ExternalSegment) string {
r := s.Request
// Is this a client request?
if nil != s.Response && nil != s.Response.Request {
r = s.Response.Request
// Golang's http package states that when a client's
// Request has an empty string for Method, the
// method is GET.
if "" == r.Method {
return "GET"
}
}
if nil == r {
return ""
}
return r.Method
}
func externalSegmentURL(s *ExternalSegment) (*url.URL, error) {
if "" != s.URL {
return url.Parse(s.URL)
}
r := s.Request
if nil != s.Response && nil != s.Response.Request {
r = s.Response.Request
}
if r != nil {
return r.URL, nil
}
return nil, nil
}
func endExternal(s *ExternalSegment) error {
txn := s.StartTime.txn
if nil == txn {
return nil
}
txn.Lock()
defer txn.Unlock()
if txn.finished {
return errAlreadyEnded
}
m := externalSegmentMethod(s)
u, err := externalSegmentURL(s)
if nil != err {
return err
}
return internal.EndExternalSegment(&txn.TxnData, s.StartTime.start, time.Now(), u, m, s.Response)
}
// oldCATOutboundHeaders generates the Old CAT and Synthetics headers, depending
// on whether Old CAT is enabled or any Synthetics functionality has been
// triggered in the agent.
func oldCATOutboundHeaders(txn *txn) http.Header {
txn.Lock()
defer txn.Unlock()
if txn.finished {
return http.Header{}
}
metadata, err := txn.CrossProcess.CreateCrossProcessMetadata(txn.Name, txn.Config.AppName)
if err != nil {
txn.Config.Logger.Debug("error generating outbound headers", map[string]interface{}{
"error": err,
})
// It's possible for CreateCrossProcessMetadata() to error and still have a
// Synthetics header, so we'll still fall through to returning headers
// based on whatever metadata was returned.
}
return internal.MetadataToHTTPHeader(metadata)
}
func outboundHeaders(s *ExternalSegment) http.Header {
txn := s.StartTime.txn
if nil == txn {
return http.Header{}
}
hdr := oldCATOutboundHeaders(txn)
// hdr may be empty, or it may contain headers. If DistributedTracer
// is enabled, add more to the existing hdr
if p := txn.CreateDistributedTracePayload().HTTPSafe(); "" != p {
hdr.Add(DistributedTracePayloadHeader, p)
return hdr
}
return hdr
}
const (
maxSampledDistributedPayloads = 35
)
type shimPayload struct{}
func (s shimPayload) Text() string { return "" }
func (s shimPayload) HTTPSafe() string { return "" }
func (txn *txn) CreateDistributedTracePayload() (payload DistributedTracePayload) {
payload = shimPayload{}
txn.Lock()
defer txn.Unlock()
if !txn.BetterCAT.Enabled {
return
}
if txn.finished {
txn.CreatePayloadException = true
return
}
if "" == txn.Reply.PrimaryAppID {
// Return a shimPayload if the application is not yet connected.
return
}
txn.numPayloadsCreated++
var p internal.Payload
p.Type = internal.CallerType
p.Account = txn.Reply.AccountID
p.App = txn.Reply.PrimaryAppID
p.TracedID = txn.BetterCAT.TraceID()
p.Priority = txn.BetterCAT.Priority
p.Timestamp.Set(time.Now())
p.TransactionID = txn.BetterCAT.ID // Set the transaction ID to the transaction guid.
if txn.Reply.AccountID != txn.Reply.TrustedAccountKey {
p.TrustedAccountKey = txn.Reply.TrustedAccountKey
}
if txn.BetterCAT.Sampled && txn.SpanEventsEnabled {
p.ID = txn.CurrentSpanIdentifier()
}
// limit the number of outbound sampled=true payloads to prevent too
// many downstream sampled events.
p.SetSampled(false)
if txn.numPayloadsCreated < maxSampledDistributedPayloads {
p.SetSampled(txn.BetterCAT.Sampled)
}
txn.CreatePayloadSuccess = true
payload = p
return
}
var (
errOutboundPayloadCreated = errors.New("outbound payload already created")
errAlreadyAccepted = errors.New("AcceptDistributedTracePayload has already been called")
errInboundPayloadDTDisabled = errors.New("DistributedTracer must be enabled to accept an inbound payload")
errTrustedAccountKey = errors.New("trusted account key missing or does not match")
)
func (txn *txn) AcceptDistributedTracePayload(t TransportType, p interface{}) error {
txn.Lock()
defer txn.Unlock()
return txn.acceptDistributedTracePayloadLocked(t, p)
}
func (txn *txn) acceptDistributedTracePayloadLocked(t TransportType, p interface{}) error {
if !txn.BetterCAT.Enabled {
return errInboundPayloadDTDisabled
}
if txn.finished {
txn.AcceptPayloadException = true
return errAlreadyEnded
}
if txn.numPayloadsCreated > 0 {
txn.AcceptPayloadCreateBeforeAccept = true
return errOutboundPayloadCreated
}
if txn.BetterCAT.Inbound != nil {
txn.AcceptPayloadIgnoredMultiple = true
return errAlreadyAccepted
}
if nil == p {
txn.AcceptPayloadNullPayload = true
return nil
}
payload, err := internal.AcceptPayload(p)
if nil != err {
if _, ok := err.(internal.ErrPayloadParse); ok {
txn.AcceptPayloadParseException = true
} else if _, ok := err.(internal.ErrUnsupportedPayloadVersion); ok {
txn.AcceptPayloadIgnoredVersion = true
} else if _, ok := err.(internal.ErrPayloadMissingField); ok {
txn.AcceptPayloadParseException = true
} else {
txn.AcceptPayloadException = true
}
return err
}
if nil == payload {
return nil
}
// now that we have a parsed and alloc'd payload,
// let's make sure it has the correct fields
if err := payload.IsValid(); nil != err {
txn.AcceptPayloadParseException = true
return err
}
// and let's also do our trustedKey check
receivedTrustKey := payload.TrustedAccountKey
if "" == receivedTrustKey {
receivedTrustKey = payload.Account
}
if receivedTrustKey != txn.Reply.TrustedAccountKey {
txn.AcceptPayloadUntrustedAccount = true
return errTrustedAccountKey
}
if 0 != payload.Priority {
txn.BetterCAT.Priority = payload.Priority
}
// a nul payload.Sampled means the a field wasn't provided
if nil != payload.Sampled {
txn.BetterCAT.Sampled = *payload.Sampled
} else {
txn.BetterCAT.Sampled = txn.Reply.AdaptiveSampler.ComputeSampled(txn.BetterCAT.Priority.Float32(), time.Now())
}
txn.BetterCAT.Inbound = payload
// TransportType's name field is not mutable outside of its package
// so the only check needed is if the caller is using an empty TransportType
txn.BetterCAT.Inbound.TransportType = t.name
if t.name == "" {
txn.BetterCAT.Inbound.TransportType = TransportUnknown.name
txn.Config.Logger.Debug("Invalid transport type, defaulting to Unknown", map[string]interface{}{})
}
if tm := payload.Timestamp.Time(); txn.Start.After(tm) {
txn.BetterCAT.Inbound.TransportDuration = txn.Start.Sub(tm)
}
txn.AcceptPayloadSuccess = true
return nil
}