Skip to content

Commit

Permalink
support airflow dag restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoWuPeloton committed Nov 7, 2023
1 parent db35e83 commit cd25bdf
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
2 changes: 1 addition & 1 deletion dagfactory/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Module contains the version of dag-factory"""
__version__ = "0.17.1.post7"
__version__ = "0.17.1.peloton-rev-8-dev"
20 changes: 17 additions & 3 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def __init__(
self.config: Dict[str, Any] = DagFactory._load_config(
config_filepath=config_filepath
)
dag_id = list(self.config.keys())[0]
if '/git/repo' in config_filepath:
git_filepath = config_filepath.replace('/git/repo', 'https://github.com/pelotoncycle/data-engineering-airflow-dags/tree/master')
file_loc = f'file loc: [Git Link]({git_filepath}).'
if 'doc_md' in self.config[dag_id]:
self.config[dag_id]['doc_md'] = ''.join([self.config[dag_id]['doc_md'], file_loc])
else:
self.config[dag_id]['doc_md'] = file_loc
if config:
self.config: Dict[str, Any] = config

Expand Down Expand Up @@ -91,6 +99,9 @@ def from_directory(cls, config_dir, globals: Dict[str, Any], parent_default_conf
if os.path.isdir(sub_fpath):
cls.from_directory(sub_fpath, globals, default_config)
elif os.path.isfile(sub_fpath) and sub_fpath.split('.')[-1] in ALLOWED_CONFIG_FILE_SUFFIX:
if 'owner' not in default_config['default_args']:
default_config['default_args']['owner'] = sub_fpath.split("/")[4]
default_config['tags'] = sub_fpath.split("/")[5:7]
# catch the errors so the rest of the dags can still be imported
try:
dag_factory = cls(config_filepath=sub_fpath, default_config=default_config)
Expand Down Expand Up @@ -259,9 +270,9 @@ def clean_dags(self, globals: Dict[str, Any]) -> None:


def load_yaml_dags(
globals_dict: Dict[str, Any],
dags_folder: str = airflow_conf.get("core", "dags_folder"),
suffix=None,
globals_dict: Dict[str, Any],
dags_folder: str = airflow_conf.get("core", "dags_folder"),
suffix=None,
):
"""
Loads all the yaml/yml files in the dags folder
Expand All @@ -288,3 +299,6 @@ def load_yaml_dags(
config_file_abs_path = str(config_file_path.absolute())
DagFactory(config_file_abs_path).generate_dags(globals_dict)
logging.info("DAG loaded: %s", config_file_path)

new_dag = DagFactory()
new_dag.from_directory("/git/repo/dags/data_engineering/ingest", globals())

0 comments on commit cd25bdf

Please sign in to comment.