diff --git a/horde_worker_regen/process_management/process_manager.py b/horde_worker_regen/process_management/process_manager.py index 0bd843fc..b0598a17 100644 --- a/horde_worker_regen/process_management/process_manager.py +++ b/horde_worker_regen/process_management/process_manager.py @@ -1376,6 +1376,7 @@ async def api_submit_job(self) -> None: async with self._completed_jobs_lock: self.completed_jobs.remove(completed_job_info) self._consecutive_failed_job_submits = 0 + self._consecutive_failed_jobs += 1 return if completed_job_info.job_result_images_base64 is not None: @@ -1404,6 +1405,7 @@ async def api_submit_job(self) -> None: yarl.URL(job_info.r2_upload, encoded=True), data=image_in_buffer.getvalue(), skip_auto_headers=["content-type"], + timeout=aiohttp.ClientTimeout(total=10), ) as response: if response.status != 200: logger.error(f"Failed to upload image to R2: {response}") @@ -1414,6 +1416,7 @@ async def api_submit_job(self) -> None: logger.error( f"Job {job_info.id_} faulted, removing from completed jobs", ) + self._consecutive_failed_jobs += 1 submit_job_request = submit_job_request_type( apikey=self.bridge_data.api_key, @@ -1523,6 +1526,8 @@ async def api_submit_job(self) -> None: _triggered_max_pending_megapixelsteps = False """Whether the number of megapixelsteps in the job deque exceeded the limit.""" + _consecutive_failed_jobs = 0 + def get_pending_megapixelsteps(self) -> int: """Get the number of megapixelsteps that are pending in the job deque.""" job_deque_mps = sum(job.payload.width * job.payload.height * job.payload.ddim_steps for job in self.job_deque) @@ -1549,7 +1554,10 @@ async def _get_source_images(self, job_pop_response: ImageGenerateJobPopResponse if fail_count >= 10: logger.error(f"Failed to download {field_name} after {fail_count} attempts") break - response = await self._aiohttp_session.get(field_value) + response = await self._aiohttp_session.get( + field_value, + timeout=aiohttp.ClientTimeout(total=10), + ) response.raise_for_status() new_response_dict = job_pop_response.model_dump(by_alias=True) @@ -1571,6 +1579,13 @@ async def api_job_pop(self) -> None: if self._shutting_down: return + if self._consecutive_failed_jobs >= 3: + logger.error( + "Too many consecutive failed jobs, pausing job pops. " + "Please look into what happened and let the devs know.", + ) + return + if len(self.job_deque) >= self.bridge_data.queue_size + 1: # FIXME? return