forked from teamclairvoyant/airflow-maintenance-dags
-
Notifications
You must be signed in to change notification settings - Fork 0
/
airflow-log-cleanup.py
executable file
·230 lines (198 loc) · 7.92 KB
/
airflow-log-cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the task logs to avoid those getting too big.
airflow trigger_dag --conf '[curly-braces]"maxLogAgeInDays":30[curly-braces]' airflow-log-cleanup
--conf options:
maxLogAgeInDays:<INT> - Optional
"""
import logging
import os
from datetime import timedelta
import airflow
import jinja2
from airflow.configuration import conf
from airflow.models import DAG, Variable
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
# airflow-log-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1)
try:
BASE_LOG_FOLDER = conf.get("core", "BASE_LOG_FOLDER").rstrip("/")
except Exception as e:
BASE_LOG_FOLDER = conf.get("logging", "BASE_LOG_FOLDER").rstrip("/")
# How often to Run. @daily - Once a day at Midnight
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that are 30 days old or older
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get(
"airflow_log_cleanup__max_log_age_in_days", 30
)
# Whether the job should delete the logs or not. Included if you want to
# temporarily avoid deleting the logs
ENABLE_DELETE = True
# The number of worker nodes you have in Airflow. Will attempt to run this
# process for however many workers there are so that each worker gets its
# logs cleared.
NUMBER_OF_WORKERS = 1
DIRECTORIES_TO_DELETE = [BASE_LOG_FOLDER]
ENABLE_DELETE_CHILD_LOG = Variable.get(
"airflow_log_cleanup__enable_delete_child_log", "False"
)
LOG_CLEANUP_PROCESS_LOCK_FILE = "/tmp/airflow_log_cleanup_worker.lock"
logging.info("ENABLE_DELETE_CHILD_LOG " + ENABLE_DELETE_CHILD_LOG)
if not BASE_LOG_FOLDER or BASE_LOG_FOLDER.strip() == "":
raise ValueError(
"BASE_LOG_FOLDER variable is empty in airflow.cfg. It can be found "
"under the [core] (<2.0.0) section or [logging] (>=2.0.0) in the cfg file. "
"Kindly provide an appropriate directory path."
)
if ENABLE_DELETE_CHILD_LOG.lower() == "true":
try:
CHILD_PROCESS_LOG_DIRECTORY = conf.get(
"scheduler", "CHILD_PROCESS_LOG_DIRECTORY"
)
if CHILD_PROCESS_LOG_DIRECTORY != ' ':
DIRECTORIES_TO_DELETE.append(CHILD_PROCESS_LOG_DIRECTORY)
except Exception as e:
logging.exception(
"Could not obtain CHILD_PROCESS_LOG_DIRECTORY from " +
"Airflow Configurations: " + str(e)
)
default_args = {
'owner': DAG_OWNER_NAME,
'depends_on_past': False,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'start_date': START_DATE,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
tags=['teamclairvoyant', 'airflow-maintenance-dags'],
template_undefined=jinja2.Undefined
)
if hasattr(dag, 'doc_md'):
dag.doc_md = __doc__
if hasattr(dag, 'catchup'):
dag.catchup = False
start = DummyOperator(
task_id='start',
dag=dag)
log_cleanup = """
echo "Getting Configurations..."
BASE_LOG_FOLDER="{{params.directory}}"
WORKER_SLEEP_TIME="{{params.sleep_time}}"
sleep ${WORKER_SLEEP_TIME}s
MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}"
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then
echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'."
MAX_LOG_AGE_IN_DAYS='""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'
fi
ENABLE_DELETE=""" + str("true" if ENABLE_DELETE else "false") + """
echo "Finished Getting Configurations"
echo ""
echo "Configurations:"
echo "BASE_LOG_FOLDER: '${BASE_LOG_FOLDER}'"
echo "MAX_LOG_AGE_IN_DAYS: '${MAX_LOG_AGE_IN_DAYS}'"
echo "ENABLE_DELETE: '${ENABLE_DELETE}'"
cleanup() {
echo "Executing Find Statement: $1"
FILES_MARKED_FOR_DELETE=`eval $1`
echo "Process will be Deleting the following File(s)/Directory(s):"
echo "${FILES_MARKED_FOR_DELETE}"
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | \
grep -v '^$' | wc -l` File(s)/Directory(s)" \
# "grep -v '^$'" - removes empty lines.
# "wc -l" - Counts the number of lines
echo ""
if [ "${ENABLE_DELETE}" == "true" ];
then
if [ "${FILES_MARKED_FOR_DELETE}" != "" ];
then
echo "Executing Delete Statement: $2"
eval $2
DELETE_STMT_EXIT_CODE=$?
if [ "${DELETE_STMT_EXIT_CODE}" != "0" ]; then
echo "Delete process failed with exit code \
'${DELETE_STMT_EXIT_CODE}'"
echo "Removing lock file..."
rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error removing the lock file. \
Check file permissions.\
To re-run the DAG, ensure that the lock file has been \
deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
fi
exit ${DELETE_STMT_EXIT_CODE}
fi
else
echo "WARN: No File(s)/Directory(s) to Delete"
fi
else
echo "WARN: You're opted to skip deleting the File(s)/Directory(s)!!!"
fi
}
if [ ! -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ ]; then
echo "Lock file not found on this node! \
Creating it to prevent collisions..."
touch """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
CREATE_LOCK_FILE_EXIT_CODE=$?
if [ "${CREATE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error creating the lock file. \
Check if the airflow user can create files under tmp directory. \
Exiting..."
exit ${CREATE_LOCK_FILE_EXIT_CODE}
fi
echo ""
echo "Running Cleanup Process..."
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime \
+${MAX_LOG_AGE_IN_DAYS}"
DELETE_STMT="${FIND_STATEMENT} -exec rm -f {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
echo "Finished Running Cleanup Process"
echo "Deleting lock file..."
rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
REMOVE_LOCK_FILE_EXIT_CODE=$?
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error removing the lock file. Check file permissions. To re-run the DAG, ensure that the lock file has been deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
fi
else
echo "Another task is already deleting logs on this worker node. \
Skipping it!"
echo "If you believe you're receiving this message in error, kindly check \
if """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ exists and delete it."
exit 0
fi
"""
for log_cleanup_id in range(1, NUMBER_OF_WORKERS + 1):
for dir_id, directory in enumerate(DIRECTORIES_TO_DELETE):
log_cleanup_op = BashOperator(
task_id='log_cleanup_worker_num_' +
str(log_cleanup_id) + '_dir_' + str(dir_id),
bash_command=log_cleanup,
params={
"directory": str(directory),
"sleep_time": int(log_cleanup_id)*3},
dag=dag)
log_cleanup_op.set_upstream(start)