Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ refactor: CLIN-3267 Code review suggestion #408

Open
wants to merge 2 commits into
base: feat/CLIN-3267-align-nextflow-operator-with-cqdg-k8s-operators-structure
Choose a base branch
from

Conversation

laurabegin
Copy link
Member

No description provided.

…ture

This code works locally with minikube for test_nextflow_operator. It is not tested on etl.py.

It reuse a configuration mechanism similar as in cqdg
Key modifications in comparison to cqdg:
- service_account_name specified in BaseConfig instead KubeConfig
- shallow copy in build_operator and partial methods in BaseConfig
- BaseConfig.args renamed to BaseConfig.append_args
@@ -82,14 +82,14 @@ def __init__(
)
self.image_pull_secrets_name = image_pull_secrets_name

def execute(self, context: Context, **kwargs):
def execute(self, context: Context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ça c'est bizarre ...Je pense que ça peut peut-être créer des bugs non?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non pas à ce que je sache. C'est bien la signature de la fonction dans la classe parente.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dans le KubernetesPodOperator. Le BaseKubernetesPodOperator c'est notre class qui extend KubernetesPodOperator. J'ai changé le BaseKubernetesPodOperator dans cette PR pour qu'il s'aligne sur la signature dans KubernetesPodOperator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok! Très bien alors. Merci!

# Prepare nextflow arguments
nextflow_revision_option = ['-r', self.nextflow_pipeline_revision] if self.nextflow_pipeline_revision else []
nextflow_config_file_options = [arg for file in self.nextflow_config_files for arg in ['-c', file] if file]
nextflow_params_file_options = [arg for file in self.nextflow_params_files for arg in ['-params-file', file] if file]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Je suis pas sûre que nextflow supporte plusieurs params file ... Mais bon à ce point c'est un détail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oupsi, corrigé


self.arguments = ['nextflow', 'run', self.nextflow_pipeline, *nextflow_revision_option,
*nextflow_config_file_options, *nextflow_params_file_options, *arguments]

Copy link
Contributor

@LysianeBouchard LysianeBouchard Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personnellement, je ne recommande pas de stéréotyper le comportement ici. Je pense que ça rend le composant moins flexible et que ça pourrait poser problème. Je pense que c'est plus safe de juste donner des méthodes utilitaires sur le côté pour construire les arguments.
Mais bon, si tu y tiens, je suis prête à te le concéder.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ça va éviter de dupliquer du code. C'est ce qui a été fait par Jérémy pour l'opérateur Spark dans CQDG parce qu'on sait qu'on va toujours rouler la même chose... https://github.com/Ferlab-Ste-Justine/cqdg-dags/blob/1528d23cc4cd32c3982bca94de34e615b4b79b33/dags/lib/operators/spark.py#L101
De cette façon, pas besoin de copier les mêmes arguments entre toutes nos tasks Nextflow et on peut laisser la logique de création des arguments dans l'opérateur car ce sera utilisé dans 100% de nos utilisations de l'opérateur. Donc sert à rien de découpler ces logiques.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Je comprends le point sur la duplication de code. Je crois qu'une méthode utilitaire sur le côté est tout aussi valable pour éviter la duplication de code que de mettre de la logique dans une classe.

Mon petit doigt me dit que nextflow est un monde qu'on connaît encore mal. Aussi, nextflow est un outil un peu immature. Des fois, il y a des bugs et il faut les contourner ... Je serais pas du tout surpris qu'une commande forcée soit trop rigide pour un cas d'utilisation.

Mais bon, c'est plus comme une petite préférence. C'est pas la fin du monde pour moi de faire un compromis et de stéréotyper la commande comme tu le suggère ici. Une autre alternative, peut-être, serait de mettre la logique qui ajoute les arguments dans une fonction dans NextflowBaseConfig. Ça pourrait être une solution un peu entre les deux ... Peut-être aussi que de bouger la logique dans le constructeur pourrait donner plus de flexibilité parce qu'à ce moment là, c'est possible de juste overwriter l'attribut arguments par la suite.

Tu me diras ce que tu en penses.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ça serait vraiment idéal de le faire dans NextflowBaseConfig mais le souci c'est que vu qu'on a des arguments qui sont templated, il faut attendre qu'ils soient rendered pour pouvoir les parser... C'est justement pour ça que c'est fait dans le execute() et pas dans le init() dans l'opérateur. Le seul contournement que je connaisse est de render d'avance les arguments en faisant des tasks comme ça a été fait ici par exemple :

@task(task_id='get_batch_ids')
def get_batch_ids(ti=None) -> List[str]:
dag_run: DagRun = ti.dag_run
ids = dag_run.conf['batch_ids'] if dag_run.conf['batch_ids'] is not None else []
return list(set(ids))

Si on croit que le seul DAG qui aura besoin d'avoir des arguments qui se render est le DAG test_nextflow_operator, alors oui on pourrait mettre ça dans NextflowBaseConfig. La raison pourquoi test_nextflow_operator doit render ses args est parce qu'ils sont passés par paramètre quand on lance le DAG. Et à bien y penser, je ne crois pas qu'on va mettre des args Nextflow en paramètres pour d'autres DAGs... Bref, on pourrait faire une task dans test_nextflow_operator qui s'occupe de render les arguments pour aider l'utilisateur OU on remet test_nextflow_operator comme avant pour que les arguments soient passés au complet via Airflow. Pis pour tous les autres DAGs, ils sont préparés dans le NextflowBaseConfig. Tu en penses quoi?

Copy link
Contributor

@LysianeBouchard LysianeBouchard Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, je pense comprendre ce que tu veux dire. Le problème vient du contenu de arguments, qui contient une liste d'arguments supplémentaires à passer. Lorsqu'il est encore templated, c'est juste une variable de type string, ce qui empêche la concaténation avec les autres arguments nextflow pour préparer la commande.

Je suis ouverte à différentes solutions. Je pense qu'un petit hack dans test_nextflow_operator pourrait être acceptable. Si ça fonctionne, je suis d'accord pour cette approche.

Sinon, pour plus de flexibilité, on pourrait ajouter un attribut dédié (par exemple, nextflow_options) et l'ajouter dynamiquement dans execute. Le hic, c'est que ça peut rendre le design un peu confus, avec la logique de construction de la commande éparpillée. Peut-être que ce serait moins problématique d'exploiter le constructeur du NextflowOperator au lieu du NextflowBaseConfig avec une approche comme ça, afin de garder la logique dans le même composant. C'est juste une intuition ; parfois c'est difficile d'anticiper ce genre de choses à l'avance.

Comme je l'ai dit, je suis aussi ouverte à garder la logique dans execute, comme tu l'avais fait initialement. Les autres pistes que j'ai proposées sont des compromis potentiels, mais si elles ne sont pas fonctionnelles ou élégantes, nous pouvons revenir à ta solution. C'est peut-être un peu moins flexible, mais ça a une simplicité et ça c'est important aussi.

name="test_nextflow_operator",
on_execute_callback=Slack.notify_dag_start,
on_success_callback=Slack.notify_dag_completion,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merci d'avoir pris le temps de le ré-écrire de façon cohérente avec ton changement. Ce n'était pas nécessaire.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C'était pour tester que tout fonctionne

arguments = [arg for arg in self.arguments if arg] if self.arguments else [] # Remove empty strings

self.arguments = ['nextflow', 'run', self.nextflow_pipeline, *nextflow_revision_option,
*nextflow_config_file_options, *nextflow_params_file_options, *arguments]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pour notre version de nextflow, les fichiers de config doivent être passés avant le mot-clé run. Sinon, pas garanti que ça fonctionne toujours. Certains features vont marcher quand même, d'autres non.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

En tout cas, un détail encore une fois.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrigé

@laurabegin laurabegin force-pushed the feat/CLIN-3267-refactoring-suggestion branch from a26c26f to c4492b9 Compare October 11, 2024 20:35
@LysianeBouchard LysianeBouchard force-pushed the feat/CLIN-3267-align-nextflow-operator-with-cqdg-k8s-operators-structure branch from e01e6cc to d9d5a34 Compare October 15, 2024 17:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants