-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Confusing Statuses for Pipelines that Uses NF Tower #503
Comments
Technical refinement:
|
To Work With the Swagger and Stage TowerTo create a bearer (token to be used for testing), go to Newly Started Runs - What is the response?Using: GET - /workflow/{workflowId}/progress On a fresh started run, cheeky_engelbart. With no submitted jobs, no running or pending. Gives:
Let's find out what request is sent to tower from TB. The request is: @handle_client_errors
def get_workflow(self, workflow_id: str) -> TowerWorkflowResponse:
url = f"{self.base_url}workflow/{workflow_id}"
response = requests.get(url=url, headers=self.headers, params=self.request_params)
response.raise_for_status()
json = response.json()
return TowerWorkflowResponse.model_validate(json) This means the that TB uses: The response for that following command would be, for a new analysis: {
"workflow": {
"id": "1ZpuYlcBHTAols",
"submit": "2024-11-14T13:47:08Z",
"start": null,
"complete": null,
"dateCreated": "2024-11-14T13:47:09Z",
"lastUpdated": "2024-11-14T13:47:09Z",
"runName": "sick_becquerel",
"sessionId": "185712be-0fc2-449e-b7ba-8f6407a8d5ff",
"profile": "",
"workDir": "/home/proj/stage/nf-tower-work",
"commitId": null,
"userName": "hiseq-clinical",
"scriptId": null,
"revision": null,
"commandLine": "nextflow run https://github.com/nextflow-io/hello -name sick_becquerel -with-tower https://cg-tower-stage.scilifelab.se/api",
"projectName": "nextflow-io/hello",
"scriptName": null,
"launchId": "1vjucQOVoMogL6Orf70WnC",
"status": "SUBMITTED",
"configFiles": null,
"params": null,
"configText": null,
"manifest": null,
"nextflow": null,
"stats": null,
"errorMessage": null,
"errorReport": null,
"deleted": null,
"peakLoadCpus": null,
"peakLoadTasks": null,
"peakLoadMemory": null,
"projectDir": null,
"homeDir": null,
"container": null,
"repository": null,
"containerEngine": null,
"scriptFile": null,
"launchDir": null,
"duration": null,
"exitStatus": null,
"resume": false,
"success": null,
"logFile": null,
"outFile": null,
"operationId": null,
"ownerId": 9
},
"progress": {
"workflowProgress": {
"cpus": 0,
"cpuTime": 0,
"cpuLoad": 0,
"memoryRss": 0,
"memoryReq": 0,
"readBytes": 0,
"writeBytes": 0,
"volCtxSwitch": 0,
"invCtxSwitch": 0,
"cost": null,
"loadTasks": 0,
"loadCpus": 0,
"loadMemory": 0,
"peakCpus": 0,
"peakTasks": 0,
"peakMemory": 0,
"executors": null,
"dateCreated": "2024-11-14T13:47:16.476000794Z",
"lastUpdated": "2024-11-14T13:47:16.476001336Z",
"submitted": 0,
"succeeded": 0,
"running": 0,
"failed": 0,
"cached": 0,
"pending": 0,
"memoryEfficiency": 0,
"cpuEfficiency": 0
},
"processesProgress": []
},
"platform": {
"id": "slurm-platform",
"name": "Slurm Workload Manager"
},
"jobInfo": {
"id": 3950,
"operationId": null,
"message": null,
"status": "CREATED",
"exitCode": null
},
"orgId": 174400828620502,
"orgName": "clinical_genomics",
"workspaceId": 44622973173606,
"workspaceName": "stage",
"labels": null,
"optimized": null
} @handle_errors
def get_status(self, analysis_id: int) -> TrailblazerStatus:
analysis: Analysis = self.store.get_analysis_with_id(analysis_id)
response = self.client.get_workflow(analysis.tower_workflow_id)
status = TOWER_WORKFLOW_STATUS.get(response.workflow.status, TrailblazerStatus.ERROR)
if status == TrailblazerStatus.COMPLETED:
return TrailblazerStatus.QC
return status I'm very curious to know if However, status |
There are no jobs for a TOWER analysis with the status def update_ongoing_analyses(self) -> None:
analyses: list[Analysis] = self.store.get_ongoing_analyses()
for analysis in analyses:
try:
self.update_analysis_meta_data(analysis.id)
except Exception as error:
self.store.update_analysis_status(analysis.id, TrailblazerStatus.ERROR)
LOG.error(f"Failed to update analysis {analysis.id}: {error}") On can see in the logs that we are getting this error very frequently: def update_analysis_meta_data(self, analysis_id: int) -> None:
"""Update the jobs, progress and status of an analysis."""
self.job_service.update_jobs(analysis_id)
self._update_progress(analysis_id)
self._update_status(analysis_id) #TODO: Issue def _update_status(self, analysis_id: int) -> None:
status: TrailblazerStatus = self.job_service.get_analysis_status(analysis_id)
self.store.update_analysis_status(analysis_id=analysis_id, status=status) def get_analysis_status(self, analysis_id: int) -> TrailblazerStatus:
analysis: Analysis = self.store.get_analysis_with_id(analysis_id)
if analysis.status == TrailblazerStatus.CANCELLED:
return TrailblazerStatus.CANCELLED
if not analysis.jobs:
raise NoJobsError(f"No jobs found for analysis {analysis_id}") #TODO: This is the thing that causes the error
if analysis.workflow_manager == WorkflowManager.TOWER:
return self.tower_service.get_status(analysis_id)
return get_status(analysis.jobs) |
Progress bar will not work - Why?First off, a progress bar will not be implemented. Even in the UI provided by the Seqera platform, a progress bar is not included. While it’s not impossible to create one, it would require predicting the number of tasks needed to complete the job. This is a promising idea, but the main issue is that the required tasks can vary based on the number of samples within a case (and possibly more situations that could affect the number of tasks). For example, tomte requires 48 tasks to complete a job, with one case per sample. However, for RAREDISEASE, we would need to calculate the number of tasks required for each situation, which could vary significantly. If we make any updates, we would also need to update the task count somewhere—an impractical and unreliable solution. An alternative could involve estimating progress based on time, using the average duration of a run. However, this approach would likely be misleading, as run times can vary depending on the specific case. There might be more variants of the examples I have given but I think the fundamental issue still stands, we have a hard time finding a practical and reliable solution that could be used to make a progress bar. Let SLURM be SLURM and TOWER be TOWERThe underlying issue is that we are trying to make TOWER behave like SLURM, which is not feasible. My suggested solution is to rely solely on the statuses provided by TOWER. No fancy calculations to estimate the remaining time—just take the response from TOWER, extract the status, and display it in TB. We can make this happen in a couple of ways: Option 1Add multiple if analysis.workflow_manager == WorkflowManager.TOWER: conditions to ensure the behavior is correct. This means for every function that updates the status of an analysis, we adjust the behavior if the WorkflowManager is TOWER. This solution is time-efficient and straightforward, requiring minimal effort to implement the changes. Option 2Introduce WorkflowManager classes. While this approach takes more time to implement, it offers greater benefits. Functions would no longer alter their behavior based on the WorkflowManager type, making them easier to maintain and troubleshoot. For example, if there’s an issue with SLURM, we wouldn’t need to worry about how a fix might affect TOWER. Additionally, introducing a new WorkflowManager in the future would be more seamless—just add a new class. from abc import ABC, abstractmethod
class WorkflowManagerBase(ABC):
"""Base class for workflow-specific logic."""
@abstractmethod
def update_meta_data(self, analysis_id: int, job_service, store) -> None:
pass
class TowerWorkflowManager(WorkflowManagerBase):
def update_meta_data(self, analysis_id: int, job_service, store) -> None:
job_service.update_jobs(analysis_id)
progress = job_service.get_analysis_progression(analysis_id)
store.update_tower_analysis_progress(analysis_id, progress) #<-----
status = job_service.get_analysis_status(analysis_id)
store.update_analysis_status(analysis_id, status)
class SlurmWorkflowManager(WorkflowManagerBase):
def update_meta_data(self, analysis_id: int, job_service, store) -> None:
job_service.update_jobs(analysis_id)
progress = job_service.get_analysis_progression(analysis_id)
store.update_slurm_analysis_progress(analysis_id, progress) #<-----
status = job_service.get_analysis_status(analysis_id)
store.update_analysis_status(analysis_id, status) |
As a TB user,
I want accurate job status updates for NF Tower pipelines,
So that I can clearly understand the progress of each job
Clarification:
Pipelines using NF Tower (e.g., RNAFUSION, TOMTE, TAXPROFILER) do not operate with SLURM in the same way as other pipelines. Specifically, not all jobs are submitted simultaneously. This causes issues in the ‘Status’ field, as the analysis often appears to be 100% complete, even though NF Tower shows that the analysis is still in progress.
Acceptance Criteria
Running
.Alt Criteria
Notes
Implementation plan
The text was updated successfully, but these errors were encountered: