From a7495d6d20a495d26fc027e7a8e5c6f25337c3be Mon Sep 17 00:00:00 2001 From: tazlin Date: Tue, 10 Oct 2023 18:25:16 -0400 Subject: [PATCH 1/3] fix: adds a timeout on job upload; halt job pop on 3 consecutive failures --- .../process_management/process_manager.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/horde_worker_regen/process_management/process_manager.py b/horde_worker_regen/process_management/process_manager.py index 0bd843fc..b0598a17 100644 --- a/horde_worker_regen/process_management/process_manager.py +++ b/horde_worker_regen/process_management/process_manager.py @@ -1376,6 +1376,7 @@ async def api_submit_job(self) -> None: async with self._completed_jobs_lock: self.completed_jobs.remove(completed_job_info) self._consecutive_failed_job_submits = 0 + self._consecutive_failed_jobs += 1 return if completed_job_info.job_result_images_base64 is not None: @@ -1404,6 +1405,7 @@ async def api_submit_job(self) -> None: yarl.URL(job_info.r2_upload, encoded=True), data=image_in_buffer.getvalue(), skip_auto_headers=["content-type"], + timeout=aiohttp.ClientTimeout(total=10), ) as response: if response.status != 200: logger.error(f"Failed to upload image to R2: {response}") @@ -1414,6 +1416,7 @@ async def api_submit_job(self) -> None: logger.error( f"Job {job_info.id_} faulted, removing from completed jobs", ) + self._consecutive_failed_jobs += 1 submit_job_request = submit_job_request_type( apikey=self.bridge_data.api_key, @@ -1523,6 +1526,8 @@ async def api_submit_job(self) -> None: _triggered_max_pending_megapixelsteps = False """Whether the number of megapixelsteps in the job deque exceeded the limit.""" + _consecutive_failed_jobs = 0 + def get_pending_megapixelsteps(self) -> int: """Get the number of megapixelsteps that are pending in the job deque.""" job_deque_mps = sum(job.payload.width * job.payload.height * job.payload.ddim_steps for job in self.job_deque) @@ -1549,7 +1554,10 @@ async def _get_source_images(self, job_pop_response: ImageGenerateJobPopResponse if fail_count >= 10: logger.error(f"Failed to download {field_name} after {fail_count} attempts") break - response = await self._aiohttp_session.get(field_value) + response = await self._aiohttp_session.get( + field_value, + timeout=aiohttp.ClientTimeout(total=10), + ) response.raise_for_status() new_response_dict = job_pop_response.model_dump(by_alias=True) @@ -1571,6 +1579,13 @@ async def api_job_pop(self) -> None: if self._shutting_down: return + if self._consecutive_failed_jobs >= 3: + logger.error( + "Too many consecutive failed jobs, pausing job pops. " + "Please look into what happened and let the devs know.", + ) + return + if len(self.job_deque) >= self.bridge_data.queue_size + 1: # FIXME? return From 8ccf834d1ee48657b4b4d30301e24d3b5c5cfc63 Mon Sep 17 00:00:00 2001 From: tazlin Date: Fri, 20 Oct 2023 22:08:40 -0400 Subject: [PATCH 2/3] fix: don't flail repeatedly on low worker speed API response --- horde_worker_regen/process_management/process_manager.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/horde_worker_regen/process_management/process_manager.py b/horde_worker_regen/process_management/process_manager.py index b0598a17..2b8381f6 100644 --- a/horde_worker_regen/process_management/process_manager.py +++ b/horde_worker_regen/process_management/process_manager.py @@ -1439,7 +1439,6 @@ async def api_submit_job(self) -> None: async with self._completed_jobs_lock: self.completed_jobs.remove(completed_job_info) self._consecutive_failed_job_submits = 0 - return if "already submitted" in job_submit_response.message: @@ -1447,7 +1446,13 @@ async def api_submit_job(self) -> None: async with self._completed_jobs_lock: self.completed_jobs.remove(completed_job_info) self._consecutive_failed_job_submits = 0 + return + if "Please check your worker speed" in job_submit_response.message: + logger.error(job_submit_response.message) + async with self._completed_jobs_lock: + self.completed_jobs.remove(completed_job_info) + self._consecutive_failed_job_submits = 0 return error_string = "Failed to submit job (API Error)" From 50e84dc31d4bde6c46b9bc34ec47b5293bfc6d4b Mon Sep 17 00:00:00 2001 From: tazlin Date: Fri, 20 Oct 2023 22:24:06 -0400 Subject: [PATCH 3/3] fix: give more time until jobpop for high MPS job queues --- horde_worker_regen/process_management/process_manager.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/horde_worker_regen/process_management/process_manager.py b/horde_worker_regen/process_management/process_manager.py index 2b8381f6..90085d2b 100644 --- a/horde_worker_regen/process_management/process_manager.py +++ b/horde_worker_regen/process_management/process_manager.py @@ -1623,9 +1623,12 @@ async def api_job_pop(self) -> None: # Assuming a megapixelstep takes 0.75 seconds, if 2/3 of the time has passed since the limit was triggered, # we can assume that the pending megapixelsteps will be below the limit soon. Otherwise we continue to wait - if not (time.time() - self._triggered_max_pending_megapixelsteps_time) > ( - (self._max_pending_megapixelsteps * 0.75) * (2 / 3) - ): + seconds_to_wait = (self._max_pending_megapixelsteps * 0.75) * (2 / 3) + + if self.get_pending_megapixelsteps() > 200: + seconds_to_wait = self._max_pending_megapixelsteps * 0.75 + + if not (time.time() - self._triggered_max_pending_megapixelsteps_time) > seconds_to_wait: return self._triggered_max_pending_megapixelsteps = False