diff --git a/fixbackend/collect/collect_queue.py b/fixbackend/collect/collect_queue.py index da891438..833a35fd 100644 --- a/fixbackend/collect/collect_queue.py +++ b/fixbackend/collect/collect_queue.py @@ -199,7 +199,7 @@ async def enqueue_post_collect( job = await self.arq.enqueue_job("post-collect", collect_job, _job_id=job_id, _defer_by=defer_by) if job is None: raise JobAlreadyEnqueued(f"Failed to enqueue collect job {job_id}") - log.info(f"Enqueuing collect job {job.job_id} for tenant={db.workspace_id}") + log.info(f"Enqueuing post-collect job {job.job_id} for tenant={db.workspace_id}") if wait_until_done: # this will either return none or throw an exception (reraised from the worker) log.debug("Waiting for collect job to finish.") diff --git a/fixbackend/dispatcher/dispatcher_service.py b/fixbackend/dispatcher/dispatcher_service.py index 8fe4f200..7d312126 100644 --- a/fixbackend/dispatcher/dispatcher_service.py +++ b/fixbackend/dispatcher/dispatcher_service.py @@ -470,7 +470,12 @@ async def process_aws_account_configured(self, event: CloudAccountConfigured) -> # The first time we collect this account with this role. # Defer the collect process and retry in case of failure. # This is required, since AWS needs some time to propagate the role into all regions. - await self.trigger_collect(account, defer_by=timedelta(minutes=3), retry_failed_for=timedelta(minutes=15)) + await self.trigger_collect( + account, + reason="aws_account_configured", + defer_by=timedelta(minutes=3), + retry_failed_for=timedelta(minutes=15), + ) else: log.error( f"Received cloud account {cloud_account_id} configured message, but it does not exist in the database" @@ -480,6 +485,7 @@ async def trigger_collect( self, account: CloudAccount, *, + reason: str, defer_by: Optional[timedelta] = None, retry_failed_for: Optional[timedelta] = None, **kwargs: Any, @@ -487,7 +493,9 @@ async def trigger_collect( set_cloud_account_id(account.account_id) set_fix_cloud_account_id(account.id) if await self.collect_progress.account_collection_ongoing(account.workspace_id, account.id): - log.info(f"Collect for tenant: {account.workspace_id} and account: {account.id} is already in progress.") + log.info( + f"Collect for tenant: {account.workspace_id} and account: {account.id} (reason: {reason}) is already in progress." # noqa: E501 + ) return async def account_information() -> Optional[AccountInformation]: @@ -534,7 +542,7 @@ async def account_information() -> Optional[AccountInformation]: ): job_id = uuid.uuid4() log.info( - f"Trigger collect for tenant: {account.workspace_id} and account: {account.id} with job_id: {job_id}" + f"Trigger collect for tenant: {account.workspace_id} and account: {account.id} with job_id: {job_id}. Reason: {reason}" ) await self.collect_progress.track_account_collection_progress( account.workspace_id, account.id, ai, job_id, utc() @@ -564,11 +572,16 @@ async def schedule_next_runs(self) -> None: product_tier = await self.workspace_repository.get_product_tier(workspace_id) log.info(f"scheduling next run for workspace {workspace_id}, {len(accounts)} accounts") for account in accounts: + reason = "regular_collect" + + if isinstance(account.state, CloudAccountStates.Degraded): + reason = "degraded_account_ping" + if account.cloud == CloudNames.Azure and not azure_graph_scheduled: azure_graph_scheduled = True - await self.trigger_collect(account, collect_microsoft_graph=True) + await self.trigger_collect(account, reason=reason, collect_microsoft_graph=True) else: - await self.trigger_collect(account) + await self.trigger_collect(account, reason=reason) for account in degraded_accounts: await self.cloud_account_repo.update( @@ -580,4 +593,4 @@ async def schedule_next_runs(self) -> None: failed_accounts = await self.cloud_account_repo.list_non_hourly_failed_scans_accounts(now) for account in failed_accounts: - await self.trigger_collect(account) + await self.trigger_collect(account, reason="failed_account_scan")