Skip to content

Commit

Permalink
sse kinda working
Browse files Browse the repository at this point in the history
  • Loading branch information
mookums committed Oct 27, 2024
1 parent 79fdcf2 commit 09f9c6f
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 153 deletions.
1 change: 1 addition & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub fn build(b: *std.Build) void {
zzz.linkLibrary(bearssl);

add_example(b, "basic", .http, false, target, optimize, zzz);
add_example(b, "sse", .http, false, target, optimize, zzz);
add_example(b, "custom", .http, false, target, optimize, zzz);
add_example(b, "tls", .http, true, target, optimize, zzz);
add_example(b, "minram", .http, false, target, optimize, zzz);
Expand Down
64 changes: 64 additions & 0 deletions examples/http/sse/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE Example</title>
</head>
<body>
<h1>Server-Sent Events Example</h1>
<button id="toggle-sse">Start SSE Connection</button>
<div id="messages"></div>
<script>
let eventSource = null;
const button = document.getElementById('toggle-sse');

button.addEventListener('click', function() {
if (eventSource) {
// If connection exists, close it
eventSource.close();
eventSource = null;
button.textContent = 'Start SSE Connection';
return;
}

// Start new connection
eventSource = new EventSource('/stream');
button.textContent = 'Stop SSE Connection';

eventSource.onmessage = function(event) {
const messagesDiv = document.getElementById('messages');
messagesDiv.innerHTML += `<p>${event.data}</p>`;
// Auto-scroll to bottom
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};

eventSource.onerror = (err) => {
console.error("Error occurred while connecting to SSE:", err);
eventSource.close();
eventSource = null;
button.textContent = 'Start SSE Connection';
};
});

// Clean up on page unload
window.addEventListener('unload', () => {
if (eventSource) {
eventSource.close();
}
});
</script>
<style>
#messages {
max-height: 400px;
overflow-y: auto;
border: 1px solid #ccc;
padding: 10px;
margin-top: 10px;
}
p {
margin: 5px 0;
}
</style>
</body>
</html>
46 changes: 46 additions & 0 deletions examples/http/sse/main.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const std = @import("std");
const zzz = @import("zzz");
const http = zzz.HTTP;
const log = std.log.scoped(.@"examples/sse");

const Server = http.Server(.plain, .auto);
const Router = Server.Router;
const Context = Server.Context;
const Route = Server.Route;
const SSE = Server.SSE;

fn sse_task(sse: *SSE) !void {
log.debug("ctx ptr: {*}", .{sse.context});
try sse.send(.{ .data = "hi this is a message" }, sse_task);
std.time.sleep(std.time.ns_per_s);
}

fn sse_handler(ctx: *Context) void {
log.debug("going into sse mode", .{});
ctx.to_sse(sse_task);
}

pub fn main() !void {
const host: []const u8 = "0.0.0.0";
const port: u16 = 9862;

var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
defer _ = gpa.deinit();

var router = Router.init(allocator);
defer router.deinit();

try router.serve_embedded_file("/", http.Mime.HTML, @embedFile("index.html"));
try router.serve_route("/stream", Route.init().get(sse_handler));

var server = Server.init(.{
.router = &router,
.allocator = allocator,
.threading = .single,
});
defer server.deinit();

try server.bind(host, port);
try server.listen();
}
14 changes: 12 additions & 2 deletions src/core/job.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
const std = @import("std");
const Pseudoslice = @import("lib.zig").Pseudoslice;

pub const SendType = struct {
const TaskFn = @import("tardy").TaskFn;

pub const AfterType = union(enum) {
recv,
sse: struct {
func: *const anyopaque,
ctx: *anyopaque,
},
};
pub const SendInner = struct {
after: AfterType,
slice: Pseudoslice,
count: usize,
security: union(enum) {
Expand All @@ -24,6 +34,6 @@ pub const Job = union(enum) {
accept,
handshake: struct { state: enum { recv, send }, count: usize },
recv: struct { count: usize },
send: SendType,
send: SendInner,
close,
};
2 changes: 1 addition & 1 deletion src/core/pseudoslice.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub const Pseudoslice = struct {

/// Operates like a slice. That means it does not capture the end.
/// Start is an inclusive bound and end is an exclusive bound.
pub fn get(self: *Pseudoslice, start: usize, end: usize) []const u8 {
pub fn get(self: *const Pseudoslice, start: usize, end: usize) []const u8 {
assert(end >= start);
assert(self.shared.len >= end - start);
const clamped_end = @min(end, self.len);
Expand Down
71 changes: 54 additions & 17 deletions src/http/context.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ const std = @import("std");
const assert = std.debug.assert;
const log = std.log.scoped(.@"zzz/http/context");

const Pseudoslice = @import("../core/pseudoslice.zig").Pseudoslice;

const Capture = @import("routing_trie.zig").Capture;
const QueryMap = @import("routing_trie.zig").QueryMap;
const Provision = @import("provision.zig").Provision;

const Request = @import("request.zig").Request;
const Response = @import("response.zig").Response;
const ResponseSetOptions = Response.ResponseSetOptions;
const Mime = @import("mime.zig").Mime;
const _SSE = @import("sse.zig").SSE;

const Runtime = @import("tardy").Runtime;
const Task = @import("tardy").Task;
const TaskFn = @import("tardy").TaskFn;

const raw_respond = @import("server.zig").raw_respond;

Expand All @@ -20,6 +25,7 @@ const raw_respond = @import("server.zig").raw_respond;
pub fn Context(comptime Server: type) type {
return struct {
const Self = @This();
const SSE = _SSE(Server);
allocator: std.mem.Allocator,
runtime: *Runtime,
/// The Request that triggered this handler.
Expand All @@ -30,36 +36,67 @@ pub fn Context(comptime Server: type) type {
captures: []Capture,
queries: *QueryMap,
provision: *Provision,
triggered: bool = false,

pub fn respond(self: *Self, options: ResponseSetOptions) void {
assert(!self.triggered);
self.triggered = true;
self.response.set(options);
pub fn to_sse(self: *Self, then: SSE.SSEHandlerFn) void {
self.response.set(.{
.status = .OK,
.body = "",
.mime = Mime{
.extension = ".sse",
.description = "Server-Sent Events",
.content_type = "text/event-stream",
},
});

// this will write the data into the appropriate places.
const status = raw_respond(self.provision) catch unreachable;
const headers = self.provision.response.headers_into_buffer(
self.provision.buffer,
null,
) catch unreachable;

self.provision.job = .{
.send = .{
.count = 0,
.slice = status.send,
.security = undefined,
},
};
const sse = self.allocator.create(SSE) catch unreachable;
sse.* = .{ .context = self };

const body = options.body orelse "";
const pslice = Pseudoslice.init(headers, "", self.provision.buffer);

const first_chunk = Server.prepare_send(
self.runtime,
self.provision,
body,
.{ .sse = .{
.func = then,
.ctx = sse,
} },
pslice,
) catch unreachable;

self.runtime.net.send(
*Provision,
Server.send_then_sse_task,
self.provision,
self.provision.socket,
first_chunk,
) catch unreachable;
}

pub fn respond(self: *Self, options: ResponseSetOptions) void {
self.response.set(options);

const body = options.body orelse "";
const headers = self.provision.response.headers_into_buffer(
self.provision.buffer,
@intCast(body.len),
) catch unreachable;
const pslice = Pseudoslice.init(headers, body, self.provision.buffer);

const first_chunk = Server.prepare_send(
self.runtime,
self.provision,
.recv,
pslice,
) catch unreachable;

self.runtime.net.send(
*Provision,
Server.send_task,
Server.send_then_recv_task,
self.provision,
self.provision.socket,
first_chunk,
Expand Down
Loading

0 comments on commit 09f9c6f

Please sign in to comment.