Skip to content

Commit

Permalink
Support the conversion of tags to labels
Browse files Browse the repository at this point in the history
  • Loading branch information
Limess committed Apr 27, 2021
1 parent 6e81c08 commit 208dcce
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 18 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ format:

.PHONY: dev-start
dev-start:
rm -rf ./targets
@mkdir -p ./targets
poetry run python discoverecs.py --directory $$PWD/targets --default-scrape-interval-prefix default
poetry run python discoverecs.py --directory $$PWD/targets --default-scrape-interval-prefix default --tags-to-labels "*"
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ e.g. if `default` is used:

then `/opt/prometheus-ecs/default-tasks.json` will be written. This can be useful to allow configuration of a default scrape interval in your Prometheus config, rather than needing to update the config and then redeploying this discovery service.

### Tags to labels

If `--tags-to-labels` is set, the given tags will be added to the service discovery entry as `tag_<tag>` where `<tag>` is the given tag formatted to allowed label characters if the tag exists on either the task definition or task. Task tags override the task definition tags.

If `---tags-to-labels` is set to `*` then _all_ non-aws tags will be added.

### Configuration yaml

The following Prometheus configuration should be used to support all available intervals:
Expand Down
92 changes: 75 additions & 17 deletions discoverecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ def valid(self):


class TaskInfoDiscoverer:
def __init__(self):
def __init__(self, fetch_tags=True):
self.ec2_client = boto3.client("ec2")
self.ecs_client = boto3.client("ecs")
self.task_cache = FlipCache()
self.task_definition_cache = FlipCache()
self.container_instance_cache = FlipCache()
self.ec2_instance_cache = FlipCache()
self.fetch_tags = fetch_tags

def flip_caches(self):
self.task_cache.flip()
Expand All @@ -131,14 +132,27 @@ def flip_caches(self):

def describe_tasks(self, cluster_arn, task_arns):
def fetcher_task_definition(arn):
return self.ecs_client.describe_task_definition(taskDefinition=arn)[
"taskDefinition"
]
response = self.ecs_client.describe_task_definition(
taskDefinition=arn,
include=[
"TAGS",
]
if self.fetch_tags
else None,
)

return {**response["taskDefinition"], "tags": response.get("tags", [])}

def fetcher(fetch_task_arns):
tasks = {}
result = self.ecs_client.describe_tasks(
cluster=cluster_arn, tasks=fetch_task_arns
cluster=cluster_arn,
tasks=fetch_task_arns,
include=[
"TAGS",
]
if self.fetch_tags
else None,
)
if "tasks" in result:
for task in result["tasks"]:
Expand Down Expand Up @@ -304,6 +318,7 @@ def __init__(
ecs_container_id,
ecs_cluster_name,
ec2_instance_id,
tags,
):
self.ip = ip
self.port = port
Expand All @@ -315,6 +330,7 @@ def __init__(
self.ecs_container_id = ecs_container_id
self.ecs_cluster_name = ecs_cluster_name
self.ec2_instance_id = ec2_instance_id
self.tags = tags


def get_environment_var(environment, name):
Expand Down Expand Up @@ -380,9 +396,16 @@ def task_info_to_targets(task_info):
lambda container: container["name"] == container_definition["name"],
task["containers"],
)

if not prometheus_enabled:
continue

# get tags from the task definition, and merge/override any tags specifically set on the task
tags = {
**{tag["key"]: tag["value"] for tag in task_definition.get("tags", [])},
**{tag["key"]: tag["value"] for tag in task.get("tags", [])},
}

for container in running_containers:
ecs_task_name = extract_name_from_arn(task["taskDefinitionArn"])
has_host_port_mapping = (
Expand Down Expand Up @@ -451,17 +474,21 @@ def task_info_to_targets(task_info):
ecs_container_id=ecs_container_id,
ecs_cluster_name=ecs_cluster_name,
ec2_instance_id=ec2_instance_id,
tags=tags,
)
]
return targets


class Main:
def __init__(self, directory, interval, default_scrape_interval_prefix):
def __init__(
self, directory, interval, default_scrape_interval_prefix, tags_to_labels
):
self.directory = directory
self.interval = interval
self.default_scrape_interval_prefix = default_scrape_interval_prefix
self.discoverer = TaskInfoDiscoverer()
self.discoverer = TaskInfoDiscoverer(fetch_tags=len(tags_to_labels) > 0)
self.tags_to_labels = tags_to_labels

def write_jobs(self, jobs):
for prefix, j in jobs.items():
Expand Down Expand Up @@ -510,10 +537,20 @@ def discover_tasks(self):
"metrics_path": path,
},
}
for tag_name, tag_value in target.tags.items():
if not tag_name.lower().startswith("aws:") and (
not tag_name in self.tags_to_labels
or self.tags_to_labels == ["*"]
):
# prometheus labels match [a-zA-Z_][a-zA-Z0-9_]*
# with leading __ reserved for internal use
tag_name = re.sub(r"[^a-zA-Z0-9_]", "_", tag_name).lstrip("_")
if tag_name != "" and not re.match(r"^[0-9]", tag_name):
job["labels"]["tag_" + tag_name] = tag_value
if labels:
job["labels"].update(labels)
jobs[interval or self.default_scrape_interval_prefix].append(job)
log(job)
log("Discovered Job: " + str(job))
self.write_jobs(jobs)

def loop(self):
Expand All @@ -524,21 +561,42 @@ def loop(self):

def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("--directory", required=True)
arg_parser.add_argument("--interval", default=60)
arg_parser.add_argument("--default-scrape-interval-prefix", default="1m")
arg_parser.add_argument(
"--directory",
required=True,
help="The output directory for service discovery configs.",
)
arg_parser.add_argument(
"--interval",
type=float,
default=60,
help="The interval to refresh targetes from AWS APIs.",
)
arg_parser.add_argument(
"--default-scrape-interval-prefix",
default="1m",
help="The default prefix to write the service discovery file for if no explicit prefix is specified in the discovered service.",
)
arg_parser.add_argument(
"--tags-to-labels",
nargs="+",
help="Task definition tags to convert to labels. Case sensitive.",
)
args = arg_parser.parse_args()
log(
'Starting...\nDirectory: "{}"\nRefresh interval: "{}s"\nDefault scrape interval prefix: "{}"\n'.format(
args.directory,
str(args.interval),
args.default_scrape_interval_prefix,
)
f"""
Starting...
Directory: "{args.directory}"
Refresh interval: "{str(args.interval)}s"
Default scrape interval prefix: "{args.default_scrape_interval_prefix}"
Tags to convert to labels: "{args.tags_to_labels}"
"""
)
Main(
directory=args.directory,
interval=float(args.interval),
interval=args.interval,
default_scrape_interval_prefix=args.default_scrape_interval_prefix,
tags_to_labels=args.tags_to_labels,
).loop()


Expand Down

0 comments on commit 208dcce

Please sign in to comment.