mirror of
https://github.com/SAP/jenkins-library.git
synced 2025-02-03 13:21:41 +02:00
Splunk reporting; Sending messages in batches (#3611)
* Refactors logfile sending logic, renaming of fields, adds proper piper sourcetype * Sets maximum retries to three and transport timeout to 10 seconds for azure and jenkins
This commit is contained in:
parent
5821a311cc
commit
6398e61995
@ -20,8 +20,10 @@ type AzureDevOpsConfigProvider struct {
|
||||
func (a *AzureDevOpsConfigProvider) InitOrchestratorProvider(settings *OrchestratorSettings) {
|
||||
a.client = piperHttp.Client{}
|
||||
a.options = piperHttp.ClientOptions{
|
||||
Username: "",
|
||||
Password: settings.AzureToken,
|
||||
Username: "",
|
||||
Password: settings.AzureToken,
|
||||
MaxRetries: 3,
|
||||
TransportTimeout: time.Second * 10,
|
||||
}
|
||||
a.client.SetOptions(a.options)
|
||||
log.Entry().Debug("Successfully initialized Azure config provider")
|
||||
|
@ -16,8 +16,10 @@ type JenkinsConfigProvider struct {
|
||||
func (j *JenkinsConfigProvider) InitOrchestratorProvider(settings *OrchestratorSettings) {
|
||||
j.client = piperHttp.Client{}
|
||||
j.options = piperHttp.ClientOptions{
|
||||
Username: settings.JenkinsUser,
|
||||
Password: settings.JenkinsToken,
|
||||
Username: settings.JenkinsUser,
|
||||
Password: settings.JenkinsToken,
|
||||
MaxRetries: 3,
|
||||
TransportTimeout: time.Second * 10,
|
||||
}
|
||||
j.client.SetOptions(j.options)
|
||||
log.Entry().Debug("Successfully initialized Jenkins config provider")
|
||||
|
@ -24,23 +24,20 @@ type MonitoringData struct {
|
||||
Duration string `json:"Duration,omitempty"`
|
||||
ErrorCode string `json:"ErrorCode,omitempty"`
|
||||
ErrorCategory string `json:"ErrorCategory,omitempty"`
|
||||
CorrelationID string `json:"CorrelationID,omitempty"`
|
||||
CorrelationID string `json:"CorrelationId,omitempty"`
|
||||
CommitHash string `json:"CommitHash,omitempty"`
|
||||
Branch string `json:"Branch,omitempty"`
|
||||
GitOwner string `json:"GitOwner,omitempty"`
|
||||
GitRepository string `json:"GitRepository,omitempty"`
|
||||
}
|
||||
|
||||
type LogFileEvents struct {
|
||||
Messages []string `json:"messages,omitempty"` // messages
|
||||
Telemetry map[string]interface{} `json:"telemetry,omitempty"` // telemetryData
|
||||
}
|
||||
type DetailsLog struct {
|
||||
Host string `json:"host"` // hostname
|
||||
Source string `json:"source,omitempty"` // optional description of the source of the event; typically the app's name
|
||||
SourceType string `json:"sourcetype,omitempty"` // optional name of a Splunk parsing configuration; this is usually inferred by Splunk
|
||||
Index string `json:"index,omitempty"` // optional name of the Splunk index to store the event in; not required if the token has a default index set in Splunk
|
||||
Event LogFileEvents `json:"event,omitempty"` // throw any useful key/val pairs here}
|
||||
type LogFileEvent struct {
|
||||
Event string `json:"event"` // messages
|
||||
Host string `json:"host"` // hostname
|
||||
Source string `json:"source"` // optional description of the source of the event; typically the app's name
|
||||
SourceType string `json:"sourcetype"` // optional name of a Splunk parsing configuration; this is usually inferred by Splunk
|
||||
Index string `json:"index"` // optional name of the Splunk index to store the event in; not required if the token has a default index set in Splunk
|
||||
|
||||
}
|
||||
|
||||
type DetailsTelemetry struct {
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -25,6 +26,7 @@ type Splunk struct {
|
||||
tags map[string]string
|
||||
splunkClient piperhttp.Client
|
||||
correlationID string
|
||||
hostName string
|
||||
splunkDsn string
|
||||
splunkIndex string
|
||||
|
||||
@ -49,14 +51,20 @@ func (s *Splunk) Initialize(correlationID, dsn, token, index string, sendLogs bo
|
||||
MaxRequestDuration: 5 * time.Second,
|
||||
Token: token,
|
||||
TransportSkipVerification: true,
|
||||
MaxRetries: -1,
|
||||
MaxRetries: 5,
|
||||
})
|
||||
|
||||
hostName, err := os.Hostname()
|
||||
if err != nil {
|
||||
log.Entry().WithError(err).Debug("Could not get hostName.")
|
||||
hostName = "n/a"
|
||||
}
|
||||
s.hostName = hostName
|
||||
s.splunkClient = client
|
||||
s.splunkDsn = dsn
|
||||
s.splunkIndex = index
|
||||
s.correlationID = correlationID
|
||||
s.postMessagesBatchSize = 20000
|
||||
s.postMessagesBatchSize = 6000
|
||||
s.sendLogs = sendLogs
|
||||
|
||||
return nil
|
||||
@ -138,11 +146,11 @@ func (s *Splunk) SendPipelineStatus(pipelineTelemetryData map[string]interface{}
|
||||
splitted := strings.Split(readLogFile, "\n")
|
||||
messagesLen := len(splitted)
|
||||
|
||||
log.Entry().Debugf("Sending %v messages to Splunk.", messagesLen)
|
||||
log.Entry().Debugf("Sending pipeline telemetry data to Splunk: %v", pipelineTelemetryData)
|
||||
s.postTelemetry(pipelineTelemetryData)
|
||||
|
||||
if s.sendLogs {
|
||||
log.Entry().Debugf("Sending %v messages to Splunk.", messagesLen)
|
||||
for i := 0; i < messagesLen; i += s.postMessagesBatchSize {
|
||||
upperBound := i + s.postMessagesBatchSize
|
||||
if upperBound > messagesLen {
|
||||
@ -163,23 +171,24 @@ func (s *Splunk) postTelemetry(telemetryData map[string]interface{}) error {
|
||||
}
|
||||
details := DetailsTelemetry{
|
||||
Host: s.correlationID,
|
||||
SourceType: "_json",
|
||||
SourceType: "piper:pipeline:telemetry",
|
||||
Index: s.splunkIndex,
|
||||
Event: telemetryData,
|
||||
}
|
||||
|
||||
payload, err := json.Marshal(details)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error while marshalling Splunk message details")
|
||||
}
|
||||
|
||||
prettyPayload, err := json.MarshalIndent(payload, "", " ")
|
||||
prettyPayload, err := json.MarshalIndent(details, "", " ")
|
||||
if err != nil {
|
||||
log.Entry().WithError(err).Warn("Failed to generate pretty payload json")
|
||||
prettyPayload = nil
|
||||
}
|
||||
log.Entry().Debugf("Sending the follwing payload to Splunk HEC: %s", string(prettyPayload))
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error while marshalling Splunk message details")
|
||||
}
|
||||
|
||||
resp, err := s.splunkClient.SendRequest(http.MethodPost, s.splunkDsn, bytes.NewBuffer(payload), nil, nil)
|
||||
|
||||
if resp != nil {
|
||||
@ -211,23 +220,26 @@ func (s *Splunk) postTelemetry(telemetryData map[string]interface{}) error {
|
||||
|
||||
func (s *Splunk) postLogFile(telemetryData map[string]interface{}, messages []string) error {
|
||||
|
||||
event := LogFileEvents{
|
||||
Messages: messages,
|
||||
Telemetry: telemetryData,
|
||||
}
|
||||
details := DetailsLog{
|
||||
Host: s.correlationID,
|
||||
SourceType: "txt",
|
||||
Index: s.splunkIndex,
|
||||
Event: event,
|
||||
var logfileEvents []string
|
||||
for _, message := range messages {
|
||||
logMessage := LogFileEvent{
|
||||
Event: message,
|
||||
Host: s.hostName,
|
||||
Source: s.correlationID,
|
||||
SourceType: "piper:pipeline:logfile",
|
||||
Index: s.splunkIndex,
|
||||
}
|
||||
marshalledLogMessage, err := json.Marshal(logMessage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error while marshalling Splunk messages")
|
||||
}
|
||||
logfileEvents = append(logfileEvents, string(marshalledLogMessage))
|
||||
}
|
||||
// creates payload {"event":"this is a sample event ", "Host":"myHost", "Source":"mySource", "SourceType":"valueA", "Index":"valueB"}{"event":"this is a sample event ", "Host":"myHost", "Source":"mySource", "SourceType":"valueA", "Index":"valueB"}..
|
||||
strout := strings.Join(logfileEvents, ",")
|
||||
payload := strings.NewReader(strout)
|
||||
|
||||
payload, err := json.Marshal(details)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error while marshalling Splunk message details")
|
||||
}
|
||||
|
||||
resp, err := s.splunkClient.SendRequest(http.MethodPost, s.splunkDsn, bytes.NewBuffer(payload), nil, nil)
|
||||
resp, err := s.splunkClient.SendRequest(http.MethodPost, s.splunkDsn, payload, nil, nil)
|
||||
|
||||
if resp != nil {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
|
Loading…
x
Reference in New Issue
Block a user