From 3e4c4474ed614cf91352eb606df3aa5e659e26f6 Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Wed, 8 Feb 2017 14:42:23 +0000 Subject: [PATCH 01/27] dasm_x86.lua: Add support for RDTSCP instruction This is a very useful instruction for self-benchmarking programs that want to read the CPU timestamp counter efficiently. See Intel whitepaper for details: http://www.intel.com/content/dam/www/public/us/en/documents/white-papers/ia-32-ia-64-benchmark-code-execution-paper.pdf --- src/dasm_x86.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dasm_x86.lua b/src/dasm_x86.lua index 0c11f020ec..ef4442c89b 100644 --- a/src/dasm_x86.lua +++ b/src/dasm_x86.lua @@ -1230,6 +1230,7 @@ local map_op = { shrd_3 = "mriqdw:0FACRmU|mrC/qq:0FADRm|mrC/dd:|mrC/ww:", rdtsc_0 = "0F31", -- P1+ + rdtscp_0 = "0F01F9",-- P6+ rdpmc_0 = "0F33", -- P6+ cpuid_0 = "0FA2", -- P1+ From 95872b1653c2e58a01389486585edd5662c2e112 Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Wed, 8 Feb 2017 14:45:10 +0000 Subject: [PATCH 02/27] core.timeline: New module for "timeline" logs --- src/core/main.lua | 3 +- src/core/timeline.dasl | 310 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 src/core/timeline.dasl diff --git a/src/core/main.lua b/src/core/main.lua index 611f167152..b1e46060c1 100644 --- a/src/core/main.lua +++ b/src/core/main.lua @@ -28,7 +28,7 @@ end -- Reserve names that we want to use for global module. -- (This way we avoid errors from the 'strict' module.) -_G.config, _G.engine, _G.memory, _G.link, _G.packet, _G.timer, +_G.config, _G.engine, _G.memory, _G.link, _G.packet, _G.timer, _G.timeline, _G.main = nil ffi.cdef[[ @@ -129,6 +129,7 @@ function initialize () _G.link = require("core.link") _G.packet = require("core.packet") _G.timer = require("core.timer") + _G.timeline = require("core.timeline") _G.main = getfenv() end diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl new file mode 100644 index 0000000000..b3a29fdbba --- /dev/null +++ b/src/core/timeline.dasl @@ -0,0 +1,310 @@ +-- timeline: high-resolution event log using in-memory ring buffer +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(..., package.seeall) + +local dasm = require("dasm") +local ffi = require("ffi") +local C = ffi.C +local S = require("syscall") +local shm = require("core.shm") +local lib = require("core.lib") + +-- Load a set of events for logging onto a timeline. +-- Returns a set of logging functions. +-- +-- For example: +-- e = load_events(engine.timeline, "core.app", {name="myapp", class="intel10g"}) +-- Loads the events defined in src/core/app.events and tags each event +-- with the name of the app and class. Events can then be logged: +-- e:app_pulled(inpackets, inbytes, outpackets, outbytes) +function load_events (tl, eventmodule, extra) + local category = eventmodule:match("[^.]+$") -- "core.engine" -> "engine" + -- Convert extra into " key1=value1 key2=value2 ..." attributes string. + local spec = require(eventmodule.."_events") + return load_events_from_string(tl, spec, category, extra) +end + +-- (Helper function) +function load_events_from_string (tl, spec, category, extra) + local events = {} + -- Insert a delimiter character (\a "alarm") between log messages. + spec = spec:gsub("\n(%d|)", "\n\a%1") + for message in spec:gmatch("[^\a]+") do + message = message:gsub("(.-)%s*$", "%1") -- trim trailing spaces + local event = message:match("([%w_]+):") + events[event] = mkevent(tl, category, message, extra) + end + -- Return the set of functions in an efficient-to-call FFI object. + local mt = {__index = events} + return ffi.new(ffi.metatype(ffi.typeof("struct{}"), mt)) +end + +------------------------------------------------------------ +-- Binary data structures + +ffi.cdef[[ + // 64B file header + struct timeline_header { + uint64_t magic; + uint16_t major_version; + uint16_t minor_version; + uint32_t log_bytes; + uint32_t strings_bytes; + uint16_t priority_mask; + uint8_t reserved[42]; + }; + + // 64B log entry + struct timeline_entry { + uint64_t tsc; // CPU timestamp (note: assumed to be first elem below) + uint16_t msgid; // msgid*16 is index into string table + uint16_t core_numa; // TSC_AUX: core (bits 0-7) + numa (12-15) + uint32_t reserved; // (available for future use) + uint64_t arg0, arg1, arg2, arg3, arg4, arg5; // message arguments + }; + + // Private local state for updating the log + struct timeline_state { + // state for the entries ring buffer + struct timeline_entry *entries; + uint32_t level; + uint32_t next_entry; + uint32_t num_entries; + // state for the string table + char *stringtable; + int stringtable_size; + int next_string; + }; +]] + +-- Header of the log file +local magic = 0xa3ff7223441d0001ULL +local major, minor = 2, 1 + +------------------------------------------------------------ +-- API + +-- Create a new timeline under the given shared memory path. +function new (shmpath, num_entries, size_stringtable) + num_entries = num_entries or 1e6 + size_stringtable = size_stringtable or 1e6 + -- Calculate size based on number of log entries + local size_header = ffi.sizeof("struct timeline_header") + local size_entries = num_entries * ffi.sizeof("struct timeline_entry") + local size = size_header + size_entries + size_stringtable + -- Allocate one shm object with memory for all data structures + local memory = shm.create(shmpath, ffi.typeof("char["..size.."]")) + local header = ffi.cast("struct timeline_header *", memory) + local ring = ffi.cast("struct timeline_entry *", memory + size_header) + local stringtable = ffi.cast("char*", memory + size_header + size_entries) + -- Fill in header values + header.magic = 0xa3ff7223441d0001ULL + header.major_version = 2 + header.minor_version = 0 + header.log_bytes = size_entries + header.strings_bytes = size_stringtable + -- Private state + local state = ffi.new("struct timeline_state") + state.entries = ring + state.level = 0 + state.next_entry = 0 + state.num_entries = num_entries + state.stringtable = stringtable + state.stringtable_size = size_stringtable + state.next_string = 0 + -- Return an object + return state +end + +function mkevent (timeline, category, message, attrs) + if not message:match("^%d|([^:]+):") then + error(("event syntax error: %q"):format(message)) + end + -- Extract the log level for the message + local level = tonumber(message:match("^(%d)|")) + -- Insert the category ("0|event:" -> "0|category.event:") + message = message:gsub("^(%d|)", "%1"..category..".") + -- Insert the additional attributes. + -- e.g. "1|foo: arg" with {a1="x",a2="y"} becomes "1|foo a1=x a2=y: arg" + for k,v in pairs(attrs or {}) do + message = message:gsub(":", (" %s=%s:"):format(k, v), 1) + end + -- Count the number of arguments. + -- (See http://stackoverflow.com/a/11158158/1523491) + local _, n = (message:match(":([^\n]*)")):gsub("[^%s]+","") + local id = intern(timeline, message) + local event = event -- move asm function into local scope + local log = timeline + if n==0 then return function () event(log,level,id,0,0,0,0,0,0) end end + if n==1 then return function (a) event(log,level,id,a,0,0,0,0,0) end end + if n==2 then return function (a,b) event(log,level,id,a,b,0,0,0,0) end end + if n==3 then return function (a,b,c) event(log,level,id,a,b,c,0,0,0) end end + if n==4 then return function (a,b,c,d) event(log,level,id,a,b,c,d,0,0) end end + if n==5 then return function (a,b,c,d,e) event(log,level,id,a,b,c,d,e,0) end end + if n==6 then return function (a,b,c,d,e,f) event(log,level,id,a,b,c,d,e,f) end end + error("illegal number of arguments: "..n) +end + +-- Get or set the current timeline log level. +function level (timeline, level) + if level then timeline.level = level end + return timeline.level +end + +------------------------------------------------------------ +-- Defining log message formats + +-- Intern a string in the timeline stringtable. +-- Return a unique ID (16-bit offset in 16-byte words) or 0xFFFF if +-- the table is full. + +-- Cache known strings in a weak table keyed on timeline object. +-- (Timeline object is an FFI struct that can't contain a Lua tables.) +local known = setmetatable({}, {__mode='k'}) + +function intern (timeline, str) + known[timeline] = known[timeline] or {} + if known[timeline][str] then + return known[timeline][str] + end + local len = #str+1 -- count null terminator + if timeline.next_string + len >= timeline.stringtable_size then + return 0xFFFF -- overflow + else + local position = timeline.next_string + ffi.copy(timeline.stringtable + position, str) + timeline.next_string = lib.align(position + len, 16) + local id = position/16 + assert(id == math.floor(id), "timeline string alignment error") + known[timeline][str] = id + return id + end +end + +------------------------------------------------------------ +-- Logging messages + +|.arch x64 +|.actionlist actions +|.globalnames globalnames + + +-- Registers holding function parameters for x86-64 calling convention. +|.define p0, rdi +|.define p1, rsi +|.define p2, rdx +|.define p3, rcx +|.define p4, r8 +|.define p5, r9 + +|.type log, struct timeline_state +|.type msg, struct timeline_entry +-- void log(timeline, level, msg, arg0, ..., arg5) +local function asmlog (Dst) + |->log: + -- Check that the enabled log level is >= the event log level + | mov eax, log:p0->level + | cmp p1, rax + | jge >1 + | ret + |1: + -- Load index to write into r11 + | mov r11d, log:p0->next_entry + -- Increment next index and check for wrap-around + | mov eax, r11d + | add eax, 1 + | xor ecx, ecx + | cmp eax, log:p0->num_entries + | cmove eax, ecx + | mov log:p0->next_entry, eax + -- Convert log entry number to pointer + | shl r11, 6 -- 64B element number -> byte index + | mov r10, log:p0->entries + | add r10, r11 + -- Log the arguments from register parameters + | mov msg:r10->msgid, dx -- 16-bit value from p2 + | mov msg:r10->arg0, p3 + | mov msg:r10->arg1, p4 + | mov msg:r10->arg2, p5 + -- Log the arguments from stack parameters + | mov rax, [rsp+16] + | mov msg:r10->arg3, rax + | mov rax, [rsp+24] + | mov msg:r10->arg4, rax + | mov rax, [rsp+32] + | mov msg:r10->arg5, rax + -- Log the timestamp and core/numa aux info + | rdtscp + | mov msg:r10->tsc, eax + | mov [r10+4], edx -- assumes tsc is first field of struct timeline_entry + | mov msg:r10->core_numa, cx + + | ret +end + +local Dst, globals = dasm.new(actions, nil, nil, 1 + #globalnames) +asmlog(Dst) +local mcode, size = Dst:build() +local entry = dasm.globals(globals, globalnames) + +event = ffi.cast("uint64_t(*)(struct timeline_state *, int, int, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t)", entry.log) + +_anchor = mcode + +--dasm.dump(mcode, size) + +local test_events = [[ +6|six: +event with level 6 (0 args) + +5|five: a b c +event with level 5 (3 args) + +4|four: a b c d e f +event with level 4 (6 args) + +3|three: +event with level 3 +]] + +-- selftest is designed mostly to check that timeline logging does not +-- crash the snabb process e.g. including overflow of the log entries +-- and the string table. it does not verify the contents of the log +-- messages. +function selftest () + print("selftest: timeline") + local tl = new("selftest/timeline") + local e = load_events_from_string(tl, test_events, "selftest", + {module="timeline", func="selftest"}) + level(tl, 4) -- won't log event three + + print("check logging individual messages") + -- First check that log entries are created + assert(tl.next_entry == 0) + e.six() assert(tl.next_entry == 1) + e.five(1, 2, 3) assert(tl.next_entry == 2) + e.four(1, 2, 3, 4, 5, 6) assert(tl.next_entry == 3) + e.three() assert(tl.next_entry == 3) -- skipped + + local n = tl.num_entries*10 + print("check wrap-around on "..lib.comma_value(n).." events") + for i = 1, n do + e.six() + e.five(1, 2, 3) + e.four(1, 2, 3, 4, 5, 6) + e.three() + end + -- overflow the string table + print("overflowing string table") + for i = 1, 1e5 do + mkevent(tl, "selftest", "9|dummy_event_definition:", {i=i}) + end + -- report median logging time + local sample = {} + for i = 1, 1000 do sample[i] = tl.entries[i].tsc - tl.entries[i-1].tsc end + table.sort(sample) + print("median time delta for sample:", tonumber(sample[1]).." cycles") + print("selftest: ok") +end + From 930f294177615fc0a7a012c66bcbd409d17a96fa Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Wed, 8 Feb 2017 14:46:15 +0000 Subject: [PATCH 03/27] Makefile: Embed "*.events" timeline specs --- src/Makefile | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 467b88e9eb..e89c3f15f0 100644 --- a/src/Makefile +++ b/src/Makefile @@ -22,6 +22,7 @@ RMSRC = $(shell find . -name '*.md' -not -regex './obj.*' -printf '%P ') PROGRAM = $(shell find program -regex '^[^/]+/[^/]+' -type d -printf '%P ') # sort to eliminate potential duplicate of programs.inc INCSRC = $(sort $(shell find . -regex '[^\#]*\.inc' -printf '%P ') programs.inc) +EVTSRC = $(sort $(shell find . -regex '[^\#]*\.events' -printf '%P ')) LUAOBJ := $(patsubst %.lua,obj/%_lua.o,$(LUASRC)) PFLUAOBJ := $(patsubst %.lua,obj/%_lua.o,$(PFLUASRC)) @@ -33,6 +34,7 @@ JITOBJS:= $(patsubst %,obj/jit_%.o,$(JITSRC)) EXTRAOBJS := obj/jit_tprof.o obj/jit_vmprof.o obj/strict.o RMOBJS := $(patsubst %,obj/%,$(RMSRC)) INCOBJ := $(patsubst %.inc,obj/%_inc.o, $(INCSRC)) +EVTOBJ := $(patsubst %.events,obj/%_events.o, $(EVTSRC)) EXE := bin/snabb $(patsubst %,bin/%,$(PROGRAM)) # TESTMODS expands to: @@ -49,7 +51,7 @@ TESTSCRIPTS = $(shell find . -name "selftest.sh" -executable | xargs) PATH := ../lib/luajit/usr/local/bin:$(PATH) -snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ARCHOBJ) $(ASMOBJ) $(INCOBJ) $(LUAJIT_A) +snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ARCHOBJ) $(ASMOBJ) $(INCOBJ) $(EVTOBJ) $(LUAJIT_A) $(E) "LINK $@" $(Q) $(CC) $(DEBUG) -Wl,--no-as-needed -Wl,-E -Werror -Wall -o $@ $^ \ ../lib/luajit/src/libluajit.a \ @@ -162,6 +164,13 @@ $(INCOBJ): obj/%_inc.o: %.inc Makefile | $(OBJDIR) echo "]=============]") > $(basename $@).luainc $(Q) luajit -bg -n $(subst /,.,$*)_inc $(basename $@).luainc $@ +$(EVTOBJ): obj/%_events.o: %.events Makefile | $(OBJDIR) + $(E) "EVENTS $@" + @(echo -n "return [=============["; \ + cat $<; \ + echo "]=============]") > $(basename $@).luainc + $(Q) luajit -bg -n $(subst /,.,$*)_events $(basename $@).luainc $@ + # Create list of programs that exist programs.inc: program @(for d in program/*/; do basename $$d; done) > $@ From aa9980a5b58383b9ff2e63fe848d9f95612c7bae Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Wed, 8 Feb 2017 14:47:14 +0000 Subject: [PATCH 04/27] engine: Add engine.events, app.events, link.events --- src/core/app.events | 11 ++++++++ src/core/engine.events | 60 ++++++++++++++++++++++++++++++++++++++++++ src/core/link.events | 10 +++++++ 3 files changed, 81 insertions(+) create mode 100644 src/core/app.events create mode 100644 src/core/engine.events create mode 100644 src/core/link.events diff --git a/src/core/app.events b/src/core/app.events new file mode 100644 index 0000000000..1d77fbe828 --- /dev/null +++ b/src/core/app.events @@ -0,0 +1,11 @@ +3|pull: inpackets inbytes outpackets outbytes droppackets dropbytes +Entering app pull() callback. + +3|pulled: inpackets inbytes outpackets outbytes droppackets dropbytes +Returned from app pull() callback. + +3|push: inpackets inbytes outpackets outbytes droppackets dropbytes +Entering app push() callback. + +3|pushed: inpackets inbytes outpackets outbytes droppackets dropbytes +Returned from app push() callback. diff --git a/src/core/engine.events b/src/core/engine.events new file mode 100644 index 0000000000..4c974a79a9 --- /dev/null +++ b/src/core/engine.events @@ -0,0 +1,60 @@ +4|sleep_Hz: usec Hz +The engine requests that the kernel suspend this process for a period of +microseconds in order to reduce CPU utilization and achieve a fixed +frequency of breaths per second (Hz). + +4|sleep_on_idle: usec +The engine requests that the kernel suspend this process for a period +of microseconds in order to reduce CPU utilization because idleness +has been detected (a breath in which no packets were processed.) + +4|wakeup_from_sleep: +The engine resumes operation after sleeping voluntarily. + + +6|engine_started: +The engine starts the traffic processing loop. + +6|engine_stopped: +The engine stops the traffic processing loop. + + +5|breath_start: breath totalpackets totalbytes totaletherbits +The engine starts an iteration of the packet-processing event loop (a +"breath".) + +The total count of packets, bytes, and bits (including layer-1 +ethernet overhead) that the engine has processed are included. These +can be used to track the rate of traffic. + +3|got_monotonic_time: unixnanos +The engine has completed initialization for the breath: synchronized +the current time and handled any pending error recovery. + +'unixnanos' is the current wall-clock time in nanoseconds since the epoc. +This can be used to synchronize the cycle timestamps with wall-clock time. + +4|breath_pulled: +The engine has "pulled" new packets into the event loop for processing. + +4|breath_pushed: +The engine has "pushed" packets one step through the processing network. + +5|breath_end: breath npackets bpp +The engine completes an iteration of the event loop (a "breath.") + +'packets' is the total number of packets processed during the breath. +'bpp' is the average number bytes per packet. + +Note: 'packets' is an internal measure of how many packets were +deallocated (freed) during processing. This does not necessarily +correspond directly to ingress or egress packets on a given interface. + + +4|polled_timers: + The engine polled its timers and executed any that were expired. + +4|commited_counters: + The engine commits the latest counter values to externally visible + shared memory. + diff --git a/src/core/link.events b/src/core/link.events new file mode 100644 index 0000000000..d6d097c483 --- /dev/null +++ b/src/core/link.events @@ -0,0 +1,10 @@ +2|packet_start: wirelength + Started capturing a packet from a link. + This event is followed by one or more app_pcap_data messages + containing (partial) payload and then app_pcap_end. +2|packet_data: a b c d e f + Payload excerpt from packet being captured. + Each argument is 8 bytes of sequential payload (48 bytes total.) +2|packet_end: wirelength + Finished capturing a packet from a link. The (partial) payload is + logged in the immediately preceeding packet_data events. From 8530942a5753cb13905c2a5c815982b60534a723 Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Wed, 8 Feb 2017 14:47:45 +0000 Subject: [PATCH 05/27] core.link: Add dropped bytes counter --- src/core/link.h | 2 +- src/core/link.lua | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/link.h b/src/core/link.h index add1d73bd1..255f6c4549 100644 --- a/src/core/link.h +++ b/src/core/link.h @@ -9,7 +9,7 @@ struct link { // http://en.wikipedia.org/wiki/Circular_buffer struct packet *packets[LINK_RING_SIZE]; struct { - struct counter *dtime, *txbytes, *rxbytes, *txpackets, *rxpackets, *txdrop; + struct counter *dtime, *txbytes, *rxbytes, *txpackets, *rxpackets, *txdrop, *txdropbytes; } stats; // Two cursors: // read: the next element to be read diff --git a/src/core/link.lua b/src/core/link.lua index 6fcc9fb58c..dd2dbdfe00 100644 --- a/src/core/link.lua +++ b/src/core/link.lua @@ -23,7 +23,7 @@ local size = C.LINK_RING_SIZE -- NB: Huge slow-down if this is not local max = C.LINK_MAX_PACKETS local provided_counters = { - "dtime", "rxpackets", "rxbytes", "txpackets", "txbytes", "txdrop" + "dtime", "rxpackets", "rxbytes", "txpackets", "txbytes", "txdrop", "txdropbytes" } function new (name) @@ -60,6 +60,7 @@ function transmit (r, p) -- assert(p) if full(r) then counter.add(r.stats.txdrop) + counter.add(r.stats.txdropbytes, p.length) packet.free(p) else r.packets[r.write] = p From 9b252f0e6f990c963dde1f275840153cb75110b3 Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Wed, 8 Feb 2017 14:49:00 +0000 Subject: [PATCH 06/27] engine: Instrument with timeline events --- src/core/app.lua | 108 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 98 insertions(+), 10 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index 50f05d991a..d4fa8945e4 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -14,6 +14,8 @@ local zone = require("jit.zone") local jit = require("jit") local ffi = require("ffi") local C = ffi.C +local timeline_mod = require("core.timeline") -- avoid collision with timeline() + require("core.packet_h") -- Packet per pull @@ -29,6 +31,9 @@ test_skipped_code = 43 -- Indexed both by name (in a table) and by number (in an array). app_table, app_array = {}, {} link_table, link_array = {}, {} +-- Timeline events specific to app and link instances +app_events = setmetatable({}, { __mode = 'k' }) +link_events = setmetatable({}, { __mode = 'k' }) configuration = config.new() @@ -39,6 +44,17 @@ freebits = counter.create("engine/freebits.counter") -- Total packet bits free freebytes = counter.create("engine/freebytes.counter") -- Total packet bytes freed configs = counter.create("engine/configs.counter") -- Total configurations loaded +-- Timeline event log +local timeline_log, events -- initialized on demand + +function timeline () + if timeline_log == nil then + timeline_log = timeline_mod.new("engine/timeline") + events = timeline_mod.load_events(timeline_log, "core.engine") + end + return timeline_log +end + -- Breathing regluation to reduce CPU usage when idle by calling usleep(3). -- -- There are two modes available: @@ -185,6 +201,7 @@ function apply_config_actions (actions, conf) name, tostring(app))) end local zone = app.zone or getfenv(class.new)._NAME or name + app_events[app] = timeline_mod.load_events(timeline(), "core.app", {app=name}) app.appname = name app.output = {} app.input = {} @@ -229,6 +246,8 @@ function apply_config_actions (actions, conf) if not new_app_table[ta] then error("no such app: " .. ta) end -- Create or reuse a link and assign/update receiving app index local link = link_table[linkspec] or link.new(linkspec) + link_events[link] = + timeline_mod.load_events(timeline(), "core.link", {linkspec=linkspec}) link.receiving_app = app_name_to_index[ta] -- Add link to apps new_app_table[fa].output[fl] = link @@ -254,6 +273,8 @@ end -- Call this to "run snabb switch". function main (options) + timeline() -- ensure timeline is created and initialized + events.engine_started() options = options or {} local done = options.done local no_timers = options.no_timers @@ -271,11 +292,12 @@ function main (options) monotonic_now = C.get_monotonic_time() repeat breathe() - if not no_timers then timer.run() end + if not no_timers then timer.run() events.polled_timers() end if not busywait then pace_breathing() end until done and done() counter.commit() if not options.no_report then report(options.report) end + events.engine_stopped() end local nextbreath @@ -288,14 +310,18 @@ function pace_breathing () nextbreath = nextbreath or monotonic_now local sleep = tonumber(nextbreath - monotonic_now) if sleep > 1e-6 then + events.sleep_Hz(Hz, math.round(sleep*1e6)) C.usleep(sleep * 1e6) monotonic_now = C.get_monotonic_time() + events.wakeup_from_sleep() end nextbreath = math.max(nextbreath + 1/Hz, monotonic_now) else if lastfrees == counter.read(frees) then sleep = math.min(sleep + 1, maxsleep) + events.sleep_on_idle(sleep) C.usleep(sleep) + events.wakeup_from_sleep() else sleep = math.floor(sleep/2) end @@ -306,33 +332,62 @@ function pace_breathing () end function breathe () + local freed_packets0 = counter.read(frees) + local freed_bytes0 = counter.read(freebytes) + events.breath_start(counter.read(breaths), freed_packets0, freed_bytes0, + counter.read(freebits)) monotonic_now = C.get_monotonic_time() -- Restart: restart dead apps restart_dead_apps() -- Inhale: pull work into the app network + events.got_monotonic_time(C.get_time_ns()) for i = 1, #app_array do local app = app_array[i] --- if app.pull then --- zone(app.zone) app:pull() zone() if app.pull and not app.dead then zone(app.zone) - with_restart(app, app.pull) + if timeline_mod.level(timeline_log) <= 3 then + app_events[app].pull(linkstats(app)) + with_restart(app, app.pull) + app_events[app].pulled(linkstats(app)) + else + with_restart(app, app.pull) + end zone() end end + events.breath_pulled() -- Exhale: push work out through the app network local firstloop = true repeat local progress = false -- For each link that has new data, run the receiving app for i = 1, #link_array do - local link = link_array[i] - if firstloop or link.has_new_data then - link.has_new_data = false - local receiver = app_array[link.receiving_app] + local l = link_array[i] + if firstloop or l.has_new_data then + -- Consider logging a packet + if l.has_new_data and timeline_mod.level(timeline_log) <= 2 then + local p = link.front(l) + if p ~= nil then + link_events[l].packet_start(p.length) + local u64 = ffi.cast("uint64_t*", p.data) + for n = 0, p.length/8, 6 do + link_events[l].packet_data(u64[n+0],u64[n+1],u64[n+2], + u64[n+3],u64[n+4],u64[n+5]) + end + link_events[l].packet_end(p.length) + end + end + l.has_new_data = false + local receiver = app_array[l.receiving_app] if receiver.push and not receiver.dead then zone(receiver.zone) - with_restart(receiver, receiver.push) + if timeline_mod.level(timeline_log) <= 3 then + app_events[receiver].push(linkstats(receiver)) + with_restart(receiver, receiver.push) + app_events[receiver].pushed(linkstats(receiver)) + else + with_restart(receiver, receiver.push) + end zone() progress = true end @@ -340,9 +395,42 @@ function breathe () end firstloop = false until not progress -- Stop after no link had new data + events.breath_pushed() + local freed + local freed_packets = counter.read(frees) - freed_packets0 + local freed_bytes = (counter.read(freebytes) - freed_bytes0) + local freed_bytes_per_packet = freed_bytes / math.max(tonumber(freed_packets), 1) + events.breath_end(counter.read(breaths), freed_packets, freed_bytes_per_packet) counter.add(breaths) -- Commit counters at a reasonable frequency - if counter.read(breaths) % 100 == 0 then counter.commit() end + if counter.read(breaths) % 100 == 0 then + counter.commit() + events.commited_counters() + end + -- Sample events with dynamic priorities. + -- Lower priorities are enabled 1/10th as often as the one above. + local r = math.random() + if r < 0.00001 then timeline_mod.level(timeline_log, 2) + elseif r < 0.00010 then timeline_mod.level(timeline_log, 3) + elseif r < 0.01000 then timeline_mod.level(timeline_log, 3) + elseif r < 0.10000 then timeline_mod.level(timeline_log, 5) + else timeline_mod.level(timeline_log, 6) + end +end + +function linkstats (app) + local inp, inb, outp, outb, dropp, dropb = 0, 0, 0, 0, 0, 0 + for i = 1, #app.input do + inp = inp + tonumber(counter.read(app.input[i].stats.rxpackets)) + inb = inb + tonumber(counter.read(app.input[i].stats.rxbytes)) + end + for i = 1, #app.output do + outp = outp + tonumber(counter.read(app.output[i].stats.txpackets)) + outb = outb + tonumber(counter.read(app.output[i].stats.txbytes)) + dropp = dropp + tonumber(counter.read(app.output[i].stats.txdrop)) + dropb = dropb + tonumber(counter.read(app.output[i].stats.txdropbytes)) + end + return inp, inb, outp, outb, dropp, dropb end function report (options) From 40ceef6032efdcb52d59bba71353b3f9f93ff882 Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Mon, 20 Feb 2017 09:34:40 +0000 Subject: [PATCH 07/27] core.timeline: Switch to double-float on disk Use 'double' instead of 'uint64_t' for values in the timeline file. This change is motivated by making timeline files easier to process by R. In the future we may switch back to uint64_t for the TSC counter and/or argument values for improved precision. The major_version file header field can be used to avoid confusion. The obvious downside to using doubles is that the TSC value will lose precision as the server uptime increases (the TSC starts at zero and increases at the base frequency of the CPU e.g. 2GHz.) The impact seems to be modest though. For example a 2GHz CPU would start rounding TSC values to the nearest 128 (likely quite acceptable in practice) after approximately 2 years of operation (2^53 * 128 cycles.) So - storing the TSC as a double-float is definitely a kludge - but unlikely to cause any real harm and expedient for the short-term goal of putting this code to use without getting blocked due to e.g. my lack of sophisticated as an R hacker. --- src/core/timeline.dasl | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl index b3a29fdbba..47208453ea 100644 --- a/src/core/timeline.dasl +++ b/src/core/timeline.dasl @@ -51,17 +51,16 @@ ffi.cdef[[ uint16_t minor_version; uint32_t log_bytes; uint32_t strings_bytes; - uint16_t priority_mask; - uint8_t reserved[42]; + uint8_t reserved[44]; }; // 64B log entry struct timeline_entry { - uint64_t tsc; // CPU timestamp (note: assumed to be first elem below) + double tsc; // CPU timestamp (note: assumed to be first elem below) uint16_t msgid; // msgid*16 is index into string table uint16_t core_numa; // TSC_AUX: core (bits 0-7) + numa (12-15) uint32_t reserved; // (available for future use) - uint64_t arg0, arg1, arg2, arg3, arg4, arg5; // message arguments + double arg0, arg1, arg2, arg3, arg4, arg5; // message arguments }; // Private local state for updating the log @@ -100,7 +99,7 @@ function new (shmpath, num_entries, size_stringtable) local stringtable = ffi.cast("char*", memory + size_header + size_entries) -- Fill in header values header.magic = 0xa3ff7223441d0001ULL - header.major_version = 2 + header.major_version = 3 header.minor_version = 0 header.log_bytes = size_entries header.strings_bytes = size_stringtable @@ -223,22 +222,21 @@ local function asmlog (Dst) | mov r10, log:p0->entries | add r10, r11 -- Log the arguments from register parameters - | mov msg:r10->msgid, dx -- 16-bit value from p2 - | mov msg:r10->arg0, p3 - | mov msg:r10->arg1, p4 - | mov msg:r10->arg2, p5 - -- Log the arguments from stack parameters - | mov rax, [rsp+16] - | mov msg:r10->arg3, rax - | mov rax, [rsp+24] - | mov msg:r10->arg4, rax - | mov rax, [rsp+32] - | mov msg:r10->arg5, rax + | mov msg:r10->msgid, dx + | movsd qword msg:r10->arg0, xmm0 + | movsd qword msg:r10->arg1, xmm1 + | movsd qword msg:r10->arg2, xmm2 + | movsd qword msg:r10->arg3, xmm3 + | movsd qword msg:r10->arg4, xmm4 + | movsd qword msg:r10->arg5, xmm5 -- Log the timestamp and core/numa aux info | rdtscp - | mov msg:r10->tsc, eax - | mov [r10+4], edx -- assumes tsc is first field of struct timeline_entry | mov msg:r10->core_numa, cx + -- Convert TSC in EAX:EDX to double + | shl rdx, 32 + | or rax, rdx + | cvtsi2sd xmm0, rax + | movsd qword msg:r10->tsc, xmm0 | ret end @@ -248,7 +246,7 @@ asmlog(Dst) local mcode, size = Dst:build() local entry = dasm.globals(globals, globalnames) -event = ffi.cast("uint64_t(*)(struct timeline_state *, int, int, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t)", entry.log) +event = ffi.cast("void(*)(struct timeline_state *, int, int, double, double, double, double, double, double)", entry.log) _anchor = mcode From 563ae7ae32ac59bc1999a305fa0ee884527243b3 Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Mon, 20 Feb 2017 17:20:58 +0000 Subject: [PATCH 08/27] engine: update timeline level probabilities --- src/core/app.lua | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index ddf83620bb..15a573db50 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -433,11 +433,13 @@ function breathe () -- Sample events with dynamic priorities. -- Lower priorities are enabled 1/10th as often as the one above. local r = math.random() - if r < 0.00001 then timeline_mod.level(timeline_log, 2) - elseif r < 0.00010 then timeline_mod.level(timeline_log, 3) - elseif r < 0.01000 then timeline_mod.level(timeline_log, 4) - elseif r < 0.10000 then timeline_mod.level(timeline_log, 5) - else timeline_mod.level(timeline_log, 6) + if r < 0.000001 then timeline_mod.level(timeline_log, 1) + elseif r < 0.000010 then timeline_mod.level(timeline_log, 2) + elseif r < 0.000100 then timeline_mod.level(timeline_log, 3) + elseif r < 0.001000 then timeline_mod.level(timeline_log, 4) + elseif r < 0.010000 then timeline_mod.level(timeline_log, 5) + elseif r < 0.100000 then timeline_mod.level(timeline_log, 6) + else timeline_mod.level(timeline_log, 7) end running = false end From ffb7379c7e5463084da817d3a1e4976e51c0088b Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Tue, 21 Feb 2017 08:50:08 +0000 Subject: [PATCH 09/27] engine: Randomize timeline log level with math Simplify the code and eliminate unwanted branches from the engine loop by drawing a random timeline level from a log-uniform distribution that mathematically favors higher log levels over lower ones. Plucked log5() out of the air i.e. each log level should be enabled for 5x more breaths than the one below. Here is how the distribution of log level choice looks in practice using this algorithm: > t = {0,0,0,0,0,0,0,0,0} > for i = 1, 1e8 do local n = math.max(1,math.ceil(math.log(math.random(5^9))/math.log(5))) t[n] = t[n]+1 end > for i,n in ipairs(t) do print(i,n) end 1 560 2 2151 3 10886 4 55149 5 273376 6 1367410 7 6844261 8 34228143 9 171120244 Note: Lua provides only natural logarithm functions but it is easy to derive other bases from this (google "log change of base formula"). --- src/core/app.lua | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index 11a8e5fea3..e07f6e98bc 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -444,17 +444,14 @@ function breathe () counter.commit() events.commited_counters() end - -- Sample events with dynamic priorities. - -- Lower priorities are enabled 1/10th as often as the one above. - local r = math.random() - if r < 0.000001 then timeline_mod.level(timeline_log, 1) - elseif r < 0.000010 then timeline_mod.level(timeline_log, 2) - elseif r < 0.000100 then timeline_mod.level(timeline_log, 3) - elseif r < 0.001000 then timeline_mod.level(timeline_log, 4) - elseif r < 0.010000 then timeline_mod.level(timeline_log, 5) - elseif r < 0.100000 then timeline_mod.level(timeline_log, 6) - else timeline_mod.level(timeline_log, 7) - end + -- Randomize the log level. Enable each level in 5x more breaths + -- than the level below by randomly picking from log5() distribution. + -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) + -- + -- Could be better to reduce the log level over time to "stretch" + -- logs for long running processes? Improvements possible :-). + local level = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) + timeline_mod.level(timeline_log, level) running = false end From 16a6bb21eb5bb303f63fc57bf97b3dbe55382268 Mon Sep 17 00:00:00 2001 From: Luke Gorrie Date: Wed, 12 Apr 2017 11:27:35 +0000 Subject: [PATCH 10/27] engine: Remove timeline packet payload sampling I suspect that it is a misfeature for the timeline to sample the contents of packets. Do we really want user data potentially appearing in debug logs? Removed for now. --- src/core/app.lua | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index 57c1fc0435..9a7327d1e2 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -402,7 +402,6 @@ function breathe () for i = 1, #breathe_pull_order do local app = breathe_pull_order[i] if app.pull and not app.dead then - if timeline_mod.level(timeline_log) <= 2 then log_links(app.input) end zone(app.zone) if timeline_mod.level(timeline_log) <= 3 then app_events[app].pull(linkstats(app)) @@ -420,7 +419,6 @@ function breathe () local app = breathe_push_order[i] if app.push and not app.dead then zone(app.zone) - if timeline_mod.level(timeline_log) <= 2 then log_links(app.output) end if timeline_mod.level(timeline_log) <= 3 then app_events[app].push(linkstats(app)) with_restart(app, app.push) @@ -428,7 +426,6 @@ function breathe () else with_restart(app, app.push) end - if timeline_mod.level(timeline_log) <= 2 then log_links(app.output) end zone() end end @@ -470,22 +467,6 @@ function linkstats (app) return inp, inb, outp, outb, dropp, dropb end --- Log packet payload from links. -function log_links (links) - for _, l in ipairs(links) do - local p = link.front(l) - if p ~= nil then - link_events[l].packet_start(p.length) - local u64 = ffi.cast("uint64_t*", p.data) - for n = 0, p.length/8, 6 do - link_events[l].packet_data(u64[n+0],u64[n+1],u64[n+2], - u64[n+3],u64[n+4],u64[n+5]) - end - link_events[l].packet_end(p.length) - end - end -end - function report (options) if not options or options.showload then report_load() From 1bb4108a3e2a0763412138e58e41ca8be337e522 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Tue, 30 Oct 2018 17:26:39 +0100 Subject: [PATCH 11/27] core.app: remove remains from 16a6bb2 (timeline packet sampling) --- src/core/app.lua | 5 +---- src/core/link.events | 10 ---------- 2 files changed, 1 insertion(+), 14 deletions(-) delete mode 100644 src/core/link.events diff --git a/src/core/app.lua b/src/core/app.lua index 980e5918b5..ad2d34ce24 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -52,9 +52,8 @@ end -- The set of all active apps and links in the system, indexed by name. app_table, link_table = {}, {} --- Timeline events specific to app and link instances +-- Timeline events specific to app instances app_events = setmetatable({}, { __mode = 'k' }) -link_events = setmetatable({}, { __mode = 'k' }) configuration = config.new() @@ -368,8 +367,6 @@ function apply_config_actions (actions) local link = link.new(linkspec) link_table[linkspec] = link configuration.links[linkspec] = true - link_events[link] = - timeline_mod.load_events(timeline(), "core.link", {linkspec=linkspec}) end function ops.link_output (appname, linkname, linkspec) local app = app_table[appname] diff --git a/src/core/link.events b/src/core/link.events deleted file mode 100644 index d6d097c483..0000000000 --- a/src/core/link.events +++ /dev/null @@ -1,10 +0,0 @@ -2|packet_start: wirelength - Started capturing a packet from a link. - This event is followed by one or more app_pcap_data messages - containing (partial) payload and then app_pcap_end. -2|packet_data: a b c d e f - Payload excerpt from packet being captured. - Each argument is 8 bytes of sequential payload (48 bytes total.) -2|packet_end: wirelength - Finished capturing a packet from a link. The (partial) payload is - logged in the immediately preceeding packet_data events. From 0e52b2406ae3c4ee55786e5f6432d8870d7d4fb7 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Wed, 31 Oct 2018 17:28:30 +0100 Subject: [PATCH 12/27] core.timeline: make timeline log available to other core modules --- src/core/app.lua | 4 ++-- src/core/timeline.dasl | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index ad2d34ce24..52df1cbced 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -43,8 +43,8 @@ end -- Timeline event log local timeline_log, events -- initialized on demand function timeline () - if timeline_log == nil then - timeline_log = timeline_mod.new("engine/timeline") + if not timeline_log then + timeline_log = timeline_mod.log() events = timeline_mod.load_events(timeline_log, "core.engine") end return timeline_log diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl index 47208453ea..64e4c40eae 100644 --- a/src/core/timeline.dasl +++ b/src/core/timeline.dasl @@ -10,6 +10,15 @@ local S = require("syscall") local shm = require("core.shm") local lib = require("core.lib") +-- Initialize and return the timeline log. +local timeline_log +function log () + if timeline_log == nil then + timeline_log = new("engine/timeline") + end + return timeline_log +end + -- Load a set of events for logging onto a timeline. -- Returns a set of logging functions. -- From e8348451ece53bc4a451227d02b2d6b567eeb5fa Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Wed, 31 Oct 2018 17:29:02 +0100 Subject: [PATCH 13/27] core.packet: record packet allocation events to timeline --- src/core/app.lua | 2 +- src/core/packet.events | 31 +++++++++++++++++++++++++++++++ src/core/packet.lua | 17 ++++++++++++++++- 3 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 src/core/packet.events diff --git a/src/core/app.lua b/src/core/app.lua index 52df1cbced..266cc0b957 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -632,8 +632,8 @@ function breathe () -- Commit counters and rebalance freelists at a reasonable frequency if counter.read(breaths) % 100 == 0 then counter.commit() - packet.rebalance_freelists() events.commited_counters() + packet.rebalance_freelists() end -- Randomize the log level. Enable each level in 5x more breaths -- than the level below by randomly picking from log5() distribution. diff --git a/src/core/packet.events b/src/core/packet.events new file mode 100644 index 0000000000..682eda4355 --- /dev/null +++ b/src/core/packet.events @@ -0,0 +1,31 @@ +2|packet_allocated: + A packet has been allocated from the packet freelist. + +2|packet_freed: length + A packet has been freed to the packet freelist. + + 'length' is the byte size of the packet. + +6|packets_preallocated: packets + DMA memory for packets had been preallocated from the operating system. + + 'packets' is the number of packets for which space has been reserved. + +4|group_freelist_wait: + The process is waiting to acquire the group freelist’s lock. + +4|group_freelist_locked: + The process has acquired the group freelist’s lock. + +4|group_freelist_unlocked: + The process has released the group freelist’s lock. + +4|group_freelist_released: packets + The packet freelist was rebalanced with the group freelist. + + 'packets' is the number of packets released to the group freelist. + +4|group_freelist_reclaimed: packets + The packet freelist was refilled from the group freelist. + + 'packets' is the number of packets reclaimed from the group freelist. diff --git a/src/core/packet.lua b/src/core/packet.lua index 130a585e08..2e01fb1036 100644 --- a/src/core/packet.lua +++ b/src/core/packet.lua @@ -13,6 +13,9 @@ local memory = require("core.memory") local shm = require("core.shm") local counter = require("core.counter") local sync = require("core.sync") +local timeline = require("core.timeline") + +local events = timeline.load_events(timeline.log(), "core.packet") require("core.packet_h") @@ -107,13 +110,18 @@ end -- Return borrowed packets to group freelist. function rebalance_freelists () - if group_fl and freelist_nfree(packets_fl) > packets_allocated then + local free_packets = freelist_nfree(packets_fl) + if group_fl and free_packets > packets_allocated then + events.group_freelist_wait() freelist_lock(group_fl) + events.group_freelist_locked() while freelist_nfree(packets_fl) > packets_allocated and not freelist_full(group_fl) do freelist_add(group_fl, freelist_remove(packets_fl)) end freelist_unlock(group_fl) + events.group_freelist_unlocked() + events.group_freelist_released(free_packets - freelist_nfree(packets_fl)) end end @@ -121,17 +129,22 @@ end function allocate () if freelist_nfree(packets_fl) == 0 then if group_fl then + events.group_freelist_wait() freelist_lock(group_fl) + events.group_freelist_locked() while freelist_nfree(group_fl) > 0 and freelist_nfree(packets_fl) < packets_allocated do freelist_add(packets_fl, freelist_remove(group_fl)) end freelist_unlock(group_fl) + events.group_freelist_unlocked() + events.group_freelist_reclaimed(freelist_nfree(packets_fl)) end if freelist_nfree(packets_fl) == 0 then preallocate_step() end end + events.packet_allocated() return freelist_remove(packets_fl) end @@ -229,6 +242,7 @@ function account_free (p) end function free (p) + events.packet_freed(p.length) account_free(p) free_internal(p) end @@ -250,6 +264,7 @@ function preallocate_step() end packets_allocated = packets_allocated + packet_allocation_step packet_allocation_step = 2 * packet_allocation_step + events.packets_preallocated(packet_allocation_step) end function selftest () From a6def7b56681c75be0efad492dcad0613de8df7d Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Fri, 2 Nov 2018 15:27:29 +0100 Subject: [PATCH 14/27] Revert "core.timeline: make timeline log available to other core modules" This reverts commit 0e52b2406ae3c4ee55786e5f6432d8870d7d4fb7. --- src/core/app.lua | 4 ++-- src/core/packet.lua | 5 ++--- src/core/timeline.dasl | 9 --------- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index 266cc0b957..4cbcb6291a 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -43,8 +43,8 @@ end -- Timeline event log local timeline_log, events -- initialized on demand function timeline () - if not timeline_log then - timeline_log = timeline_mod.log() + if timeline_log == nil then + timeline_log = timeline_mod.new("engine/timeline") events = timeline_mod.load_events(timeline_log, "core.engine") end return timeline_log diff --git a/src/core/packet.lua b/src/core/packet.lua index aaab27e122..d9a18b3393 100644 --- a/src/core/packet.lua +++ b/src/core/packet.lua @@ -15,8 +15,6 @@ local counter = require("core.counter") local sync = require("core.sync") local timeline = require("core.timeline") -local events = timeline.load_events(timeline.log(), "core.packet") - require("core.packet_h") local packet_t = ffi.typeof("struct packet") @@ -108,11 +106,12 @@ end local packet_allocation_step = 1000 local packets_allocated = 0 -- Initialized on demand. -local packets_fl, group_fl +local packets_fl, group_fl, events -- Call to ensure packet freelist is enabled. function initialize () packets_fl = freelist_create("engine/packets.freelist") + events = timeline.load_events(engine.timeline(), "core.packet") end -- Call to ensure group freelist is enabled. diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl index 64e4c40eae..47208453ea 100644 --- a/src/core/timeline.dasl +++ b/src/core/timeline.dasl @@ -10,15 +10,6 @@ local S = require("syscall") local shm = require("core.shm") local lib = require("core.lib") --- Initialize and return the timeline log. -local timeline_log -function log () - if timeline_log == nil then - timeline_log = new("engine/timeline") - end - return timeline_log -end - -- Load a set of events for logging onto a timeline. -- Returns a set of logging functions. -- From 8201d1d7990cf5eb7af756accc12a8faa0f43c8d Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Fri, 2 Nov 2018 15:35:10 +0100 Subject: [PATCH 15/27] lib.ptree.worker: reformulate to use engine.main with done func --- src/lib/ptree/worker.lua | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/lib/ptree/worker.lua b/src/lib/ptree/worker.lua index 24db0eaa5a..64b78e1047 100644 --- a/src/lib/ptree/worker.lua +++ b/src/lib/ptree/worker.lua @@ -92,27 +92,22 @@ function Worker:handle_actions_from_manager() end function Worker:main () - local vmprofile = require("jit.vmprofile") local stop = engine.now() + self.duration local next_time = engine.now() - -- Setup vmprofile. - engine.setvmprofile("engine") - vmprofile.start() - - if not engine.auditlog_enabled then engine.enable_auditlog() end - - repeat - self.breathe() + local function control () if next_time < engine.now() then next_time = engine.now() + self.period + engine.setvmprofile("worker") self:handle_actions_from_manager() - timer.run() + engine.setvmprofile("engine") + end + if stop < engine.now() then + return true -- done end - if not engine.busywait then engine.pace_breathing() end - until stop < engine.now() - counter.commit() - if not self.no_report then engine.report(self.report) end + end + + engine.main{done=control} end function main (opts) From debd26cfea3a77690f66144ee9ff62e4f5166cb8 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Mon, 8 Oct 2018 11:08:25 +0200 Subject: [PATCH 16/27] engine.main: fix memory leak due to latency histogram creation --- src/core/app.lua | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index 4cbcb6291a..5b547fd789 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -50,6 +50,14 @@ function timeline () return timeline_log end +-- Breath latency histogram +local latency -- initialized on demand +function enable_latency_histogram () + if latency == nil then + latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) + end +end + -- The set of all active apps and links in the system, indexed by name. app_table, link_table = {}, {} -- Timeline events specific to app instances @@ -529,10 +537,10 @@ function main (options) -- Ensure timeline is created and initialized timeline() - -- Enable latency histogram if requested + -- Enable latency histogram unless explicitly disabled local breathe = breathe if options.measure_latency or options.measure_latency == nil then - local latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) + enable_latency_histogram() breathe = latency:wrap_thunk(breathe, now) end From edaad6c82177089626cbc6334047a5d2771bd1de Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Wed, 7 Nov 2018 15:23:39 +0100 Subject: [PATCH 17/27] core.app: set timeline log level at the very end of breathe loop This fixes a bug where timeline log level was rerolled between end of breaths but before before post-breath events, causing sampling to affect the event lag of the polled_timers event. --- src/core/app.lua | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index 5b547fd789..2c95c5c6a6 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -553,6 +553,7 @@ function main (options) breathe() if not no_timers then timer.run() events.polled_timers() end if not busywait then pace_breathing() end + set_log_level() -- roll random log level until done and done() counter.commit() if not options.no_report then report(options.report) end @@ -562,6 +563,19 @@ function main (options) setvmprofile("program") end +function set_log_level (level) + if not level then + -- Randomize the log level. Enable each level in 5x more breaths + -- than the level below by randomly picking from log5() distribution. + -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) + -- + -- Could be better to reduce the log level over time to "stretch" + -- logs for long running processes? Improvements possible :-). + level = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) + end + timeline_mod.level(timeline_log, level) +end + local nextbreath local lastfrees = 0 local lastfreebits = 0 @@ -643,14 +657,6 @@ function breathe () events.commited_counters() packet.rebalance_freelists() end - -- Randomize the log level. Enable each level in 5x more breaths - -- than the level below by randomly picking from log5() distribution. - -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) - -- - -- Could be better to reduce the log level over time to "stretch" - -- logs for long running processes? Improvements possible :-). - local level = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) - timeline_mod.level(timeline_log, level) running = false end From 9c4c804cbafc45c9abe2a06c1352314dec5ee4e3 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Thu, 8 Nov 2018 11:19:35 +0100 Subject: [PATCH 18/27] core.packet: reorder group_fl released/reclaimed/unlocked events --- src/core/packet.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/packet.lua b/src/core/packet.lua index d9a18b3393..a592b6aa1d 100644 --- a/src/core/packet.lua +++ b/src/core/packet.lua @@ -123,18 +123,18 @@ end -- Return borrowed packets to group freelist. function rebalance_freelists () - local free_packets = freelist_nfree(packets_fl) - if group_fl and free_packets > packets_allocated then + if group_fl and freelist_nfree(packets_fl) > packets_allocated then events.group_freelist_wait() freelist_lock(group_fl) events.group_freelist_locked() + local nfree0 = freelist_nfree(packets_fl) while freelist_nfree(packets_fl) > packets_allocated and not freelist_full(group_fl) do freelist_add(group_fl, freelist_remove(packets_fl)) end + events.group_freelist_released(nfree0 - freelist_nfree(packets_fl)) freelist_unlock(group_fl) events.group_freelist_unlocked() - events.group_freelist_released(free_packets - freelist_nfree(packets_fl)) end end From 004f5bb71848cb2d530deea113d411168cf2a754 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Thu, 8 Nov 2018 11:21:05 +0100 Subject: [PATCH 19/27] core.timeline: decouple log level from event rate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes the syntax of event specs to ,|: ... The previous level digit becomes the event’s "rate" and retains its semantics with regard to the logging frequency of the specified event. The "stack depth" of the event is now decoupled as the new, leading level digit and specified independently. The new level semantics are as follows: - level ranges from 0-9 (10 levels in total) - 0 is the top most level while 9 in the lowest - levels 0-4 are reserved for use by the engine - user applications can use levels 5-9 to create hierarchy in their events Caveat: users should avoid defining events with a higher level and a lower event rate than an enclosed event if the higher level event is supposed to serve as a latency anchor for the lower level event. RIGHT WRONG 5,3|op_start: 5,2|op_start: 6,2|op_iter: 6,3|op_iter: 5,3|op_end: 5,2|op_end: In the left most WRONG example, the anchor of the op_inter event depends on the log rate at the time of sampling. --- src/core/app.events | 9 +++--- src/core/app.lua | 26 ++++++++--------- src/core/engine.events | 59 ++++++++++++++++++++------------------ src/core/packet.events | 41 ++++++++++++++------------- src/core/timeline.dasl | 64 +++++++++++++++++++++--------------------- 5 files changed, 101 insertions(+), 98 deletions(-) diff --git a/src/core/app.events b/src/core/app.events index 1d77fbe828..dd7ac9daad 100644 --- a/src/core/app.events +++ b/src/core/app.events @@ -1,11 +1,12 @@ -3|pull: inpackets inbytes outpackets outbytes droppackets dropbytes +3,3|pull: inpackets inbytes outpackets outbytes droppackets dropbytes Entering app pull() callback. -3|pulled: inpackets inbytes outpackets outbytes droppackets dropbytes +3,3|pulled: inpackets inbytes outpackets outbytes droppackets dropbytes Returned from app pull() callback. -3|push: inpackets inbytes outpackets outbytes droppackets dropbytes + +3,3|push: inpackets inbytes outpackets outbytes droppackets dropbytes Entering app push() callback. -3|pushed: inpackets inbytes outpackets outbytes droppackets dropbytes +3,3|pushed: inpackets inbytes outpackets outbytes droppackets dropbytes Returned from app push() callback. diff --git a/src/core/app.lua b/src/core/app.lua index 2c95c5c6a6..f131ce6818 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -553,7 +553,7 @@ function main (options) breathe() if not no_timers then timer.run() events.polled_timers() end if not busywait then pace_breathing() end - set_log_level() -- roll random log level + randomize_log_rate() -- roll random log rate until done and done() counter.commit() if not options.no_report then report(options.report) end @@ -563,17 +563,15 @@ function main (options) setvmprofile("program") end -function set_log_level (level) - if not level then - -- Randomize the log level. Enable each level in 5x more breaths - -- than the level below by randomly picking from log5() distribution. - -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) - -- - -- Could be better to reduce the log level over time to "stretch" - -- logs for long running processes? Improvements possible :-). - level = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) - end - timeline_mod.level(timeline_log, level) +function randomize_log_rate () + -- Randomize the log rate. Enable each rate in 5x more breaths + -- than the rate below by randomly picking from log5() distribution. + -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) + -- + -- Could be better to reduce the log rate over time to "stretch" + -- logs for long running processes? Improvements possible :-). + local rate = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) + timeline_mod.rate(timeline_log, rate) end local nextbreath @@ -621,7 +619,7 @@ function breathe () for i = 1, #breathe_pull_order do local app = breathe_pull_order[i] if app.pull and not app.dead then - if timeline_mod.level(timeline_log) <= 3 then + if timeline_mod.rate(timeline_log) <= 3 then app_events[app].pull(linkstats(app)) with_restart(app, app.pull) app_events[app].pulled(linkstats(app)) @@ -635,7 +633,7 @@ function breathe () for i = 1, #breathe_push_order do local app = breathe_push_order[i] if app.push and not app.dead then - if timeline_mod.level(timeline_log) <= 3 then + if timeline_mod.rate(timeline_log) <= 3 then app_events[app].push(linkstats(app)) with_restart(app, app.push) app_events[app].pushed(linkstats(app)) diff --git a/src/core/engine.events b/src/core/engine.events index 4c974a79a9..79e1978e21 100644 --- a/src/core/engine.events +++ b/src/core/engine.events @@ -1,25 +1,8 @@ -4|sleep_Hz: usec Hz -The engine requests that the kernel suspend this process for a period of -microseconds in order to reduce CPU utilization and achieve a fixed -frequency of breaths per second (Hz). - -4|sleep_on_idle: usec -The engine requests that the kernel suspend this process for a period -of microseconds in order to reduce CPU utilization because idleness -has been detected (a breath in which no packets were processed.) - -4|wakeup_from_sleep: -The engine resumes operation after sleeping voluntarily. - - -6|engine_started: +0,6|engine_started: The engine starts the traffic processing loop. -6|engine_stopped: -The engine stops the traffic processing loop. - -5|breath_start: breath totalpackets totalbytes totaletherbits +1,5|breath_start: breath totalpackets totalbytes totaletherbits The engine starts an iteration of the packet-processing event loop (a "breath".) @@ -27,20 +10,23 @@ The total count of packets, bytes, and bits (including layer-1 ethernet overhead) that the engine has processed are included. These can be used to track the rate of traffic. -3|got_monotonic_time: unixnanos + +2,3|got_monotonic_time: unixnanos The engine has completed initialization for the breath: synchronized the current time and handled any pending error recovery. 'unixnanos' is the current wall-clock time in nanoseconds since the epoc. This can be used to synchronize the cycle timestamps with wall-clock time. -4|breath_pulled: + +2,4|breath_pulled: The engine has "pulled" new packets into the event loop for processing. -4|breath_pushed: +2,4|breath_pushed: The engine has "pushed" packets one step through the processing network. -5|breath_end: breath npackets bpp + +1,5|breath_end: breath npackets bpp The engine completes an iteration of the event loop (a "breath.") 'packets' is the total number of packets processed during the breath. @@ -51,10 +37,27 @@ deallocated (freed) during processing. This does not necessarily correspond directly to ingress or egress packets on a given interface. -4|polled_timers: - The engine polled its timers and executed any that were expired. +1,4|commited_counters: +The engine commits the latest counter values to externally visible shared +memory. + +1,4|polled_timers: +The engine polled its timers and executed any that were expired. + + +1,4|sleep_Hz: usec Hz +The engine requests that the kernel suspend this process for a period of +microseconds in order to reduce CPU utilization and achieve a fixed +frequency of breaths per second (Hz). + +1,4|sleep_on_idle: usec +The engine requests that the kernel suspend this process for a period +of microseconds in order to reduce CPU utilization because idleness +has been detected (a breath in which no packets were processed.) + +1,4|wakeup_from_sleep: +The engine resumes operation after sleeping voluntarily. -4|commited_counters: - The engine commits the latest counter values to externally visible - shared memory. +0,6|engine_stopped: +The engine stops the traffic processing loop. diff --git a/src/core/packet.events b/src/core/packet.events index 682eda4355..e004a6da52 100644 --- a/src/core/packet.events +++ b/src/core/packet.events @@ -1,31 +1,32 @@ -2|packet_allocated: - A packet has been allocated from the packet freelist. +9,6|packets_preallocated: packets +DMA memory for packets had been preallocated from the operating system. -2|packet_freed: length - A packet has been freed to the packet freelist. +'packets' is the number of packets for which space has been reserved. - 'length' is the byte size of the packet. +9,1|packet_allocated: +A packet has been allocated from the packet freelist. -6|packets_preallocated: packets - DMA memory for packets had been preallocated from the operating system. +9,1|packet_freed: length +A packet has been freed to the packet freelist. - 'packets' is the number of packets for which space has been reserved. +'length' is the byte size of the packet. -4|group_freelist_wait: - The process is waiting to acquire the group freelist’s lock. -4|group_freelist_locked: - The process has acquired the group freelist’s lock. +9,4|group_freelist_wait: +The process is waiting to acquire the group freelist’s lock. -4|group_freelist_unlocked: - The process has released the group freelist’s lock. +9,4|group_freelist_locked: +The process has acquired the group freelist’s lock. -4|group_freelist_released: packets - The packet freelist was rebalanced with the group freelist. +9,4|group_freelist_released: packets +The packet freelist was rebalanced with the group freelist. - 'packets' is the number of packets released to the group freelist. +'packets' is the number of packets released to the group freelist. -4|group_freelist_reclaimed: packets - The packet freelist was refilled from the group freelist. +9,4|group_freelist_reclaimed: packets +The packet freelist was refilled from the group freelist. - 'packets' is the number of packets reclaimed from the group freelist. +'packets' is the number of packets reclaimed from the group freelist. + +9,4|group_freelist_unlocked: +The process has released the group freelist’s lock. diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl index 47208453ea..f676db9b8b 100644 --- a/src/core/timeline.dasl +++ b/src/core/timeline.dasl @@ -29,7 +29,7 @@ end function load_events_from_string (tl, spec, category, extra) local events = {} -- Insert a delimiter character (\a "alarm") between log messages. - spec = spec:gsub("\n(%d|)", "\n\a%1") + spec = spec:gsub("\n(%d,%d|)", "\n\a%1") for message in spec:gmatch("[^\a]+") do message = message:gsub("(.-)%s*$", "%1") -- trim trailing spaces local event = message:match("([%w_]+):") @@ -67,7 +67,7 @@ ffi.cdef[[ struct timeline_state { // state for the entries ring buffer struct timeline_entry *entries; - uint32_t level; + uint32_t rate; uint32_t next_entry; uint32_t num_entries; // state for the string table @@ -106,7 +106,7 @@ function new (shmpath, num_entries, size_stringtable) -- Private state local state = ffi.new("struct timeline_state") state.entries = ring - state.level = 0 + state.rate = 0 state.next_entry = 0 state.num_entries = num_entries state.stringtable = stringtable @@ -117,13 +117,13 @@ function new (shmpath, num_entries, size_stringtable) end function mkevent (timeline, category, message, attrs) - if not message:match("^%d|([^:]+):") then + if not message:match("^%d,%d|([^:]+):") then error(("event syntax error: %q"):format(message)) end - -- Extract the log level for the message - local level = tonumber(message:match("^(%d)|")) - -- Insert the category ("0|event:" -> "0|category.event:") - message = message:gsub("^(%d|)", "%1"..category..".") + -- Extract the sampling rate for the message + local rate = tonumber(message:match("^%d,(%d)|")) + -- Insert the category ("0,3|event:" -> "0,3|category.event:") + message = message:gsub("|", "|"..category..".", 1) -- Insert the additional attributes. -- e.g. "1|foo: arg" with {a1="x",a2="y"} becomes "1|foo a1=x a2=y: arg" for k,v in pairs(attrs or {}) do @@ -135,20 +135,20 @@ function mkevent (timeline, category, message, attrs) local id = intern(timeline, message) local event = event -- move asm function into local scope local log = timeline - if n==0 then return function () event(log,level,id,0,0,0,0,0,0) end end - if n==1 then return function (a) event(log,level,id,a,0,0,0,0,0) end end - if n==2 then return function (a,b) event(log,level,id,a,b,0,0,0,0) end end - if n==3 then return function (a,b,c) event(log,level,id,a,b,c,0,0,0) end end - if n==4 then return function (a,b,c,d) event(log,level,id,a,b,c,d,0,0) end end - if n==5 then return function (a,b,c,d,e) event(log,level,id,a,b,c,d,e,0) end end - if n==6 then return function (a,b,c,d,e,f) event(log,level,id,a,b,c,d,e,f) end end + if n==0 then return function () event(log,rate,id,0,0,0,0,0,0) end end + if n==1 then return function (a) event(log,rate,id,a,0,0,0,0,0) end end + if n==2 then return function (a,b) event(log,rate,id,a,b,0,0,0,0) end end + if n==3 then return function (a,b,c) event(log,rate,id,a,b,c,0,0,0) end end + if n==4 then return function (a,b,c,d) event(log,rate,id,a,b,c,d,0,0) end end + if n==5 then return function (a,b,c,d,e) event(log,rate,id,a,b,c,d,e,0) end end + if n==6 then return function (a,b,c,d,e,f) event(log,rate,id,a,b,c,d,e,f) end end error("illegal number of arguments: "..n) end --- Get or set the current timeline log level. -function level (timeline, level) - if level then timeline.level = level end - return timeline.level +-- Get or set the current timeline log rate. +function rate (timeline, rate) + if rate then timeline.rate = rate end + return timeline.rate end ------------------------------------------------------------ @@ -199,11 +199,11 @@ end |.type log, struct timeline_state |.type msg, struct timeline_entry --- void log(timeline, level, msg, arg0, ..., arg5) +-- void log(timeline, rate, msg, arg0, ..., arg5) local function asmlog (Dst) |->log: - -- Check that the enabled log level is >= the event log level - | mov eax, log:p0->level + -- Check that the enabled log rate is >= the event log rate + | mov eax, log:p0->rate | cmp p1, rax | jge >1 | ret @@ -253,17 +253,17 @@ _anchor = mcode --dasm.dump(mcode, size) local test_events = [[ -6|six: -event with level 6 (0 args) +0,6|six: +event with rate 6 (0 args) -5|five: a b c -event with level 5 (3 args) +0,5|five: a b c +event with rate 5 (3 args) -4|four: a b c d e f -event with level 4 (6 args) +0,4|four: a b c d e f +event with rate 4 (6 args) -3|three: -event with level 3 +0,3|three: +event with rate 3 (0 args) ]] -- selftest is designed mostly to check that timeline logging does not @@ -275,7 +275,7 @@ function selftest () local tl = new("selftest/timeline") local e = load_events_from_string(tl, test_events, "selftest", {module="timeline", func="selftest"}) - level(tl, 4) -- won't log event three + rate(tl, 4) -- won't log event three print("check logging individual messages") -- First check that log entries are created @@ -296,7 +296,7 @@ function selftest () -- overflow the string table print("overflowing string table") for i = 1, 1e5 do - mkevent(tl, "selftest", "9|dummy_event_definition:", {i=i}) + mkevent(tl, "selftest", "0,9|dummy_event_definition:", {i=i}) end -- report median logging time local sample = {} From 4166c0e9634e72d6869d6619565d86e2d37409db Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Tue, 13 Nov 2018 11:59:10 +0100 Subject: [PATCH 20/27] lib.ipsec.esp: added low-level timeline events --- src/lib/ipsec/esp.events | 70 ++++++++++++++++++++++++++++++++++++++++ src/lib/ipsec/esp.lua | 18 +++++++++++ 2 files changed, 88 insertions(+) create mode 100644 src/lib/ipsec/esp.events diff --git a/src/lib/ipsec/esp.events b/src/lib/ipsec/esp.events new file mode 100644 index 0000000000..0580e67f03 --- /dev/null +++ b/src/lib/ipsec/esp.events @@ -0,0 +1,70 @@ +8,2|encapsulate_transport6_start: + +ESP encapsulation for a packet in transport-IPv6 mode has started. + +8,2|encapsulate_tunnel_start: + +ESP encapsulation for a packet in tunnel mode has started. + +9,1|trailer_encoded: + +The ESP trailer has been encoded. + +9,1|payload_encrypted: length + +The ESP payload has been encrypted. + +'length' is the byte length of the payload. + +9,1|header_encoded: + +The ESP header has been encoded. + +8,2|encapsulate_transport6_end: + +ESP encapsulation for a packet in transport-IPv6 mode has completed. + +8,2|encapsulate_tunnel_end: + +ESP encapsulation for a packet in tunnel mode has completed. + + +8,2|decapsulate_transport6_start: + +ESP decapsulation for a packet in transport-IPv6 mode has started. + +8,2|decapsulate_tunnel_start: + +ESP decapsulation for a packet in tunnel mode has started. + +9,1|sequence_number_checked: seq_low + +A ESP sequence number has been checked. + +'seq_low' is the lower 32 bits of the sequence number. + +9,1|payload_decrypted: length + +ESP payload has been decrypted. + +'length' is the byte length of the ciphertext. + +9,6|resync_attempted: spi + +ESP resynchronization has been attempted. + +'spi' is the "Security Parameter Index" of the ESP "Security Association" (SA). + +9,1|sequence_number_tracked: seq + +A new ESP sequence number has been tracked. + +'seq' is the 64-bit extended sequence number. + +8,2|decapsulate_transport6_end: + +ESP decapsulation for a packet in transport-IPv6 mode has completed. + +8,2|decapsulate_tunnel_end: + +ESP decapsulation for a packet in tunnel mode has completed. diff --git a/src/lib/ipsec/esp.lua b/src/lib/ipsec/esp.lua index 7a55596049..3142249503 100644 --- a/src/lib/ipsec/esp.lua +++ b/src/lib/ipsec/esp.lua @@ -34,6 +34,8 @@ local band = bit.band local htons, htonl, ntohl = lib.htons, lib.htonl, lib.ntohl +local events = timeline.load_events(engine.timeline(), ...) + require("lib.ipsec.track_seq_no_h") local window_t = ffi.typeof("uint8_t[?]") @@ -95,12 +97,14 @@ function encrypt:encode_esp_trailer (ptr, next_header, pad_length) local esp_trailer = ffi.cast(esp_trailer_ptr_t, ptr) esp_trailer.next_header = next_header esp_trailer.pad_length = pad_length + events.trailer_encoded() end function encrypt:encrypt_payload (ptr, length) self:next_seq_no() local seq, low, high = self.seq, self.seq:low(), self.seq:high() self.cipher:encrypt(ptr, seq, low, high, ptr, length, ptr + length) + events.payload_encrypted(length) end function encrypt:encode_esp_header (ptr) @@ -108,6 +112,7 @@ function encrypt:encode_esp_header (ptr) esp_header.spi = htonl(self.spi) esp_header.seq_no = htonl(self.seq:low()) ffi.copy(ptr + ESP_SIZE, self.seq, self.cipher.IV_SIZE) + events.header_encoded() end -- Encapsulation in transport mode is performed as follows: @@ -117,6 +122,7 @@ end -- 4. Move resulting ciphertext to make room for ESP header -- 5. Write ESP header function encrypt:encapsulate_transport6 (p) + events.encapsulate_transport6_start() if p.length < TRANSPORT6_PAYLOAD_OFFSET then return nil end local ip = ffi.cast(ipv6_ptr_t, p.data + ETHERNET_SIZE) @@ -141,6 +147,7 @@ function encrypt:encapsulate_transport6 (p) ip.next_header = PROTOCOL ip.payload_length = htons(payload_length + overhead) + events.encapsulate_transport6_end() return p end @@ -154,6 +161,7 @@ end -- (The resulting packet contains the raw ESP frame, without IP or Ethernet -- headers.) function encrypt:encapsulate_tunnel (p, next_header) + events.encapsulate_tunnel_start() local pad_length = self:padding(p.length) local trailer_overhead = pad_length + ESP_TAIL_SIZE + self.cipher.AUTH_SIZE local orig_length = p.length @@ -170,6 +178,7 @@ function encrypt:encapsulate_tunnel (p, next_header) self:encode_esp_header(p.data) + events.encapsulate_tunnel_end() return p end @@ -210,6 +219,7 @@ function decrypt:decrypt_payload (ptr, length, ip) local seq_high = tonumber( C.check_seq_no(seq_low, self.seq.no, self.window, self.window_size) ) + events.sequence_number_checked(seq_low) local error = nil if seq_high < 0 or not self.cipher:decrypt( @@ -221,8 +231,11 @@ function decrypt:decrypt_payload (ptr, length, ip) self.decap_fail = self.decap_fail + 1 if self.decap_fail > self.resync_threshold then seq_high = self:resync(ptr, length, seq_low, seq_high) + events.resync_attempted(self.spi) if seq_high then error = nil end end + else + events.payload_decrypted(ctext_length) end if error then @@ -234,6 +247,7 @@ function decrypt:decrypt_payload (ptr, length, ip) self.seq.no = C.track_seq_no( seq_high, seq_low, self.seq.no, self.window, self.window_size ) + events.sequence_number_tracked(self.seq.no) local esp_trailer_start = ctext_start + ctext_length - ESP_TAIL_SIZE local esp_trailer = ffi.cast(esp_trailer_ptr_t, esp_trailer_start) @@ -249,6 +263,7 @@ end -- 4. Move cleartext up to IP payload -- 5. Shrink p by ESP overhead function decrypt:decapsulate_transport6 (p) + events.decapsulate_transport6_start() if p.length - TRANSPORT6_PAYLOAD_OFFSET < self.MIN_SIZE then return nil end local ip = ffi.cast(ipv6_ptr_t, p.data + ETHERNET_SIZE) @@ -267,6 +282,7 @@ function decrypt:decapsulate_transport6 (p) C.memmove(payload, ptext_start, ptext_length) p = packet.resize(p, TRANSPORT6_PAYLOAD_OFFSET + ptext_length) + events.decapsulate_transport6_end() return p end @@ -279,6 +295,7 @@ end -- (The resulting packet contains the raw ESP payload (i.e. an IP frame), -- without an Ethernet header.) function decrypt:decapsulate_tunnel (p) + events.decapsulate_tunnel_start() if p.length < self.MIN_SIZE then return nil end local ptext_start, ptext_length, next_header = @@ -289,6 +306,7 @@ function decrypt:decapsulate_tunnel (p) p = packet.shiftleft(p, self.CTEXT_OFFSET) p = packet.resize(p, ptext_length) + events.decapsulate_tunnel_end() return p, next_header end From 198db25771eaa839c6ce3b4862ff8320576f8c3c Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Wed, 14 Nov 2018 16:49:02 +0100 Subject: [PATCH 21/27] vita: added low level timeline events for packet dispatch and routing --- src/program/vita/dispatch.events | 14 ++++++++++++++ src/program/vita/dispatch.lua | 6 ++++++ src/program/vita/route.events | 24 ++++++++++++++++++++++++ src/program/vita/route.lua | 9 +++++++++ 4 files changed, 53 insertions(+) create mode 100644 src/program/vita/dispatch.events create mode 100644 src/program/vita/route.events diff --git a/src/program/vita/dispatch.events b/src/program/vita/dispatch.events new file mode 100644 index 0000000000..9a95fb7c51 --- /dev/null +++ b/src/program/vita/dispatch.events @@ -0,0 +1,14 @@ +5,2|private_dispatch_start: +PrivateDispatch started to process a packet. + +6,1|private_forward4_matched: +PrivateDispatch matched an IPv4 packet to be forwarded. + +6,1|checksum_verification_succeeded: +PrivateDispatch verified a correct IP checksum. + +6,1|checksum_verification_failed: +PrivateDispatch rejected an invalid IP checksum. + +5,2|private_dispatch_end: +PrivateDispatch has processed a packet. diff --git a/src/program/vita/dispatch.lua b/src/program/vita/dispatch.lua index dd82428dda..c07ffe7639 100644 --- a/src/program/vita/dispatch.lua +++ b/src/program/vita/dispatch.lua @@ -10,6 +10,7 @@ local ipv4 = require("lib.protocol.ipv4") local pf_match = require("pf.match") local ffi = require("ffi") +local events = timeline.load_events(engine.timeline(), ...) PrivateDispatch = { name = "PrivateDispatch", @@ -39,13 +40,16 @@ function PrivateDispatch:new (conf) end function PrivateDispatch:forward4 () + events.private_forward4_matched() local p = packet.shiftleft(self.p_box[0], ethernet:sizeof()) assert(self.ip4:new_from_mem(p.data, p.length)) if self.ip4:checksum_ok() then + events.checksum_verification_succeeded() -- Strip datagram of any Ethernet frame padding before encapsulation. local d = packet.resize(p, math.min(self.ip4:total_length(), p.length)) link.transmit(self.output.forward4, d) else + events.checksum_verification_failed() packet.free(p) counter.add(self.shm.rxerrors) counter.add(self.shm.checksum_errors) @@ -78,7 +82,9 @@ function PrivateDispatch:push () while not link.empty(input) do local p = link.receive(input) self.p_box[0] = p + events.private_dispatch_start() self:dispatch(p.data, p.length) + events.private_dispatch_end() end end diff --git a/src/program/vita/route.events b/src/program/vita/route.events new file mode 100644 index 0000000000..dab6cd3a7c --- /dev/null +++ b/src/program/vita/route.events @@ -0,0 +1,24 @@ +5,2|private_route4_start: +PrivateRouter has started to process an IPv4 packet. + +6,1|private_route4_lookup_success: +PrivateRouter looked up the route for an IPv4 packet. + +6,1|private_route4_lookup_failure: +PrivateRouter failed to look up a route of an IPv4 packet. + +5,2|private_route4_end: +PrivateRouter has processed an IPv4 packet. + + +5,2|public_route4_start: +PublicRouter has started to process an IPv4 packet. + +6,1|public_route4_lookup_success: +PublicRouter looked up the SA for an IPv4 packet. + +6,1|public_route4_lookup_failure: +PublicRouter failed to look up a SA of an IPv4 packet. + +5,2|public_route4_end: +PublicRouter has processed an IPv4 packet. diff --git a/src/program/vita/route.lua b/src/program/vita/route.lua index 7a6909fb27..8015f0f0cb 100644 --- a/src/program/vita/route.lua +++ b/src/program/vita/route.lua @@ -10,6 +10,7 @@ local lpm = require("lib.lpm.lpm4_248").LPM4_248 local ctable = require("lib.ctable") local ffi = require("ffi") +local events = timeline.load_events(engine.timeline(), ...) -- route := { net_cidr4=(CIDR4), gw_ip4=(IPv4), preshared_key=(KEY) } @@ -66,8 +67,10 @@ end function PrivateRouter:route (p) assert(self.ip4:new_from_mem(p.data, p.length)) + events.private_route4_start() local route = self:find_route4(self.ip4:dst()) if route then + events.private_route4_lookup_success() if p.length + ethernet:sizeof() <= self.mtu then link.transmit(route, p) else @@ -80,10 +83,12 @@ function PrivateRouter:route (p) end end else + events.private_route4_lookup_failure() packet.free(p) counter.add(self.shm.rxerrors) counter.add(self.shm.route_errors) end + events.private_route4_end() end function PrivateRouter:push () @@ -158,14 +163,18 @@ function PublicRouter:push () local input = self.input.input while not link.empty(input) do + events.public_route4_start() local p = link.receive(input) assert(self.esp:new_from_mem(p.data, p.length)) local route = self:find_route4(self.esp:spi()) if route then + events.public_route4_lookup_success() link.transmit(route, p) else + events.public_route4_lookup_failure() packet.free(p) counter.add(self.shm.route_errors) end + events.public_route4_end() end end From 217760b871eb847060a54ab369d103f794174f35 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Thu, 15 Nov 2018 16:24:12 +0100 Subject: [PATCH 22/27] lib.ptree.worker: fixup to 8201d1d7990cf5eb7af756accc12a8faa0f43c8d --- src/lib/ptree/worker.lua | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/lib/ptree/worker.lua b/src/lib/ptree/worker.lua index 8d10b756be..c54045b5af 100644 --- a/src/lib/ptree/worker.lua +++ b/src/lib/ptree/worker.lua @@ -31,6 +31,7 @@ function new_worker (conf) ret.period = 1/conf.Hz ret.duration = conf.duration or 1/0 ret.no_report = conf.no_report + ret.measure_latency = conf.measure_latency ret.jit_flush = conf.jit_flush ret.channel = channel.create('config-worker-channel', 1e6) alarms.install_alarm_handler(ptree_alarms:alarm_handler()) @@ -38,11 +39,6 @@ function new_worker (conf) require("jit.opt").start('sizemcode=256', 'maxmcode=2048') - ret.breathe = engine.breathe - if conf.measure_latency then - local latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) - ret.breathe = latency:wrap_thunk(ret.breathe, engine.now) - end return ret end @@ -115,7 +111,9 @@ function Worker:main () end end - engine.main{done=control} + engine.main{done=control, + report=self.report, no_report=self.no_report, + measure_latency=self.measure_latency} end function main (opts) From 8e681091aef7233d15d9638ad68daa48d52e48e2 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Fri, 16 Nov 2018 14:43:27 +0100 Subject: [PATCH 23/27] core.app: initialize timeline log rate to 7 (no logging) randomize_log_rate will then set the rate to values between 1 and 6, enabling the respective events. --- src/core/app.lua | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/core/app.lua b/src/core/app.lua index f131ce6818..357bffcec8 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -45,11 +45,25 @@ local timeline_log, events -- initialized on demand function timeline () if timeline_log == nil then timeline_log = timeline_mod.new("engine/timeline") + timeline_mod.rate(timeline_log, 7) -- initialize rate to "no logging" events = timeline_mod.load_events(timeline_log, "core.engine") end return timeline_log end +function randomize_log_rate () + -- Randomize the log rate. Enable each rate in 5x more breaths + -- than the rate below by randomly picking from log5() distribution. + -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) + -- + -- Could be better to reduce the log rate over time to "stretch" + -- logs for long running processes? Improvements possible :-). + -- + -- We use rates 0-6 where 6 means "log always", and 0 means "log never." + local rate = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) + timeline_mod.rate(timeline_log, rate) +end + -- Breath latency histogram local latency -- initialized on demand function enable_latency_histogram () @@ -563,17 +577,6 @@ function main (options) setvmprofile("program") end -function randomize_log_rate () - -- Randomize the log rate. Enable each rate in 5x more breaths - -- than the rate below by randomly picking from log5() distribution. - -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) - -- - -- Could be better to reduce the log rate over time to "stretch" - -- logs for long running processes? Improvements possible :-). - local rate = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) - timeline_mod.rate(timeline_log, rate) -end - local nextbreath local lastfrees = 0 local lastfreebits = 0 From 8f0c6bf928f5efb88b33a6da2f76395a4177ff0f Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Fri, 16 Nov 2018 17:16:59 +0100 Subject: [PATCH 24/27] core.timeline: add switch to disable timeline logging --- src/core/timeline.dasl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl index f676db9b8b..76e4a9b97b 100644 --- a/src/core/timeline.dasl +++ b/src/core/timeline.dasl @@ -10,6 +10,9 @@ local S = require("syscall") local shm = require("core.shm") local lib = require("core.lib") +-- Set to false to disable timeline logging +enabled = true + -- Load a set of events for logging onto a timeline. -- Returns a set of logging functions. -- @@ -86,6 +89,7 @@ local major, minor = 2, 1 -- Create a new timeline under the given shared memory path. function new (shmpath, num_entries, size_stringtable) + if not enabled then return false end num_entries = num_entries or 1e6 size_stringtable = size_stringtable or 1e6 -- Calculate size based on number of log entries @@ -132,6 +136,8 @@ function mkevent (timeline, category, message, attrs) -- Count the number of arguments. -- (See http://stackoverflow.com/a/11158158/1523491) local _, n = (message:match(":([^\n]*)")):gsub("[^%s]+","") + assert(n >= 0 and n <= 6, "illegal number of arguments: "..n) + if not enabled then return function () end end local id = intern(timeline, message) local event = event -- move asm function into local scope local log = timeline @@ -142,11 +148,11 @@ function mkevent (timeline, category, message, attrs) if n==4 then return function (a,b,c,d) event(log,rate,id,a,b,c,d,0,0) end end if n==5 then return function (a,b,c,d,e) event(log,rate,id,a,b,c,d,e,0) end end if n==6 then return function (a,b,c,d,e,f) event(log,rate,id,a,b,c,d,e,f) end end - error("illegal number of arguments: "..n) end -- Get or set the current timeline log rate. function rate (timeline, rate) + if not enabled then return 1/0 end if rate then timeline.rate = rate end return timeline.rate end From 30cafdc7ce2b2ca98cbdece4ca893d46fe8c7bd3 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Fri, 16 Nov 2018 18:08:34 +0100 Subject: [PATCH 25/27] core.app: add timeline events for app network configuration --- src/core/app.events | 13 +++++++++++++ src/core/app.lua | 18 ++++++++++++------ src/core/engine.events | 15 +++++++++++++++ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/src/core/app.events b/src/core/app.events index dd7ac9daad..0fb3611263 100644 --- a/src/core/app.events +++ b/src/core/app.events @@ -1,3 +1,16 @@ +1,6|started: +The app has been started. (Returned from new() callback.) + +1,6|linked: +The app has been linked. (Returned from link() callback.) + +1,6|reconfigured: +The app has been reconfigured. (Returned from reconfig() callback.) + +1,6|stopped: +The app has been stopped. (Returned from stop() callback.) + + 3,3|pull: inpackets inbytes outpackets outbytes droppackets dropbytes Entering app pull() callback. diff --git a/src/core/app.lua b/src/core/app.lua index 357bffcec8..5e39db5c80 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -346,6 +346,7 @@ function compute_config_actions (old, new) end end + events.config_actions_computed() return actions end @@ -371,14 +372,14 @@ function apply_config_actions (actions) local link = app.output[linkname] app.output[linkname] = nil remove_link_from_array(app.output, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.unlink_input (appname, linkname) local app = app_table[appname] local link = app.input[linkname] app.input[linkname] = nil remove_link_from_array(app.input, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.free_link (linkspec) link.free(link_table[linkspec], linkspec) @@ -395,19 +396,20 @@ function apply_config_actions (actions) local link = assert(link_table[linkspec]) app.output[linkname] = link table.insert(app.output, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.link_input (appname, linkname, linkspec) local app = app_table[appname] local link = assert(link_table[linkspec]) app.input[linkname] = link table.insert(app.input, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.stop_app (name) local app = app_table[name] - if app.stop then app:stop() end + if app.stop then app:stop() app_events[app].stopped() end if app.shm then shm.delete_frame(app.shm) end + app_events[app] = nil app_table[name] = nil configuration.apps[name] = nil end @@ -430,21 +432,25 @@ function apply_config_actions (actions) app.shm = shm.create_frame("apps/"..name, app.shm) end configuration.apps[name] = { class = class, arg = arg } + app_events[app].started() end function ops.reconfig_app (name, class, arg) local app = app_table[name] - app:reconfig(arg) + app:reconfig(arg) app_events[app].reconfigured() configuration.apps[name].arg = arg end + events.configure(counter.read(configs) + 1) -- Dispatch actions. for _, action in ipairs(actions) do local name, args = unpack(action) if log then io.write("engine: ", name, " ", args[1], "\n") end assert(ops[name], name)(unpack(args)) end + events.config_applied() compute_breathe_order () + events.breath_order_computed() end -- Sort the NODES topologically according to SUCCESSORS via diff --git a/src/core/engine.events b/src/core/engine.events index 79e1978e21..25b275d851 100644 --- a/src/core/engine.events +++ b/src/core/engine.events @@ -61,3 +61,18 @@ The engine resumes operation after sleeping voluntarily. 0,6|engine_stopped: The engine stops the traffic processing loop. + + +0,6|config_actions_computed: +The engine has computed the actions required for applying a new configuration. + +0,6|configure: config +The engine begins to apply a new configuration. + +'config' is the number of this configuration. + +0,6|config_applied: +The engine has applied a new configuration. + +0,6|breath_order_computed: +The engine has computed the breath order of a new configuration. \ No newline at end of file From e5faba0f19d11021f6416ba0884712241835dee0 Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Fri, 7 Dec 2018 15:47:10 +0100 Subject: [PATCH 26/27] Fix some wrong assumptions about the event rate Realized: 9 is the rate that means "log always". Elevate some rare but important events to rate 9. --- src/core/app.events | 8 ++++---- src/core/app.lua | 6 +++--- src/core/engine.events | 10 +++++----- src/core/packet.events | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/core/app.events b/src/core/app.events index 0fb3611263..e72e76047a 100644 --- a/src/core/app.events +++ b/src/core/app.events @@ -1,13 +1,13 @@ -1,6|started: +1,9|started: The app has been started. (Returned from new() callback.) -1,6|linked: +1,9|linked: The app has been linked. (Returned from link() callback.) -1,6|reconfigured: +1,9|reconfigured: The app has been reconfigured. (Returned from reconfig() callback.) -1,6|stopped: +1,9|stopped: The app has been stopped. (Returned from stop() callback.) diff --git a/src/core/app.lua b/src/core/app.lua index 5e39db5c80..00034e8119 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -45,7 +45,7 @@ local timeline_log, events -- initialized on demand function timeline () if timeline_log == nil then timeline_log = timeline_mod.new("engine/timeline") - timeline_mod.rate(timeline_log, 7) -- initialize rate to "no logging" + timeline_mod.rate(timeline_log, 9) -- initially log events with rate >= 9 events = timeline_mod.load_events(timeline_log, "core.engine") end return timeline_log @@ -59,7 +59,7 @@ function randomize_log_rate () -- Could be better to reduce the log rate over time to "stretch" -- logs for long running processes? Improvements possible :-). -- - -- We use rates 0-6 where 6 means "log always", and 0 means "log never." + -- We use rates 0-9 where 9 means "log always", and 0 means "log never." local rate = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) timeline_mod.rate(timeline_log, rate) end @@ -450,7 +450,7 @@ function apply_config_actions (actions) events.config_applied() compute_breathe_order () - events.breath_order_computed() + events.breathe_order_computed() end -- Sort the NODES topologically according to SUCCESSORS via diff --git a/src/core/engine.events b/src/core/engine.events index 25b275d851..10e9e32687 100644 --- a/src/core/engine.events +++ b/src/core/engine.events @@ -63,16 +63,16 @@ The engine resumes operation after sleeping voluntarily. The engine stops the traffic processing loop. -0,6|config_actions_computed: +0,9|config_actions_computed: The engine has computed the actions required for applying a new configuration. -0,6|configure: config +0,9|configure: config The engine begins to apply a new configuration. 'config' is the number of this configuration. -0,6|config_applied: +0,9|config_applied: The engine has applied a new configuration. -0,6|breath_order_computed: -The engine has computed the breath order of a new configuration. \ No newline at end of file +0,9|breathe_order_computed: +The engine has computed the breath order of a new configuration. diff --git a/src/core/packet.events b/src/core/packet.events index e004a6da52..752a3f25ee 100644 --- a/src/core/packet.events +++ b/src/core/packet.events @@ -1,5 +1,5 @@ -9,6|packets_preallocated: packets -DMA memory for packets had been preallocated from the operating system. +9,9|packets_preallocated: packets +DMA memory for packets has been preallocated from the operating system. 'packets' is the number of packets for which space has been reserved. From 5b062156540a17aefc425134045a979c64dd2b4a Mon Sep 17 00:00:00 2001 From: Max Rottenkolber Date: Fri, 7 Dec 2018 15:50:42 +0100 Subject: [PATCH 27/27] ptree.worker: emit engine_stopped/engine_started events when pausing main --- src/lib/ptree/worker.lua | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/lib/ptree/worker.lua b/src/lib/ptree/worker.lua index c54045b5af..a67d1c1396 100644 --- a/src/lib/ptree/worker.lua +++ b/src/lib/ptree/worker.lua @@ -13,6 +13,8 @@ local alarms = require("lib.yang.alarms") local channel = require("lib.ptree.channel") local action_codec = require("lib.ptree.action_codec") local ptree_alarms = require("lib.ptree.alarms") +local timeline = require("core.timeline") +local events = timeline.load_events(engine.timeline(), "core.engine") local Worker = {} @@ -102,9 +104,11 @@ function Worker:main () local function control () if next_time < engine.now() then next_time = engine.now() + self.period + events.engine_stopped() engine.setvmprofile("worker") self:handle_actions_from_manager() engine.setvmprofile("engine") + events.engine_started() end if stop < engine.now() then return true -- done