Skip to content

Commit

Permalink
Merge pull request #1005 from CityofToronto/i980_bash_logical
Browse files Browse the repository at this point in the history
#980 update logical date and switch to task bash
  • Loading branch information
chmnata authored Jun 28, 2024
2 parents bd83be0 + 9c1a7d5 commit b80a161
Showing 1 changed file with 9 additions and 19 deletions.
28 changes: 9 additions & 19 deletions dags/pull_here.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,13 @@ def get_download_link(request_id: str, access_token: str):
request_id = get_request_id(access_token)
download_url = get_download_link(request_id, access_token)

load_data_run = BashOperator(
task_id = "load_data",
bash_command = '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $USER -d bigdata -c "\COPY here.ta_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" ''',
env = {"DOWNLOAD_URL": download_url,
"HOST": BaseHook.get_connection("here_bot").host,
"USER" : BaseHook.get_connection("here_bot").login,
"PGPASSWORD": BaseHook.get_connection("here_bot").password},
append_env=True
)

#Can implement the following when we upgrade to 2.9.1
#@task.bash(env={"DOWNLOAD_URL": download_url,
# "HOST": BaseHook.get_connection("here_bot").host,
# "USER" : BaseHook.get_connection("here_bot").login,
# " PGPASSWORD": BaseHook.get_connection("here_bot").password})
#def load_data_run()->str:
# return '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $USER -d bigdata -c "\COPY here.ta_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" '''
@task.bash(env={"DOWNLOAD_URL": download_url,
"HOST": BaseHook.get_connection("here_bot").host,
"USER" : BaseHook.get_connection("here_bot").login,
"PGPASSWORD": BaseHook.get_connection("here_bot").password})
def load_data_run()->str:
return '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $USER -d bigdata -c "\COPY here.ta_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" '''

# Create a task group for triggering the DAGs
@task_group(group_id='trigger_dags_tasks')
def trigger_dags(**kwargs):
Expand All @@ -113,11 +103,11 @@ def trigger_dags(**kwargs):
trigger_operator = TriggerDagRunOperator(
task_id=f'trigger_{dag_id}',
trigger_dag_id=dag_id,
execution_date = '{{ ds }}',
logical_date = '{{ ds }}',
reset_dag_run = True # Clear existing dag if already exists (for backfilling), old runs will not be in the logs
)
trigger_operators.append(trigger_operator)

load_data_run >> trigger_dags()
load_data_run() >> trigger_dags()

pull_here()

0 comments on commit b80a161

Please sign in to comment.