Skip to content

Commit

Permalink
fix QueueFull error
Browse files Browse the repository at this point in the history
  • Loading branch information
mookums committed Sep 23, 2024
1 parent 8442fc7 commit 031413b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
17 changes: 13 additions & 4 deletions src/core/pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub fn Pool(comptime T: type) type {
// Buffer for the Pool.
items: []T,
dirty: std.DynamicBitSet,
full: bool = false,

/// Initalizes our items buffer as undefined.
pub fn init(
Expand Down Expand Up @@ -60,12 +59,24 @@ pub fn Pool(comptime T: type) type {
return &self.items[index];
}

pub fn empty(self: *const Self) bool {
return self.dirty.count() == 0;
}

pub fn full(self: *const Self) bool {
return self.dirty.count() == self.items.len;
}

// The id is supposed to be a unique identification for
// this element. It gets hashed and used to find an empty element.
//
// Returns a tuple of the index into the pool and a pointer to the item.
// Returns null otherwise.
pub fn borrow(self: *Self, id: u32) !Borrow(T) {
if (self.full()) {
return error.Full;
}

const bytes = std.mem.toBytes(id)[0..];
const hash = @mod(std.hash.Wyhash.hash(0, bytes), self.items.len);

Expand All @@ -85,16 +96,14 @@ pub fn Pool(comptime T: type) type {
}
}

self.full = true;
return error.Full;
unreachable;
}

// Releases the item with the given index back to the Pool.
// Asserts that the given index was borrowed.
pub fn release(self: *Self, index: usize) void {
assert(self.dirty.isSet(index));
self.dirty.unset(index);
self.full = false;
}
};
}
Expand Down
12 changes: 10 additions & 2 deletions src/core/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,11 @@ pub fn Server(
defer provision_pool.release(provision.index);

log.info("{d} - closing connection", .{provision.index});
std.posix.close(provision.socket);
provision.socket = -1;
_ = provision.arena.reset(.{ .retain_with_limit = config.size_connection_arena_retain });
provision.data.clean();
std.posix.close(provision.socket);
provision.recv_buffer.clearRetainingCapacity();
}

fn run(
Expand Down Expand Up @@ -318,6 +320,8 @@ pub fn Server(
_ = try backend.queue_accept(&first_provision, server_socket);
try backend.submit();

var accept_queued = true;

while (true) {
const completions = try backend.reap();
const completions_count = completions.len;
Expand All @@ -328,7 +332,7 @@ pub fn Server(

switch (p.job) {
.accept => {
_ = try backend.queue_accept(&first_provision, server_socket);
accept_queued = false;
const socket: Socket = completion.result;

if (socket < 0) {
Expand Down Expand Up @@ -604,6 +608,10 @@ pub fn Server(
}
}

if (!accept_queued and !provision_pool.full()) {
_ = try backend.queue_accept(&first_provision, server_socket);
}

try backend.submit();
}

Expand Down

0 comments on commit 031413b

Please sign in to comment.