diff --git a/src/lib/ptree/ptree.lua b/src/lib/ptree/ptree.lua index ebc9cd4709..51ee8ac433 100644 --- a/src/lib/ptree/ptree.lua +++ b/src/lib/ptree/ptree.lua @@ -33,6 +33,9 @@ local alarms = require("lib.yang.alarms") local json = require("lib.ptree.json") local queue = require('lib.fibers.queue') local fiber_sleep = require('lib.fibers.sleep').sleep +local inotify = require("lib.ptree.inotify") +local counter = require("core.counter") +local cond = require("lib.fibers.cond") local call_with_output_string = mem.call_with_output_string @@ -254,6 +257,7 @@ function Manager:stop_worker(id) self:enqueue_config_actions_for_worker(id, stop_actions) self:send_messages_to_workers() self.workers[id].shutting_down = true + self.workers[id].cancel:signal() end function Manager:remove_stale_workers() @@ -297,17 +301,89 @@ function Manager:compute_scheduling_for_worker(id, app_graph) return ret end +local counters = {active={}, archived={}, aggregated={}} + +local function is_counter (name) + return lib.basename(name):match("%.counter$") +end +-- Creates an aggregated counter and saves worker counter in active table. +-- Active table is indexed by aggregated counter name. +local function create_counter (worker_pid, name) + local k = name:gsub(worker_pid, S.getpid()) + if not counters.aggregated[k] then + counters.aggregated[k] = counter.create(k) + end + if not counters.active[k] then counters.active[k] = {} end + if not counters.active[k][name] then + counters.active[k][name] = counter.open(name) + end +end +-- Removes a worker counter from the active table and stores its value to +-- archived. Archived table is indexed by aggregated counter name. +local function archive_counter (name) + local k = name:gsub(worker_pid, S.getpid()) + local c = assert(counters.active[k] and counters.active[k][name]) + local val = counter.read(c) + counter.delete(name) + counters.active[k][name] = nil + if not counters.archived[k] then + counters.archived[k] = ffi.new("uint64_t[1]") + end + counters.archived[k][0] = counters.archived[k][0] + val +end +-- For all workers counter that belong to the same aggregated name, computed +-- accumulative sum. Accumulative sum is initialized to aggregated counter +-- value if any. +local function compute_aggregated_value (k) + local ret = counters.archived[k][0] or 0 + for _,c in pairs(counters.active[k]) do + ret = ret + counter.read(c) + end + return ret +end + function Manager:start_worker_for_graph(id, graph) local scheduling = self:compute_scheduling_for_worker(id, graph) self:info('Starting worker %s.', id) self.workers[id] = { scheduling=scheduling, pid=self:start_worker(scheduling), - queue={}, graph=graph } + queue={}, graph=graph, + cancel = cond.new() } self:state_change_event('worker_starting', id) self:debug('Worker %s has PID %s.', id, self.workers[id].pid) local actions = self.support.compute_config_actions( app_graph.new(), self.workers[id].graph, {}, 'load') self:enqueue_config_actions_for_worker(id, actions) + + -- Manage aggregated counters (creation and removal). + fiber.spawn(function () + local worker = self.workers[id] + local dir = shm.root..'/'..worker.pid + local rx = inotify.recursive_directory_inventory_events(dir, worker.cancel:wait_operation()) + for event in rx.get, rx do + if is_counter(event.name) then + local name = event.name:gsub(shm.root, '') + if event.kind == 'creat' then + create_counter(worker.pid, name) + elseif event.kind == 'rm' then + -- Move counter from active to archive. + archive_counter(name) + end + end + end + end) + + -- Update aggregated counters. + fiber.spawn(function () + while true do + for k,c in pairs(counters.aggregated) do + counter.set(c, compute_aggregated_value(k)) + end + counter.commit() + fiber_sleep(1) + end + end) + return self.workers[id] end @@ -696,7 +772,16 @@ function Manager:handle_alarm (worker, alarm) end function Manager:stop () - assert(self.sched:shutdown()) + -- Call shutdown for 0.1s or until it returns true (all tasks cancelled). + local now = C.get_monotonic_time() + local threshold = now + 0.1 + while now < threshold do + if self.sched:shutdown() then break end + end + if now >= threshold then + io.stderr:write("Warning: there are still tasks pending\n") + end + require('lib.fibers.file').uninstall_poll_io_handler() for id, worker in pairs(self.workers) do