Skip to content

Commit

Permalink
feat: handling http/2 connection preface
Browse files Browse the repository at this point in the history
  • Loading branch information
hendriknielaender committed Sep 25, 2024
1 parent 2c9921b commit 407c83c
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 32 deletions.
107 changes: 89 additions & 18 deletions http2.zig
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
const std = @import("std");
const http2 = @import("http2/connection.zig");
const hpack = @import("http2/hpack.zig");

const Connection = http2.Connection(std.io.AnyReader, std.io.AnyWriter);
const Hpack = hpack.Hpack;

pub fn main() !void {
// Setup a TCP listener on port 8080
const address = try std.net.Address.resolveIp("127.0.0.1", 8080);
var listener = try address.listen(.{ .reuse_address = true });
defer listener.deinit();

std.debug.print("Listening on 127.0.0.1:8080...\n", .{});

while (true) {
// Accept a new TCP connection
var conn = try listener.accept();
defer conn.stream.close();
defer conn.stream.close(); // Ensure stream is closed

std.debug.print("Accepted connection from: {any}\n", .{conn.address});

// Create a new HTTP/2 connection
var server_conn = try Connection.init(@constCast(&std.heap.page_allocator), conn.stream.reader().any(), conn.stream.writer().any(), true);
// defer server_conn.deinit();
var server_conn = Connection.init(@constCast(&std.heap.page_allocator), conn.stream.reader().any(), conn.stream.writer().any(), true) catch |err| {
if (err == error.InvalidPreface) {
std.debug.print("Invalid HTTP/2 preface, closing connection\n", .{});
// Connection will close after GOAWAY is sent
} else if (err == error.BrokenPipe) {
std.debug.print("Client disconnected (BrokenPipe)\n", .{});
} else {
std.debug.print("Error during connection initialization: {s}\n", .{@errorName(err)});
}
continue; // Continue accepting new connections after handling error
};

// Handle the HTTP/2 connection, including the SETTINGS frame and initial preface
try handleHttp2Connection(&server_conn);
handleHttp2Connection(&server_conn) catch |err| {
if (err == error.BrokenPipe) {
std.debug.print("Client disconnected (BrokenPipe)\n", .{});
} else {
std.debug.print("Error handling connection: {s}\n", .{@errorName(err)});
}
};

std.debug.print("Closing connection from: {any}\n", .{conn.address});
}
}

Expand All @@ -34,43 +49,99 @@ fn handleHttp2Connection(server_conn: *Connection) !void {

// Loop to process incoming frames
while (true) {
// Receive an HTTP/2 frame
const frame = try server_conn.receiveFrame();
const frame = server_conn.receiveFrame() catch |err| {
if (err == error.BrokenPipe or err == error.ConnectionResetByPeer) {
std.debug.print("Client disconnected: {any}\n", .{err});
return server_conn.close();
}
return err;
};

std.debug.print("Received frame of type: {s}, stream ID: {d}\n", .{ @tagName(frame.header.frame_type), frame.header.stream_id });

// Handle different types of frames
switch (frame.header.frame_type) {
.SETTINGS => {
std.debug.print("Received SETTINGS frame\n", .{});
// Send an ACK for the SETTINGS frame
try server_conn.applyFrameSettings(frame);
try server_conn.sendSettingsAck();
std.debug.print("Sent SETTINGS ACK\n", .{});
},
.PING => {
std.debug.print("Received PING frame, responding with PONG\n", .{});
// Send PONG frame in response to PING
try server_conn.sendPong(frame.payload);
},
.WINDOW_UPDATE => {
std.debug.print("Received WINDOW_UPDATE frame\n", .{});
// Handle window updates (adjust send/recv window size)
try server_conn.handleWindowUpdate(frame);
},
.HEADERS => {
std.debug.print("Received HEADERS frame\n", .{});
// Here you could implement header processing logic, e.g. parsing request headers
try processRequest(server_conn, frame.header.stream_id);
},
.DATA => {
std.debug.print("Received DATA frame\n", .{});
// Handle data frame, which could involve reading the payload and responding
// Handle data frame
},
.GOAWAY => {
std.debug.print("Received GOAWAY frame, closing connection\n", .{});
return; // Gracefully close connection on GOAWAY
return server_conn.close();
},
else => {
std.debug.print("Unhandled frame type: {s}\n", .{@tagName(frame.header.frame_type)});
std.debug.print("Unknown frame type: {s}\n", .{@tagName(frame.header.frame_type)});
},
}
}
}

fn processRequest(server_conn: *Connection, stream_id: u31) !void {
std.debug.print("Processing request for stream ID: {d}\n", .{stream_id});
var dynamic_table = try Hpack.DynamicTable.init(@constCast(&std.heap.page_allocator), 4096); // Initialize dynamic table with 4KB size

// Prepare a basic response: "Hello, World!"
const response_body = "Hello, World!";
const response_headers = [_]Hpack.HeaderField{
.{ .name = ":status", .value = "200" },
};

var buffer = std.ArrayList(u8).init(std.heap.page_allocator);
defer buffer.deinit();

// Encode headers and write them to the buffer
for (response_headers) |header| {
try Hpack.encodeHeaderField(header, &dynamic_table, &buffer);
}

const encoded_headers = buffer.items;

// Send HEADERS frame
var headers_frame = http2.Frame{
.header = http2.FrameHeader{
.length = @intCast(encoded_headers.len),
.frame_type = .HEADERS,
.flags = http2.FrameFlags{
.value = http2.FrameFlags.END_HEADERS, // Mark end of headers
},
.reserved = false,
.stream_id = stream_id,
},
.payload = encoded_headers,
};
try headers_frame.write(server_conn.writer);

// Send DATA frame with "Hello, World!" response
var data_frame = http2.Frame{
.header = http2.FrameHeader{
.length = @intCast(response_body.len),
.frame_type = .DATA,
.flags = http2.FrameFlags{
.value = http2.FrameFlags.END_STREAM, // Mark end of stream
},
.reserved = false,
.stream_id = stream_id,
},
.payload = response_body,
};
try data_frame.write(server_conn.writer);

std.debug.print("Sent 200 OK response with body: \"Hello, World!\"\n", .{});
}
77 changes: 63 additions & 14 deletions http2/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type {
var preface_buf: [24]u8 = undefined;
_ = try self.reader.readAll(&preface_buf);
if (!std.mem.eql(u8, &preface_buf, http2_preface)) {
try self.sendGoAway(0, 1, "Invalid preface: PROTOCOL_ERROR");
return error.InvalidPreface;
}
std.debug.print("Valid HTTP/2 preface received\n", .{});
Expand All @@ -48,6 +49,20 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type {
return self;
}

pub fn deinit(self: @This()) void {
// Deinitialize all streams.
var it = self.streams.iterator();
while (it.next()) |_| {
//const stream = entry.value_ptr;
//stream.deinit();
}

// Deinitialize the stream hash map.
// self.streams.deinit();

std.debug.print("Resources deinitialized for connection\n", .{});
}

fn sendPreface(self: @This()) !void {
try self.writer.writeAll(http2_preface);
}
Expand Down Expand Up @@ -96,7 +111,15 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type {

pub fn receiveFrame(self: *@This()) !Frame {
var header_buf: [9]u8 = undefined; // Frame header size is 9 bytes
_ = try self.reader.readAll(&header_buf);

// Catching potential BrokenPipe or ConnectionResetByPeer
_ = self.reader.readAll(&header_buf) catch |err| {
if (err == error.BrokenPipe or err == error.ConnectionResetByPeer) {
std.debug.print("Client disconnected (BrokenPipe or ConnectionResetByPeer)\n", .{});
return err;
}
return err;
};

std.debug.print("Raw header bytes: {x}\n", .{header_buf});

Expand Down Expand Up @@ -146,20 +169,20 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type {
return error.InvalidFrameType;
}

// The payload of the SETTINGS frame is already read into `frame.payload`
// The payload of the SETTINGS frame must be a multiple of 6 bytes
if (frame.payload.len % 6 != 0) {
std.debug.print("Invalid SETTINGS frame size: {d}\n", .{frame.payload.len});
return error.InvalidSettingsFrameSize;
}

var i: usize = 0;
while (i + 6 <= frame.payload.len) {
const setting = frame.payload[i .. i + 6];

// Decode setting ID and value
const id = std.mem.readInt(u16, setting[0..2], .big);
const value = std.mem.readInt(u32, setting[2..6], .big);
const id: u16 = std.mem.readInt(u16, setting[0..2], .big);
const value: u32 = std.mem.readInt(u32, setting[2..6], .big);

std.debug.print("Setting ID: {d}, Value: {d}\n", .{ id, value });
std.debug.print("Applying Setting ID: {d}, Value: {d}\n", .{ id, value });

// Apply the settings based on ID
switch (id) {
Expand Down Expand Up @@ -190,7 +213,13 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type {
.stream_id = 0,
};

try frame_header.write(self.writer);
frame_header.write(self.writer) catch |err| {
if (err == error.BrokenPipe) {
std.debug.print("Client disconnected (BrokenPipe)\n", .{});
return err;
}
return err;
};

std.debug.print("Sent SETTINGS ACK frame\n", .{});
}
Expand Down Expand Up @@ -241,8 +270,6 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type {
}
}

// ... [Existing Methods] ...

/// Sends a GOAWAY frame with the given parameters.
pub fn sendGoAway(self: @This(), last_stream_id: u31, error_code: u32, debug_data: []const u8) !void {
var buffer = std.ArrayList(u8).init(self.allocator.*);
Expand Down Expand Up @@ -283,13 +310,35 @@ pub fn Connection(comptime ReaderType: type, comptime WriterType: type) type {
}

pub fn close(self: @This()) !void {
// Here, `last_stream_id` should be the highest stream ID the server has processed.
// For simplicity, we'll use 1, as per your current implementation.
try self.sendGoAway(1, 0, "Closing connection gracefully");
// Determine the highest stream ID that was processed
var highest_stream_id: u31 = 0;

var it = self.streams.iterator();
while (it.next()) |entry| {
if (entry.key_ptr.* > highest_stream_id) {
highest_stream_id = entry.key_ptr.*;
}
}

// Error code 0 indicates graceful shutdown; adjust this if specific errors need to be reported.
const error_code: u32 = 0; // 0: NO_ERROR, indicating graceful shutdown

// Optional debug data for GOAWAY frame, informing the client about the reason
const debug_data = "Connection closing: graceful shutdown";

// Send the GOAWAY frame with the highest stream ID and debug information
try self.sendGoAway(highest_stream_id, error_code, debug_data);

// Ensure the GOAWAY frame is fully sent before closing the connection.
// try self.writer.flush();

// Close the underlying writer and terminate the connection gracefully
// try self.writer.close();

// Optionally, send a GOAWAY frame with additional debug data or different error codes as needed.
// Optionally, free up resources associated with streams
@constCast(&self.streams).deinit();

// Finally, close the writer to terminate the connection.
std.debug.print("Connection closed gracefully with GOAWAY frame\n", .{});
}

pub fn getStream(self: *@This(), stream_id: u31) !*Stream {
Expand Down

0 comments on commit 407c83c

Please sign in to comment.