From 28144332f10ab7f2f040d305fa6644dfaec92379 Mon Sep 17 00:00:00 2001 From: Lee Brown Date: Mon, 15 Jul 2019 12:55:56 -0800 Subject: [PATCH] issue#8: Added example code for datadog log collector This code was copied from another project and should be used as a reference. Im not sure if the functions directory is a good locaiton and the task definitions include incorrect account information. The task definition should by dynamically generated by the deploy script. --- .../lambda-func-definition-dev.json | 18 + .../lambda-func-definition-prod.json | 18 + .../lambda-func-definition-stage.json | 18 + functions/ddlogscollector/lambda_function.py | 505 ++++++++++++++++++ 4 files changed, 559 insertions(+) create mode 100644 functions/ddlogscollector/lambda-func-definition-dev.json create mode 100644 functions/ddlogscollector/lambda-func-definition-prod.json create mode 100644 functions/ddlogscollector/lambda-func-definition-stage.json create mode 100644 functions/ddlogscollector/lambda_function.py diff --git a/functions/ddlogscollector/lambda-func-definition-dev.json b/functions/ddlogscollector/lambda-func-definition-dev.json new file mode 100644 index 0000000..2bcc91b --- /dev/null +++ b/functions/ddlogscollector/lambda-func-definition-dev.json @@ -0,0 +1,18 @@ +{ + "FunctionName": "{LAMBDA_FUNC}", + "MemorySize": 512, + "Role": "arn:aws:iam::074586779200:role/DatadogAWSIntegrationLambdaRole", + "Timeout": 300, + "Runtime": "python2.7", + "TracingConfig": { + "Mode": "PassThrough" + }, + "Description": "Ship logs from cloudwatch to datadog", + "Handler": "lambda_function.lambda_handler", + "Environment": { + "Variables": { + "DD_API_KEY": "{DATADOG_APIKEY}", + "LAMBDA_FUNC": "{LAMBDA_FUNC}" + } + } +} diff --git a/functions/ddlogscollector/lambda-func-definition-prod.json b/functions/ddlogscollector/lambda-func-definition-prod.json new file mode 100644 index 0000000..d1c875b --- /dev/null +++ b/functions/ddlogscollector/lambda-func-definition-prod.json @@ -0,0 +1,18 @@ +{ + "FunctionName": "{LAMBDA_FUNC}", + "MemorySize": 1024, + "Role": "arn:aws:iam::992202127086:role/DatadogAWSIntegrationLambdaRole", + "Timeout": 300, + "Runtime": "python2.7", + "TracingConfig": { + "Mode": "PassThrough" + }, + "Description": "Ship logs from cloudwatch to datadog", + "Handler": "lambda_function.lambda_handler", + "Environment": { + "Variables": { + "DD_API_KEY": "{DATADOG_APIKEY}", + "LAMBDA_FUNC": "{LAMBDA_FUNC}" + } + } +} \ No newline at end of file diff --git a/functions/ddlogscollector/lambda-func-definition-stage.json b/functions/ddlogscollector/lambda-func-definition-stage.json new file mode 100644 index 0000000..1f106d8 --- /dev/null +++ b/functions/ddlogscollector/lambda-func-definition-stage.json @@ -0,0 +1,18 @@ +{ + "FunctionName": "{LAMBDA_FUNC}", + "MemorySize": 1024, + "Role": "arn:aws:iam::421734222910:role/DatadogAWSIntegrationLambdaRole", + "Timeout": 300, + "Runtime": "python2.7", + "TracingConfig": { + "Mode": "PassThrough" + }, + "Description": "Ship logs from cloudwatch to datadog", + "Handler": "lambda_function.lambda_handler", + "Environment": { + "Variables": { + "DD_API_KEY": "{DATADOG_APIKEY}", + "LAMBDA_FUNC": "{LAMBDA_FUNC}" + } + } +} diff --git a/functions/ddlogscollector/lambda_function.py b/functions/ddlogscollector/lambda_function.py new file mode 100644 index 0000000..8cf98c8 --- /dev/null +++ b/functions/ddlogscollector/lambda_function.py @@ -0,0 +1,505 @@ +# Unless explicitly stated otherwise all files in this repository are licensed +# under the Apache License Version 2.0. +# This product includes software developed at Datadog (https://www.datadoghq.com/). +# Copyright 2018 Datadog, Inc. + +from __future__ import print_function + +import base64 +import gzip +import json +import os +import re +import socket +import ssl +import urllib +from io import BytesIO, BufferedReader + +import boto3 + +# Proxy +# Define the proxy endpoint to forward the logs to +DD_SITE = os.getenv("DD_SITE", default="datadoghq.com") +DD_URL = os.getenv("DD_URL", default="lambda-intake.logs." + DD_SITE) + +# Define the proxy port to forward the logs to +try: + if "DD_SITE" in os.environ and DD_SITE == "datadoghq.eu": + DD_PORT = int(os.environ.get("DD_PORT", 443)) + else: + DD_PORT = int(os.environ.get("DD_PORT", 10516)) +except Exception: + DD_PORT = 10516 + +# Scrubbing sensitive data +# Option to redact all pattern that looks like an ip address / email address +try: + is_ipscrubbing = os.environ["REDACT_IP"] +except Exception: + is_ipscrubbing = False +try: + is_emailscrubbing = os.environ["REDACT_EMAIL"] +except Exception: + is_emailscrubbing = False + +# DD_API_KEY: Datadog API Key +DD_API_KEY = "" +if "DD_KMS_API_KEY" in os.environ: + ENCRYPTED = os.environ["DD_KMS_API_KEY"] + DD_API_KEY = boto3.client("kms").decrypt( + CiphertextBlob=base64.b64decode(ENCRYPTED) + )["Plaintext"] +elif "DD_API_KEY" in os.environ: + DD_API_KEY = os.environ["DD_API_KEY"] + +# Strip any trailing and leading whitespace from the API key +DD_API_KEY = DD_API_KEY.strip() + +cloudtrail_regex = re.compile( + "\d+_CloudTrail_\w{2}-\w{4,9}-\d_\d{8}T\d{4}Z.+.json.gz$", re.I +) +ip_regex = re.compile("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", re.I) +email_regex = re.compile("[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", re.I) + +DD_SOURCE = "ddsource" +DD_CUSTOM_TAGS = "ddtags" +DD_SERVICE = "service" +DD_HOST = "host" +DD_FORWARDER_VERSION = "1.2.3" + +# Pass custom tags as environment variable, ensure comma separated, no trailing comma in envvar! +DD_TAGS = os.environ.get("DD_TAGS", "") + +class DatadogConnection(object): + def __init__(self, host, port, ddApiKey): + self.host = host + self.port = port + self.api_key = ddApiKey + self._sock = None + + def _connect(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = ssl.wrap_socket(s) + s.connect((self.host, self.port)) + return s + + def safe_submit_log(self, log, metadata): + try: + self.send_entry(log, metadata) + except Exception as e: + # retry once + if self._sock: + # make sure we don't keep old connections open + self._sock.close() + self._sock = self._connect() + self.send_entry(log, metadata) + return self + + def send_entry(self, log_entry, metadata): + # The log_entry can only be a string or a dict + if isinstance(log_entry, str): + log_entry = {"message": log_entry} + elif not isinstance(log_entry, dict): + raise Exception( + "Cannot send the entry as it must be either a string or a dict. Provided entry: " + + str(log_entry) + ) + + # Merge with metadata + log_entry = merge_dicts(log_entry, metadata) + + # Send to Datadog + str_entry = json.dumps(log_entry) + + # Scrub ip addresses if activated + if is_ipscrubbing: + try: + str_entry = ip_regex.sub("xxx.xxx.xxx.xx", str_entry) + except Exception as e: + print( + "Unexpected exception while scrubbing logs: {} for event {}".format( + str(e), str_entry + ) + ) + # Scrub email addresses if activated + if is_emailscrubbing: + try: + str_entry = email_regex.sub("xxxxx@xxxxx.com", str_entry) + except Exception as e: + print( + "Unexpected exception while scrubbing logs: {} for event {}".format( + str(e), str_entry + ) + ) + + # For debugging purpose uncomment the following line + # print(str_entry) + prefix = "%s " % self.api_key + return self._sock.send((prefix + str_entry + "\n").encode("UTF-8")) + + def __enter__(self): + self._sock = self._connect() + return self + + def __exit__(self, ex_type, ex_value, traceback): + if self._sock: + self._sock.close() + if ex_type is not None: + print("DatadogConnection exit: ", ex_type, ex_value, traceback) + + +def lambda_handler(event, context): + # Check prerequisites + if DD_API_KEY == "" or DD_API_KEY == "": + raise Exception( + "You must configure your API key before starting this lambda function (see #Parameters section)" + ) + # Check if the API key is the correct number of characters + if len(DD_API_KEY) != 32: + raise Exception( + "The API key is not the expected length. Please confirm that your API key is correct" + ) + + metadata = {"ddsourcecategory": "aws"} + + # create socket + with DatadogConnection(DD_URL, DD_PORT, DD_API_KEY) as con: + # Add the context to meta + if "aws" not in metadata: + metadata["aws"] = {} + aws_meta = metadata["aws"] + aws_meta["function_version"] = context.function_version + aws_meta["invoked_function_arn"] = context.invoked_function_arn + # Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications + dd_custom_tags_data = { + "forwardername": context.function_name.lower(), + "memorysize": context.memory_limit_in_mb, + "forwarder_version": DD_FORWARDER_VERSION, + } + metadata[DD_CUSTOM_TAGS] = ",".join( + filter( + None, + [ + DD_TAGS, + ",".join( + [ + "{}:{}".format(k, v) + for k, v in dd_custom_tags_data.iteritems() + ] + ), + ], + ) + ) + + try: + logs = generate_logs(event, context, metadata) + for log in logs: + con = con.safe_submit_log(log, metadata) + except Exception as e: + print("Unexpected exception: {} for event {}".format(str(e), event)) + + +def generate_logs(event, context, metadata): + try: + # Route to the corresponding parser + event_type = parse_event_type(event) + if event_type == "s3": + logs = s3_handler(event, context, metadata) + elif event_type == "awslogs": + logs = awslogs_handler(event, context, metadata) + elif event_type == "events": + logs = cwevent_handler(event, metadata) + elif event_type == "sns": + logs = sns_handler(event, metadata) + except Exception as e: + # Logs through the socket the error + err_message = "Error parsing the object. Exception: {} for event {}".format( + str(e), event + ) + logs = [err_message] + return logs + + +# Utility functions + + +def parse_event_type(event): + if "Records" in event and len(event["Records"]) > 0: + if "s3" in event["Records"][0]: + return "s3" + elif "Sns" in event["Records"][0]: + return "sns" + + elif "awslogs" in event: + return "awslogs" + + elif "detail" in event: + return "events" + raise Exception("Event type not supported (see #Event supported section)") + + +# Handle S3 events +def s3_handler(event, context, metadata): + s3 = boto3.client("s3") + + # Get the object from the event and show its content type + bucket = event["Records"][0]["s3"]["bucket"]["name"] + key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"]).decode("utf8") + + keyMetadata = parse_key_metadata(key) + for k in keyMetadata : + metadata[k] = keyMetadata[k] + + source = parse_event_source(event, key) + metadata[DD_SOURCE] = source + + ##Get the ARN of the service and set it as the hostname + if DD_HOST not in metadata.keys() : + hostname = parse_service_arn(source, key, bucket, context) + if hostname: + metadata[DD_HOST] = hostname + + ##default service to source value + if DD_SERVICE not in metadata.keys() : + metadata[DD_SERVICE] = source + + # Extract the S3 object + response = s3.get_object(Bucket=bucket, Key=key) + body = response["Body"] + data = body.read() + + # If the name has a .gz extension, then decompress the data + if key[-3:] == ".gz": + with gzip.GzipFile(fileobj=BytesIO(data)) as decompress_stream: + # Reading line by line avoid a bug where gzip would take a very long time (>5min) for + # file around 60MB gzipped + data = "".join(BufferedReader(decompress_stream)) + + if is_cloudtrail(str(key)): + cloud_trail = json.loads(data) + for event in cloud_trail["Records"]: + # Create structured object and send it + structured_line = merge_dicts( + event, {"aws": {"s3": {"bucket": bucket, "key": key}}} + ) + yield structured_line + else: + # Send lines to Datadog + for line in data.splitlines(): + # Create structured object and send it + structured_line = { + "aws": {"s3": {"bucket": bucket, "key": key}}, + "message": line, + } + yield structured_line + + +# Handle CloudWatch logs +def awslogs_handler(event, context, metadata): + # Get logs + with gzip.GzipFile( + fileobj=BytesIO(base64.b64decode(event["awslogs"]["data"])) + ) as decompress_stream: + # Reading line by line avoid a bug where gzip would take a very long + # time (>5min) for file around 60MB gzipped + data = "".join(BufferedReader(decompress_stream)) + logs = json.loads(str(data)) + + # Set the source on the logs + source = logs.get("logGroup", "cloudwatch") + metadata[DD_SOURCE] = parse_event_source(event, source) + + # Default service to source value + metadata[DD_SERVICE] = metadata[DD_SOURCE] + + # Build aws attributes + aws_attributes = { + "aws": { + "awslogs": { + "logGroup": logs["logGroup"], + "logStream": logs["logStream"], + "owner": logs["owner"], + } + } + } + + # For Lambda logs we want to extract the function name, + # then rebuild the arn of the monitored lambda using that name. + # Start by splitting the log group to get the function name + if metadata[DD_SOURCE] == "lambda": + log_group_parts = logs["logGroup"].split("/lambda/") + if len(log_group_parts) > 0: + function_name = log_group_parts[1].lower() + # Split the arn of the forwarder to extract the prefix + arn_parts = context.invoked_function_arn.split("function:") + if len(arn_parts) > 0: + arn_prefix = arn_parts[0] + # Rebuild the arn by replacing the function name + arn = arn_prefix + "function:" + function_name + # Add the arn as a log attribute + arn_attributes = {"lambda": {"arn": arn}} + aws_attributes = merge_dicts(aws_attributes, arn_attributes) + # Add the function name as tag + metadata[DD_CUSTOM_TAGS] += ",functionname:" + function_name + # Set the arn as the hostname + metadata[DD_HOST] = arn + + # Create and send structured logs to Datadog + for log in logs["logEvents"]: + yield merge_dicts(log, aws_attributes) + + +# Handle Cloudwatch Events +def cwevent_handler(event, metadata): + + data = event + + # Set the source on the log + source = data.get("source", "cloudwatch") + service = source.split(".") + if len(service) > 1: + metadata[DD_SOURCE] = service[1] + else: + metadata[DD_SOURCE] = "cloudwatch" + ##default service to source value + metadata[DD_SERVICE] = metadata[DD_SOURCE] + + yield data + + +# Handle Sns events +def sns_handler(event, metadata): + + data = event + # Set the source on the log + metadata[DD_SOURCE] = parse_event_source(event, "sns") + + for ev in data["Records"]: + # Create structured object and send it + structured_line = ev + yield structured_line + + +def merge_dicts(a, b, path=None): + if path is None: + path = [] + for key in b: + if key in a: + if isinstance(a[key], dict) and isinstance(b[key], dict): + merge_dicts(a[key], b[key], path + [str(key)]) + elif a[key] == b[key]: + pass # same leaf value + else: + raise Exception( + "Conflict while merging metadatas and the log entry at %s" + % ".".join(path + [str(key)]) + ) + else: + a[key] = b[key] + return a + + +def is_cloudtrail(key): + match = cloudtrail_regex.search(key) + return bool(match) + + +def parse_event_source(event, key): + if "elasticloadbalancing" in key: + return "elb" + for source in [ + "lambda", + "redshift", + "cloudfront", + "kinesis", + "mariadb", + "mysql", + "apigateway", + "route53", + "vpc", + "rds", + "sns", + "waf", + "docdb", + "ecs", + ]: + if source in key: + return source + if "API-Gateway" in key: + return "apigateway" + if is_cloudtrail(str(key)): + return "cloudtrail" + if "awslogs" in event: + return "cloudwatch" + if "Records" in event and len(event["Records"]) > 0: + if "s3" in event["Records"][0]: + return "s3" + return "aws" + +def parse_service_arn(source, key, bucket, context): + if source == "elb": + #For ELB logs we parse the filename to extract parameters in order to rebuild the ARN + #1. We extract the region from the filename + #2. We extract the loadbalancer name and replace the "." by "/" to match the ARN format + #3. We extract the id of the loadbalancer + #4. We build the arn + keysplit = key.split("_") + idsplit = key.split("/") + if len(keysplit) > 3: + region = keysplit[2].lower() + name = keysplit[3] + elbname = name.replace(".", "/") + if len(idsplit) > 1: + idvalue = idsplit[1] + return "arn:aws:elasticloadbalancing:" + region + ":" + idvalue + ":loadbalancer/" + elbname + if source == "s3": + #For S3 access logs we use the bucket name to rebuild the arn + if bucket: + return "arn:aws:s3:::" + bucket + if source == "cloudfront": + #For Cloudfront logs we need to get the account and distribution id from the lambda arn and the filename + #1. We extract the cloudfront id from the filename + #2. We extract the AWS account id from the lambda arn + #3. We build the arn + namesplit = key.split("/") + if len(namesplit) > 0: + filename = namesplit[len(namesplit)-1] + #(distribution-ID.YYYY-MM-DD-HH.unique-ID.gz) + filenamesplit = filename.split(".") + if len(filenamesplit) > 3: + distributionID = filenamesplit[len(filenamesplit)-4].lower() + arn = context.invoked_function_arn + arnsplit = arn.split(":") + if len(arnsplit) == 7: + awsaccountID = arnsplit[4].lower() + return "arn:aws:cloudfront::" + awsaccountID+":distribution/" + distributionID + if source == "redshift": + #For redshift logs we leverage the filename to extract the relevant information + #1. We extract the region from the filename + #2. We extract the account-id from the filename + #3. We extract the name of the cluster + #4. We build the arn: arn:aws:redshift:region:account-id:cluster:cluster-name + namesplit = key.split("/") + if len(namesplit) == 8: + region = namesplit[3].lower() + accountID = namesplit[1].lower() + filename = namesplit[7] + filesplit = filename.split("_") + if len(filesplit) == 6: + clustername = filesplit[3] + return "arn:aws:redshift:" + region + ":" + accountID + ":cluster:" + clustername + return + +def parse_key_metadata(key): + metadata = {} + keysplit = key.split("/") + for k in keysplit : + if "=" in k : + prt = key.split("=") + metadata[prt[0]] = prt[0] + elif "_" in k : + kn = key.split("_")[0] + if kn in ["source", "cluster", "service", "env", "region", "host"]: + metadata[kn] = k.replace(kn+"_", "") + + return metadata