diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 325c92dd..3cc7750d 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -16,6 +16,7 @@ Thanks to the contributors who helped on this project apart from the authors * [Raju Gujjalapati](https://in.linkedin.com/in/raju-gujjalapati-470a88171) * [Madhusudan Koukutla](https://www.linkedin.com/in/madhusudan-reddy/) * [Surya Teja Jagatha](https://www.linkedin.com/in/surya-teja-jagatha/) +* [Iris Meerman](https://www.linkedin.com/in/iris-meerman-92694675/) # Honorary Mentions Thanks to the team below for invaluable insights and support throughout the initial release of this project diff --git a/brickflow_plugins/airflow/operators/external_tasks.py b/brickflow_plugins/airflow/operators/external_tasks.py index 30385f6e..cdd70af8 100644 --- a/brickflow_plugins/airflow/operators/external_tasks.py +++ b/brickflow_plugins/airflow/operators/external_tasks.py @@ -220,8 +220,11 @@ def __init__( self._poke_count = 0 self._start_time = time.time() - def get_execution_stats(self, execution_date: datetime): - """Function to get the execution stats for task_id within a execution delta window + def get_execution_stats( + self, execution_date: datetime, max_end_date: datetime = None + ): + """Function to get the execution stats for task_id within a execution start time and + (optionally) allowed job end time. Returns: string: state of the desired task id and dag_run_id (success/failure/running) @@ -236,6 +239,11 @@ def get_execution_stats(self, execution_date: datetime): execution_window_tz = (execution_date + execution_delta).strftime( "%Y-%m-%dT%H:%M:%SZ" ) + max_end_date_filter = ( + f"&end_date_lte={max_end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}" + if max_end_date + else "" + ) headers = { "Content-Type": "application/json", "cache-control": "no-cache", @@ -256,7 +264,7 @@ def get_execution_stats(self, execution_date: datetime): api_url + "/api/v1/dags/" + external_dag_id - + f"/dagRuns?execution_date_gte={execution_window_tz}" + + f"/dagRuns?execution_date_gte={execution_window_tz}{max_end_date_filter}" ) log.info(f"URL to poke for dag runs {url}") response = requests.request("GET", url, headers=headers, verify=False) @@ -269,7 +277,9 @@ def get_execution_stats(self, execution_date: datetime): if len(list_of_dictionaries) == 0: log.info( - f"No Runs found for {external_dag_id} dag after {execution_window_tz}, please check upstream dag" + f"No Runs found for {external_dag_id} dag in time window: {execution_window_tz} - " + f"{max_end_date.strftime('%Y-%m-%dT%H:%M:%SZ') if max_end_date else 'now'}, " + f"please check upstream dag" ) return "none" @@ -282,7 +292,7 @@ def get_execution_stats(self, execution_date: datetime): if latest: # Only picking the latest run id if latest flag is True dag_run_id = list_of_dictionaries[0]["dag_run_id"] - log.info(f"Latest run for the dag is with execution date of {dag_run_id}") + log.info(f"Latest run for the dag is with execution date of {dag_run_id}") log.info( f"Poking {external_dag_id} dag for {dag_run_id} run_id status as latest flag is set to {latest} " ) diff --git a/tests/airflow_plugins/test_task_dependency.py b/tests/airflow_plugins/test_task_dependency.py index 9c176581..f46741cb 100644 --- a/tests/airflow_plugins/test_task_dependency.py +++ b/tests/airflow_plugins/test_task_dependency.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import timedelta, datetime import pytest from requests.exceptions import HTTPError @@ -66,6 +66,36 @@ def mock_api(self): {"json": {"state": "success"}, "status_code": int(200)}, ], ) + # DAG Run Endpoint with max end date + rm.register_uri( + method="GET", + url=( + f"{BASE_URL}/api/v1/dags/test-dag" + f"/dagRuns?execution_date_gte=2024-01-01T00:00:00Z&end_date_lte=2024-01-01T01:20:00Z" + ), + response_list=[ + # Test 3: max end date specified + { + "json": { + "dag_runs": [ + { + "conf": {}, + "dag_id": "test-dag", + "dag_run_id": "manual__2024-01-01T01:00:00.000000+00:00", + "end_date": "2024-01-01T01:10:00.000000+00:00", + "execution_date": "2024-01-01T01:00:00.000000+00:00", + "external_trigger": True, + "logical_date": "2024-01-01T01:00:00.000000+00:00", + "start_date": "2024-01-01T01:00:00.000000+00:00", + "state": "success", + }, + ], + "total_entries": 1, + }, + "status_code": int(200), + }, + ], + ) yield rm @pytest.fixture() @@ -96,7 +126,7 @@ def test_api_airflow_v2(self, api, caplog, sensor): sensor.execute(context={"execution_date": "2024-01-01T03:00:00Z"}) assert ( - "No Runs found for test-dag dag after 2024-01-01T00:00:00Z, please check upstream dag" + "No Runs found for test-dag dag in time window: 2024-01-01T00:00:00Z - now, please check upstream dag" in caplog.text ) assert "task_status=running" in caplog.text @@ -128,3 +158,12 @@ def test_timeout(self, sensor): with pytest.raises(AirflowSensorTimeout): with rm: sensor.execute(context={"execution_date": "2024-01-01T03:00:00Z"}) + + def test_end_date(self, api, sensor): + execution_date = datetime.strptime("2024-01-01T03:00:00Z", "%Y-%m-%dT%H:%M:%SZ") + max_end_date = datetime.strptime("2024-01-01T01:20:00Z", "%Y-%m-%dT%H:%M:%SZ") + with api: + task_status = sensor.get_execution_stats( + execution_date=execution_date, max_end_date=max_end_date + ) + assert task_status == "success"