Skip to content

Commit

Permalink
fix: adds a timeout on job upload; halt job pop on 3 consecutive fail…
Browse files Browse the repository at this point in the history
…ures
  • Loading branch information
tazlin committed Oct 10, 2023
1 parent 7ae0e02 commit a7495d6
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion horde_worker_regen/process_management/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,7 @@ async def api_submit_job(self) -> None:
async with self._completed_jobs_lock:
self.completed_jobs.remove(completed_job_info)
self._consecutive_failed_job_submits = 0
self._consecutive_failed_jobs += 1
return

if completed_job_info.job_result_images_base64 is not None:
Expand Down Expand Up @@ -1404,6 +1405,7 @@ async def api_submit_job(self) -> None:
yarl.URL(job_info.r2_upload, encoded=True),
data=image_in_buffer.getvalue(),
skip_auto_headers=["content-type"],
timeout=aiohttp.ClientTimeout(total=10),
) as response:
if response.status != 200:
logger.error(f"Failed to upload image to R2: {response}")
Expand All @@ -1414,6 +1416,7 @@ async def api_submit_job(self) -> None:
logger.error(
f"Job {job_info.id_} faulted, removing from completed jobs",
)
self._consecutive_failed_jobs += 1

submit_job_request = submit_job_request_type(
apikey=self.bridge_data.api_key,
Expand Down Expand Up @@ -1523,6 +1526,8 @@ async def api_submit_job(self) -> None:
_triggered_max_pending_megapixelsteps = False
"""Whether the number of megapixelsteps in the job deque exceeded the limit."""

_consecutive_failed_jobs = 0

def get_pending_megapixelsteps(self) -> int:
"""Get the number of megapixelsteps that are pending in the job deque."""
job_deque_mps = sum(job.payload.width * job.payload.height * job.payload.ddim_steps for job in self.job_deque)
Expand All @@ -1549,7 +1554,10 @@ async def _get_source_images(self, job_pop_response: ImageGenerateJobPopResponse
if fail_count >= 10:
logger.error(f"Failed to download {field_name} after {fail_count} attempts")
break
response = await self._aiohttp_session.get(field_value)
response = await self._aiohttp_session.get(
field_value,
timeout=aiohttp.ClientTimeout(total=10),
)
response.raise_for_status()
new_response_dict = job_pop_response.model_dump(by_alias=True)

Expand All @@ -1571,6 +1579,13 @@ async def api_job_pop(self) -> None:
if self._shutting_down:
return

if self._consecutive_failed_jobs >= 3:
logger.error(
"Too many consecutive failed jobs, pausing job pops. "
"Please look into what happened and let the devs know.",
)
return

if len(self.job_deque) >= self.bridge_data.queue_size + 1: # FIXME?
return

Expand Down

0 comments on commit a7495d6

Please sign in to comment.