-
Notifications
You must be signed in to change notification settings - Fork 0
/
xcom_dag.py
71 lines (55 loc) · 1.79 KB
/
xcom_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.bash_operator import BashOperator
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(1),
}
def push_xcom_with_return():
return 'my_returned_xcom'
def get_pushed_xcom_with_return(**context):
print(context['ti'].xcom_pull(task_ids='t0'))
def push_next_task(**context):
context['ti'].xcom_push(key='next_task', value='t3')
def get_next_task(**context):
return context['ti'].xcom_pull(key='next_task')
def get_multiple_xcoms(**context):
print(context['ti'].xcom_pull(key=None, task_ids=['t0', 't2']))
with DAG(dag_id='xcom_dag', default_args=args, schedule_interval="@once") as dag:
t0 = PythonOperator(
task_id='t0',
python_callable=push_xcom_with_return
)
t1 = PythonOperator(
task_id='t1',
provide_context=True,
python_callable=get_pushed_xcom_with_return
)
t2 = PythonOperator(
task_id='t2',
provide_context=True,
python_callable=push_next_task
)
branching = BranchPythonOperator(
task_id='branching',
provide_context=True,
python_callable=get_next_task,
)
t3 = DummyOperator(task_id='t3')
t4 = DummyOperator(task_id='t4')
t5 = PythonOperator(
task_id='t5',
trigger_rule='one_success',
provide_context=True,
python_callable=get_multiple_xcoms
)
t6 = BashOperator(
task_id='t6',
bash_command="echo value from xcom: {{ ti.xcom_pull(key='next_task') }}"
)
t0 >> t1
t1 >> t2 >> branching
branching >> t3 >> t5 >> t6
branching >> t4 >> t5 >> t6