Skip to content

Commit

Permalink
fix issue with mismatched reap and completion sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
mookums committed Sep 21, 2024
1 parent a6d47a4 commit 0d31bd8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 17 deletions.
53 changes: 43 additions & 10 deletions src/async/io_uring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,55 @@ pub const AsyncIoUring = struct {

pub fn reap(self: *Async) AsyncError![]Completion {
const uring: *std.os.linux.IoUring = @ptrCast(@alignCast(self.runner));
// NOTE: this can be dynamic and then we would just have to make a single call
// which would probably be better.
var cqes: [256]std.os.linux.io_uring_cqe = [_]std.os.linux.io_uring_cqe{undefined} ** 256;
const count = uring.copy_cqes(cqes[0..], 1) catch |e| switch (e) {
// TODO: match error states.
else => unreachable,
};
var total_reaped: u64 = 0;

const min = @min(self.completions.len, count);
const min_length = @min(cqes.len, self.completions.len);
{
// only the first one blocks waiting for an initial set of completions.
const count = uring.copy_cqes(cqes[0..min_length], 1) catch |e| switch (e) {
// TODO: match error states.
else => unreachable,
};

total_reaped += count;

for (0..min) |i| {
self.completions[i] = Completion{
.result = cqes[i].res,
.context = @ptrFromInt(@as(usize, @intCast(cqes[i].user_data))),
// Copy over the first one.
for (0..total_reaped) |i| {
self.completions[i] = Completion{
.result = cqes[i].res,
.context = @ptrFromInt(@as(usize, @intCast(cqes[i].user_data))),
};
}
}

while (total_reaped < self.completions.len) {
const start = total_reaped;
const remaining = self.completions.len - total_reaped;

const count = uring.copy_cqes(cqes[0..remaining], 0) catch |e| switch (e) {
// TODO: match error states.
else => unreachable,
};

if (count == 0) {
return self.completions[0..total_reaped];
}

total_reaped += count;

for (start..total_reaped) |i| {
const cqe_index = i - start;
self.completions[i] = Completion{
.result = cqes[cqe_index].res,
.context = @ptrFromInt(@as(usize, @intCast(cqes[cqe_index].user_data))),
};
}
}

return self.completions[0..min];
return self.completions[0..total_reaped];
}

pub fn to_async(self: *AsyncIoUring) Async {
Expand Down
10 changes: 3 additions & 7 deletions src/core/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ pub fn Server(
.data = undefined,
};

var accepted = false;
_ = try backend.queue_accept(&first_provision, server_socket);
try backend.submit();

Expand All @@ -329,7 +328,7 @@ pub fn Server(

switch (p.job) {
.accept => {
accepted = true;
_ = try backend.queue_accept(&first_provision, server_socket);
const socket: Socket = completion.result;

if (socket < 0) {
Expand All @@ -339,6 +338,8 @@ pub fn Server(

// Borrow a provision from the pool otherwise close the socket.
const borrowed = provision_pool.borrow(@intCast(completion.result)) catch {
log.warn("out of provision pool entries", .{});
std.posix.close(socket);
continue :reap_loop;
};

Expand Down Expand Up @@ -603,11 +604,6 @@ pub fn Server(
}
}

if (!provision_pool.full and accepted) {
try backend.queue_accept(&first_provision, server_socket);
accepted = false;
}

try backend.submit();
}

Expand Down

0 comments on commit 0d31bd8

Please sign in to comment.