diff --git a/explorer/tasks.py b/explorer/tasks.py index a1eb6289..cceddcbb 100644 --- a/explorer/tasks.py +++ b/explorer/tasks.py @@ -18,28 +18,28 @@ from celery.utils.log import get_task_logger from explorer.utils import s3_upload + logger = get_task_logger(__name__) else: import logging from explorer.utils import noop_decorator as shared_task + logger = logging.getLogger(__name__) @shared_task def execute_query(query_id, email_address): q = Query.objects.get(pk=query_id) - send_mail("[SQL Explorer] Your query is running...", - f"{q.title} is running and should be in your inbox soon!", - app_settings.FROM_EMAIL, - [email_address]) + send_mail( + "[SQL Explorer] Your query is running...", + f"{q.title} is running and should be in your inbox soon!", + app_settings.FROM_EMAIL, + [email_address], + ) exporter = get_exporter_class("csv")(q) - random_part = "".join( - random.choice( - string.ascii_uppercase + string.digits - ) for _ in range(20) - ) + random_part = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(20)) try: url = s3_upload(f"{random_part}.csv", convert_csv_to_bytesio(exporter)) subj = f'[SQL Explorer] Report "{q.title}" is ready' @@ -66,19 +66,12 @@ def snapshot_query(query_id): logger.info(f"Starting snapshot for query {query_id}...") q = Query.objects.get(pk=query_id) exporter = get_exporter_class("csv")(q) - k = "query-{}/snap-{}.csv".format( - q.id, - date.today().strftime("%Y%m%d-%H:%M:%S") - ) + k = "query-{}/snap-{}.csv".format(q.id, date.today().strftime("%Y%m%d-%H:%M:%S")) logger.info(f"Uploading snapshot for query {query_id} as {k}...") url = s3_upload(k, convert_csv_to_bytesio(exporter)) - logger.info( - f"Done uploading snapshot for query {query_id}. URL: {url}" - ) + logger.info(f"Done uploading snapshot for query {query_id}. URL: {url}") except Exception as e: - logger.warning( - f"Failed to snapshot query {query_id} ({e}). Retrying..." - ) + logger.warning(f"Failed to snapshot query {query_id} ({e}). Retrying...") snapshot_query.retry() @@ -86,9 +79,7 @@ def snapshot_query(query_id): def snapshot_queries(): logger.info("Starting query snapshots...") qs = Query.objects.filter(snapshot=True).values_list("id", flat=True) - logger.info( - f"Found {len(qs)} queries to snapshot. Creating snapshot tasks..." - ) + logger.info(f"Found {len(qs)} queries to snapshot. Creating snapshot tasks...") for qid in qs: snapshot_query.delay(qid) logger.info("Done creating tasks.") @@ -96,12 +87,8 @@ def snapshot_queries(): @shared_task def truncate_querylogs(days): - qs = QueryLog.objects.filter( - run_at__lt=datetime.now() - timedelta(days=days) - ) - logger.info( - f"Deleting {qs.count} QueryLog objects older than {days} days." - ) + qs = QueryLog.objects.filter(run_at__lt=datetime.now() - timedelta(days=days)) + logger.info(f"Deleting {qs.count()} QueryLog objects older than {days} days.") qs.delete() logger.info("Done deleting QueryLog objects.") @@ -109,19 +96,21 @@ def truncate_querylogs(days): @shared_task def build_schema_cache_async(connection_alias): from .schema import ( - build_schema_info, connection_schema_cache_key, connection_schema_json_cache_key, transform_to_json_schema, + build_schema_info, + connection_schema_cache_key, + connection_schema_json_cache_key, + transform_to_json_schema, ) + ret = build_schema_info(connection_alias) cache.set(connection_schema_cache_key(connection_alias), ret) - cache.set(connection_schema_json_cache_key(connection_alias), - transform_to_json_schema(ret)) + cache.set(connection_schema_json_cache_key(connection_alias), transform_to_json_schema(ret)) return ret @shared_task def remove_unused_sqlite_dbs(): - uploaded_dbs = DatabaseConnection.objects.filter(engine=DatabaseConnection.SQLITE, - host__isnull=False) + uploaded_dbs = DatabaseConnection.objects.filter(engine=DatabaseConnection.SQLITE, host__isnull=False) for db in uploaded_dbs: if os.path.exists(db.local_name): recent_run = QueryLog.objects.filter(connection=db.alias).first()