-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathempty_topic.sh
149 lines (124 loc) · 3.83 KB
/
empty_topic.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/bin/bash
# strict
set -o errexit -o pipefail -o noclobber -o nounset
set -x
readonly progversion="v2.3"
readonly progname="Kafka Topic Cleaner"
# Error definitions
readonly INVALID_ARGUMENT_ERROR=1
readonly MISSING_ARGUMENT_ERROR=2
########################################
# Configuration
########################################
source kafka_tool.cfg
# Timeout to allow configuration changes
readonly timeout=300
# Usage instructions
function usage() {
echo "
NAME
${progname} ${progversion} -- Empty the contents of one or more Kafka topics
SYNOPSIS
${0} [-f] -t TOPIC_NAME_1 [-t TOPIC_NAME_N]
OPTIONS
The following options are available:
-t Topic name
-f Force run without prompting for confirmation
-h Print this help message and exit.
EXAMPLES
${0} -t \"test-topic\"
${0} -t \"test-topic-1\" -t \"test-topic-2\" -t \"test-topic-3\"
CONFIGURATION
This script relies on kafka_tool.cfg for environment configuration.
"
}
declare -d topic_names=()
# Parse arguments
force=false
while getopts "t:fh" opt; do
case ${opt} in
t) topic_names[${#topic_names[@]}]="${OPTARG}"
;;
f) force=true
;;
h) usage && exit 0
;;
esac
done
########################################
# @param string topic_name
########################################
function empty_topic() {
local topic_name=${1}
${kafka_dir}/kafka-configs.sh --zookeeper ${zookeeper} --alter --entity-type topics --entity-name ${topic_name} --add-config retention.ms=1000
}
########################################
# Wait
########################################
function wait() {
echo "Waiting ${timeout} seconds to allow configuration change to be applied."
sleep ${timeout}
}
########################################
# @param string topic_name
########################################
function restore_configuration() {
local topic_name=${1}
${kafka_dir}/kafka-configs.sh --zookeeper ${zookeeper} --alter --entity-type topics --entity-name ${topic_name} --delete-config retention.ms
}
########################################
# @param string topic_name
########################################
function get_offsets() {
local topic_name=${1}
${kafka_dir}/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${broker} --time -1 --topic ${topic_name}
}
########################################
# @param array topic_names
########################################
function main() {
local topic_names=("$@")
if [[ "${#topic_names[@]}" == "0" ]]; then
echo "Error: A minimum of one TOPIC_NAME is required"
usage
exit ${MISSING_ARGUMENT_ERROR}
fi
echo "${progname}"
echo ""
echo "Zookeeper: ${zookeeper}"
echo "Kafka Broker: ${broker}"
echo "Topic(s): ${topic_names[@]}"
if [[ ! ${force} ]];
# Prompt user before proceeding
echo "WARNING: Emptying a topic is irreversible. "
read -p "Are you sure that you want to proceed? [y/n]: " answer
if [[ "${answer}" != "y" ]]; then
echo "Cancelled."
exit 0
fi
fi
echo "Starting cleaning."
for topic_name in "${topic_names[@]}"; do
empty_topic ${topic_name}
done
wait
echo "Cleaning completed."
echo "Restoring configuration."
for topic_name in "${topic_names[@]}"; do
restore_configuration ${topic_name}
done
wait
echo "Topic count and offset: "
for topic_name in "${topic_names[@]}"; do
get_offsets ${topic_name}
done
echo "Done."
}
main "${topic_names[@]}"
# TODO:
# ${kafka_dir}/kafka-delete-records.sh --bootstrap-server ${broker} --offset-json-file offsets.json
# offsets.json :
# echo <<HEREDOC
# {"partitions": [{"topic": “${topic_name}", "partition": 0, "offset": -1}], "version":1 }
# HEREDOC > offsets.json
# rm offsets.json