Skip to content

Commit

Permalink
Merge PR #65 (Timeline) into vita-next
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Mar 8, 2019
2 parents e4edfc9 + c7a2106 commit a72c364
Show file tree
Hide file tree
Showing 18 changed files with 760 additions and 51 deletions.
12 changes: 11 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ RMSRC = $(shell find $(INCLUDE) -name '*.md' -not -regex './obj.*' -printf '%p
PROGRAM = $(shell find program -regex '^[^/]+/[^/]+' -type d -printf '%p ')
# sort to eliminate potential duplicate of programs.inc
INCSRC = $(sort $(shell find $(INCLUDE) -regex '[^\#]*\.inc' -printf '%p ') programs.inc)
EVTSRC= $(shell find $(INCLUDE) -regex '[^\#]*\.events' -printf '%p ')
YANGSRC= $(shell find $(INCLUDE) -regex '[^\#]*\.yang' -printf '%p ')

LUAOBJ := $(patsubst %.lua,obj/%_lua.o,$(LUASRC))
Expand All @@ -40,6 +41,7 @@ JITOBJS:= $(patsubst %,obj/jit_%.o,$(JITSRC))
EXTRAOBJS := obj/jit_tprof.o obj/jit_vmprof.o obj/strict.o
RMOBJS := $(patsubst %,obj/%,$(RMSRC))
INCOBJ := $(patsubst %.inc,obj/%_inc.o, $(INCSRC))
EVTOBJ := $(patsubst %.events,obj/%_events.o, $(EVTSRC))
YANGOBJ:= $(patsubst %.yang,obj/%_yang.o, $(YANGSRC))
EXE := bin/snabb $(patsubst %,bin/%,$(PROGRAM))

Expand All @@ -59,7 +61,7 @@ TESTSCRIPTS = $(shell find $(INCLUDE) -name "selftest.*" -executable | xargs)

PATH := ../lib/luajit/src:$(PATH)

snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ARCHOBJ) $(ASMOBJ) $(PFLUAASMOBJ) $(INCOBJ) $(YANGOBJ) $(LUAJIT_A)
snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ARCHOBJ) $(ASMOBJ) $(PFLUAASMOBJ) $(INCOBJ) $(EVTOBJ) $(YANGOBJ) $(LUAJIT_A)
$(E) "GEN obj/version.lua.gen"
$(Q) ../generate-version-lua.sh > obj/version.lua.gen
$(E) "LUA obj/version.lua"
Expand All @@ -83,6 +85,7 @@ $(EXE): snabb bin
$(Q) upx -f --brute -o$@ snabb
@echo -n "BINARY "
@ls -sh $@

markdown: $(RMOBJS)

test: $(TESTMODS) $(TESTSCRIPTS)
Expand Down Expand Up @@ -182,6 +185,13 @@ $(INCOBJ): obj/%_inc.o: %.inc Makefile | $(OBJDIR)
echo "]=============]") > $(basename $@).luainc
$(Q) raptorjit -bg -n $(subst /,.,$*)_inc $(basename $@).luainc $@

$(EVTOBJ): obj/%_events.o: %.events Makefile | $(OBJDIR)
$(E) "EVENTS $@"
@(echo -n "return [=============["; \
cat $<; \
echo "]=============]") > $(basename $@).luainc
$(Q) raptorjit -bg -n $(subst /,.,$*)_events $(basename $@).luainc $@

$(YANGOBJ): obj/%_yang.o: %.yang Makefile | $(OBJDIR)
$(E) "YANG $@"
@(echo -n "return [=============["; \
Expand Down
25 changes: 25 additions & 0 deletions src/core/app.events
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
1,9|started:
The app has been started. (Returned from new() callback.)

1,9|linked:
The app has been linked. (Returned from link() callback.)

1,9|reconfigured:
The app has been reconfigured. (Returned from reconfig() callback.)

1,9|stopped:
The app has been stopped. (Returned from stop() callback.)


3,3|pull: inpackets inbytes outpackets outbytes droppackets dropbytes
Entering app pull() callback.

3,3|pulled: inpackets inbytes outpackets outbytes droppackets dropbytes
Returned from app pull() callback.


3,3|push: inpackets inbytes outpackets outbytes droppackets dropbytes
Entering app push() callback.

3,3|pushed: inpackets inbytes outpackets outbytes droppackets dropbytes
Returned from app push() callback.
149 changes: 122 additions & 27 deletions src/core/app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

module(...,package.seeall)

local packet = require("core.packet")
local lib = require("core.lib")
local link = require("core.link")
local config = require("core.config")
local timer = require("core.timer")
local shm = require("core.shm")
local histogram = require('core.histogram')
local counter = require("core.counter")
local jit = require("jit")
local S = require("syscall")
local ffi = require("ffi")
local C = ffi.C
local S = require("syscall")
local packet = require("core.packet")
local lib = require("core.lib")
local link = require("core.link")
local config = require("core.config")
local timer = require("core.timer")
local shm = require("core.shm")
local histogram = require('core.histogram')
local counter = require("core.counter")
local timeline_mod = require("core.timeline") -- avoid collision with timeline
local jit = require("jit")
local S = require("syscall")
local ffi = require("ffi")
local C = ffi.C

require("core.packet_h")

-- Packet per pull
Expand All @@ -33,14 +34,48 @@ local named_program_root = shm.root .. "/" .. "by-name"
program_name = false

-- Auditlog state
auditlog_enabled = false
local auditlog_enabled = false
function enable_auditlog ()
jit.auditlog(shm.path("audit.log"))
auditlog_enabled = true
end

-- Timeline event log
local timeline_log, events -- initialized on demand
function timeline ()
if timeline_log == nil then
timeline_log = timeline_mod.new("engine/timeline")
timeline_mod.rate(timeline_log, 9) -- initially log events with rate >= 9
events = timeline_mod.load_events(timeline_log, "core.engine")
end
return timeline_log
end

function randomize_log_rate ()
-- Randomize the log rate. Enable each rate in 5x more breaths
-- than the rate below by randomly picking from log5() distribution.
-- Goal is ballpark 1000 messages per second (~15min for 1M entries.)
--
-- Could be better to reduce the log rate over time to "stretch"
-- logs for long running processes? Improvements possible :-).
--
-- We use rates 0-9 where 9 means "log always", and 0 means "log never."
local rate = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5)))
timeline_mod.rate(timeline_log, rate)
end

-- Breath latency histogram
local latency -- initialized on demand
function enable_latency_histogram ()
if latency == nil then
latency = histogram.create('engine/latency.histogram', 1e-6, 1e0)
end
end

-- The set of all active apps and links in the system, indexed by name.
app_table, link_table = {}, {}
-- Timeline events specific to app instances
app_events = setmetatable({}, { __mode = 'k' })

configuration = config.new()

Expand Down Expand Up @@ -311,6 +346,7 @@ function compute_config_actions (old, new)
end
end

events.config_actions_computed()
return actions
end

Expand All @@ -336,42 +372,44 @@ function apply_config_actions (actions)
local link = app.output[linkname]
app.output[linkname] = nil
remove_link_from_array(app.output, link)
if app.link then app:link() end
if app.link then app:link() app_events[app].linked() end
end
function ops.unlink_input (appname, linkname)
local app = app_table[appname]
local link = app.input[linkname]
app.input[linkname] = nil
remove_link_from_array(app.input, link)
if app.link then app:link() end
if app.link then app:link() app_events[app].linked() end
end
function ops.free_link (linkspec)
link.free(link_table[linkspec], linkspec)
link_table[linkspec] = nil
configuration.links[linkspec] = nil
end
function ops.new_link (linkspec)
link_table[linkspec] = link.new(linkspec)
local link = link.new(linkspec)
link_table[linkspec] = link
configuration.links[linkspec] = true
end
function ops.link_output (appname, linkname, linkspec)
local app = app_table[appname]
local link = assert(link_table[linkspec])
app.output[linkname] = link
table.insert(app.output, link)
if app.link then app:link() end
if app.link then app:link() app_events[app].linked() end
end
function ops.link_input (appname, linkname, linkspec)
local app = app_table[appname]
local link = assert(link_table[linkspec])
app.input[linkname] = link
table.insert(app.input, link)
if app.link then app:link() end
if app.link then app:link() app_events[app].linked() end
end
function ops.stop_app (name)
local app = app_table[name]
if app.stop then app:stop() end
if app.stop then app:stop() app_events[app].stopped() end
if app.shm then shm.delete_frame(app.shm) end
app_events[app] = nil
app_table[name] = nil
configuration.apps[name] = nil
end
Expand All @@ -382,6 +420,8 @@ function apply_config_actions (actions)
name, tostring(app)))
end
local zone = app.zone or (type(class.name) == 'string' and class.name) or getfenv(class.new)._NAME or name
app_events[app] =
timeline_mod.load_events(timeline(), "core.app", {app=name})
app.appname = name
app.output = {}
app.input = {}
Expand All @@ -392,21 +432,25 @@ function apply_config_actions (actions)
app.shm = shm.create_frame("apps/"..name, app.shm)
end
configuration.apps[name] = { class = class, arg = arg }
app_events[app].started()
end
function ops.reconfig_app (name, class, arg)
local app = app_table[name]
app:reconfig(arg)
app:reconfig(arg) app_events[app].reconfigured()
configuration.apps[name].arg = arg
end

events.configure(counter.read(configs) + 1)
-- Dispatch actions.
for _, action in ipairs(actions) do
local name, args = unpack(action)
if log then io.write("engine: ", name, " ", args[1], "\n") end
assert(ops[name], name)(unpack(args))
end
events.config_applied()

compute_breathe_order ()
events.breathe_order_computed()
end

-- Sort the NODES topologically according to SUCCESSORS via
Expand Down Expand Up @@ -510,23 +554,30 @@ function main (options)
enable_auditlog()
end

-- Setup vmprofile
setvmprofile("engine")
-- Ensure timeline is created and initialized
timeline()

-- Enable latency histogram unless explicitly disabled
local breathe = breathe
if options.measure_latency or options.measure_latency == nil then
local latency = histogram.create('engine/latency.histogram', 1e-6, 1e0)
enable_latency_histogram()
breathe = latency:wrap_thunk(breathe, now)
end

-- Setup vmprofile
setvmprofile("engine")

events.engine_started()
monotonic_now = C.get_monotonic_time()
repeat
breathe()
if not no_timers then timer.run() end
if not no_timers then timer.run() events.polled_timers() end
if not busywait then pace_breathing() end
randomize_log_rate() -- roll random log rate
until done and done()
counter.commit()
if not options.no_report then report(options.report) end
events.engine_stopped()

-- Switch to catch-all profile
setvmprofile("program")
Expand All @@ -542,14 +593,18 @@ function pace_breathing ()
nextbreath = nextbreath or monotonic_now
local sleep = tonumber(nextbreath - monotonic_now)
if sleep > 1e-6 then
events.sleep_Hz(Hz, math.round(sleep*1e6))
C.usleep(sleep * 1e6)
monotonic_now = C.get_monotonic_time()
events.wakeup_from_sleep()
end
nextbreath = math.max(nextbreath + 1/Hz, monotonic_now)
else
if lastfrees == counter.read(frees) then
sleep = math.min(sleep + 1, maxsleep)
events.sleep_on_idle(sleep)
C.usleep(sleep)
events.wakeup_from_sleep()
else
sleep = math.floor(sleep/2)
end
Expand All @@ -560,33 +615,73 @@ function pace_breathing ()
end

function breathe ()
local freed_packets0 = counter.read(frees)
local freed_bytes0 = counter.read(freebytes)
events.breath_start(counter.read(breaths), freed_packets0, freed_bytes0,
counter.read(freebits))
running = true
monotonic_now = C.get_monotonic_time()
events.got_monotonic_time(C.get_time_ns())
-- Restart: restart dead apps
restart_dead_apps()
-- Inhale: pull work into the app network
for i = 1, #breathe_pull_order do
local app = breathe_pull_order[i]
if app.pull and not app.dead then
with_restart(app, app.pull)
if timeline_mod.rate(timeline_log) <= 3 then
app_events[app].pull(linkstats(app))
with_restart(app, app.pull)
app_events[app].pulled(linkstats(app))
else
with_restart(app, app.pull)
end
end
end
events.breath_pulled()
-- Exhale: push work out through the app network
for i = 1, #breathe_push_order do
local app = breathe_push_order[i]
if app.push and not app.dead then
with_restart(app, app.push)
if timeline_mod.rate(timeline_log) <= 3 then
app_events[app].push(linkstats(app))
with_restart(app, app.push)
app_events[app].pushed(linkstats(app))
else
with_restart(app, app.push)
end
end
end
events.breath_pushed()
local freed
local freed_packets = counter.read(frees) - freed_packets0
local freed_bytes = (counter.read(freebytes) - freed_bytes0)
local freed_bytes_per_packet = freed_bytes / math.max(tonumber(freed_packets), 1)
events.breath_end(counter.read(breaths), freed_packets, freed_bytes_per_packet)
counter.add(breaths)
-- Commit counters and rebalance freelists at a reasonable frequency
if counter.read(breaths) % 100 == 0 then
counter.commit()
events.commited_counters()
packet.rebalance_freelists()
end
running = false
end

function linkstats (app)
local inp, inb, outp, outb, dropp, dropb = 0, 0, 0, 0, 0, 0
for i = 1, #app.input do
inp = inp + tonumber(counter.read(app.input[i].stats.rxpackets))
inb = inb + tonumber(counter.read(app.input[i].stats.rxbytes))
end
for i = 1, #app.output do
outp = outp + tonumber(counter.read(app.output[i].stats.txpackets))
outb = outb + tonumber(counter.read(app.output[i].stats.txbytes))
dropp = dropp + tonumber(counter.read(app.output[i].stats.txdrop))
dropb = dropb + tonumber(counter.read(app.output[i].stats.txdropbytes))
end
return inp, inb, outp, outb, dropp, dropb
end

function report (options)
if not options or options.showload then
report_load()
Expand Down
Loading

0 comments on commit a72c364

Please sign in to comment.