From 208dcceee32b9b166fd6a7084974721b676aa18f Mon Sep 17 00:00:00 2001 From: Charlie Briggs Date: Tue, 27 Apr 2021 12:43:30 +0100 Subject: [PATCH] Support the conversion of tags to labels --- Makefile | 3 +- README.md | 6 ++++ discoverecs.py | 92 ++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 83 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index 7fa06e8..636e1f4 100644 --- a/Makefile +++ b/Makefile @@ -4,5 +4,6 @@ format: .PHONY: dev-start dev-start: + rm -rf ./targets @mkdir -p ./targets - poetry run python discoverecs.py --directory $$PWD/targets --default-scrape-interval-prefix default + poetry run python discoverecs.py --directory $$PWD/targets --default-scrape-interval-prefix default --tags-to-labels "*" diff --git a/README.md b/README.md index bdabe59..ae89ca9 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,12 @@ e.g. if `default` is used: then `/opt/prometheus-ecs/default-tasks.json` will be written. This can be useful to allow configuration of a default scrape interval in your Prometheus config, rather than needing to update the config and then redeploying this discovery service. +### Tags to labels + +If `--tags-to-labels` is set, the given tags will be added to the service discovery entry as `tag_` where `` is the given tag formatted to allowed label characters if the tag exists on either the task definition or task. Task tags override the task definition tags. + +If `---tags-to-labels` is set to `*` then _all_ non-aws tags will be added. + ### Configuration yaml The following Prometheus configuration should be used to support all available intervals: diff --git a/discoverecs.py b/discoverecs.py index c5c3551..8003fdf 100644 --- a/discoverecs.py +++ b/discoverecs.py @@ -115,13 +115,14 @@ def valid(self): class TaskInfoDiscoverer: - def __init__(self): + def __init__(self, fetch_tags=True): self.ec2_client = boto3.client("ec2") self.ecs_client = boto3.client("ecs") self.task_cache = FlipCache() self.task_definition_cache = FlipCache() self.container_instance_cache = FlipCache() self.ec2_instance_cache = FlipCache() + self.fetch_tags = fetch_tags def flip_caches(self): self.task_cache.flip() @@ -131,14 +132,27 @@ def flip_caches(self): def describe_tasks(self, cluster_arn, task_arns): def fetcher_task_definition(arn): - return self.ecs_client.describe_task_definition(taskDefinition=arn)[ - "taskDefinition" - ] + response = self.ecs_client.describe_task_definition( + taskDefinition=arn, + include=[ + "TAGS", + ] + if self.fetch_tags + else None, + ) + + return {**response["taskDefinition"], "tags": response.get("tags", [])} def fetcher(fetch_task_arns): tasks = {} result = self.ecs_client.describe_tasks( - cluster=cluster_arn, tasks=fetch_task_arns + cluster=cluster_arn, + tasks=fetch_task_arns, + include=[ + "TAGS", + ] + if self.fetch_tags + else None, ) if "tasks" in result: for task in result["tasks"]: @@ -304,6 +318,7 @@ def __init__( ecs_container_id, ecs_cluster_name, ec2_instance_id, + tags, ): self.ip = ip self.port = port @@ -315,6 +330,7 @@ def __init__( self.ecs_container_id = ecs_container_id self.ecs_cluster_name = ecs_cluster_name self.ec2_instance_id = ec2_instance_id + self.tags = tags def get_environment_var(environment, name): @@ -380,9 +396,16 @@ def task_info_to_targets(task_info): lambda container: container["name"] == container_definition["name"], task["containers"], ) + if not prometheus_enabled: continue + # get tags from the task definition, and merge/override any tags specifically set on the task + tags = { + **{tag["key"]: tag["value"] for tag in task_definition.get("tags", [])}, + **{tag["key"]: tag["value"] for tag in task.get("tags", [])}, + } + for container in running_containers: ecs_task_name = extract_name_from_arn(task["taskDefinitionArn"]) has_host_port_mapping = ( @@ -451,17 +474,21 @@ def task_info_to_targets(task_info): ecs_container_id=ecs_container_id, ecs_cluster_name=ecs_cluster_name, ec2_instance_id=ec2_instance_id, + tags=tags, ) ] return targets class Main: - def __init__(self, directory, interval, default_scrape_interval_prefix): + def __init__( + self, directory, interval, default_scrape_interval_prefix, tags_to_labels + ): self.directory = directory self.interval = interval self.default_scrape_interval_prefix = default_scrape_interval_prefix - self.discoverer = TaskInfoDiscoverer() + self.discoverer = TaskInfoDiscoverer(fetch_tags=len(tags_to_labels) > 0) + self.tags_to_labels = tags_to_labels def write_jobs(self, jobs): for prefix, j in jobs.items(): @@ -510,10 +537,20 @@ def discover_tasks(self): "metrics_path": path, }, } + for tag_name, tag_value in target.tags.items(): + if not tag_name.lower().startswith("aws:") and ( + not tag_name in self.tags_to_labels + or self.tags_to_labels == ["*"] + ): + # prometheus labels match [a-zA-Z_][a-zA-Z0-9_]* + # with leading __ reserved for internal use + tag_name = re.sub(r"[^a-zA-Z0-9_]", "_", tag_name).lstrip("_") + if tag_name != "" and not re.match(r"^[0-9]", tag_name): + job["labels"]["tag_" + tag_name] = tag_value if labels: job["labels"].update(labels) jobs[interval or self.default_scrape_interval_prefix].append(job) - log(job) + log("Discovered Job: " + str(job)) self.write_jobs(jobs) def loop(self): @@ -524,21 +561,42 @@ def loop(self): def main(): arg_parser = argparse.ArgumentParser() - arg_parser.add_argument("--directory", required=True) - arg_parser.add_argument("--interval", default=60) - arg_parser.add_argument("--default-scrape-interval-prefix", default="1m") + arg_parser.add_argument( + "--directory", + required=True, + help="The output directory for service discovery configs.", + ) + arg_parser.add_argument( + "--interval", + type=float, + default=60, + help="The interval to refresh targetes from AWS APIs.", + ) + arg_parser.add_argument( + "--default-scrape-interval-prefix", + default="1m", + help="The default prefix to write the service discovery file for if no explicit prefix is specified in the discovered service.", + ) + arg_parser.add_argument( + "--tags-to-labels", + nargs="+", + help="Task definition tags to convert to labels. Case sensitive.", + ) args = arg_parser.parse_args() log( - 'Starting...\nDirectory: "{}"\nRefresh interval: "{}s"\nDefault scrape interval prefix: "{}"\n'.format( - args.directory, - str(args.interval), - args.default_scrape_interval_prefix, - ) + f""" +Starting... +Directory: "{args.directory}" +Refresh interval: "{str(args.interval)}s" +Default scrape interval prefix: "{args.default_scrape_interval_prefix}" +Tags to convert to labels: "{args.tags_to_labels}" + """ ) Main( directory=args.directory, - interval=float(args.interval), + interval=args.interval, default_scrape_interval_prefix=args.default_scrape_interval_prefix, + tags_to_labels=args.tags_to_labels, ).loop()