Skip to content

Commit

Permalink
feat: switch to tardy v0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mookums committed Dec 18, 2024
1 parent 0bf3cbf commit 2e3ac65
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 75 deletions.
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
.minimum_zig_version = "0.13.0",
.dependencies = .{
.tardy = .{
.url = "git+https://github.com/mookums/tardy?ref=v0.1.0#ae0970d6b3fa5b03625b14e142c664efe1fd7789",
.hash = "12207f5afee3b8933c1c32737e8feedc80a2e4feebe058739509094c812e4a8d2cc8",
.url = "git+https://github.com/mookums/tardy?ref=v0.2.0#543e6b01cba3caa691960a4a5a54d3419969f2d8",
.hash = "12202bc544928f0bb67ab4a30d3ff6d54c9d62643296c8d04303762b477b71fd002d",
},
.bearssl = .{
.url = "git+https://github.com/mookums/bearssl-zig#37a96eee56fe2543579bbc6da148ca886f3dd32b",
Expand Down
8 changes: 4 additions & 4 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"a framework for writing performant and reliable networked services";

inputs = {
nixpkgs.url = "github:nixos/nixpkgs/release-24.05";
nixpkgs.url = "github:nixos/nixpkgs/release-24.11";
iguana.url = "github:mookums/iguana";
flake-utils.url = "github:numtide/flake-utils";
};
Expand Down
100 changes: 61 additions & 39 deletions src/http/router/fs_dir.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ const Response = @import("../response.zig").Response;
const Mime = @import("../mime.zig").Mime;
const _Context = @import("../context.zig").Context;

const OpenResult = @import("tardy").OpenResult;
const ReadResult = @import("tardy").ReadResult;
const SendResult = @import("tardy").SendResult;
const StatResult = @import("tardy").StatResult;

const Runtime = @import("tardy").Runtime;
const Stat = @import("tardy").Stat;
const Cross = @import("tardy").Cross;
Expand All @@ -27,33 +32,44 @@ pub fn FsDir(Server: type, AppState: type) type {
buffer: []u8,
};

fn open_file_task(rt: *Runtime, fd: std.posix.fd_t, provision: *FileProvision) !void {
fn open_file_task(rt: *Runtime, result: OpenResult, provision: *FileProvision) !void {
errdefer provision.context.respond(.{
.status = .@"Internal Server Error",
.mime = Mime.HTML,
.body = "",
}) catch unreachable;

if (!Cross.fd.is_valid(fd)) {
const fd = result.unwrap() catch |e| {
log.warn("file not found | {}", .{e});
try provision.context.respond(.{
.status = .@"Not Found",
.mime = Mime.HTML,
.body = "File Not Found",
});
return;
}
};
provision.fd = fd;

try rt.fs.stat(provision, stat_file_task, fd);
}

fn stat_file_task(rt: *Runtime, stat: Stat, provision: *FileProvision) !void {
fn stat_file_task(rt: *Runtime, result: StatResult, provision: *FileProvision) !void {
errdefer provision.context.respond(.{
.status = .@"Internal Server Error",
.mime = Mime.HTML,
.body = "",
}) catch unreachable;

const stat = result.unwrap() catch |e| {
log.warn("stat on fd={d} failed | {}", .{ provision.fd, e });
try provision.context.respond(.{
.status = .@"Not Found",
.mime = Mime.HTML,
.body = "File Not Found",
});
return;
};

// Set file size.
provision.file_size = stat.size;
log.debug("file size: {d}", .{provision.file_size});
Expand Down Expand Up @@ -109,50 +125,56 @@ pub fn FsDir(Server: type, AppState: type) type {
);
}

fn read_file_task(rt: *Runtime, result: i32, provision: *FileProvision) !void {
fn read_file_task(rt: *Runtime, result: ReadResult, provision: *FileProvision) !void {
errdefer {
std.posix.close(provision.fd);
provision.context.close() catch unreachable;
}

if (result <= -1) {
log.warn("read file task failed", .{});
std.posix.close(provision.fd);
try provision.context.close();
return;
}
const length = result.unwrap() catch |e| {
switch (e) {
error.EndOfFile => {
log.debug("done streaming file | rd off: {d} | f size: {d} ", .{
provision.rd_offset,
provision.file_size,
});

std.posix.close(provision.fd);
try provision.context.send_then_recv(
provision.buffer[0..provision.current_length],
);
return;
},
else => {
log.warn("reading on fd={d} failed | {}", .{ provision.fd, e });
std.posix.close(provision.fd);
try provision.context.close();
return;
},
}
};

const length: usize = @intCast(result);
provision.rd_offset += length;
provision.current_length += length;
const length_as_usize: usize = @intCast(length);
provision.rd_offset += length_as_usize;
provision.current_length += length_as_usize;
log.debug("current offset: {d} | fd: {}", .{ provision.rd_offset, provision.fd });

if (provision.rd_offset >= provision.file_size or result == 0) {
log.debug("done streaming file | rd off: {d} | f size: {d} | result: {d}", .{
provision.rd_offset,
provision.file_size,
result,
});

std.posix.close(provision.fd);
try provision.context.send_then_recv(provision.buffer[0..provision.current_length]);
assert(provision.rd_offset <= length_as_usize);
assert(provision.current_length <= provision.buffer.len);
if (provision.current_length == provision.buffer.len) {
try provision.context.send_then(
provision.buffer[0..provision.current_length],
provision,
send_file_task,
);
} else {
assert(provision.current_length <= provision.buffer.len);
if (provision.current_length == provision.buffer.len) {
try provision.context.send_then(
provision.buffer[0..provision.current_length],
provision,
send_file_task,
);
} else {
try rt.fs.read(
provision,
read_file_task,
provision.fd,
provision.buffer[provision.current_length..],
provision.rd_offset,
);
}
try rt.fs.read(
provision,
read_file_task,
provision.fd,
provision.buffer[provision.current_length..],
provision.rd_offset,
);
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/http/router/routing_trie.zig
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,16 @@ pub fn RoutingTrie(comptime Server: type, comptime AppState: type) type {
}

/// Initialize new trie node for the next token.
fn _with_route(comptime node: *const Node, comptime iterator: *std.mem.TokenIterator(u8, .scalar), comptime route: Route) Node {
fn with_route_helper(
comptime node: *const Node,
comptime iterator: *std.mem.TokenIterator(u8, .scalar),
comptime route: Route,
) Node {
if (iterator.next()) |chunk| {
// Parse the current chunk.
const token: Token = Token.parse_chunk(chunk);
// Alter the child of the current node.
return node.with_child(token, &(_with_route(
return node.with_child(token, &(with_route_helper(
node.children.get_optional(token) orelse &(Node.init(token, null)),
iterator,
route,
Expand All @@ -168,13 +172,13 @@ pub fn RoutingTrie(comptime Server: type, comptime AppState: type) type {

/// Copy the current routing trie to add the provided route.
pub fn with_route(comptime self: *const Self, comptime route: Route) Self {
@setEvalBranchQuota(10000);
@setEvalBranchQuota(1_000_000);

// This is where we will parse out the path.
comptime var iterator = std.mem.tokenizeScalar(u8, route.path, '/');

return Self{
.root = _with_route(&(self.root), &iterator, route),
.root = with_route_helper(&(self.root), &iterator, route),
};
}

Expand Down
94 changes: 69 additions & 25 deletions src/http/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub const AsyncIOType = @import("tardy").AsyncIOType;
const TardyCreator = @import("tardy").Tardy;
const Cross = @import("tardy").Cross;

const AcceptResult = @import("tardy").AcceptResult;
const RecvResult = @import("tardy").RecvResult;
const SendResult = @import("tardy").SendResult;

pub const RecvStatus = union(enum) {
kill,
recv,
Expand Down Expand Up @@ -278,21 +282,24 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
}
}

fn accept_task(rt: *Runtime, child_socket: std.posix.socket_t, socket: std.posix.socket_t) !void {
const pool = rt.storage.get_ptr("__zzz_provision_pool", Pool(Provision));
fn accept_task(rt: *Runtime, result: AcceptResult, socket: std.posix.socket_t) !void {
const accept_queued = rt.storage.get_ptr("__zzz_accept_queued", bool);

const child_socket = result.unwrap() catch |e| {
log.err("socket accept failed | {}", .{e});
accept_queued.* = true;
try rt.net.accept(socket, accept_task, socket);
return;
};

const pool = rt.storage.get_ptr("__zzz_provision_pool", Pool(Provision));
accept_queued.* = false;

if (rt.scheduler.tasks.clean() >= 2) {
accept_queued.* = true;
try rt.net.accept(socket, accept_task, socket);
}

if (!Cross.socket.is_valid(child_socket)) {
log.err("socket accept failed", .{});
return error.AcceptFailed;
}

// This should never fail. It means that we have a dangling item.
assert(pool.clean() > 0);
const borrowed = pool.borrow() catch unreachable;
Expand Down Expand Up @@ -339,7 +346,7 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
};

provision.job = .{ .handshake = .{ .state = .recv, .count = 0 } };
try rt.net.recv(borrowed.item, handshake_task, child_socket, recv_buf);
try rt.net.recv(borrowed.item, handshake_recv_task, child_socket, recv_buf);
},
.plain => {
provision.job = .{ .recv = .{ .count = 0 } };
Expand All @@ -348,8 +355,18 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
}
}

fn recv_task(rt: *Runtime, length: i32, provision: *Provision) !void {
fn recv_task(rt: *Runtime, result: RecvResult, provision: *Provision) !void {
assert(provision.job == .recv);

const length = result.unwrap() catch |e| {
if (e != error.Closed) {
log.warn("socket recv failed | {}", .{e});
}
provision.job = .close;
try rt.net.close(provision, close_task, provision.socket);
return;
};

const config = rt.storage.get_const_ptr("__zzz_config", ServerConfig);
const router = rt.storage.get_const_ptr("__zzz_router", Router);

Expand Down Expand Up @@ -424,7 +441,37 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
}
}

fn handshake_task(rt: *Runtime, length: i32, provision: *Provision) !void {
fn handshake_recv_task(rt: *Runtime, result: RecvResult, provision: *Provision) !void {
assert(security == .tls);

const length = result.unwrap() catch |e| {
if (e != error.Closed) {
log.warn("socket recv failed | {}", .{e});
}
provision.job = .close;
try rt.net.close(provision, close_task, provision.socket);
return error.TLSHandshakeClosed;
};

try handshake_inner_task(rt, length, provision);
}

fn handshake_send_task(rt: *Runtime, result: SendResult, provision: *Provision) !void {
assert(security == .tls);

const length = result.unwrap() catch |e| {
if (e != error.ConnectionReset) {
log.warn("socket send failed | {}", .{e});
}
provision.job = .close;
try rt.net.close(provision, close_task, provision.socket);
return error.TLSHandshakeClosed;
};

try handshake_inner_task(rt, length, provision);
}

fn handshake_inner_task(rt: *Runtime, length: i32, provision: *Provision) !void {
assert(security == .tls);
if (comptime security == .tls) {
const tls_slice = rt.storage.get("__zzz_tls_slice", []TLSType);
Expand All @@ -437,13 +484,6 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
log.debug("processing handshake", .{});
handshake_job.count += 1;

if (length <= 0) {
log.debug("handshake connection closed", .{});
provision.job = .close;
try rt.net.close(provision, close_task, provision.socket);
return error.TLSHandshakeClosed;
}

if (handshake_job.count >= 50) {
log.debug("handshake taken too many cycles", .{});
provision.job = .close;
Expand All @@ -467,12 +507,12 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
.recv => |buf| {
log.debug("queueing recv in handshake", .{});
handshake_job.state = .recv;
try rt.net.recv(provision, handshake_task, provision.socket, buf);
try rt.net.recv(provision, handshake_recv_task, provision.socket, buf);
},
.send => |buf| {
log.debug("queueing send in handshake", .{});
handshake_job.state = .send;
try rt.net.send(provision, handshake_task, provision.socket, buf);
try rt.net.send(provision, handshake_send_task, provision.socket, buf);
},
.complete => {
log.debug("handshake complete", .{});
Expand Down Expand Up @@ -575,17 +615,21 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
}
}.inner);

pub fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(i32, *Provision) {
pub fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(SendResult, *Provision) {
return struct {
fn send_then_inner(rt: *Runtime, length: i32, provision: *Provision) !void {
fn send_then_inner(rt: *Runtime, result: SendResult, provision: *Provision) !void {
assert(provision.job == .send);
const config = rt.storage.get_const_ptr("__zzz_config", ServerConfig);

// If the socket is closed.
if (length <= 0) {
try @call(.always_inline, func, .{ rt, false, provision });
const length = result.unwrap() catch |e| {
// If the socket is closed.
if (e != error.ConnectionReset) {
log.warn("socket send failed: {}", .{e});
}

try @call(.auto, func, .{ rt, false, provision });
return;
}
};

const send_job = &provision.job.send;

Expand Down

0 comments on commit 2e3ac65

Please sign in to comment.