From 2e3ac65a45a1710af1ea9741c06221a0a3fbc553 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 17 Dec 2024 20:35:44 -0800 Subject: [PATCH] feat: switch to tardy v0.2.0 --- build.zig.zon | 4 +- flake.lock | 8 +-- flake.nix | 2 +- src/http/router/fs_dir.zig | 100 +++++++++++++++++++------------ src/http/router/routing_trie.zig | 12 ++-- src/http/server.zig | 94 +++++++++++++++++++++-------- 6 files changed, 145 insertions(+), 75 deletions(-) diff --git a/build.zig.zon b/build.zig.zon index f3c0378..93b1e2b 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -4,8 +4,8 @@ .minimum_zig_version = "0.13.0", .dependencies = .{ .tardy = .{ - .url = "git+https://github.com/mookums/tardy?ref=v0.1.0#ae0970d6b3fa5b03625b14e142c664efe1fd7789", - .hash = "12207f5afee3b8933c1c32737e8feedc80a2e4feebe058739509094c812e4a8d2cc8", + .url = "git+https://github.com/mookums/tardy?ref=v0.2.0#543e6b01cba3caa691960a4a5a54d3419969f2d8", + .hash = "12202bc544928f0bb67ab4a30d3ff6d54c9d62643296c8d04303762b477b71fd002d", }, .bearssl = .{ .url = "git+https://github.com/mookums/bearssl-zig#37a96eee56fe2543579bbc6da148ca886f3dd32b", diff --git a/flake.lock b/flake.lock index 57e83c6..a8acc7b 100644 --- a/flake.lock +++ b/flake.lock @@ -538,16 +538,16 @@ }, "nixpkgs_7": { "locked": { - "lastModified": 1734465047, - "narHash": "sha256-60yR+QPBLsZktRn7WHrYA5hRphOabWx4BAAXG7GiVlo=", + "lastModified": 1734482269, + "narHash": "sha256-4SJCseZMuI2n7mOZY7+2Q8FNQSaZLJ3tzSeGZ3CRA6E=", "owner": "nixos", "repo": "nixpkgs", - "rev": "9294f1b44c212d8795ceae7bc353fa62596d16c0", + "rev": "7c5d525b987f5372820e72298444cd3e4b6a9351", "type": "github" }, "original": { "owner": "nixos", - "ref": "release-24.05", + "ref": "release-24.11", "repo": "nixpkgs", "type": "github" } diff --git a/flake.nix b/flake.nix index 014dc16..4da8abd 100644 --- a/flake.nix +++ b/flake.nix @@ -3,7 +3,7 @@ "a framework for writing performant and reliable networked services"; inputs = { - nixpkgs.url = "github:nixos/nixpkgs/release-24.05"; + nixpkgs.url = "github:nixos/nixpkgs/release-24.11"; iguana.url = "github:mookums/iguana"; flake-utils.url = "github:numtide/flake-utils"; }; diff --git a/src/http/router/fs_dir.zig b/src/http/router/fs_dir.zig index ac8f0f3..c6471a0 100644 --- a/src/http/router/fs_dir.zig +++ b/src/http/router/fs_dir.zig @@ -7,6 +7,11 @@ const Response = @import("../response.zig").Response; const Mime = @import("../mime.zig").Mime; const _Context = @import("../context.zig").Context; +const OpenResult = @import("tardy").OpenResult; +const ReadResult = @import("tardy").ReadResult; +const SendResult = @import("tardy").SendResult; +const StatResult = @import("tardy").StatResult; + const Runtime = @import("tardy").Runtime; const Stat = @import("tardy").Stat; const Cross = @import("tardy").Cross; @@ -27,33 +32,44 @@ pub fn FsDir(Server: type, AppState: type) type { buffer: []u8, }; - fn open_file_task(rt: *Runtime, fd: std.posix.fd_t, provision: *FileProvision) !void { + fn open_file_task(rt: *Runtime, result: OpenResult, provision: *FileProvision) !void { errdefer provision.context.respond(.{ .status = .@"Internal Server Error", .mime = Mime.HTML, .body = "", }) catch unreachable; - if (!Cross.fd.is_valid(fd)) { + const fd = result.unwrap() catch |e| { + log.warn("file not found | {}", .{e}); try provision.context.respond(.{ .status = .@"Not Found", .mime = Mime.HTML, .body = "File Not Found", }); return; - } + }; provision.fd = fd; try rt.fs.stat(provision, stat_file_task, fd); } - fn stat_file_task(rt: *Runtime, stat: Stat, provision: *FileProvision) !void { + fn stat_file_task(rt: *Runtime, result: StatResult, provision: *FileProvision) !void { errdefer provision.context.respond(.{ .status = .@"Internal Server Error", .mime = Mime.HTML, .body = "", }) catch unreachable; + const stat = result.unwrap() catch |e| { + log.warn("stat on fd={d} failed | {}", .{ provision.fd, e }); + try provision.context.respond(.{ + .status = .@"Not Found", + .mime = Mime.HTML, + .body = "File Not Found", + }); + return; + }; + // Set file size. provision.file_size = stat.size; log.debug("file size: {d}", .{provision.file_size}); @@ -109,50 +125,56 @@ pub fn FsDir(Server: type, AppState: type) type { ); } - fn read_file_task(rt: *Runtime, result: i32, provision: *FileProvision) !void { + fn read_file_task(rt: *Runtime, result: ReadResult, provision: *FileProvision) !void { errdefer { std.posix.close(provision.fd); provision.context.close() catch unreachable; } - if (result <= -1) { - log.warn("read file task failed", .{}); - std.posix.close(provision.fd); - try provision.context.close(); - return; - } + const length = result.unwrap() catch |e| { + switch (e) { + error.EndOfFile => { + log.debug("done streaming file | rd off: {d} | f size: {d} ", .{ + provision.rd_offset, + provision.file_size, + }); + + std.posix.close(provision.fd); + try provision.context.send_then_recv( + provision.buffer[0..provision.current_length], + ); + return; + }, + else => { + log.warn("reading on fd={d} failed | {}", .{ provision.fd, e }); + std.posix.close(provision.fd); + try provision.context.close(); + return; + }, + } + }; - const length: usize = @intCast(result); - provision.rd_offset += length; - provision.current_length += length; + const length_as_usize: usize = @intCast(length); + provision.rd_offset += length_as_usize; + provision.current_length += length_as_usize; log.debug("current offset: {d} | fd: {}", .{ provision.rd_offset, provision.fd }); - if (provision.rd_offset >= provision.file_size or result == 0) { - log.debug("done streaming file | rd off: {d} | f size: {d} | result: {d}", .{ - provision.rd_offset, - provision.file_size, - result, - }); - - std.posix.close(provision.fd); - try provision.context.send_then_recv(provision.buffer[0..provision.current_length]); + assert(provision.rd_offset <= length_as_usize); + assert(provision.current_length <= provision.buffer.len); + if (provision.current_length == provision.buffer.len) { + try provision.context.send_then( + provision.buffer[0..provision.current_length], + provision, + send_file_task, + ); } else { - assert(provision.current_length <= provision.buffer.len); - if (provision.current_length == provision.buffer.len) { - try provision.context.send_then( - provision.buffer[0..provision.current_length], - provision, - send_file_task, - ); - } else { - try rt.fs.read( - provision, - read_file_task, - provision.fd, - provision.buffer[provision.current_length..], - provision.rd_offset, - ); - } + try rt.fs.read( + provision, + read_file_task, + provision.fd, + provision.buffer[provision.current_length..], + provision.rd_offset, + ); } } diff --git a/src/http/router/routing_trie.zig b/src/http/router/routing_trie.zig index d8f6545..f31dc3c 100644 --- a/src/http/router/routing_trie.zig +++ b/src/http/router/routing_trie.zig @@ -146,12 +146,16 @@ pub fn RoutingTrie(comptime Server: type, comptime AppState: type) type { } /// Initialize new trie node for the next token. - fn _with_route(comptime node: *const Node, comptime iterator: *std.mem.TokenIterator(u8, .scalar), comptime route: Route) Node { + fn with_route_helper( + comptime node: *const Node, + comptime iterator: *std.mem.TokenIterator(u8, .scalar), + comptime route: Route, + ) Node { if (iterator.next()) |chunk| { // Parse the current chunk. const token: Token = Token.parse_chunk(chunk); // Alter the child of the current node. - return node.with_child(token, &(_with_route( + return node.with_child(token, &(with_route_helper( node.children.get_optional(token) orelse &(Node.init(token, null)), iterator, route, @@ -168,13 +172,13 @@ pub fn RoutingTrie(comptime Server: type, comptime AppState: type) type { /// Copy the current routing trie to add the provided route. pub fn with_route(comptime self: *const Self, comptime route: Route) Self { - @setEvalBranchQuota(10000); + @setEvalBranchQuota(1_000_000); // This is where we will parse out the path. comptime var iterator = std.mem.tokenizeScalar(u8, route.path, '/'); return Self{ - .root = _with_route(&(self.root), &iterator, route), + .root = with_route_helper(&(self.root), &iterator, route), }; } diff --git a/src/http/server.zig b/src/http/server.zig index 8ddd14f..9dd9f3f 100644 --- a/src/http/server.zig +++ b/src/http/server.zig @@ -34,6 +34,10 @@ pub const AsyncIOType = @import("tardy").AsyncIOType; const TardyCreator = @import("tardy").Tardy; const Cross = @import("tardy").Cross; +const AcceptResult = @import("tardy").AcceptResult; +const RecvResult = @import("tardy").RecvResult; +const SendResult = @import("tardy").SendResult; + pub const RecvStatus = union(enum) { kill, recv, @@ -278,9 +282,17 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { } } - fn accept_task(rt: *Runtime, child_socket: std.posix.socket_t, socket: std.posix.socket_t) !void { - const pool = rt.storage.get_ptr("__zzz_provision_pool", Pool(Provision)); + fn accept_task(rt: *Runtime, result: AcceptResult, socket: std.posix.socket_t) !void { const accept_queued = rt.storage.get_ptr("__zzz_accept_queued", bool); + + const child_socket = result.unwrap() catch |e| { + log.err("socket accept failed | {}", .{e}); + accept_queued.* = true; + try rt.net.accept(socket, accept_task, socket); + return; + }; + + const pool = rt.storage.get_ptr("__zzz_provision_pool", Pool(Provision)); accept_queued.* = false; if (rt.scheduler.tasks.clean() >= 2) { @@ -288,11 +300,6 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { try rt.net.accept(socket, accept_task, socket); } - if (!Cross.socket.is_valid(child_socket)) { - log.err("socket accept failed", .{}); - return error.AcceptFailed; - } - // This should never fail. It means that we have a dangling item. assert(pool.clean() > 0); const borrowed = pool.borrow() catch unreachable; @@ -339,7 +346,7 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { }; provision.job = .{ .handshake = .{ .state = .recv, .count = 0 } }; - try rt.net.recv(borrowed.item, handshake_task, child_socket, recv_buf); + try rt.net.recv(borrowed.item, handshake_recv_task, child_socket, recv_buf); }, .plain => { provision.job = .{ .recv = .{ .count = 0 } }; @@ -348,8 +355,18 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { } } - fn recv_task(rt: *Runtime, length: i32, provision: *Provision) !void { + fn recv_task(rt: *Runtime, result: RecvResult, provision: *Provision) !void { assert(provision.job == .recv); + + const length = result.unwrap() catch |e| { + if (e != error.Closed) { + log.warn("socket recv failed | {}", .{e}); + } + provision.job = .close; + try rt.net.close(provision, close_task, provision.socket); + return; + }; + const config = rt.storage.get_const_ptr("__zzz_config", ServerConfig); const router = rt.storage.get_const_ptr("__zzz_router", Router); @@ -424,7 +441,37 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { } } - fn handshake_task(rt: *Runtime, length: i32, provision: *Provision) !void { + fn handshake_recv_task(rt: *Runtime, result: RecvResult, provision: *Provision) !void { + assert(security == .tls); + + const length = result.unwrap() catch |e| { + if (e != error.Closed) { + log.warn("socket recv failed | {}", .{e}); + } + provision.job = .close; + try rt.net.close(provision, close_task, provision.socket); + return error.TLSHandshakeClosed; + }; + + try handshake_inner_task(rt, length, provision); + } + + fn handshake_send_task(rt: *Runtime, result: SendResult, provision: *Provision) !void { + assert(security == .tls); + + const length = result.unwrap() catch |e| { + if (e != error.ConnectionReset) { + log.warn("socket send failed | {}", .{e}); + } + provision.job = .close; + try rt.net.close(provision, close_task, provision.socket); + return error.TLSHandshakeClosed; + }; + + try handshake_inner_task(rt, length, provision); + } + + fn handshake_inner_task(rt: *Runtime, length: i32, provision: *Provision) !void { assert(security == .tls); if (comptime security == .tls) { const tls_slice = rt.storage.get("__zzz_tls_slice", []TLSType); @@ -437,13 +484,6 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { log.debug("processing handshake", .{}); handshake_job.count += 1; - if (length <= 0) { - log.debug("handshake connection closed", .{}); - provision.job = .close; - try rt.net.close(provision, close_task, provision.socket); - return error.TLSHandshakeClosed; - } - if (handshake_job.count >= 50) { log.debug("handshake taken too many cycles", .{}); provision.job = .close; @@ -467,12 +507,12 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { .recv => |buf| { log.debug("queueing recv in handshake", .{}); handshake_job.state = .recv; - try rt.net.recv(provision, handshake_task, provision.socket, buf); + try rt.net.recv(provision, handshake_recv_task, provision.socket, buf); }, .send => |buf| { log.debug("queueing send in handshake", .{}); handshake_job.state = .send; - try rt.net.send(provision, handshake_task, provision.socket, buf); + try rt.net.send(provision, handshake_send_task, provision.socket, buf); }, .complete => { log.debug("handshake complete", .{}); @@ -575,17 +615,21 @@ pub fn Server(comptime security: Security, comptime AppState: type) type { } }.inner); - pub fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(i32, *Provision) { + pub fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(SendResult, *Provision) { return struct { - fn send_then_inner(rt: *Runtime, length: i32, provision: *Provision) !void { + fn send_then_inner(rt: *Runtime, result: SendResult, provision: *Provision) !void { assert(provision.job == .send); const config = rt.storage.get_const_ptr("__zzz_config", ServerConfig); - // If the socket is closed. - if (length <= 0) { - try @call(.always_inline, func, .{ rt, false, provision }); + const length = result.unwrap() catch |e| { + // If the socket is closed. + if (e != error.ConnectionReset) { + log.warn("socket send failed: {}", .{e}); + } + + try @call(.auto, func, .{ rt, false, provision }); return; - } + }; const send_job = &provision.job.send;