forked from nsqio/libnsq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nsqlookupd.c
102 lines (85 loc) · 3.13 KB
/
nsqlookupd.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include <json-c/json.h>
#include "nsq.h"
#include "http.h"
#ifdef DEBUG
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
#else
#define _DEBUG(...) do {;} while (0)
#endif
void nsq_lookupd_request_cb(struct HttpRequest *req, struct HttpResponse *resp, void *arg)
{
struct NSQReader *rdr = (struct NSQReader *)arg;
struct json_object *jsobj, *data, *producers, *producer, *broadcast_address_obj, *tcp_port_obj;
struct json_tokener *jstok;
struct NSQDConnection *conn;
const char *broadcast_address;
int i, found, tcp_port;
_DEBUG("%s: status_code %d, body %.*s\n", __FUNCTION__, resp->status_code,
(int)BUFFER_HAS_DATA(resp->data), resp->data->data);
if (resp->status_code != 200) {
free_http_response(resp);
free_http_request(req);
return;
}
jstok = json_tokener_new();
jsobj = json_tokener_parse_ex(jstok, resp->data->data, (int)BUFFER_HAS_DATA(resp->data));
if (!jsobj) {
_DEBUG("%s: error parsing JSON\n", __FUNCTION__);
json_tokener_free(jstok);
return;
}
data = json_object_object_get(jsobj, "data");
if (!jsobj) {
_DEBUG("%s: error getting 'data' key\n", __FUNCTION__);
json_object_put(jsobj);
json_tokener_free(jstok);
return;
}
producers = json_object_object_get(data, "producers");
if (!producers) {
_DEBUG("%s: error getting 'producers' key\n", __FUNCTION__);
json_object_put(jsobj);
json_tokener_free(jstok);
return;
}
_DEBUG("%s: num producers %d\n", __FUNCTION__, json_object_array_length(producers));
for (i = 0; i < json_object_array_length(producers); i++) {
producer = json_object_array_get_idx(producers, i);
broadcast_address_obj = json_object_object_get(producer, "broadcast_address");
tcp_port_obj = json_object_object_get(producer, "tcp_port");
broadcast_address = json_object_get_string(broadcast_address_obj);
tcp_port = json_object_get_int(tcp_port_obj);
_DEBUG("%s: broadcast_address %s, port %d\n", __FUNCTION__, broadcast_address, tcp_port);
found = 0;
LL_FOREACH(rdr->conns, conn) {
if (strcmp(conn->bs->address, broadcast_address) == 0
&& conn->bs->port == tcp_port) {
found = 1;
break;
}
}
if (!found) {
nsq_reader_connect_to_nsqd(rdr, broadcast_address, tcp_port);
}
}
json_object_put(jsobj);
json_tokener_free(jstok);
free_http_response(resp);
free_http_request(req);
}
struct NSQLookupdEndpoint *new_nsqlookupd_endpoint(const char *address, int port)
{
struct NSQLookupdEndpoint *nsqlookupd_endpoint;
nsqlookupd_endpoint = (struct NSQLookupdEndpoint *)malloc(sizeof(struct NSQLookupdEndpoint));
nsqlookupd_endpoint->address = strdup(address);
nsqlookupd_endpoint->port = port;
nsqlookupd_endpoint->next = NULL;
return nsqlookupd_endpoint;
}
void free_nsqlookupd_endpoint(struct NSQLookupdEndpoint *nsqlookupd_endpoint)
{
if (nsqlookupd_endpoint) {
free(nsqlookupd_endpoint->address);
free(nsqlookupd_endpoint);
}
}