Skip to content

Commit

Permalink
Improve method to search orgs for buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorbonas committed May 9, 2024
1 parent ec0bd2a commit 7cc694e
Showing 1 changed file with 30 additions and 66 deletions.
96 changes: 30 additions & 66 deletions tools/python/influx-migration/influx_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
if full:
report_all_bucket_series_count(host=src_host, token=root_token)
else:
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token)
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org)
start_time = time.time()

bucket_backup_command = ['influx', 'backup', backup_path, '-t', root_token,
Expand All @@ -110,15 +110,15 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
duration = time.time() - start_time
log_performance_metrics("backup", start_time, duration)

def report_all_bucket_series_count(host, token, org=None):
def report_all_bucket_series_count(host, token, org_name=None):
with InfluxDBClient(url=host, token=token) as client:
# CSV migration may use an all access token, meaning buckets will be scoped to an organization
if org is not None:
client.org = org
if org_name is not None:
client.org = org_name
buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT)
for bucket in buckets.buckets:
if not bucket.name.startswith("_"):
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org=org)
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org_name=org_name)
# Handle pagination
offset = 0
while buckets.links.next is not None:
Expand All @@ -127,25 +127,28 @@ def report_all_bucket_series_count(host, token, org=None):
offset=offset)
for bucket in buckets.buckets:
if not bucket.name.startswith("_"):
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org=org)
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org_name=org_name)

def report_bucket_series_count(bucket_name, host, token, org=None):
def report_bucket_series_count(bucket_name, host, token, org_name=None):
try:
with InfluxDBClient(url=host, token=token) as client:
if org is not None:
client.org = org
bucket = client.buckets_api().find_bucket_by_name(bucket_name=bucket_name)
metrics_service = MetricsService(client.api_client)
metrics = metrics_service.get_metrics()
for line in metrics.split("\n"):
if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line:
line = line.split(" ")
if len(line) < 2:
raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are "
"tracked in storage_bucket_series_num but its series count is missing. "
f"Check the {host}/metrics endpoint for more details")
logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series")
return
if org_name is not None:
client.org = org_name
buckets = client.buckets_api().find_buckets(name=bucket_name) \
if org_name is None else \
client.buckets_api().find_buckets(name=bucket_name, org=org_name)
for bucket in buckets.buckets:
metrics_service = MetricsService(client.api_client)
metrics = metrics_service.get_metrics()
for line in metrics.split("\n"):
if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line:
line = line.split(" ")
if len(line) < 2:
raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are "
"tracked in storage_bucket_series_num but its series count is missing. "
f"Check the {host}/metrics endpoint for more details")
logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series")
return
raise ValueError(f"Bucket series count could not be found in {host}/metrics")
except (ApiException, ValueError) as error:
logging.error(repr(error))
Expand Down Expand Up @@ -251,19 +254,16 @@ def bucket_exists(host, token, bucket_name, org_name=None, skip_verify=False):
if org_name is not None:
client.org = org_name
# Buckets may have the same name in multiple organizations
buckets = client.buckets_api().find_buckets(name=bucket_name)
buckets = client.buckets_api().find_buckets(name=bucket_name) \
if org_name is None else \
client.buckets_api().find_buckets(name=bucket_name, org=org_name)
if len(buckets.buckets) <= 0:
logging.debug(f"Bucket with name {bucket_name} could not be found "
f"in host {host}")
return False
if org_name is not None:
bucket_found_in_org = False
for bucket in buckets.buckets:
if bucket_exists_in_org(host=host, token=token, bucket=bucket,
org_name=org_name, skip_verify=skip_verify):
bucket_found_in_org = True
return bucket_found_in_org
logging.debug(f"Bucket with name {bucket_name} found in host {host}")
if org_name is not None:
logging.debug(f"Bucket found in org with name {org_name} and ID {buckets.buckets[0].org_id}")
logging.debug(f"{len(buckets.buckets)} buckets found")
return True
except InfluxDBError as error:
Expand All @@ -272,42 +272,6 @@ def bucket_exists(host, token, bucket_name, org_name=None, skip_verify=False):
f"a bucket with name {bucket_name} in host {host}")
return False

def bucket_exists_in_org(host, token, bucket, org_name, skip_verify=False):
"""
Checks for the existence of a bucket in an org.
:param str host: The host for the InfluxDB instance.
:param str token: The token to use for verification.
:param influxdb_client.Bucket bucket: The bucket to verify.
:param org_name: The name of the org to use for bucket verification.
:type org_name: str or None
:param bool skip_verify: Whether to skip TLS certificate verification.
:returns: Whether the bucket exists in the instance in the specified org.
:rtype: bool
"""
try:
with InfluxDBClient(url=host, token=token, timeout=MILLISECOND_TIMEOUT, verify_ssl=not skip_verify, org=org_name) as client:
org_list = client.organizations_api().find_organizations(org=org_name)
if len(org_list) <= 0:
logging.debug(f"Org {org_name} could not be found when checking whether bucket {bucket.name} exists")
return False
else:
logging.debug(f"{len(org_list)} orgs matched with name {org_name}")
for org in org_list:
if org.id == bucket.org_id:
logging.debug(f"Bucket with name {bucket.name} found in org "
f"{org.name} with org ID {org.id} in host {host}")
return True
# Bucket could not be found in any org with matching org_name
logging.debug(f"Bucket with name {bucket.name} could not be found "
f"in any org with name {org_name} in host {host}")
return False
except InfluxDBError as error:
logging.error(repr(error))
logging.debug("An unexpected error occurred while checking the existence "
f"a bucket with name {bucket.name} in the org with name {org_name} in host {host}")
return False

def cleanup(mount_point=None, exec_s3_bucket_mount=None):
"""
Coordinates unmounting S3 bucket and deleting temporary mount directory. Called at exit.
Expand Down Expand Up @@ -1206,7 +1170,7 @@ def main(args):
report_all_bucket_series_count(host=args.dest_host, token=src_token)
else:
report_bucket_series_count(bucket_name=args.dest_bucket, host=args.dest_host,
token=dest_token, org=args.dest_org)
token=dest_token, org_name=args.dest_org)

logging.info("Migration complete")
log_performance_metrics("influx_migration.py", script_start_time, script_duration)
Expand Down

0 comments on commit 7cc694e

Please sign in to comment.