-
Notifications
You must be signed in to change notification settings - Fork 0
/
zenduty_centreon.lua
487 lines (400 loc) · 18.5 KB
/
zenduty_centreon.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
#!/usr/bin/lua
--------------------------------------------------------------------------------
-- Centreon Broker zenduty Connector Events
--------------------------------------------------------------------------------
-- Libraries
local curl = require "cURL"
local new_from_timestamp = require "luatz.timetable".new_from_timestamp
local sc_common = require("centreon-stream-connectors-lib.sc_common")
local sc_logger = require("centreon-stream-connectors-lib.sc_logger")
local sc_broker = require("centreon-stream-connectors-lib.sc_broker")
local sc_event = require("centreon-stream-connectors-lib.sc_event")
local sc_params = require("centreon-stream-connectors-lib.sc_params")
local sc_macros = require("centreon-stream-connectors-lib.sc_macros")
local sc_flush = require("centreon-stream-connectors-lib.sc_flush")
--------------------------------------------------------------------------------
-- Class event_queue
--------------------------------------------------------------------------------
local EventQueue = {}
EventQueue.__index = EventQueue
--------------------------------------------------------------------------------
---- Constructor
---- @param conf The table given by the init() function and returned from the GUI
---- @return the new EventQueue
----------------------------------------------------------------------------------
function EventQueue.new(params)
local self = {}
local mandatory_parameters = {
"zenduty_integration_url"
}
self.fail = false
-- set up log configuration
local logfile = params.logfile or "/var/log/centreon-broker/zenduty-events.log"
local log_level = params.log_level or 1
-- initiate mandatory objects
self.sc_logger = sc_logger.new(logfile, log_level)
self.sc_common = sc_common.new(self.sc_logger)
self.sc_broker = sc_broker.new(self.sc_logger)
self.sc_params = sc_params.new(self.sc_common, self.sc_logger)
-- checking mandatory parameters and setting a fail flag
if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then
self.fail = true
end
-- force buffer size to 1 to avoid breaking the communication with zenduty (can't send more than one event at once)
params.max_buffer_size = 1
-- overriding default parameters for this stream connector if the default values doesn't suit the basic needs
self.sc_params.params.centreon_url = params.centreon_url or "http://set.centreon_url.parameter"
self.sc_params.params.http_server_url = self.sc_params.params.zenduty_integration_url
self.sc_params.params.client = params.client or "Centreon Stream Connector"
self.sc_params.params.accepted_categories = params.accepted_categories or "neb"
self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status"
self.sc_params.params.zenduty_source = params.zenduty_source or nil
-- apply users params and check syntax of standard ones
self.sc_params:param_override(params)
self.sc_params:check_params()
self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger)
self.format_template = self.sc_params:load_event_format_file(true)
-- only load the custom code file, not executed yet
if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then
self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file))
end
self.sc_params:build_accepted_elements_info()
self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger)
local categories = self.sc_params.params.bbdo.categories
local elements = self.sc_params.params.bbdo.elements
self.format_event = {
[categories.neb.id] = {
[elements.host_status.id] = function () return self:format_event_host() end,
[elements.service_status.id] = function () return self:format_event_service() end
},
[categories.bam.id] = {}
}
self.send_data_method = {
[1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end
}
self.build_payload_method = {
[1] = function (payload, event) return self:build_payload(payload, event) end
}
self.state_to_severity_mapping = {
[0] = {
severity = "info",
action = "resolve"
},
[1] = {
severity = "warning",
action = "trigger"
},
[2] = {
severity = "critical",
action = "trigger"
},
[3] = {
severity = "error",
action = "trigger"
}
}
-- return EventQueue object
setmetatable(self, { __index = EventQueue })
return self
end
--------------------------------------------------------------------------------
---- EventQueue:format_event method
----------------------------------------------------------------------------------
function EventQueue:format_accepted_event()
local category = self.sc_event.event.category
local element = self.sc_event.event.element
local template = self.sc_params.params.format_template[category][element]
self.sc_logger:debug("[EventQueue:format_event]: starting format event")
self.sc_event.event.formated_event = {}
if self.format_template and template ~= nil and template ~= "" then
self.sc_event.event.formated_event = self.sc_macros:replace_sc_macro(template, self.sc_event.event, true)
else
-- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file
if not self.format_event[category][element] then
self.sc_logger:error("[format_event]: You are trying to format an event with category: "
.. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: "
.. tostring(self.sc_params.params.reverse_element_mapping[category][element])
.. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element")
else
self.format_event[category][element]()
end
end
self:add()
self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished")
end
function EventQueue:format_event_host()
local event = self.sc_event.event
local zenduty_custom_details = {}
-- handle hostgroup
local hostgroups = self.sc_broker:get_hostgroups(event.host_id)
local zenduty_hostgroups = ""
-- retrieve hostgroups and store them in zenduty_custom_details["Hostgroups"]
if not hostgroups then
zenduty_hostgroups = "empty host group"
else
for index, hg_data in ipairs(hostgroups) do
if zenduty_hostgroups ~= "" then
zenduty_hostgroups = zenduty_hostgroups .. ", " .. hg_data.group_name
else
zenduty_hostgroups = hg_data.group_name
end
end
zenduty_custom_details["Hostgroups"] = zenduty_hostgroups
end
-- handle severity
local host_severity = self.sc_broker:get_severity(event.host_id)
if host_severity then
zenduty_custom_details['Hostseverity'] = host_severity
end
zenduty_custom_details["Output"] = self.sc_common:ifnil_or_empty(event.output, "no output")
self.sc_event.event.formated_event = {
payload = {
summary = tostring(event.cache.host.name) .. ": " .. self.sc_params.params.status_mapping[event.category][event.element][event.state],
timestamp = new_from_timestamp(event.last_update):rfc_3339(),
severity = self.state_to_severity_mapping[event.state].severity,
source = self.sc_params.params.zenduty_source or tostring(event.cache.host.name),
component = tostring(event.cache.host.name),
group = zenduty_hostgroups,
class = "host",
custom_details = zenduty_custom_details,
},
event_action = self.state_to_severity_mapping[event.state].action,
entity_id = event.host_id .. "_H",
client = self.sc_params.params.client,
client_url = self.sc_params.params.client_url,
--following api integration payload format
message = tostring(event.cache.host.name) .. "/" .. tostring(event.cache.service.description) .. ": " .. self.sc_params.params.status_mapping[event.category][event.element][event.state],
summary = zenduty_custom_details["Output"],
alert_type = self.state_to_severity_mapping[event.state].severity,
links = {
{
-- should think about using the new resources page but keep it as is for compatibility reasons
link_url = self.sc_params.params.centreon_url .. "/centreon/monitoring/resources",
link_text = "Link to Centreon host summary"
}
}
}
end
function EventQueue:format_event_service()
local event = self.sc_event.event
local zenduty_custom_details = {}
-- handle hostgroup
local hostgroups = self.sc_broker:get_hostgroups(event.host_id)
local zenduty_hostgroups = ""
-- retrieve hostgroups and store them in zenduty_custom_details["Hostgroups"]
if not hostgroups then
zenduty_hostgroups = "empty host group"
else
for index, hg_data in ipairs(hostgroups) do
if zenduty_hostgroups ~= "" then
zenduty_hostgroups = zenduty_hostgroups .. ", " .. hg_data.group_name
else
zenduty_hostgroups = hg_data.group_name
end
end
zenduty_custom_details["Hostgroups"] = zenduty_hostgroups
end
-- handle servicegroups
local servicegroups = self.sc_broker:get_servicegroups(event.host_id, event.service_id)
local zenduty_servicegroups = ""
-- retrieve servicegroups and store them in zenduty_custom_details["Servicegroups"]
if not servicegroups then
zenduty_servicegroups = "empty service group"
else
for index, sg_data in ipairs(servicegroups) do
if zenduty_servicegroups ~= "" then
zenduty_servicegroups = zenduty_servicegroups .. ", " .. sg_data.group_name
else
zenduty_servicegroups = sg_data.group_name
end
end
zenduty_custom_details["Servicegroups"] = zenduty_servicegroups
end
-- handle host severity
local host_severity = self.sc_broker:get_severity(event.host_id)
if host_severity then
zenduty_custom_details["Hostseverity"] = host_severity
end
-- handle service severity
local service_severity = self.sc_broker:get_severity(event.host_id, event.service_id)
if service_severity then
zenduty_custom_details["Serviceseverity"] = service_severity
end
zenduty_custom_details["Output"] = self.sc_common:ifnil_or_empty(event.output, "no output")
self.sc_event.event.formated_event = {
payload = {
summary = tostring(event.cache.host.name) .. "/" .. tostring(event.cache.service.description) .. ": " .. self.sc_params.params.status_mapping[event.category][event.element][event.state],
timestamp = new_from_timestamp(event.last_update):rfc_3339(),
severity = self.state_to_severity_mapping[event.state].severity,
source = self.sc_params.params.zenduty_source or tostring(event.cache.host.name),
component = tostring(event.cache.service.description),
group = zenduty_hostgroups,
class = "service",
custom_details = zenduty_custom_details,
},
event_action = self.state_to_severity_mapping[event.state].action,
entity_id = event.host_id .. "_" .. event.service_id,
client = self.sc_params.params.client,
client_url = self.sc_params.params.client_url,
--following api integration payload format
message = tostring(event.cache.host.name) .. "/" .. tostring(event.cache.service.description) .. ": " .. self.sc_params.params.status_mapping[event.category][event.element][event.state],
summary = zenduty_custom_details["Output"],
alert_type = self.state_to_severity_mapping[event.state].severity,
links = {
{
-- should think about using the new resources page but keep it as is for compatibility reasons
link_url = self.sc_params.params.centreon_url .. "/centreon/monitoring/resources",
link_text = "Link to Centreon host summary"
}
}
}
end
--------------------------------------------------------------------------------
-- EventQueue:add, add an event to the sending queue
--------------------------------------------------------------------------------
function EventQueue:add()
-- store event in self.events lists
local category = self.sc_event.event.category
local element = self.sc_event.event.element
self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category])
.. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element]))
self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events))
self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event
self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events)
.. ", max is: " .. tostring(self.sc_params.params.max_buffer_size))
end
--------------------------------------------------------------------------------
-- EventQueue:build_payload, concatenate data so it is ready to be sent
-- @param payload {string} json encoded string
-- @param event {table} the event that is going to be added to the payload
-- @return payload {string} json encoded string
--------------------------------------------------------------------------------
function EventQueue:build_payload(payload, event)
if not payload then
payload = broker.json_encode(event)
else
payload = payload .. broker.json_encode(event)
end
return payload
end
function EventQueue:send_data(payload, queue_metadata)
self.sc_logger:debug("[EventQueue:send_data]: Starting to send data")
local url = self.sc_params.params.http_server_url
queue_metadata.headers = {
"content-type: application/json",
"content-length:" .. string.len(payload),
}
self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload)
-- write payload in the logfile for test purpose
if self.sc_params.params.send_data_test == 1 then
self.sc_logger:notice("[send_data]: " .. tostring(payload))
return true
end
self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload))
self.sc_logger:info("[EventQueue:send_data]: zenduty address is: " .. tostring(url))
local http_response_body = ""
local http_request = curl.easy()
:setopt_url(url)
:setopt_writefunction(
function (response)
http_response_body = http_response_body .. tostring(response)
end
)
:setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout)
:setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection)
:setopt(curl.OPT_HTTPHEADER, queue_metadata.headers)
-- set proxy address configuration
if (self.sc_params.params.proxy_address ~= '') then
if (self.sc_params.params.proxy_port ~= '') then
http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port)
else
self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used")
end
end
-- set proxy user configuration
if (self.sc_params.params.proxy_username ~= '') then
if (self.sc_params.params.proxy_password ~= '') then
http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password)
else
self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used")
end
end
-- adding the HTTP POST data
http_request:setopt_postfields(payload)
-- performing the HTTP request
http_request:perform()
-- collecting results
http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE)
http_request:close()
-- Handling the return code
local retval = false
if http_response_code == 200 or http_response_code == 400 or http_response_code == 201 then
self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code))
retval = true
else
self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body))
end
return retval
end
--------------------------------------------------------------------------------
-- Required functions for Broker StreamConnector
--------------------------------------------------------------------------------
local queue
-- Fonction init()
function init(conf)
queue = EventQueue.new(conf)
end
-- --------------------------------------------------------------------------------
-- write,
-- @param {table} event, the event from broker
-- @return {boolean}
--------------------------------------------------------------------------------
function write (event)
-- skip event if a mandatory parameter is missing
if queue.fail then
queue.sc_logger:error("Skipping event because a mandatory parameter is not set")
return false
end
-- initiate event object
queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker)
if queue.sc_event:is_valid_category() then
if queue.sc_event:is_valid_element() then
-- format event if it is validated
if queue.sc_event:is_valid_event() then
queue:format_accepted_event()
end
--- log why the event has been dropped
else
queue.sc_logger:debug("dropping event because element is not valid. Event element is: "
.. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element]))
end
else
queue.sc_logger:debug("dropping event because category is not valid. Event category is: "
.. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category]))
end
return flush()
end
-- flush method is called by broker every now and then (more often when broker has nothing else to do)
function flush()
local queues_size = queue.sc_flush:get_queues_size()
-- nothing to flush
if queues_size == 0 then
return true
end
-- flush all queues because last global flush is too old
if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then
if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then
return false
end
return true
end
-- flush queues because too many events are stored in them
if queues_size > queue.sc_params.params.max_buffer_size then
if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then
return false
end
return true
end
-- there are events in the queue but they were not ready to be send
return false
end