Skip to content

Commit

Permalink
Rework aggregate counters logic
Browse files Browse the repository at this point in the history
Before the aggregated counters were stored in active and its value was
updated without taking into account the archived value.

Now the active table only stores the workers counter indexed by
aggregated counter name.  There is a new table for aggregated counters.
A fiber calculates the accumulative sum of all workers counter than
belong to an aggregated counter and sum the archived value to the
aggregated counter if any.
  • Loading branch information
dpino committed Jun 1, 2018
1 parent c7388a2 commit 5e7793a
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions src/lib/ptree/ptree.lua
Original file line number Diff line number Diff line change
Expand Up @@ -301,40 +301,45 @@ function Manager:compute_scheduling_for_worker(id, app_graph)
return ret
end

local counters = {active={}, archived={}}
local counters = {active={}, archived={}, aggregated={}}

local function is_counter (name)
return lib.basename(name):match("%.counter$")
end
local function read_counter (name)
if not counters.active[name] then counters.active[name] = counter.open(name) end
return counter.read(counters.active[name])
end
local function update_counter (name, val)
local c = assert(counters.active[name])
counter.add(c, val)
end
local function create_counter (name, val)
counters.active[name] = counter.create(name, val)
-- Creates an aggregated counter and saves worker counter in active table.
-- Active table is indexed by aggregated counter name.
local function create_counter (worker_pid, name)
local k = name:gsub(worker_pid, S.getpid())
if not counters.aggregated[k] then
counters.aggregated[k] = counter.create(k)
end
if not counters.active[k] then counters.active[k] = {} end
if not counters.active[k][name] then
counters.active[k][name] = counter.open(name)
end
end
-- Removes a worker counter from the active table and stores its value to
-- archived. Archived table is indexed by aggregated counter name.
local function archive_counter (name)
local c = assert(counters.active[name])
local k = name:gsub(worker_pid, S.getpid())
local c = assert(counters.active[k] and counters.active[k][name])
local val = counter.read(c)
counter.delete(name)
counters.active[name] = nil
if not counters.archived[name] then
counters.archived[name] = ffi.new("uint64_t[1]")
counters.active[k][name] = nil
if not counters.archived[k] then
counters.archived[k] = ffi.new("uint64_t[1]")
end
counters.archived[name][0] = counters.archived[name][0] + val
counters.archived[k][0] = counters.archived[k][0] + val
end
local function create_or_update_counter (worker_pid, name)
local aggregated = name:gsub(worker_pid, S.getpid())
local val = read_counter(name)
if not counters.active[aggregated] then
create_counter(aggregated, val)
else
update_counter(aggregated, val)
-- For all workers counter that belong to the same aggregated name, computed
-- accumulative sum. Accumulative sum is initialized to aggregated counter
-- value if any.
local function compute_aggregated_value (k)
local ret = counters.archived[k] or 0
for _,c in pairs(counters.active[k]) do
ret = ret + counter.read(c)
end
return ret
end

function Manager:start_worker_for_graph(id, graph)
Expand All @@ -357,12 +362,12 @@ function Manager:start_worker_for_graph(id, graph)
local rx = inotify.recursive_directory_inventory_events(dir, worker.cancel:wait_operation())
for event in rx.get, rx do
if is_counter(event.name) then
local cname = event.name:gsub(shm.root, '')
local name = event.name:gsub(shm.root, '')
if event.kind == 'creat' then
create_or_update_counter(worker.pid, cname)
create_counter(worker.pid, name)
elseif event.kind == 'rm' then
-- Move counter from active to archive.
archive_counter(cname)
archive_counter(name)
end
end
end
Expand All @@ -371,11 +376,8 @@ function Manager:start_worker_for_graph(id, graph)
-- Update aggregated counters.
fiber.spawn(function ()
while true do
for cname, c in pairs(counters.active) do
local pid = tonumber(cname:match("/(%d+)"))
if pid ~= S.getpid() then
create_or_update_counter(pid, cname)
end
for k,c in pairs(counters.aggregated) do
counter.set(c, compute_aggregated_value(k))
end
counter.commit()
fiber_sleep(1)
Expand Down

0 comments on commit 5e7793a

Please sign in to comment.