Skip to content

Commit

Permalink
support airflow dag restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoWuPeloton committed Dec 3, 2023
1 parent 3a04750 commit b29bd45
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
2 changes: 1 addition & 1 deletion dagfactory/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Module contains the version of dag-factory"""
__version__ = "0.17.1.post9"
__version__ = "0.17.1.post9.dev1"
47 changes: 23 additions & 24 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

SYSTEM_PARAMS: List[str] = ["default", "task_groups"]
ALLOWED_CONFIG_FILE_SUFFIX: List[str] = ["yaml", "yml"]
CONFIG_FILENAME_REGEX = re.compile(r"_jc__*", flags=re.IGNORECASE)
CONFIG_FILENAME_REGEX = re.compile(r"_jc__.*", flags=re.IGNORECASE)

logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -110,37 +110,36 @@ def from_directory(cls, config_dir, globals: Dict[str, Any], parent_default_conf
cls.from_directory(sub_fpath, globals, default_config)
elif os.path.isfile(sub_fpath) and sub_fpath.split('.')[-1] in ALLOWED_CONFIG_FILE_SUFFIX:
if 'git/repo/dags/data_engineering' in sub_fpath:
print("sub_fpath=" + sub_fpath)
if CONFIG_FILENAME_REGEX.match(sub_fpath.split("/")[-1]):
print("config_filename=" + sub_fpath.split("/")[-1])
if 'owner' not in default_config['default_args']:
print("owner="+sub_fpath.split("/")[4])
print("tag="+sub_fpath.split("/")[5:7])
default_config['default_args']['owner'] = sub_fpath.split("/")[4]
default_config['tags'] = sub_fpath.split("/")[5:7]
# catch the errors so the rest of the dags can still be imported
try:
dag_factory = cls(config_filepath=sub_fpath, default_config=default_config)
dag_factory.generate_dags(globals)
except Exception as e:
if cls.DAGBAG_IMPORT_ERROR_TRACEBACKS:
import_failures[sub_fpath] = traceback.format_exc(
limit=-cls.DAGBAG_IMPORT_ERROR_TRACEBACK_DEPTH
)
else:
import_failures[sub_fpath] = str(e)
else:
# catch the errors so the rest of the dags can still be imported
try:
dag_factory = cls(config_filepath=sub_fpath, default_config=default_config)
dag_factory.generate_dags(globals)
except Exception as e:
if cls.DAGBAG_IMPORT_ERROR_TRACEBACKS:
import_failures[sub_fpath] = traceback.format_exc(
limit=-cls.DAGBAG_IMPORT_ERROR_TRACEBACK_DEPTH
)
else:
import_failures[sub_fpath] = str(e)
else:
print("ignored_file="+sub_fpath)
continue

# catch the errors so the rest of the dags can still be imported
try:
print("yaml_filepath="+sub_fpath)
print("config_info="+default_config)
dag_factory = cls(config_filepath=sub_fpath, default_config=default_config)
dag_factory.generate_dags(globals)
except Exception as e:
if cls.DAGBAG_IMPORT_ERROR_TRACEBACKS:
import_failures[sub_fpath] = traceback.format_exc(
limit=-cls.DAGBAG_IMPORT_ERROR_TRACEBACK_DEPTH
)
else:
import_failures[sub_fpath] = str(e)


# in the end we want to surface the error messages if there's any
if import_failures:
print("import_failure="+import_failures)
# reformat import_failures so they are reader friendly
import_failures_reformatted = ''
for import_loc, import_trc in import_failures.items():
Expand Down

0 comments on commit b29bd45

Please sign in to comment.