From 407c83c3b34c522b7f7688d4be1f5afaffc5937c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hendrik=20Niel=C3=A4nder?= Date: Wed, 25 Sep 2024 22:36:54 +0200 Subject: [PATCH] feat: handling http/2 connection preface --- http2.zig | 107 +++++++++++++++++++++++++++++++++++-------- http2/connection.zig | 77 +++++++++++++++++++++++++------ 2 files changed, 152 insertions(+), 32 deletions(-) diff --git a/http2.zig b/http2.zig index f5e374e..490a96d 100644 --- a/http2.zig +++ b/http2.zig @@ -1,10 +1,11 @@ const std = @import("std"); const http2 = @import("http2/connection.zig"); +const hpack = @import("http2/hpack.zig"); const Connection = http2.Connection(std.io.AnyReader, std.io.AnyWriter); +const Hpack = hpack.Hpack; pub fn main() !void { - // Setup a TCP listener on port 8080 const address = try std.net.Address.resolveIp("127.0.0.1", 8080); var listener = try address.listen(.{ .reuse_address = true }); defer listener.deinit(); @@ -12,18 +13,32 @@ pub fn main() !void { std.debug.print("Listening on 127.0.0.1:8080...\n", .{}); while (true) { - // Accept a new TCP connection var conn = try listener.accept(); - defer conn.stream.close(); + defer conn.stream.close(); // Ensure stream is closed std.debug.print("Accepted connection from: {any}\n", .{conn.address}); - // Create a new HTTP/2 connection - var server_conn = try Connection.init(@constCast(&std.heap.page_allocator), conn.stream.reader().any(), conn.stream.writer().any(), true); - // defer server_conn.deinit(); + var server_conn = Connection.init(@constCast(&std.heap.page_allocator), conn.stream.reader().any(), conn.stream.writer().any(), true) catch |err| { + if (err == error.InvalidPreface) { + std.debug.print("Invalid HTTP/2 preface, closing connection\n", .{}); + // Connection will close after GOAWAY is sent + } else if (err == error.BrokenPipe) { + std.debug.print("Client disconnected (BrokenPipe)\n", .{}); + } else { + std.debug.print("Error during connection initialization: {s}\n", .{@errorName(err)}); + } + continue; // Continue accepting new connections after handling error + }; - // Handle the HTTP/2 connection, including the SETTINGS frame and initial preface - try handleHttp2Connection(&server_conn); + handleHttp2Connection(&server_conn) catch |err| { + if (err == error.BrokenPipe) { + std.debug.print("Client disconnected (BrokenPipe)\n", .{}); + } else { + std.debug.print("Error handling connection: {s}\n", .{@errorName(err)}); + } + }; + + std.debug.print("Closing connection from: {any}\n", .{conn.address}); } } @@ -34,43 +49,99 @@ fn handleHttp2Connection(server_conn: *Connection) !void { // Loop to process incoming frames while (true) { - // Receive an HTTP/2 frame - const frame = try server_conn.receiveFrame(); + const frame = server_conn.receiveFrame() catch |err| { + if (err == error.BrokenPipe or err == error.ConnectionResetByPeer) { + std.debug.print("Client disconnected: {any}\n", .{err}); + return server_conn.close(); + } + return err; + }; + std.debug.print("Received frame of type: {s}, stream ID: {d}\n", .{ @tagName(frame.header.frame_type), frame.header.stream_id }); - // Handle different types of frames switch (frame.header.frame_type) { .SETTINGS => { std.debug.print("Received SETTINGS frame\n", .{}); - // Send an ACK for the SETTINGS frame + try server_conn.applyFrameSettings(frame); try server_conn.sendSettingsAck(); std.debug.print("Sent SETTINGS ACK\n", .{}); }, .PING => { std.debug.print("Received PING frame, responding with PONG\n", .{}); - // Send PONG frame in response to PING try server_conn.sendPong(frame.payload); }, .WINDOW_UPDATE => { std.debug.print("Received WINDOW_UPDATE frame\n", .{}); - // Handle window updates (adjust send/recv window size) try server_conn.handleWindowUpdate(frame); }, .HEADERS => { std.debug.print("Received HEADERS frame\n", .{}); - // Here you could implement header processing logic, e.g. parsing request headers + try processRequest(server_conn, frame.header.stream_id); }, .DATA => { std.debug.print("Received DATA frame\n", .{}); - // Handle data frame, which could involve reading the payload and responding + // Handle data frame }, .GOAWAY => { std.debug.print("Received GOAWAY frame, closing connection\n", .{}); - return; // Gracefully close connection on GOAWAY + return server_conn.close(); }, else => { - std.debug.print("Unhandled frame type: {s}\n", .{@tagName(frame.header.frame_type)}); + std.debug.print("Unknown frame type: {s}\n", .{@tagName(frame.header.frame_type)}); }, } } } + +fn processRequest(server_conn: *Connection, stream_id: u31) !void { + std.debug.print("Processing request for stream ID: {d}\n", .{stream_id}); + var dynamic_table = try Hpack.DynamicTable.init(@constCast(&std.heap.page_allocator), 4096); // Initialize dynamic table with 4KB size + + // Prepare a basic response: "Hello, World!" + const response_body = "Hello, World!"; + const response_headers = [_]Hpack.HeaderField{ + .{ .name = ":status", .value = "200" }, + }; + + var buffer = std.ArrayList(u8).init(std.heap.page_allocator); + defer buffer.deinit(); + + // Encode headers and write them to the buffer + for (response_headers) |header| { + try Hpack.encodeHeaderField(header, &dynamic_table, &buffer); + } + + const encoded_headers = buffer.items; + + // Send HEADERS frame + var headers_frame = http2.Frame{ + .header = http2.FrameHeader{ + .length = @intCast(encoded_headers.len), + .frame_type = .HEADERS, + .flags = http2.FrameFlags{ + .value = http2.FrameFlags.END_HEADERS, // Mark end of headers + }, + .reserved = false, + .stream_id = stream_id, + }, + .payload = encoded_headers, + }; + try headers_frame.write(server_conn.writer); + + // Send DATA frame with "Hello, World!" response + var data_frame = http2.Frame{ + .header = http2.FrameHeader{ + .length = @intCast(response_body.len), + .frame_type = .DATA, + .flags = http2.FrameFlags{ + .value = http2.FrameFlags.END_STREAM, // Mark end of stream + }, + .reserved = false, + .stream_id = stream_id, + }, + .payload = response_body, + }; + try data_frame.write(server_conn.writer); + + std.debug.print("Sent 200 OK response with body: \"Hello, World!\"\n", .{}); +} diff --git a/http2/connection.zig b/http2/connection.zig index 800f22a..ef073e3 100644 --- a/http2/connection.zig +++ b/http2/connection.zig @@ -37,6 +37,7 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { var preface_buf: [24]u8 = undefined; _ = try self.reader.readAll(&preface_buf); if (!std.mem.eql(u8, &preface_buf, http2_preface)) { + try self.sendGoAway(0, 1, "Invalid preface: PROTOCOL_ERROR"); return error.InvalidPreface; } std.debug.print("Valid HTTP/2 preface received\n", .{}); @@ -48,6 +49,20 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { return self; } + pub fn deinit(self: @This()) void { + // Deinitialize all streams. + var it = self.streams.iterator(); + while (it.next()) |_| { + //const stream = entry.value_ptr; + //stream.deinit(); + } + + // Deinitialize the stream hash map. + // self.streams.deinit(); + + std.debug.print("Resources deinitialized for connection\n", .{}); + } + fn sendPreface(self: @This()) !void { try self.writer.writeAll(http2_preface); } @@ -96,7 +111,15 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { pub fn receiveFrame(self: *@This()) !Frame { var header_buf: [9]u8 = undefined; // Frame header size is 9 bytes - _ = try self.reader.readAll(&header_buf); + + // Catching potential BrokenPipe or ConnectionResetByPeer + _ = self.reader.readAll(&header_buf) catch |err| { + if (err == error.BrokenPipe or err == error.ConnectionResetByPeer) { + std.debug.print("Client disconnected (BrokenPipe or ConnectionResetByPeer)\n", .{}); + return err; + } + return err; + }; std.debug.print("Raw header bytes: {x}\n", .{header_buf}); @@ -146,8 +169,9 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { return error.InvalidFrameType; } - // The payload of the SETTINGS frame is already read into `frame.payload` + // The payload of the SETTINGS frame must be a multiple of 6 bytes if (frame.payload.len % 6 != 0) { + std.debug.print("Invalid SETTINGS frame size: {d}\n", .{frame.payload.len}); return error.InvalidSettingsFrameSize; } @@ -155,11 +179,10 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { while (i + 6 <= frame.payload.len) { const setting = frame.payload[i .. i + 6]; - // Decode setting ID and value - const id = std.mem.readInt(u16, setting[0..2], .big); - const value = std.mem.readInt(u32, setting[2..6], .big); + const id: u16 = std.mem.readInt(u16, setting[0..2], .big); + const value: u32 = std.mem.readInt(u32, setting[2..6], .big); - std.debug.print("Setting ID: {d}, Value: {d}\n", .{ id, value }); + std.debug.print("Applying Setting ID: {d}, Value: {d}\n", .{ id, value }); // Apply the settings based on ID switch (id) { @@ -190,7 +213,13 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { .stream_id = 0, }; - try frame_header.write(self.writer); + frame_header.write(self.writer) catch |err| { + if (err == error.BrokenPipe) { + std.debug.print("Client disconnected (BrokenPipe)\n", .{}); + return err; + } + return err; + }; std.debug.print("Sent SETTINGS ACK frame\n", .{}); } @@ -241,8 +270,6 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { } } - // ... [Existing Methods] ... - /// Sends a GOAWAY frame with the given parameters. pub fn sendGoAway(self: @This(), last_stream_id: u31, error_code: u32, debug_data: []const u8) !void { var buffer = std.ArrayList(u8).init(self.allocator.*); @@ -283,13 +310,35 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type { } pub fn close(self: @This()) !void { - // Here, `last_stream_id` should be the highest stream ID the server has processed. - // For simplicity, we'll use 1, as per your current implementation. - try self.sendGoAway(1, 0, "Closing connection gracefully"); + // Determine the highest stream ID that was processed + var highest_stream_id: u31 = 0; + + var it = self.streams.iterator(); + while (it.next()) |entry| { + if (entry.key_ptr.* > highest_stream_id) { + highest_stream_id = entry.key_ptr.*; + } + } + + // Error code 0 indicates graceful shutdown; adjust this if specific errors need to be reported. + const error_code: u32 = 0; // 0: NO_ERROR, indicating graceful shutdown + + // Optional debug data for GOAWAY frame, informing the client about the reason + const debug_data = "Connection closing: graceful shutdown"; + + // Send the GOAWAY frame with the highest stream ID and debug information + try self.sendGoAway(highest_stream_id, error_code, debug_data); + + // Ensure the GOAWAY frame is fully sent before closing the connection. + // try self.writer.flush(); + + // Close the underlying writer and terminate the connection gracefully + // try self.writer.close(); - // Optionally, send a GOAWAY frame with additional debug data or different error codes as needed. + // Optionally, free up resources associated with streams + @constCast(&self.streams).deinit(); - // Finally, close the writer to terminate the connection. + std.debug.print("Connection closed gracefully with GOAWAY frame\n", .{}); } pub fn getStream(self: *@This(), stream_id: u31) !*Stream {