From 5ddf1ac130755055a113a69da46618d2d2e39f51 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Fri, 18 Oct 2024 03:38:49 -0700 Subject: [PATCH] http/router: asynchronous fs serving This utilizes the new functionality of `Context` to serve files asynchronously instead of blocking for the entire file send. --- examples/http/fs/main.zig | 28 ++++++- src/core/server.zig | 11 ++- src/http/router.zig | 152 ++++++++++++++++++++++++++++++++------ 3 files changed, 164 insertions(+), 27 deletions(-) diff --git a/examples/http/fs/main.zig b/examples/http/fs/main.zig index 5380ff4..13bf4f3 100644 --- a/examples/http/fs/main.zig +++ b/examples/http/fs/main.zig @@ -7,13 +7,15 @@ pub fn main() !void { const host: []const u8 = "0.0.0.0"; const port: u16 = 9862; - const allocator = std.heap.page_allocator; + var gpa = std.heap.GeneralPurposeAllocator(.{ .thread_safe = true }){ .backing_allocator = std.heap.c_allocator }; + const allocator = gpa.allocator(); + defer _ = gpa.deinit(); var router = http.Router.init(allocator); defer router.deinit(); try router.serve_route("/", http.Route.init().get(struct { - pub fn handler_fn(_: http.Request, response: *http.Response, _: http.Context) void { + pub fn handler_fn(ctx: *http.Context) void { const body = \\ \\ @@ -23,7 +25,7 @@ pub fn main() !void { \\ ; - response.set(.{ + ctx.respond(.{ .status = .OK, .mime = http.Mime.HTML, .body = body[0..], @@ -31,9 +33,27 @@ pub fn main() !void { } }.handler_fn)); + try router.serve_route("/kill", http.Route.init().get(struct { + pub fn handler_fn(ctx: *http.Context) void { + ctx.runtime.stop(); + + ctx.respond(.{ + .status = .OK, + .mime = http.Mime.HTML, + .body = "", + }); + } + }.handler_fn)); + try router.serve_fs_dir("/static", "./examples/http/fs/static"); - var server = http.Server(.plain, .auto).init(.{ .allocator = allocator }); + var server = http.Server(.plain, .auto).init(.{ + .allocator = allocator, + .threading = .auto, + .size_connections_max = 256, + }); + defer server.deinit(); + try server.bind(host, port); try server.listen(.{ .router = &router }); } diff --git a/src/core/server.zig b/src/core/server.zig index 84d7f65..0ad1fe4 100644 --- a/src/core/server.zig +++ b/src/core/server.zig @@ -26,7 +26,7 @@ pub const RecvStatus = union(enum) { spawned, }; -/// Security Model to use.chinp acas +/// Security Model to use. /// /// Default: .plain (plaintext) pub const Security = union(enum) { @@ -239,7 +239,14 @@ pub fn Server( provision.job = .empty; _ = provision.arena.reset(.{ .retain_with_limit = z_config.size_connection_arena_retain }); provision.data.clean(); - provision.recv_buffer.clearRetainingCapacity(); + + // TODO: new z_config setting here! + if (provision.recv_buffer.items.len > 1024) { + provision.recv_buffer.shrinkRetainingCapacity(1024); + } else { + provision.recv_buffer.clearRetainingCapacity(); + } + pool.release(provision.index); const accept_queued = rt.storage.get_ptr("accept_queued", bool); diff --git a/src/http/router.zig b/src/http/router.zig index 89f5ca6..b2fac03 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -13,6 +13,10 @@ const Context = @import("context.zig").Context; const RoutingTrie = @import("routing_trie.zig").RoutingTrie; const QueryMap = @import("routing_trie.zig").QueryMap; +const Runtime = @import("tardy").Runtime; +const Task = @import("tardy").Task; +const Cross = @import("tardy").Cross; + pub const Router = struct { allocator: std.mem.Allocator, routes: RoutingTrie, @@ -29,16 +33,111 @@ pub const Router = struct { self.routes.deinit(); } + const FileProvision = struct { + mime: Mime, + context: *Context, + fd: std.posix.fd_t, + offset: usize, + list: std.ArrayList(u8), + buffer: []u8, + }; + + fn open_file_task(rt: *Runtime, t: *const Task, ctx: ?*anyopaque) !void { + const provision: *FileProvision = @ptrCast(@alignCast(ctx.?)); + errdefer { + provision.context.respond(.{ + .status = .@"Internal Server Error", + .mime = Mime.HTML, + .body = "", + }); + } + + const fd = t.result.?.fd; + if (!Cross.fd.is_valid(fd)) { + provision.context.respond(.{ + .status = .@"Not Found", + .mime = Mime.HTML, + .body = "File Not Found", + }); + return; + } + provision.fd = fd; + + try rt.fs.read(.{ + .fd = fd, + .buffer = provision.buffer, + .offset = 0, + .func = read_file_task, + .ctx = provision, + }); + } + + fn read_file_task(rt: *Runtime, t: *const Task, ctx: ?*anyopaque) !void { + const provision: *FileProvision = @ptrCast(@alignCast(ctx.?)); + errdefer { + provision.context.respond(.{ + .status = .@"Internal Server Error", + .mime = Mime.HTML, + .body = "", + }); + } + + const result: i32 = t.result.?.value; + if (result <= 0) { + // If we are done reading... + try rt.fs.close(.{ + .fd = provision.fd, + .func = close_file_task, + .ctx = provision, + }); + return; + } + + const length: usize = @intCast(result); + + try provision.list.appendSlice(provision.buffer[0..length]); + + // TODO: This needs to be a setting you pass in to the router. + // + //if (provision.list.items.len > 1024 * 1024 * 4) { + // provision.context.respond(.{ + // .status = .@"Content Too Large", + // .mime = Mime.HTML, + // .body = "File Too Large", + // }); + // return; + //} + + provision.offset += length; + + try rt.fs.read(.{ + .fd = provision.fd, + .buffer = provision.buffer, + .offset = provision.offset, + .func = read_file_task, + .ctx = provision, + }); + } + + fn close_file_task(_: *Runtime, _: *const Task, ctx: ?*anyopaque) !void { + const provision: *FileProvision = @ptrCast(@alignCast(ctx.?)); + + provision.context.respond(.{ + .status = .OK, + .mime = provision.mime, + .body = provision.list.items[0..], + }); + } + pub fn serve_fs_dir(self: *Router, comptime url_path: []const u8, comptime dir_path: []const u8) !void { assert(!self.locked); const route = Route.init().get(struct { - pub fn handler_fn(request: Request, response: *Response, context: Context) void { - _ = request; + pub fn handler_fn(ctx: *Context) void { + const search_path = ctx.captures[0].remaining; - const search_path = context.captures[0].remaining; - const file_path = std.fmt.allocPrint(context.allocator, "{s}/{s}", .{ dir_path, search_path }) catch { - response.set(.{ + const file_path = std.fmt.allocPrintZ(ctx.allocator, "{s}/{s}", .{ dir_path, search_path }) catch { + ctx.respond(.{ .status = .@"Internal Server Error", .mime = Mime.HTML, .body = "", @@ -46,39 +145,50 @@ pub const Router = struct { return; }; + // TODO: Ensure that paths cannot go out of scope and reference data that they shouldn't be allowed to. + // Very important. + const extension_start = std.mem.lastIndexOfScalar(u8, search_path, '.'); const mime: Mime = blk: { if (extension_start) |start| { break :blk Mime.from_extension(search_path[start..]); } else { - break :blk Mime.HTML; + break :blk Mime.BIN; } }; - const file: std.fs.File = std.fs.cwd().openFile(file_path, .{}) catch { - response.set(.{ - .status = .@"Not Found", + const provision = ctx.allocator.create(FileProvision) catch { + ctx.respond(.{ + .status = .@"Internal Server Error", .mime = Mime.HTML, - .body = "File Not Found", + .body = "", }); return; }; - defer file.close(); - const file_bytes = file.readToEndAlloc(context.allocator, 1024 * 1024 * 4) catch { - response.set(.{ - .status = .@"Content Too Large", + provision.* = .{ + .mime = mime, + .context = ctx, + .fd = -1, + .offset = 0, + .list = std.ArrayList(u8).init(ctx.allocator), + .buffer = ctx.provision.buffer, + }; + + // We also need to support chunked encoding. + // It makes a lot more sense for files atleast. + ctx.runtime.fs.open(.{ + .path = file_path, + .func = open_file_task, + .ctx = provision, + }) catch { + ctx.respond(.{ + .status = .@"Internal Server Error", .mime = Mime.HTML, - .body = "File Too Large", + .body = "", }); return; }; - - response.set(.{ - .status = .OK, - .mime = mime, - .body = file_bytes, - }); } }.handler_fn);