-
Notifications
You must be signed in to change notification settings - Fork 11
/
example.sh
executable file
·280 lines (243 loc) · 9.63 KB
/
example.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#!/bin/bash
# this is a full example of configuring and running a scheduler. It uses
# docker compose to create a kafka and vertica service, then it
set -o xtrace
set -o errexit
# first chose a unique project name for docker compose
cd "$(dirname ${BASH_SOURCE[0]})" || exit $?
source ./.env || exit $?
NETWORK=${COMPOSE_PROJECT_NAME}_example
: ${VERTICA_VERSION:=v12.0.3}
export VERTICA_K8S_VERSION=${VERTICA_VERSION#v}-0-minimal
########################
# Debugging and Colors #
########################
# TO only run certain steps, export steps variable like so:
# steps="start setup run write" ./example.sh
: ${steps=start setup run write stop clean}
# see if sed can make the log output green to make it easier to diferentiate
esc=$'\e'
green="sed -u -e s/\(.*\)/$esc[32m\1$esc[39m/" # for normal output
red="sed -u -e s/\(.*\)/$esc[31m\1$esc[39m/" # for errors
blue="sed -u -e s/\(.*\)/$esc[34m\1$esc[39m/" # for log output
if ! echo | $green >/dev/null 2>&1; then
green=cat
red=cat
blue=cat
fi
##########################
# SETUP TEST ENVIRONMENT #
##########################
if [[ $steps =~ start ]]; then
# make sure containers have been cleaned up properly
docker compose rm -svf >/dev/null 2>&1 || exit $?
# start servers
# docker compose uses colors, so don't override
docker compose up -d --force-recreate
# create a directory for log output
mkdir -p log
# create and start a database
# The OSx version of bash calls the M1 chip "arm64", but if someone updates
# /bin/bash, then it will could use "aarch64" in $MACHTYPE
if [[ $MACHTYPE =~ ^aarch64 ]] || [[ $MACHTYPE =~ ^arm64 ]] ; then
# Arm based macs crash on a memory check unless this is added
VERTICA_ENV+=(-e VERTICA_MEMDEBUG=2)
fi
docker compose exec ${VERTICA_ENV[@]} vertica /opt/vertica/bin/admintools -t create_db --database=example --password= --hosts=localhost | $green || exit $?
# create a simple table to store messages
docker compose exec vertica vsql -c 'create flex table KafkaFlex()' | $green || exit $?
# create an operator
docker compose exec vertica vsql -c 'create user JimmyKafka' | $green || exit $?
# create a resource pool
docker compose exec vertica vsql -c 'create resource pool Scheduler_pool plannedconcurrency 1' | $green || exit $?
# create a couple topics
docker compose exec kafka kafka-run-class.sh kafka.admin.TopicCommand --create --partitions 10 --replication-factor 1 --topic KafkaTopic1 --bootstrap-server kafka:9092 | $green || exit $?
docker compose exec kafka kafka-run-class.sh kafka.admin.TopicCommand --create --partitions 10 --replication-factor 1 --topic KafkaTopic2 --bootstrap-server kafka:9092 | $green || exit $?
fi
###################
# SETUP SCHEDULER #
###################
if [[ $steps =~ setup ]]; then
# create scheduler
# set the target table
# set the parser
# set the kafka server
# define a couple sources (topics)
# define a couple microbatches
# (using one "docker run" because the startup costs add up)
docker run \
--rm \
-v $PWD/example.conf:/etc/vkconfig.conf \
-v $PWD/vkafka-log-config-debug.xml:/opt/vertica/packages/kafka/config/vkafka-log-config.xml \
-v $PWD/log:/opt/vertica/log \
--user $(id -u):$(id -g) \
--network $NETWORK \
vertica/kafka-scheduler:$VERTICA_VERSION bash -c "
echo 'Creating scheduler schema' ; \
vkconfig scheduler \
--conf /etc/vkconfig.conf \
--frame-duration 00:00:10 \
--create \
--operator JimmyKafka \
--eof-timeout-ms 2000 \
--config-refresh 00:01:00 \
--new-source-policy START \
--resource-pool Scheduler_pool || exit $? ; \
echo 'Creating target table KafkaFlex' ; \
vkconfig target --add \
--conf /etc/vkconfig.conf \
--target-schema public \
--target-table KafkaFlex || exit $? ; \
echo 'Setting parser to kafkajsonparser' ; \
vkconfig load-spec --add \
--conf /etc/vkconfig.conf \
--load-spec KafkaSpec \
--parser kafkajsonparser \
--load-method DIRECT \
--message-max-bytes 1000000 || exit $? ; \
echo 'Configuring kafka cluster as kafka:9092' ; \
vkconfig cluster --add \
--conf /etc/vkconfig.conf \
--cluster KafkaCluster \
--hosts kafka:9092 || exit $? ; \
echo 'Configuring 10 partions from topic KafkaTopic1 on kafka:9092' ; \
vkconfig source --add \
--conf /etc/vkconfig.conf \
--source KafkaTopic1 \
--cluster KafkaCluster \
--partitions 10 || exit $? ; \
echo 'Configuring 10 partions from topic KafkaTopic2 on kafka:9092' ; \
vkconfig source --add \
--conf /etc/vkconfig.conf \
--source KafkaTopic2 \
--cluster KafkaCluster \
--partitions 10 || exit $? ; \
echo 'Connecting KafkaTopic1 to table KafkaFlex' ; \
vkconfig microbatch --add \
--conf /etc/vkconfig.conf \
--microbatch KafkaBatch1 \
--add-source KafkaTopic1 \
--add-source-cluster KafkaCluster \
--target-schema public \
--target-table KafkaFlex \
--rejection-schema public \
--rejection-table KafkaFlex_rej \
--load-spec KafkaSpec || exit $? ; \
echo 'Connecting KafkaTopic2 to table KafkaFlex' ; \
vkconfig microbatch --add \
--conf /etc/vkconfig.conf \
--microbatch KafkaBatch2 \
--add-source KafkaTopic2 \
--add-source-cluster KafkaCluster \
--target-schema public \
--target-table KafkaFlex \
--rejection-schema public \
--rejection-table KafkaFlex_rej \
--load-spec KafkaSpec || exit $? ; \
" | $green
if (($?)); then
echo "Kafka Scheduler setup failed" | $red >&2
fi
fi
#####################
# RUN THE SCHEDULER #
#####################
if [[ $steps =~ run ]]; then
# make sure it's not already running
docker rm kafka_scheduler 2>/dev/null | $green
# run this in the background
# don't color the log output becasue it can mess up the formatting
docker run \
--rm \
-v $PWD/example.conf:/etc/vkconfig.conf \
-v $PWD/vkafka-log-config-debug.xml:/opt/vertica/packages/kafka/config/vkafka-log-config.xml \
-v $PWD/log:/opt/vertica/log \
--network $NETWORK \
--user $(id -u):$(id -g) \
--name kafka_scheduler \
vertica/kafka-scheduler:$VERTICA_VERSION \
vkconfig launch \
--conf /etc/vkconfig.conf | $blue &
SCHEDULER_PID=$!
fi
#####################
# SEND TO KAFKA AND #
# SEE IT IN VERTICA #
#####################
if [[ $steps =~ write ]]; then
# fake loop so we can 'break'
while true; do
# write a test subject with a caffine addiction
docker compose exec kafka bash -c 'echo "{\"Test Subject\":\"98101\", \"Diagnosis\":\"Caffine Addiction\"}" | kafka-console-producer.sh \
--topic KafkaTopic1 \
--bootstrap-server localhost:9092' | grep . | $green
# Make sure it's there
# This produces an eroneous error message, so grep is used to only print messages
docker compose exec kafka kafka-console-consumer.sh --topic KafkaTopic1 --bootstrap-server localhost:9092 --from-beginning --timeout-ms 1000 | grep '^{' | $green
# wait for it to appear in vertica
delay=0
while ! docker compose exec vertica vsql -t -c "SELECT compute_flextable_keys_and_build_view('KafkaFlex'); SELECT Diagnosis FROM KafkaFlex_view WHERE \"Test Subject\" = '98101'" | grep Caffine >/dev/null 2>&1; do
if ((delay++ > 20)); then
echo "ERROR: Should have appeared within the ~10 second frame duration." | $red >&2
break 2
fi
echo "Waiting ($delay) for Kafka test message containing 'Caffine'..." | $green
sleep 1;
done
docker compose exec vertica vsql -c "SELECT * FROM KafkaFlex_view" | $green
# write a test subject with a cold feet problem
docker compose exec kafka bash -c 'echo "{\"Test Subject\":\"99782\", \"Diagnosis\":\"Cold Feet\"}" | kafka-console-producer.sh \
--topic KafkaTopic2 \
--bootstrap-server localhost:9092' | grep . | $green
# Make sure it's there
docker compose exec kafka kafka-console-consumer.sh --topic KafkaTopic2 --bootstrap-server localhost:9092 --from-beginning --timeout-ms 1000 | grep '^{' | $green
delay=0
while ! docker compose exec vertica vsql -t -c "SELECT compute_flextable_keys_and_build_view('KafkaFlex'); SELECT Diagnosis FROM KafkaFlex_view WHERE \"Test Subject\" = '99782'" | grep Cold >/dev/null 2>&1; do
if ((delay++ > 20)); then
echo "ERROR: Should have appeared within the ~10 second frame duration." | $red >&2
break 2
fi
echo "Waiting ($delay) for Kafka test message containing 'Cold Feet'..." | $green
sleep 1;
done
break
done
docker compose exec vertica vsql -c "SELECT * FROM KafkaFlex_view" | $green
if (( $(docker compose exec vertica vsql -t -c "SELECT count(*) FROM KafkaFlex_rej" | head -1 | sed 's/\s//g') )); then
docker compose exec vertica vsql -c "SELECT * FROM KafkaFlex_rej" | $red
fi
fi
######################
# STOP THE SCHEDULER #
######################
if [[ $steps =~ stop ]]; then
echo SHUTTING DOWN... | $green
# a graceful shutdown request
docker exec \
--user $(id -u):$(id -g) \
kafka_scheduler \
killall java 2>&1 | $red >&2
delay=0
: ${SCHEDULER_PID=$(ps -ef | grep 'vkconfig_scheduler\ .*vkconfig launch' | awk '{ print $2 }')}
while kill -0 $SCHEDULER_PID >/dev/null 2>&1; do
sleep 1;
if ((delay++ > 20)); then
# not so graceful
echo "Scheduler didn't stop gracefully" | $red >&2
docker stop kafka_scheduler 2>&1 | $red >&2
break;
fi
done
# This isn't necessary because --rm is used in 'docker run'
# docker rm kafka_scheduler 2>&1 | $green
# Here's how to prune old unused containers if you forget to use --rm
# docker container rm $(docker container ls -a --filter=ancestor=vertica/kafka-scheduler | tail -n +2 | awk '{ print $NF }')
fi
###############################
# DELETE THE TEST ENVIRONMENT #
###############################
if [[ $steps =~ clean ]]; then
# docker compose uses colors, so don't override
#docker compose down
docker compose rm -svf
fi