Skip to content

Commit

Permalink
Allow optional end-date in get_execution_stats() (#155)
Browse files Browse the repository at this point in the history
* max end date

* add test

* fix test

* black

* make cov fixed

* contributors update

---------

Co-authored-by: imeer1 <[email protected]>
  • Loading branch information
IMC07 and imeer1 authored Sep 17, 2024
1 parent ef2a5e0 commit 922953d
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 7 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Thanks to the contributors who helped on this project apart from the authors
* [Raju Gujjalapati](https://in.linkedin.com/in/raju-gujjalapati-470a88171)
* [Madhusudan Koukutla](https://www.linkedin.com/in/madhusudan-reddy/)
* [Surya Teja Jagatha](https://www.linkedin.com/in/surya-teja-jagatha/)
* [Iris Meerman](https://www.linkedin.com/in/iris-meerman-92694675/)

# Honorary Mentions
Thanks to the team below for invaluable insights and support throughout the initial release of this project
Expand Down
20 changes: 15 additions & 5 deletions brickflow_plugins/airflow/operators/external_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,11 @@ def __init__(
self._poke_count = 0
self._start_time = time.time()

def get_execution_stats(self, execution_date: datetime):
"""Function to get the execution stats for task_id within a execution delta window
def get_execution_stats(
self, execution_date: datetime, max_end_date: datetime = None
):
"""Function to get the execution stats for task_id within a execution start time and
(optionally) allowed job end time.
Returns:
string: state of the desired task id and dag_run_id (success/failure/running)
Expand All @@ -236,6 +239,11 @@ def get_execution_stats(self, execution_date: datetime):
execution_window_tz = (execution_date + execution_delta).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
max_end_date_filter = (
f"&end_date_lte={max_end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"
if max_end_date
else ""
)
headers = {
"Content-Type": "application/json",
"cache-control": "no-cache",
Expand All @@ -256,7 +264,7 @@ def get_execution_stats(self, execution_date: datetime):
api_url
+ "/api/v1/dags/"
+ external_dag_id
+ f"/dagRuns?execution_date_gte={execution_window_tz}"
+ f"/dagRuns?execution_date_gte={execution_window_tz}{max_end_date_filter}"
)
log.info(f"URL to poke for dag runs {url}")
response = requests.request("GET", url, headers=headers, verify=False)
Expand All @@ -269,7 +277,9 @@ def get_execution_stats(self, execution_date: datetime):

if len(list_of_dictionaries) == 0:
log.info(
f"No Runs found for {external_dag_id} dag after {execution_window_tz}, please check upstream dag"
f"No Runs found for {external_dag_id} dag in time window: {execution_window_tz} - "
f"{max_end_date.strftime('%Y-%m-%dT%H:%M:%SZ') if max_end_date else 'now'}, "
f"please check upstream dag"
)
return "none"

Expand All @@ -282,7 +292,7 @@ def get_execution_stats(self, execution_date: datetime):
if latest:
# Only picking the latest run id if latest flag is True
dag_run_id = list_of_dictionaries[0]["dag_run_id"]
log.info(f"Latest run for the dag is with execution date of {dag_run_id}")
log.info(f"Latest run for the dag is with execution date of {dag_run_id}")
log.info(
f"Poking {external_dag_id} dag for {dag_run_id} run_id status as latest flag is set to {latest} "
)
Expand Down
43 changes: 41 additions & 2 deletions tests/airflow_plugins/test_task_dependency.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta
from datetime import timedelta, datetime

import pytest
from requests.exceptions import HTTPError
Expand Down Expand Up @@ -66,6 +66,36 @@ def mock_api(self):
{"json": {"state": "success"}, "status_code": int(200)},
],
)
# DAG Run Endpoint with max end date
rm.register_uri(
method="GET",
url=(
f"{BASE_URL}/api/v1/dags/test-dag"
f"/dagRuns?execution_date_gte=2024-01-01T00:00:00Z&end_date_lte=2024-01-01T01:20:00Z"
),
response_list=[
# Test 3: max end date specified
{
"json": {
"dag_runs": [
{
"conf": {},
"dag_id": "test-dag",
"dag_run_id": "manual__2024-01-01T01:00:00.000000+00:00",
"end_date": "2024-01-01T01:10:00.000000+00:00",
"execution_date": "2024-01-01T01:00:00.000000+00:00",
"external_trigger": True,
"logical_date": "2024-01-01T01:00:00.000000+00:00",
"start_date": "2024-01-01T01:00:00.000000+00:00",
"state": "success",
},
],
"total_entries": 1,
},
"status_code": int(200),
},
],
)
yield rm

@pytest.fixture()
Expand Down Expand Up @@ -96,7 +126,7 @@ def test_api_airflow_v2(self, api, caplog, sensor):
sensor.execute(context={"execution_date": "2024-01-01T03:00:00Z"})

assert (
"No Runs found for test-dag dag after 2024-01-01T00:00:00Z, please check upstream dag"
"No Runs found for test-dag dag in time window: 2024-01-01T00:00:00Z - now, please check upstream dag"
in caplog.text
)
assert "task_status=running" in caplog.text
Expand Down Expand Up @@ -128,3 +158,12 @@ def test_timeout(self, sensor):
with pytest.raises(AirflowSensorTimeout):
with rm:
sensor.execute(context={"execution_date": "2024-01-01T03:00:00Z"})

def test_end_date(self, api, sensor):
execution_date = datetime.strptime("2024-01-01T03:00:00Z", "%Y-%m-%dT%H:%M:%SZ")
max_end_date = datetime.strptime("2024-01-01T01:20:00Z", "%Y-%m-%dT%H:%M:%SZ")
with api:
task_status = sensor.get_execution_stats(
execution_date=execution_date, max_end_date=max_end_date
)
assert task_status == "success"

0 comments on commit 922953d

Please sign in to comment.