diff --git a/src/async/io_uring.zig b/src/async/io_uring.zig index 6aadf46..f18940e 100644 --- a/src/async/io_uring.zig +++ b/src/async/io_uring.zig @@ -72,22 +72,55 @@ pub const AsyncIoUring = struct { pub fn reap(self: *Async) AsyncError![]Completion { const uring: *std.os.linux.IoUring = @ptrCast(@alignCast(self.runner)); + // NOTE: this can be dynamic and then we would just have to make a single call + // which would probably be better. var cqes: [256]std.os.linux.io_uring_cqe = [_]std.os.linux.io_uring_cqe{undefined} ** 256; - const count = uring.copy_cqes(cqes[0..], 1) catch |e| switch (e) { - // TODO: match error states. - else => unreachable, - }; + var total_reaped: u64 = 0; - const min = @min(self.completions.len, count); + const min_length = @min(cqes.len, self.completions.len); + { + // only the first one blocks waiting for an initial set of completions. + const count = uring.copy_cqes(cqes[0..min_length], 1) catch |e| switch (e) { + // TODO: match error states. + else => unreachable, + }; + + total_reaped += count; - for (0..min) |i| { - self.completions[i] = Completion{ - .result = cqes[i].res, - .context = @ptrFromInt(@as(usize, @intCast(cqes[i].user_data))), + // Copy over the first one. + for (0..total_reaped) |i| { + self.completions[i] = Completion{ + .result = cqes[i].res, + .context = @ptrFromInt(@as(usize, @intCast(cqes[i].user_data))), + }; + } + } + + while (total_reaped < self.completions.len) { + const start = total_reaped; + const remaining = self.completions.len - total_reaped; + + const count = uring.copy_cqes(cqes[0..remaining], 0) catch |e| switch (e) { + // TODO: match error states. + else => unreachable, }; + + if (count == 0) { + return self.completions[0..total_reaped]; + } + + total_reaped += count; + + for (start..total_reaped) |i| { + const cqe_index = i - start; + self.completions[i] = Completion{ + .result = cqes[cqe_index].res, + .context = @ptrFromInt(@as(usize, @intCast(cqes[cqe_index].user_data))), + }; + } } - return self.completions[0..min]; + return self.completions[0..total_reaped]; } pub fn to_async(self: *AsyncIoUring) Async { diff --git a/src/core/server.zig b/src/core/server.zig index 7a566e3..aa55bba 100644 --- a/src/core/server.zig +++ b/src/core/server.zig @@ -315,7 +315,6 @@ pub fn Server( .data = undefined, }; - var accepted = false; _ = try backend.queue_accept(&first_provision, server_socket); try backend.submit(); @@ -329,7 +328,7 @@ pub fn Server( switch (p.job) { .accept => { - accepted = true; + _ = try backend.queue_accept(&first_provision, server_socket); const socket: Socket = completion.result; if (socket < 0) { @@ -339,6 +338,8 @@ pub fn Server( // Borrow a provision from the pool otherwise close the socket. const borrowed = provision_pool.borrow(@intCast(completion.result)) catch { + log.warn("out of provision pool entries", .{}); + std.posix.close(socket); continue :reap_loop; }; @@ -603,11 +604,6 @@ pub fn Server( } } - if (!provision_pool.full and accepted) { - try backend.queue_accept(&first_provision, server_socket); - accepted = false; - } - try backend.submit(); }