diff --git a/src/Makefile b/src/Makefile index efaad45cf0..6469d37715 100644 --- a/src/Makefile +++ b/src/Makefile @@ -26,6 +26,7 @@ RMSRC = $(shell find $(INCLUDE) -name '*.md' -not -regex './obj.*' -printf '%p PROGRAM = $(shell find program -regex '^[^/]+/[^/]+' -type d -printf '%p ') # sort to eliminate potential duplicate of programs.inc INCSRC = $(sort $(shell find $(INCLUDE) -regex '[^\#]*\.inc' -printf '%p ') programs.inc) +EVTSRC= $(shell find $(INCLUDE) -regex '[^\#]*\.events' -printf '%p ') YANGSRC= $(shell find $(INCLUDE) -regex '[^\#]*\.yang' -printf '%p ') LUAOBJ := $(patsubst %.lua,obj/%_lua.o,$(LUASRC)) @@ -40,6 +41,7 @@ JITOBJS:= $(patsubst %,obj/jit_%.o,$(JITSRC)) EXTRAOBJS := obj/jit_tprof.o obj/jit_vmprof.o obj/strict.o RMOBJS := $(patsubst %,obj/%,$(RMSRC)) INCOBJ := $(patsubst %.inc,obj/%_inc.o, $(INCSRC)) +EVTOBJ := $(patsubst %.events,obj/%_events.o, $(EVTSRC)) YANGOBJ:= $(patsubst %.yang,obj/%_yang.o, $(YANGSRC)) EXE := bin/snabb $(patsubst %,bin/%,$(PROGRAM)) @@ -59,7 +61,7 @@ TESTSCRIPTS = $(shell find $(INCLUDE) -name "selftest.*" -executable | xargs) PATH := ../lib/luajit/src:$(PATH) -snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ARCHOBJ) $(ASMOBJ) $(PFLUAASMOBJ) $(INCOBJ) $(YANGOBJ) $(LUAJIT_A) +snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ARCHOBJ) $(ASMOBJ) $(PFLUAASMOBJ) $(INCOBJ) $(EVTOBJ) $(YANGOBJ) $(LUAJIT_A) $(E) "GEN obj/version.lua.gen" $(Q) ../generate-version-lua.sh > obj/version.lua.gen $(E) "LUA obj/version.lua" @@ -83,6 +85,7 @@ $(EXE): snabb bin $(Q) upx -f --brute -o$@ snabb @echo -n "BINARY " @ls -sh $@ + markdown: $(RMOBJS) test: $(TESTMODS) $(TESTSCRIPTS) @@ -182,6 +185,13 @@ $(INCOBJ): obj/%_inc.o: %.inc Makefile | $(OBJDIR) echo "]=============]") > $(basename $@).luainc $(Q) raptorjit -bg -n $(subst /,.,$*)_inc $(basename $@).luainc $@ +$(EVTOBJ): obj/%_events.o: %.events Makefile | $(OBJDIR) + $(E) "EVENTS $@" + @(echo -n "return [=============["; \ + cat $<; \ + echo "]=============]") > $(basename $@).luainc + $(Q) raptorjit -bg -n $(subst /,.,$*)_events $(basename $@).luainc $@ + $(YANGOBJ): obj/%_yang.o: %.yang Makefile | $(OBJDIR) $(E) "YANG $@" @(echo -n "return [=============["; \ diff --git a/src/core/app.events b/src/core/app.events new file mode 100644 index 0000000000..e72e76047a --- /dev/null +++ b/src/core/app.events @@ -0,0 +1,25 @@ +1,9|started: +The app has been started. (Returned from new() callback.) + +1,9|linked: +The app has been linked. (Returned from link() callback.) + +1,9|reconfigured: +The app has been reconfigured. (Returned from reconfig() callback.) + +1,9|stopped: +The app has been stopped. (Returned from stop() callback.) + + +3,3|pull: inpackets inbytes outpackets outbytes droppackets dropbytes +Entering app pull() callback. + +3,3|pulled: inpackets inbytes outpackets outbytes droppackets dropbytes +Returned from app pull() callback. + + +3,3|push: inpackets inbytes outpackets outbytes droppackets dropbytes +Entering app push() callback. + +3,3|pushed: inpackets inbytes outpackets outbytes droppackets dropbytes +Returned from app push() callback. diff --git a/src/core/app.lua b/src/core/app.lua index 74201037c8..78291ada08 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -2,19 +2,20 @@ module(...,package.seeall) -local packet = require("core.packet") -local lib = require("core.lib") -local link = require("core.link") -local config = require("core.config") -local timer = require("core.timer") -local shm = require("core.shm") -local histogram = require('core.histogram') -local counter = require("core.counter") -local jit = require("jit") -local S = require("syscall") -local ffi = require("ffi") -local C = ffi.C -local S = require("syscall") +local packet = require("core.packet") +local lib = require("core.lib") +local link = require("core.link") +local config = require("core.config") +local timer = require("core.timer") +local shm = require("core.shm") +local histogram = require('core.histogram') +local counter = require("core.counter") +local timeline_mod = require("core.timeline") -- avoid collision with timeline +local jit = require("jit") +local S = require("syscall") +local ffi = require("ffi") +local C = ffi.C + require("core.packet_h") -- Packet per pull @@ -33,14 +34,48 @@ local named_program_root = shm.root .. "/" .. "by-name" program_name = false -- Auditlog state -auditlog_enabled = false +local auditlog_enabled = false function enable_auditlog () jit.auditlog(shm.path("audit.log")) auditlog_enabled = true end +-- Timeline event log +local timeline_log, events -- initialized on demand +function timeline () + if timeline_log == nil then + timeline_log = timeline_mod.new("engine/timeline") + timeline_mod.rate(timeline_log, 9) -- initially log events with rate >= 9 + events = timeline_mod.load_events(timeline_log, "core.engine") + end + return timeline_log +end + +function randomize_log_rate () + -- Randomize the log rate. Enable each rate in 5x more breaths + -- than the rate below by randomly picking from log5() distribution. + -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) + -- + -- Could be better to reduce the log rate over time to "stretch" + -- logs for long running processes? Improvements possible :-). + -- + -- We use rates 0-9 where 9 means "log always", and 0 means "log never." + local rate = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) + timeline_mod.rate(timeline_log, rate) +end + +-- Breath latency histogram +local latency -- initialized on demand +function enable_latency_histogram () + if latency == nil then + latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) + end +end + -- The set of all active apps and links in the system, indexed by name. app_table, link_table = {}, {} +-- Timeline events specific to app instances +app_events = setmetatable({}, { __mode = 'k' }) configuration = config.new() @@ -311,6 +346,7 @@ function compute_config_actions (old, new) end end + events.config_actions_computed() return actions end @@ -336,14 +372,14 @@ function apply_config_actions (actions) local link = app.output[linkname] app.output[linkname] = nil remove_link_from_array(app.output, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.unlink_input (appname, linkname) local app = app_table[appname] local link = app.input[linkname] app.input[linkname] = nil remove_link_from_array(app.input, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.free_link (linkspec) link.free(link_table[linkspec], linkspec) @@ -351,7 +387,8 @@ function apply_config_actions (actions) configuration.links[linkspec] = nil end function ops.new_link (linkspec) - link_table[linkspec] = link.new(linkspec) + local link = link.new(linkspec) + link_table[linkspec] = link configuration.links[linkspec] = true end function ops.link_output (appname, linkname, linkspec) @@ -359,19 +396,20 @@ function apply_config_actions (actions) local link = assert(link_table[linkspec]) app.output[linkname] = link table.insert(app.output, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.link_input (appname, linkname, linkspec) local app = app_table[appname] local link = assert(link_table[linkspec]) app.input[linkname] = link table.insert(app.input, link) - if app.link then app:link() end + if app.link then app:link() app_events[app].linked() end end function ops.stop_app (name) local app = app_table[name] - if app.stop then app:stop() end + if app.stop then app:stop() app_events[app].stopped() end if app.shm then shm.delete_frame(app.shm) end + app_events[app] = nil app_table[name] = nil configuration.apps[name] = nil end @@ -382,6 +420,8 @@ function apply_config_actions (actions) name, tostring(app))) end local zone = app.zone or (type(class.name) == 'string' and class.name) or getfenv(class.new)._NAME or name + app_events[app] = + timeline_mod.load_events(timeline(), "core.app", {app=name}) app.appname = name app.output = {} app.input = {} @@ -392,21 +432,25 @@ function apply_config_actions (actions) app.shm = shm.create_frame("apps/"..name, app.shm) end configuration.apps[name] = { class = class, arg = arg } + app_events[app].started() end function ops.reconfig_app (name, class, arg) local app = app_table[name] - app:reconfig(arg) + app:reconfig(arg) app_events[app].reconfigured() configuration.apps[name].arg = arg end + events.configure(counter.read(configs) + 1) -- Dispatch actions. for _, action in ipairs(actions) do local name, args = unpack(action) if log then io.write("engine: ", name, " ", args[1], "\n") end assert(ops[name], name)(unpack(args)) end + events.config_applied() compute_breathe_order () + events.breathe_order_computed() end -- Sort the NODES topologically according to SUCCESSORS via @@ -510,23 +554,30 @@ function main (options) enable_auditlog() end - -- Setup vmprofile - setvmprofile("engine") + -- Ensure timeline is created and initialized + timeline() + -- Enable latency histogram unless explicitly disabled local breathe = breathe if options.measure_latency or options.measure_latency == nil then - local latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) + enable_latency_histogram() breathe = latency:wrap_thunk(breathe, now) end + -- Setup vmprofile + setvmprofile("engine") + + events.engine_started() monotonic_now = C.get_monotonic_time() repeat breathe() - if not no_timers then timer.run() end + if not no_timers then timer.run() events.polled_timers() end if not busywait then pace_breathing() end + randomize_log_rate() -- roll random log rate until done and done() counter.commit() if not options.no_report then report(options.report) end + events.engine_stopped() -- Switch to catch-all profile setvmprofile("program") @@ -542,14 +593,18 @@ function pace_breathing () nextbreath = nextbreath or monotonic_now local sleep = tonumber(nextbreath - monotonic_now) if sleep > 1e-6 then + events.sleep_Hz(Hz, math.round(sleep*1e6)) C.usleep(sleep * 1e6) monotonic_now = C.get_monotonic_time() + events.wakeup_from_sleep() end nextbreath = math.max(nextbreath + 1/Hz, monotonic_now) else if lastfrees == counter.read(frees) then sleep = math.min(sleep + 1, maxsleep) + events.sleep_on_idle(sleep) C.usleep(sleep) + events.wakeup_from_sleep() else sleep = math.floor(sleep/2) end @@ -560,33 +615,73 @@ function pace_breathing () end function breathe () + local freed_packets0 = counter.read(frees) + local freed_bytes0 = counter.read(freebytes) + events.breath_start(counter.read(breaths), freed_packets0, freed_bytes0, + counter.read(freebits)) running = true monotonic_now = C.get_monotonic_time() + events.got_monotonic_time(C.get_time_ns()) -- Restart: restart dead apps restart_dead_apps() -- Inhale: pull work into the app network for i = 1, #breathe_pull_order do local app = breathe_pull_order[i] if app.pull and not app.dead then - with_restart(app, app.pull) + if timeline_mod.rate(timeline_log) <= 3 then + app_events[app].pull(linkstats(app)) + with_restart(app, app.pull) + app_events[app].pulled(linkstats(app)) + else + with_restart(app, app.pull) + end end end + events.breath_pulled() -- Exhale: push work out through the app network for i = 1, #breathe_push_order do local app = breathe_push_order[i] if app.push and not app.dead then - with_restart(app, app.push) + if timeline_mod.rate(timeline_log) <= 3 then + app_events[app].push(linkstats(app)) + with_restart(app, app.push) + app_events[app].pushed(linkstats(app)) + else + with_restart(app, app.push) + end end end + events.breath_pushed() + local freed + local freed_packets = counter.read(frees) - freed_packets0 + local freed_bytes = (counter.read(freebytes) - freed_bytes0) + local freed_bytes_per_packet = freed_bytes / math.max(tonumber(freed_packets), 1) + events.breath_end(counter.read(breaths), freed_packets, freed_bytes_per_packet) counter.add(breaths) -- Commit counters and rebalance freelists at a reasonable frequency if counter.read(breaths) % 100 == 0 then counter.commit() + events.commited_counters() packet.rebalance_freelists() end running = false end +function linkstats (app) + local inp, inb, outp, outb, dropp, dropb = 0, 0, 0, 0, 0, 0 + for i = 1, #app.input do + inp = inp + tonumber(counter.read(app.input[i].stats.rxpackets)) + inb = inb + tonumber(counter.read(app.input[i].stats.rxbytes)) + end + for i = 1, #app.output do + outp = outp + tonumber(counter.read(app.output[i].stats.txpackets)) + outb = outb + tonumber(counter.read(app.output[i].stats.txbytes)) + dropp = dropp + tonumber(counter.read(app.output[i].stats.txdrop)) + dropb = dropb + tonumber(counter.read(app.output[i].stats.txdropbytes)) + end + return inp, inb, outp, outb, dropp, dropb +end + function report (options) if not options or options.showload then report_load() diff --git a/src/core/engine.events b/src/core/engine.events new file mode 100644 index 0000000000..10e9e32687 --- /dev/null +++ b/src/core/engine.events @@ -0,0 +1,78 @@ +0,6|engine_started: +The engine starts the traffic processing loop. + + +1,5|breath_start: breath totalpackets totalbytes totaletherbits +The engine starts an iteration of the packet-processing event loop (a +"breath".) + +The total count of packets, bytes, and bits (including layer-1 +ethernet overhead) that the engine has processed are included. These +can be used to track the rate of traffic. + + +2,3|got_monotonic_time: unixnanos +The engine has completed initialization for the breath: synchronized +the current time and handled any pending error recovery. + +'unixnanos' is the current wall-clock time in nanoseconds since the epoc. +This can be used to synchronize the cycle timestamps with wall-clock time. + + +2,4|breath_pulled: +The engine has "pulled" new packets into the event loop for processing. + +2,4|breath_pushed: +The engine has "pushed" packets one step through the processing network. + + +1,5|breath_end: breath npackets bpp +The engine completes an iteration of the event loop (a "breath.") + +'packets' is the total number of packets processed during the breath. +'bpp' is the average number bytes per packet. + +Note: 'packets' is an internal measure of how many packets were +deallocated (freed) during processing. This does not necessarily +correspond directly to ingress or egress packets on a given interface. + + +1,4|commited_counters: +The engine commits the latest counter values to externally visible shared +memory. + +1,4|polled_timers: +The engine polled its timers and executed any that were expired. + + +1,4|sleep_Hz: usec Hz +The engine requests that the kernel suspend this process for a period of +microseconds in order to reduce CPU utilization and achieve a fixed +frequency of breaths per second (Hz). + +1,4|sleep_on_idle: usec +The engine requests that the kernel suspend this process for a period +of microseconds in order to reduce CPU utilization because idleness +has been detected (a breath in which no packets were processed.) + +1,4|wakeup_from_sleep: +The engine resumes operation after sleeping voluntarily. + + +0,6|engine_stopped: +The engine stops the traffic processing loop. + + +0,9|config_actions_computed: +The engine has computed the actions required for applying a new configuration. + +0,9|configure: config +The engine begins to apply a new configuration. + +'config' is the number of this configuration. + +0,9|config_applied: +The engine has applied a new configuration. + +0,9|breathe_order_computed: +The engine has computed the breath order of a new configuration. diff --git a/src/core/link.h b/src/core/link.h index 44f9e7eddb..415c768b2c 100644 --- a/src/core/link.h +++ b/src/core/link.h @@ -9,7 +9,7 @@ struct link { // http://en.wikipedia.org/wiki/Circular_buffer struct packet *packets[LINK_RING_SIZE]; struct { - struct counter *dtime, *txbytes, *rxbytes, *txpackets, *rxpackets, *txdrop; + struct counter *dtime, *txbytes, *rxbytes, *txpackets, *rxpackets, *txdrop, *txdropbytes; } stats; // Two cursors: // read: the next element to be read diff --git a/src/core/link.lua b/src/core/link.lua index 34cab06507..4dddf6ed7c 100644 --- a/src/core/link.lua +++ b/src/core/link.lua @@ -23,7 +23,7 @@ local size = C.LINK_RING_SIZE -- NB: Huge slow-down if this is not local max = C.LINK_MAX_PACKETS local provided_counters = { - "dtime", "rxpackets", "rxbytes", "txpackets", "txbytes", "txdrop" + "dtime", "rxpackets", "rxbytes", "txpackets", "txbytes", "txdrop", "txdropbytes" } function new (name) @@ -63,6 +63,7 @@ function transmit (r, p) -- assert(p) if full(r) then counter.add(r.stats.txdrop) + counter.add(r.stats.txdropbytes, p.length) packet.free(p) else r.packets[r.write] = p diff --git a/src/core/main.lua b/src/core/main.lua index ae0677356c..288833fe36 100644 --- a/src/core/main.lua +++ b/src/core/main.lua @@ -28,7 +28,7 @@ end -- Reserve names that we want to use for global module. -- (This way we avoid errors from the 'strict' module.) -_G.config, _G.engine, _G.memory, _G.link, _G.packet, _G.timer, +_G.config, _G.engine, _G.memory, _G.link, _G.packet, _G.timer, _G.timeline, _G.main = nil ffi.cdef[[ @@ -164,6 +164,7 @@ function initialize () _G.link = require("core.link") _G.packet = require("core.packet"); _G.packet.initialize() _G.timer = require("core.timer") + _G.timeline = require("core.timeline") _G.main = getfenv() end diff --git a/src/core/packet.events b/src/core/packet.events new file mode 100644 index 0000000000..752a3f25ee --- /dev/null +++ b/src/core/packet.events @@ -0,0 +1,32 @@ +9,9|packets_preallocated: packets +DMA memory for packets has been preallocated from the operating system. + +'packets' is the number of packets for which space has been reserved. + +9,1|packet_allocated: +A packet has been allocated from the packet freelist. + +9,1|packet_freed: length +A packet has been freed to the packet freelist. + +'length' is the byte size of the packet. + + +9,4|group_freelist_wait: +The process is waiting to acquire the group freelist’s lock. + +9,4|group_freelist_locked: +The process has acquired the group freelist’s lock. + +9,4|group_freelist_released: packets +The packet freelist was rebalanced with the group freelist. + +'packets' is the number of packets released to the group freelist. + +9,4|group_freelist_reclaimed: packets +The packet freelist was refilled from the group freelist. + +'packets' is the number of packets reclaimed from the group freelist. + +9,4|group_freelist_unlocked: +The process has released the group freelist’s lock. diff --git a/src/core/packet.lua b/src/core/packet.lua index c7ea684332..c6847d4ea1 100644 --- a/src/core/packet.lua +++ b/src/core/packet.lua @@ -13,6 +13,7 @@ local memory = require("core.memory") local shm = require("core.shm") local counter = require("core.counter") local sync = require("core.sync") +local timeline = require("core.timeline") require("core.packet_h") @@ -105,11 +106,12 @@ end local packet_allocation_step = 1000 local packets_allocated = 0 -- Initialized on demand. -local packets_fl, group_fl +local packets_fl, group_fl, events -- Call to ensure packet freelist is enabled. function initialize () packets_fl = freelist_create("engine/packets.freelist") + events = timeline.load_events(engine.timeline(), "core.packet") end -- Call to ensure group freelist is enabled. @@ -122,12 +124,17 @@ end -- Return borrowed packets to group freelist. function rebalance_freelists () if group_fl and freelist_nfree(packets_fl) > packets_allocated then + events.group_freelist_wait() freelist_lock(group_fl) + events.group_freelist_locked() + local nfree0 = freelist_nfree(packets_fl) while freelist_nfree(packets_fl) > packets_allocated and not freelist_full(group_fl) do freelist_add(group_fl, freelist_remove(packets_fl)) end + events.group_freelist_released(nfree0 - freelist_nfree(packets_fl)) freelist_unlock(group_fl) + events.group_freelist_unlocked() end end @@ -142,17 +149,22 @@ end}) function allocate () if freelist_nfree(packets_fl) == 0 then if group_fl then + events.group_freelist_wait() freelist_lock(group_fl) + events.group_freelist_locked() while freelist_nfree(group_fl) > 0 and freelist_nfree(packets_fl) < packets_allocated do freelist_add(packets_fl, freelist_remove(group_fl)) end freelist_unlock(group_fl) + events.group_freelist_unlocked() + events.group_freelist_reclaimed(freelist_nfree(packets_fl)) end if freelist_nfree(packets_fl) == 0 then preallocate_step() end end + events.packet_allocated() return freelist_remove(packets_fl) end @@ -266,6 +278,7 @@ function account_free (p) end function free (p) + events.packet_freed(p.length) account_free(p) free_internal(p) end @@ -293,6 +306,7 @@ function preallocate_step() end packets_allocated = packets_allocated + packet_allocation_step packet_allocation_step = 2 * packet_allocation_step + events.packets_preallocated(packet_allocation_step) end function selftest () diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl new file mode 100644 index 0000000000..76e4a9b97b --- /dev/null +++ b/src/core/timeline.dasl @@ -0,0 +1,314 @@ +-- timeline: high-resolution event log using in-memory ring buffer +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(..., package.seeall) + +local dasm = require("dasm") +local ffi = require("ffi") +local C = ffi.C +local S = require("syscall") +local shm = require("core.shm") +local lib = require("core.lib") + +-- Set to false to disable timeline logging +enabled = true + +-- Load a set of events for logging onto a timeline. +-- Returns a set of logging functions. +-- +-- For example: +-- e = load_events(engine.timeline, "core.app", {name="myapp", class="intel10g"}) +-- Loads the events defined in src/core/app.events and tags each event +-- with the name of the app and class. Events can then be logged: +-- e:app_pulled(inpackets, inbytes, outpackets, outbytes) +function load_events (tl, eventmodule, extra) + local category = eventmodule:match("[^.]+$") -- "core.engine" -> "engine" + -- Convert extra into " key1=value1 key2=value2 ..." attributes string. + local spec = require(eventmodule.."_events") + return load_events_from_string(tl, spec, category, extra) +end + +-- (Helper function) +function load_events_from_string (tl, spec, category, extra) + local events = {} + -- Insert a delimiter character (\a "alarm") between log messages. + spec = spec:gsub("\n(%d,%d|)", "\n\a%1") + for message in spec:gmatch("[^\a]+") do + message = message:gsub("(.-)%s*$", "%1") -- trim trailing spaces + local event = message:match("([%w_]+):") + events[event] = mkevent(tl, category, message, extra) + end + -- Return the set of functions in an efficient-to-call FFI object. + local mt = {__index = events} + return ffi.new(ffi.metatype(ffi.typeof("struct{}"), mt)) +end + +------------------------------------------------------------ +-- Binary data structures + +ffi.cdef[[ + // 64B file header + struct timeline_header { + uint64_t magic; + uint16_t major_version; + uint16_t minor_version; + uint32_t log_bytes; + uint32_t strings_bytes; + uint8_t reserved[44]; + }; + + // 64B log entry + struct timeline_entry { + double tsc; // CPU timestamp (note: assumed to be first elem below) + uint16_t msgid; // msgid*16 is index into string table + uint16_t core_numa; // TSC_AUX: core (bits 0-7) + numa (12-15) + uint32_t reserved; // (available for future use) + double arg0, arg1, arg2, arg3, arg4, arg5; // message arguments + }; + + // Private local state for updating the log + struct timeline_state { + // state for the entries ring buffer + struct timeline_entry *entries; + uint32_t rate; + uint32_t next_entry; + uint32_t num_entries; + // state for the string table + char *stringtable; + int stringtable_size; + int next_string; + }; +]] + +-- Header of the log file +local magic = 0xa3ff7223441d0001ULL +local major, minor = 2, 1 + +------------------------------------------------------------ +-- API + +-- Create a new timeline under the given shared memory path. +function new (shmpath, num_entries, size_stringtable) + if not enabled then return false end + num_entries = num_entries or 1e6 + size_stringtable = size_stringtable or 1e6 + -- Calculate size based on number of log entries + local size_header = ffi.sizeof("struct timeline_header") + local size_entries = num_entries * ffi.sizeof("struct timeline_entry") + local size = size_header + size_entries + size_stringtable + -- Allocate one shm object with memory for all data structures + local memory = shm.create(shmpath, ffi.typeof("char["..size.."]")) + local header = ffi.cast("struct timeline_header *", memory) + local ring = ffi.cast("struct timeline_entry *", memory + size_header) + local stringtable = ffi.cast("char*", memory + size_header + size_entries) + -- Fill in header values + header.magic = 0xa3ff7223441d0001ULL + header.major_version = 3 + header.minor_version = 0 + header.log_bytes = size_entries + header.strings_bytes = size_stringtable + -- Private state + local state = ffi.new("struct timeline_state") + state.entries = ring + state.rate = 0 + state.next_entry = 0 + state.num_entries = num_entries + state.stringtable = stringtable + state.stringtable_size = size_stringtable + state.next_string = 0 + -- Return an object + return state +end + +function mkevent (timeline, category, message, attrs) + if not message:match("^%d,%d|([^:]+):") then + error(("event syntax error: %q"):format(message)) + end + -- Extract the sampling rate for the message + local rate = tonumber(message:match("^%d,(%d)|")) + -- Insert the category ("0,3|event:" -> "0,3|category.event:") + message = message:gsub("|", "|"..category..".", 1) + -- Insert the additional attributes. + -- e.g. "1|foo: arg" with {a1="x",a2="y"} becomes "1|foo a1=x a2=y: arg" + for k,v in pairs(attrs or {}) do + message = message:gsub(":", (" %s=%s:"):format(k, v), 1) + end + -- Count the number of arguments. + -- (See http://stackoverflow.com/a/11158158/1523491) + local _, n = (message:match(":([^\n]*)")):gsub("[^%s]+","") + assert(n >= 0 and n <= 6, "illegal number of arguments: "..n) + if not enabled then return function () end end + local id = intern(timeline, message) + local event = event -- move asm function into local scope + local log = timeline + if n==0 then return function () event(log,rate,id,0,0,0,0,0,0) end end + if n==1 then return function (a) event(log,rate,id,a,0,0,0,0,0) end end + if n==2 then return function (a,b) event(log,rate,id,a,b,0,0,0,0) end end + if n==3 then return function (a,b,c) event(log,rate,id,a,b,c,0,0,0) end end + if n==4 then return function (a,b,c,d) event(log,rate,id,a,b,c,d,0,0) end end + if n==5 then return function (a,b,c,d,e) event(log,rate,id,a,b,c,d,e,0) end end + if n==6 then return function (a,b,c,d,e,f) event(log,rate,id,a,b,c,d,e,f) end end +end + +-- Get or set the current timeline log rate. +function rate (timeline, rate) + if not enabled then return 1/0 end + if rate then timeline.rate = rate end + return timeline.rate +end + +------------------------------------------------------------ +-- Defining log message formats + +-- Intern a string in the timeline stringtable. +-- Return a unique ID (16-bit offset in 16-byte words) or 0xFFFF if +-- the table is full. + +-- Cache known strings in a weak table keyed on timeline object. +-- (Timeline object is an FFI struct that can't contain a Lua tables.) +local known = setmetatable({}, {__mode='k'}) + +function intern (timeline, str) + known[timeline] = known[timeline] or {} + if known[timeline][str] then + return known[timeline][str] + end + local len = #str+1 -- count null terminator + if timeline.next_string + len >= timeline.stringtable_size then + return 0xFFFF -- overflow + else + local position = timeline.next_string + ffi.copy(timeline.stringtable + position, str) + timeline.next_string = lib.align(position + len, 16) + local id = position/16 + assert(id == math.floor(id), "timeline string alignment error") + known[timeline][str] = id + return id + end +end + +------------------------------------------------------------ +-- Logging messages + +|.arch x64 +|.actionlist actions +|.globalnames globalnames + + +-- Registers holding function parameters for x86-64 calling convention. +|.define p0, rdi +|.define p1, rsi +|.define p2, rdx +|.define p3, rcx +|.define p4, r8 +|.define p5, r9 + +|.type log, struct timeline_state +|.type msg, struct timeline_entry +-- void log(timeline, rate, msg, arg0, ..., arg5) +local function asmlog (Dst) + |->log: + -- Check that the enabled log rate is >= the event log rate + | mov eax, log:p0->rate + | cmp p1, rax + | jge >1 + | ret + |1: + -- Load index to write into r11 + | mov r11d, log:p0->next_entry + -- Increment next index and check for wrap-around + | mov eax, r11d + | add eax, 1 + | xor ecx, ecx + | cmp eax, log:p0->num_entries + | cmove eax, ecx + | mov log:p0->next_entry, eax + -- Convert log entry number to pointer + | shl r11, 6 -- 64B element number -> byte index + | mov r10, log:p0->entries + | add r10, r11 + -- Log the arguments from register parameters + | mov msg:r10->msgid, dx + | movsd qword msg:r10->arg0, xmm0 + | movsd qword msg:r10->arg1, xmm1 + | movsd qword msg:r10->arg2, xmm2 + | movsd qword msg:r10->arg3, xmm3 + | movsd qword msg:r10->arg4, xmm4 + | movsd qword msg:r10->arg5, xmm5 + -- Log the timestamp and core/numa aux info + | rdtscp + | mov msg:r10->core_numa, cx + -- Convert TSC in EAX:EDX to double + | shl rdx, 32 + | or rax, rdx + | cvtsi2sd xmm0, rax + | movsd qword msg:r10->tsc, xmm0 + + | ret +end + +local Dst, globals = dasm.new(actions, nil, nil, 1 + #globalnames) +asmlog(Dst) +local mcode, size = Dst:build() +local entry = dasm.globals(globals, globalnames) + +event = ffi.cast("void(*)(struct timeline_state *, int, int, double, double, double, double, double, double)", entry.log) + +_anchor = mcode + +--dasm.dump(mcode, size) + +local test_events = [[ +0,6|six: +event with rate 6 (0 args) + +0,5|five: a b c +event with rate 5 (3 args) + +0,4|four: a b c d e f +event with rate 4 (6 args) + +0,3|three: +event with rate 3 (0 args) +]] + +-- selftest is designed mostly to check that timeline logging does not +-- crash the snabb process e.g. including overflow of the log entries +-- and the string table. it does not verify the contents of the log +-- messages. +function selftest () + print("selftest: timeline") + local tl = new("selftest/timeline") + local e = load_events_from_string(tl, test_events, "selftest", + {module="timeline", func="selftest"}) + rate(tl, 4) -- won't log event three + + print("check logging individual messages") + -- First check that log entries are created + assert(tl.next_entry == 0) + e.six() assert(tl.next_entry == 1) + e.five(1, 2, 3) assert(tl.next_entry == 2) + e.four(1, 2, 3, 4, 5, 6) assert(tl.next_entry == 3) + e.three() assert(tl.next_entry == 3) -- skipped + + local n = tl.num_entries*10 + print("check wrap-around on "..lib.comma_value(n).." events") + for i = 1, n do + e.six() + e.five(1, 2, 3) + e.four(1, 2, 3, 4, 5, 6) + e.three() + end + -- overflow the string table + print("overflowing string table") + for i = 1, 1e5 do + mkevent(tl, "selftest", "0,9|dummy_event_definition:", {i=i}) + end + -- report median logging time + local sample = {} + for i = 1, 1000 do sample[i] = tl.entries[i].tsc - tl.entries[i-1].tsc end + table.sort(sample) + print("median time delta for sample:", tonumber(sample[1]).." cycles") + print("selftest: ok") +end + diff --git a/src/dasm_x86.lua b/src/dasm_x86.lua index 22fbbc4516..b31fe03f85 100644 --- a/src/dasm_x86.lua +++ b/src/dasm_x86.lua @@ -1232,6 +1232,7 @@ local map_op = { shrd_3 = "mriqdw:0FACRmU|mrC/qq:0FADRm|mrC/dd:|mrC/ww:", rdtsc_0 = "0F31", -- P1+ + rdtscp_0 = "0F01F9",-- P6+ rdpmc_0 = "0F33", -- P6+ cpuid_0 = "0FA2", -- P1+ diff --git a/src/lib/ipsec/esp.events b/src/lib/ipsec/esp.events new file mode 100644 index 0000000000..0580e67f03 --- /dev/null +++ b/src/lib/ipsec/esp.events @@ -0,0 +1,70 @@ +8,2|encapsulate_transport6_start: + +ESP encapsulation for a packet in transport-IPv6 mode has started. + +8,2|encapsulate_tunnel_start: + +ESP encapsulation for a packet in tunnel mode has started. + +9,1|trailer_encoded: + +The ESP trailer has been encoded. + +9,1|payload_encrypted: length + +The ESP payload has been encrypted. + +'length' is the byte length of the payload. + +9,1|header_encoded: + +The ESP header has been encoded. + +8,2|encapsulate_transport6_end: + +ESP encapsulation for a packet in transport-IPv6 mode has completed. + +8,2|encapsulate_tunnel_end: + +ESP encapsulation for a packet in tunnel mode has completed. + + +8,2|decapsulate_transport6_start: + +ESP decapsulation for a packet in transport-IPv6 mode has started. + +8,2|decapsulate_tunnel_start: + +ESP decapsulation for a packet in tunnel mode has started. + +9,1|sequence_number_checked: seq_low + +A ESP sequence number has been checked. + +'seq_low' is the lower 32 bits of the sequence number. + +9,1|payload_decrypted: length + +ESP payload has been decrypted. + +'length' is the byte length of the ciphertext. + +9,6|resync_attempted: spi + +ESP resynchronization has been attempted. + +'spi' is the "Security Parameter Index" of the ESP "Security Association" (SA). + +9,1|sequence_number_tracked: seq + +A new ESP sequence number has been tracked. + +'seq' is the 64-bit extended sequence number. + +8,2|decapsulate_transport6_end: + +ESP decapsulation for a packet in transport-IPv6 mode has completed. + +8,2|decapsulate_tunnel_end: + +ESP decapsulation for a packet in tunnel mode has completed. diff --git a/src/lib/ipsec/esp.lua b/src/lib/ipsec/esp.lua index 7a55596049..3142249503 100644 --- a/src/lib/ipsec/esp.lua +++ b/src/lib/ipsec/esp.lua @@ -34,6 +34,8 @@ local band = bit.band local htons, htonl, ntohl = lib.htons, lib.htonl, lib.ntohl +local events = timeline.load_events(engine.timeline(), ...) + require("lib.ipsec.track_seq_no_h") local window_t = ffi.typeof("uint8_t[?]") @@ -95,12 +97,14 @@ function encrypt:encode_esp_trailer (ptr, next_header, pad_length) local esp_trailer = ffi.cast(esp_trailer_ptr_t, ptr) esp_trailer.next_header = next_header esp_trailer.pad_length = pad_length + events.trailer_encoded() end function encrypt:encrypt_payload (ptr, length) self:next_seq_no() local seq, low, high = self.seq, self.seq:low(), self.seq:high() self.cipher:encrypt(ptr, seq, low, high, ptr, length, ptr + length) + events.payload_encrypted(length) end function encrypt:encode_esp_header (ptr) @@ -108,6 +112,7 @@ function encrypt:encode_esp_header (ptr) esp_header.spi = htonl(self.spi) esp_header.seq_no = htonl(self.seq:low()) ffi.copy(ptr + ESP_SIZE, self.seq, self.cipher.IV_SIZE) + events.header_encoded() end -- Encapsulation in transport mode is performed as follows: @@ -117,6 +122,7 @@ end -- 4. Move resulting ciphertext to make room for ESP header -- 5. Write ESP header function encrypt:encapsulate_transport6 (p) + events.encapsulate_transport6_start() if p.length < TRANSPORT6_PAYLOAD_OFFSET then return nil end local ip = ffi.cast(ipv6_ptr_t, p.data + ETHERNET_SIZE) @@ -141,6 +147,7 @@ function encrypt:encapsulate_transport6 (p) ip.next_header = PROTOCOL ip.payload_length = htons(payload_length + overhead) + events.encapsulate_transport6_end() return p end @@ -154,6 +161,7 @@ end -- (The resulting packet contains the raw ESP frame, without IP or Ethernet -- headers.) function encrypt:encapsulate_tunnel (p, next_header) + events.encapsulate_tunnel_start() local pad_length = self:padding(p.length) local trailer_overhead = pad_length + ESP_TAIL_SIZE + self.cipher.AUTH_SIZE local orig_length = p.length @@ -170,6 +178,7 @@ function encrypt:encapsulate_tunnel (p, next_header) self:encode_esp_header(p.data) + events.encapsulate_tunnel_end() return p end @@ -210,6 +219,7 @@ function decrypt:decrypt_payload (ptr, length, ip) local seq_high = tonumber( C.check_seq_no(seq_low, self.seq.no, self.window, self.window_size) ) + events.sequence_number_checked(seq_low) local error = nil if seq_high < 0 or not self.cipher:decrypt( @@ -221,8 +231,11 @@ function decrypt:decrypt_payload (ptr, length, ip) self.decap_fail = self.decap_fail + 1 if self.decap_fail > self.resync_threshold then seq_high = self:resync(ptr, length, seq_low, seq_high) + events.resync_attempted(self.spi) if seq_high then error = nil end end + else + events.payload_decrypted(ctext_length) end if error then @@ -234,6 +247,7 @@ function decrypt:decrypt_payload (ptr, length, ip) self.seq.no = C.track_seq_no( seq_high, seq_low, self.seq.no, self.window, self.window_size ) + events.sequence_number_tracked(self.seq.no) local esp_trailer_start = ctext_start + ctext_length - ESP_TAIL_SIZE local esp_trailer = ffi.cast(esp_trailer_ptr_t, esp_trailer_start) @@ -249,6 +263,7 @@ end -- 4. Move cleartext up to IP payload -- 5. Shrink p by ESP overhead function decrypt:decapsulate_transport6 (p) + events.decapsulate_transport6_start() if p.length - TRANSPORT6_PAYLOAD_OFFSET < self.MIN_SIZE then return nil end local ip = ffi.cast(ipv6_ptr_t, p.data + ETHERNET_SIZE) @@ -267,6 +282,7 @@ function decrypt:decapsulate_transport6 (p) C.memmove(payload, ptext_start, ptext_length) p = packet.resize(p, TRANSPORT6_PAYLOAD_OFFSET + ptext_length) + events.decapsulate_transport6_end() return p end @@ -279,6 +295,7 @@ end -- (The resulting packet contains the raw ESP payload (i.e. an IP frame), -- without an Ethernet header.) function decrypt:decapsulate_tunnel (p) + events.decapsulate_tunnel_start() if p.length < self.MIN_SIZE then return nil end local ptext_start, ptext_length, next_header = @@ -289,6 +306,7 @@ function decrypt:decapsulate_tunnel (p) p = packet.shiftleft(p, self.CTEXT_OFFSET) p = packet.resize(p, ptext_length) + events.decapsulate_tunnel_end() return p, next_header end diff --git a/src/lib/ptree/worker.lua b/src/lib/ptree/worker.lua index 0d102b9d20..2be6fc5664 100644 --- a/src/lib/ptree/worker.lua +++ b/src/lib/ptree/worker.lua @@ -13,6 +13,8 @@ local alarms = require("lib.yang.alarms") local channel = require("lib.ptree.channel") local action_codec = require("lib.ptree.action_codec") local ptree_alarms = require("lib.ptree.alarms") +local timeline = require("core.timeline") +local events = timeline.load_events(engine.timeline(), "core.engine") local Worker = {} @@ -31,6 +33,7 @@ function new_worker (conf) ret.period = 1/conf.Hz ret.duration = conf.duration or 1/0 ret.no_report = conf.no_report + ret.measure_latency = conf.measure_latency ret.jit_flush = conf.jit_flush ret.channel = channel.create('config-worker-channel', 1e6) alarms.install_alarm_handler(ptree_alarms:alarm_handler()) @@ -38,11 +41,6 @@ function new_worker (conf) require("jit.opt").start('sizemcode=256', 'maxmcode=2048') - ret.breathe = engine.breathe - if conf.measure_latency then - local latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) - ret.breathe = latency:wrap_thunk(ret.breathe, engine.now) - end return ret end @@ -100,27 +98,26 @@ function Worker:handle_actions_from_manager() end function Worker:main () - local vmprofile = require("jit.vmprofile") local stop = engine.now() + self.duration local next_time = engine.now() - -- Setup vmprofile. - engine.setvmprofile("engine") - vmprofile.start() - - if not engine.auditlog_enabled then engine.enable_auditlog() end - - repeat - self.breathe() + local function control () if next_time < engine.now() then next_time = engine.now() + self.period + events.engine_stopped() + engine.setvmprofile("worker") self:handle_actions_from_manager() - timer.run() + engine.setvmprofile("engine") + events.engine_started() end - if not engine.busywait then engine.pace_breathing() end - until stop < engine.now() - counter.commit() - if not self.no_report then engine.report(self.report) end + if stop < engine.now() then + return true -- done + end + end + + engine.main{done=control, + report=self.report, no_report=self.no_report, + measure_latency=self.measure_latency} end function main (opts) diff --git a/src/program/vita/dispatch.events b/src/program/vita/dispatch.events new file mode 100644 index 0000000000..9a95fb7c51 --- /dev/null +++ b/src/program/vita/dispatch.events @@ -0,0 +1,14 @@ +5,2|private_dispatch_start: +PrivateDispatch started to process a packet. + +6,1|private_forward4_matched: +PrivateDispatch matched an IPv4 packet to be forwarded. + +6,1|checksum_verification_succeeded: +PrivateDispatch verified a correct IP checksum. + +6,1|checksum_verification_failed: +PrivateDispatch rejected an invalid IP checksum. + +5,2|private_dispatch_end: +PrivateDispatch has processed a packet. diff --git a/src/program/vita/dispatch.lua b/src/program/vita/dispatch.lua index 125ea18d73..18923f5ebc 100644 --- a/src/program/vita/dispatch.lua +++ b/src/program/vita/dispatch.lua @@ -11,6 +11,7 @@ local ipv6 = require("lib.protocol.ipv6") local pf_match = require("pf.match") local ffi = require("ffi") +local events = timeline.load_events(engine.timeline(), ...) PrivateDispatch = { name = "PrivateDispatch", @@ -40,13 +41,16 @@ function PrivateDispatch:new (conf) end function PrivateDispatch:forward4 () + events.private_forward4_matched() local p = packet.shiftleft(self.p_box[0], ethernet:sizeof()) assert(self.ip4:new_from_mem(p.data, p.length)) if self.ip4:checksum_ok() then + events.checksum_verification_succeeded() -- Strip datagram of any Ethernet frame padding before encapsulation. local d = packet.resize(p, math.min(self.ip4:total_length(), p.length)) link.transmit(self.output.forward4, d) else + events.checksum_verification_failed() packet.free(p) counter.add(self.shm.rxerrors) counter.add(self.shm.checksum_errors) @@ -79,7 +83,9 @@ function PrivateDispatch:push () while not link.empty(input) do local p = link.receive(input) self.p_box[0] = p + events.private_dispatch_start() self:dispatch(p.data, p.length) + events.private_dispatch_end() end end diff --git a/src/program/vita/route.events b/src/program/vita/route.events new file mode 100644 index 0000000000..dab6cd3a7c --- /dev/null +++ b/src/program/vita/route.events @@ -0,0 +1,24 @@ +5,2|private_route4_start: +PrivateRouter has started to process an IPv4 packet. + +6,1|private_route4_lookup_success: +PrivateRouter looked up the route for an IPv4 packet. + +6,1|private_route4_lookup_failure: +PrivateRouter failed to look up a route of an IPv4 packet. + +5,2|private_route4_end: +PrivateRouter has processed an IPv4 packet. + + +5,2|public_route4_start: +PublicRouter has started to process an IPv4 packet. + +6,1|public_route4_lookup_success: +PublicRouter looked up the SA for an IPv4 packet. + +6,1|public_route4_lookup_failure: +PublicRouter failed to look up a SA of an IPv4 packet. + +5,2|public_route4_end: +PublicRouter has processed an IPv4 packet. diff --git a/src/program/vita/route.lua b/src/program/vita/route.lua index 003858df45..05a9029bf8 100644 --- a/src/program/vita/route.lua +++ b/src/program/vita/route.lua @@ -9,6 +9,7 @@ local esp = require("lib.protocol.esp") local poptrie = require("lib.poptrie") local ffi = require("ffi") +local events = timeline.load_events(engine.timeline(), ...) -- route := { net_cidr4=(CIDR4), gw_ip4=(IPv4), preshared_key=(KEY) } @@ -58,8 +59,10 @@ end function PrivateRouter:route (p) assert(self.ip4:new_from_mem(p.data, p.length)) + events.private_route4_start() local route = self:find_route4(self.ip4:dst()) if route then + events.private_route4_lookup_success() if p.length <= self.mtu then link.transmit(route, p) else @@ -72,10 +75,12 @@ function PrivateRouter:route (p) end end else + events.private_route4_lookup_failure() packet.free(p) counter.add(self.shm.rxerrors) counter.add(self.shm.route_errors) end + events.private_route4_end() end function PrivateRouter:push () @@ -141,14 +146,18 @@ function PublicRouter:push () local input = self.input.input while not link.empty(input) do + events.public_route4_start() local p = link.receive(input) assert(self.esp:new_from_mem(p.data, p.length)) local route = self:find_route4(self.esp:spi()) if route then + events.public_route4_lookup_success() link.transmit(route, p) else + events.public_route4_lookup_failure() packet.free(p) counter.add(self.shm.route_errors) end + events.public_route4_end() end end