forked from etcd-cpp-apiv3/etcd-cpp-apiv3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRewatchTest.cpp
104 lines (87 loc) · 3.13 KB
/
RewatchTest.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <chrono>
#include <thread>
#include "etcd/Client.hpp"
#include "etcd/SyncClient.hpp"
#include "etcd/Watcher.hpp"
static const std::string etcd_url("http://127.0.0.1:2379");
static int watcher_called = 0;
void print_response(etcd::Response const & resp)
{
++watcher_called;
std::cout << "print response called" << std::endl;
if (resp.error_code()) {
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
}
else {
std::cout << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl;
std::cout << "Events size: " << resp.events().size() << std::endl;
for (auto const &ev: resp.events()) {
std::cout << "Value change in events: " << static_cast<int>(ev.event_type())
<< ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string()
<< ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string()
<< std::endl;
}
}
}
void wait_for_connection(etcd::Client &client) {
// wait until the client connects to etcd server
while (!client.head().get().is_ok()) {
sleep(1);
}
}
void initialize_watcher(const std::string& endpoints,
const std::string& prefix,
std::function<void(etcd::Response)> callback,
std::shared_ptr<etcd::Watcher>& watcher) {
etcd::Client client(endpoints);
wait_for_connection(client);
// Check if the failed one has been cancelled first
if (watcher && watcher->Cancelled()) {
std::cout << "watcher's reconnect loop been cancelled" << std::endl;
return;
}
watcher.reset(new etcd::Watcher(client, prefix, callback, true));
// Note that lambda requires `mutable`qualifier.
watcher->Wait([endpoints, prefix, callback,
/* By reference for renewing */ &watcher](bool cancelled) mutable {
if (cancelled) {
std::cout << "watcher's reconnect loop stopped as been cancelled" << std::endl;
return;
}
initialize_watcher(endpoints, prefix, callback, watcher);
});
}
TEST_CASE("watch should can be re-established")
{
const std::string my_prefix = "/test";
// the watcher initialized in this way will auto re-connect to etcd
std::shared_ptr<etcd::Watcher> watcher;
initialize_watcher(etcd_url, my_prefix, print_response, watcher);
// issue some changes to see if the watcher works
for (int round = 0; round < 100000; ++round) {
try {
etcd::Client client(etcd_url);
auto response = client.set(
my_prefix + "/foo", "bar-" + std::to_string(round)).get();
} catch (...) {
// pass
}
std::this_thread::sleep_for(std::chrono::seconds(2));
}
// cancel the worker
watcher->Cancel();
// the watcher has been cancelled and shouldn't work anymore
for (int round = 10; round < 20; ++round) {
try {
etcd::Client client(etcd_url);
auto response = client.set(
my_prefix + "/foo", "bar-" + std::to_string(round)).get();
} catch (...) {
// pass
}
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}