From 7cc694ed7f67cc8ba9aeb0e303623fc263cee6a0 Mon Sep 17 00:00:00 2001 From: Trevor Bonas Date: Thu, 9 May 2024 11:48:00 -0700 Subject: [PATCH] Improve method to search orgs for buckets --- .../influx-migration/influx_migration.py | 96 ++++++------------- 1 file changed, 30 insertions(+), 66 deletions(-) diff --git a/tools/python/influx-migration/influx_migration.py b/tools/python/influx-migration/influx_migration.py index 1e0c8b84..29585b4c 100644 --- a/tools/python/influx-migration/influx_migration.py +++ b/tools/python/influx-migration/influx_migration.py @@ -92,7 +92,7 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip if full: report_all_bucket_series_count(host=src_host, token=root_token) else: - report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token) + report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org) start_time = time.time() bucket_backup_command = ['influx', 'backup', backup_path, '-t', root_token, @@ -110,15 +110,15 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip duration = time.time() - start_time log_performance_metrics("backup", start_time, duration) -def report_all_bucket_series_count(host, token, org=None): +def report_all_bucket_series_count(host, token, org_name=None): with InfluxDBClient(url=host, token=token) as client: # CSV migration may use an all access token, meaning buckets will be scoped to an organization - if org is not None: - client.org = org + if org_name is not None: + client.org = org_name buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT) for bucket in buckets.buckets: if not bucket.name.startswith("_"): - report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org=org) + report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org_name=org_name) # Handle pagination offset = 0 while buckets.links.next is not None: @@ -127,25 +127,28 @@ def report_all_bucket_series_count(host, token, org=None): offset=offset) for bucket in buckets.buckets: if not bucket.name.startswith("_"): - report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org=org) + report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org_name=org_name) -def report_bucket_series_count(bucket_name, host, token, org=None): +def report_bucket_series_count(bucket_name, host, token, org_name=None): try: with InfluxDBClient(url=host, token=token) as client: - if org is not None: - client.org = org - bucket = client.buckets_api().find_bucket_by_name(bucket_name=bucket_name) - metrics_service = MetricsService(client.api_client) - metrics = metrics_service.get_metrics() - for line in metrics.split("\n"): - if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line: - line = line.split(" ") - if len(line) < 2: - raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are " - "tracked in storage_bucket_series_num but its series count is missing. " - f"Check the {host}/metrics endpoint for more details") - logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series") - return + if org_name is not None: + client.org = org_name + buckets = client.buckets_api().find_buckets(name=bucket_name) \ + if org_name is None else \ + client.buckets_api().find_buckets(name=bucket_name, org=org_name) + for bucket in buckets.buckets: + metrics_service = MetricsService(client.api_client) + metrics = metrics_service.get_metrics() + for line in metrics.split("\n"): + if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line: + line = line.split(" ") + if len(line) < 2: + raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are " + "tracked in storage_bucket_series_num but its series count is missing. " + f"Check the {host}/metrics endpoint for more details") + logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series") + return raise ValueError(f"Bucket series count could not be found in {host}/metrics") except (ApiException, ValueError) as error: logging.error(repr(error)) @@ -251,19 +254,16 @@ def bucket_exists(host, token, bucket_name, org_name=None, skip_verify=False): if org_name is not None: client.org = org_name # Buckets may have the same name in multiple organizations - buckets = client.buckets_api().find_buckets(name=bucket_name) + buckets = client.buckets_api().find_buckets(name=bucket_name) \ + if org_name is None else \ + client.buckets_api().find_buckets(name=bucket_name, org=org_name) if len(buckets.buckets) <= 0: logging.debug(f"Bucket with name {bucket_name} could not be found " f"in host {host}") return False - if org_name is not None: - bucket_found_in_org = False - for bucket in buckets.buckets: - if bucket_exists_in_org(host=host, token=token, bucket=bucket, - org_name=org_name, skip_verify=skip_verify): - bucket_found_in_org = True - return bucket_found_in_org logging.debug(f"Bucket with name {bucket_name} found in host {host}") + if org_name is not None: + logging.debug(f"Bucket found in org with name {org_name} and ID {buckets.buckets[0].org_id}") logging.debug(f"{len(buckets.buckets)} buckets found") return True except InfluxDBError as error: @@ -272,42 +272,6 @@ def bucket_exists(host, token, bucket_name, org_name=None, skip_verify=False): f"a bucket with name {bucket_name} in host {host}") return False -def bucket_exists_in_org(host, token, bucket, org_name, skip_verify=False): - """ - Checks for the existence of a bucket in an org. - - :param str host: The host for the InfluxDB instance. - :param str token: The token to use for verification. - :param influxdb_client.Bucket bucket: The bucket to verify. - :param org_name: The name of the org to use for bucket verification. - :type org_name: str or None - :param bool skip_verify: Whether to skip TLS certificate verification. - :returns: Whether the bucket exists in the instance in the specified org. - :rtype: bool - """ - try: - with InfluxDBClient(url=host, token=token, timeout=MILLISECOND_TIMEOUT, verify_ssl=not skip_verify, org=org_name) as client: - org_list = client.organizations_api().find_organizations(org=org_name) - if len(org_list) <= 0: - logging.debug(f"Org {org_name} could not be found when checking whether bucket {bucket.name} exists") - return False - else: - logging.debug(f"{len(org_list)} orgs matched with name {org_name}") - for org in org_list: - if org.id == bucket.org_id: - logging.debug(f"Bucket with name {bucket.name} found in org " - f"{org.name} with org ID {org.id} in host {host}") - return True - # Bucket could not be found in any org with matching org_name - logging.debug(f"Bucket with name {bucket.name} could not be found " - f"in any org with name {org_name} in host {host}") - return False - except InfluxDBError as error: - logging.error(repr(error)) - logging.debug("An unexpected error occurred while checking the existence " - f"a bucket with name {bucket.name} in the org with name {org_name} in host {host}") - return False - def cleanup(mount_point=None, exec_s3_bucket_mount=None): """ Coordinates unmounting S3 bucket and deleting temporary mount directory. Called at exit. @@ -1206,7 +1170,7 @@ def main(args): report_all_bucket_series_count(host=args.dest_host, token=src_token) else: report_bucket_series_count(bucket_name=args.dest_bucket, host=args.dest_host, - token=dest_token, org=args.dest_org) + token=dest_token, org_name=args.dest_org) logging.info("Migration complete") log_performance_metrics("influx_migration.py", script_start_time, script_duration)