Skip to content

Commit

Permalink
Merge pull request snabbco#1076 from dpino/aggregate-counters
Browse files Browse the repository at this point in the history
Aggregate counters
  • Loading branch information
dpino authored Jun 1, 2018
2 parents f310d4e + 5e1fced commit 4d9f344
Showing 1 changed file with 87 additions and 2 deletions.
89 changes: 87 additions & 2 deletions src/lib/ptree/ptree.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ local alarms = require("lib.yang.alarms")
local json = require("lib.ptree.json")
local queue = require('lib.fibers.queue')
local fiber_sleep = require('lib.fibers.sleep').sleep
local inotify = require("lib.ptree.inotify")
local counter = require("core.counter")
local cond = require("lib.fibers.cond")

local call_with_output_string = mem.call_with_output_string

Expand Down Expand Up @@ -254,6 +257,7 @@ function Manager:stop_worker(id)
self:enqueue_config_actions_for_worker(id, stop_actions)
self:send_messages_to_workers()
self.workers[id].shutting_down = true
self.workers[id].cancel:signal()
end

function Manager:remove_stale_workers()
Expand Down Expand Up @@ -297,17 +301,89 @@ function Manager:compute_scheduling_for_worker(id, app_graph)
return ret
end

local counters = {active={}, archived={}, aggregated={}}

local function is_counter (name)
return lib.basename(name):match("%.counter$")
end
-- Creates an aggregated counter and saves worker counter in active table.
-- Active table is indexed by aggregated counter name.
local function create_counter (worker_pid, name)
local k = name:gsub(worker_pid, S.getpid())
if not counters.aggregated[k] then
counters.aggregated[k] = counter.create(k)
end
if not counters.active[k] then counters.active[k] = {} end
if not counters.active[k][name] then
counters.active[k][name] = counter.open(name)
end
end
-- Removes a worker counter from the active table and stores its value to
-- archived. Archived table is indexed by aggregated counter name.
local function archive_counter (name)
local k = name:gsub(worker_pid, S.getpid())
local c = assert(counters.active[k] and counters.active[k][name])
local val = counter.read(c)
counter.delete(name)
counters.active[k][name] = nil
if not counters.archived[k] then
counters.archived[k] = ffi.new("uint64_t[1]")
end
counters.archived[k][0] = counters.archived[k][0] + val
end
-- For all workers counter that belong to the same aggregated name, computed
-- accumulative sum. Accumulative sum is initialized to aggregated counter
-- value if any.
local function compute_aggregated_value (k)
local ret = counters.archived[k][0] or 0
for _,c in pairs(counters.active[k]) do
ret = ret + counter.read(c)
end
return ret
end

function Manager:start_worker_for_graph(id, graph)
local scheduling = self:compute_scheduling_for_worker(id, graph)
self:info('Starting worker %s.', id)
self.workers[id] = { scheduling=scheduling,
pid=self:start_worker(scheduling),
queue={}, graph=graph }
queue={}, graph=graph,
cancel = cond.new() }
self:state_change_event('worker_starting', id)
self:debug('Worker %s has PID %s.', id, self.workers[id].pid)
local actions = self.support.compute_config_actions(
app_graph.new(), self.workers[id].graph, {}, 'load')
self:enqueue_config_actions_for_worker(id, actions)

-- Manage aggregated counters (creation and removal).
fiber.spawn(function ()
local worker = self.workers[id]
local dir = shm.root..'/'..worker.pid
local rx = inotify.recursive_directory_inventory_events(dir, worker.cancel:wait_operation())
for event in rx.get, rx do
if is_counter(event.name) then
local name = event.name:gsub(shm.root, '')
if event.kind == 'creat' then
create_counter(worker.pid, name)
elseif event.kind == 'rm' then
-- Move counter from active to archive.
archive_counter(name)
end
end
end
end)

-- Update aggregated counters.
fiber.spawn(function ()
while true do
for k,c in pairs(counters.aggregated) do
counter.set(c, compute_aggregated_value(k))
end
counter.commit()
fiber_sleep(1)
end
end)

return self.workers[id]
end

Expand Down Expand Up @@ -696,7 +772,16 @@ function Manager:handle_alarm (worker, alarm)
end

function Manager:stop ()
assert(self.sched:shutdown())
-- Call shutdown for 0.1s or until it returns true (all tasks cancelled).
local now = C.get_monotonic_time()
local threshold = now + 0.1
while now < threshold do
if self.sched:shutdown() then break end
end
if now >= threshold then
io.stderr:write("Warning: there are still tasks pending\n")
end

require('lib.fibers.file').uninstall_poll_io_handler()

for id, worker in pairs(self.workers) do
Expand Down

0 comments on commit 4d9f344

Please sign in to comment.