// websocket handlers for /events/in and /events/out const std = @import("std"); const zap = @import("zap"); const log = @import("../logging.zig"); const messaging = @import("../utilities/messaging.zig"); // legacy fallback const broker = @import("../broker.zig"); const event_broadcaster = @import("../services/event_broadcaster.zig"); /// Topic for Prefect events (matches Python: "events") pub const EVENTS_TOPIC = "events"; const WebSockets = zap.WebSockets; // ============================================================================ // Events In - event ingestion endpoint // ============================================================================ const EventsContext = struct {}; const EventsHandler = WebSockets.Handler(EventsContext); var events_context: EventsContext = .{}; var events_settings: EventsHandler.WebSocketSettings = .{ .on_open = onEventsOpen, .on_close = onEventsClose, .on_message = onEventsMessage, .context = &events_context, }; fn onEventsOpen(_: ?*EventsContext, _: WebSockets.WsHandle) !void { log.debug("websocket", "events connection opened", .{}); } fn onEventsClose(_: ?*EventsContext, _: isize) !void { log.debug("websocket", "events connection closed", .{}); } fn onEventsMessage(_: ?*EventsContext, _: WebSockets.WsHandle, message: []const u8, is_text: bool) !void { log.debug("events", "received message: {d} bytes, text={}", .{ message.len, is_text }); var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { log.err("events", "failed to parse event json", .{}); return; }; const obj = parsed.value.object; const getString = struct { fn f(val: ?std.json.Value) ?[]const u8 { if (val) |v| { return switch (v) { .string => |s| s, else => null, }; } return null; } }.f; const stringifyJson = struct { fn f(a: std.mem.Allocator, val: ?std.json.Value, default: []const u8) []const u8 { if (val) |v| { const formatted = std.fmt.allocPrint(a, "{f}", .{std.json.fmt(v, .{})}) catch return default; return formatted; } return default; } }.f; const id = getString(obj.get("id")); const occurred = getString(obj.get("occurred")); const event_name = getString(obj.get("event")); if (id == null or occurred == null or event_name == null) { log.err("events", "event missing required fields", .{}); return; } var resource_id: []const u8 = ""; if (obj.get("resource")) |res| { if (res == .object) { resource_id = getString(res.object.get("prefect.resource.id")) orelse ""; } } const resource_json = stringifyJson(alloc, obj.get("resource"), "{}"); const payload_json = stringifyJson(alloc, obj.get("payload"), "{}"); const related_json = stringifyJson(alloc, obj.get("related"), "[]"); const follows = getString(obj.get("follows")); // Publish to broker (single source of truth) // Format: JSON with all event fields for consumers to parse const event_json = std.fmt.allocPrint(alloc, \\{{"id":"{s}","occurred":"{s}","event":"{s}","resource_id":"{s}","resource":{s},"payload":{s},"related":{s},"follows":{s}}} , .{ id.?, occurred.?, event_name.?, resource_id, resource_json, payload_json, related_json, if (follows) |f| std.fmt.allocPrint(alloc, "\"{s}\"", .{f}) catch "null" else "null", }) catch { log.err("events", "failed to format event json", .{}); return; }; if (broker.getBroker()) |b| { b.publish(EVENTS_TOPIC, id.?, event_json) catch |err| { log.warn("events", "dropped (broker error): {s} - {}", .{ event_name.?, err }); return; }; log.debug("events", "published: {s}", .{event_name.?}); } else { // Fallback to legacy path if broker not initialized if (messaging.publishEvent(id.?, occurred.?, event_name.?, resource_id, resource_json, payload_json, related_json, follows)) { log.debug("events", "queued (legacy): {s}", .{event_name.?}); } else { log.warn("events", "dropped (backpressure): {s}", .{event_name.?}); } } } // ============================================================================ // Events Out - subscription endpoint // ============================================================================ const SubscriberState = enum { waiting_auth, waiting_filter, subscribed }; const SubscriberContext = struct { state: SubscriberState = .waiting_auth, handle: WebSockets.WsHandle = null, }; const SubscriberHandler = WebSockets.Handler(SubscriberContext); const MAX_SUBSCRIBERS = 256; var subscriber_contexts: [MAX_SUBSCRIBERS]SubscriberContext = undefined; var subscriber_settings: [MAX_SUBSCRIBERS]SubscriberHandler.WebSocketSettings = undefined; var initialized: bool = false; fn initIfNeeded() void { if (initialized) return; for (0..MAX_SUBSCRIBERS) |i| { subscriber_contexts[i] = .{ .state = .waiting_auth, .handle = null }; subscriber_settings[i] = .{ .on_open = onSubscriberOpen, .on_close = onSubscriberClose, .on_message = onSubscriberMessage, .context = &subscriber_contexts[i], }; } initialized = true; } fn onSubscriberOpen(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle) !void { log.debug("events-out", "subscriber connected", .{}); if (ctx) |c| { c.handle = handle; c.state = .waiting_auth; } } fn onSubscriberClose(ctx: ?*SubscriberContext, _: isize) !void { log.debug("events-out", "subscriber disconnected", .{}); if (ctx) |c| { if (c.handle) |h| event_broadcaster.removeSubscriber(h); c.handle = null; c.state = .waiting_auth; } } fn onSubscriberMessage(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle, message: []const u8, _: bool) !void { const c = ctx orelse return; var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { log.err("events-out", "invalid json from subscriber", .{}); SubscriberHandler.close(handle); return; }; const obj = parsed.value.object; const msg_type = if (obj.get("type")) |v| switch (v) { .string => |s| s, else => null, } else null; if (msg_type == null) { log.err("events-out", "message missing type field", .{}); SubscriberHandler.close(handle); return; } switch (c.state) { .waiting_auth => { if (!std.mem.eql(u8, msg_type.?, "auth")) { log.err("events-out", "expected auth message, got {s}", .{msg_type.?}); SubscriberHandler.close(handle); return; } SubscriberHandler.write(handle, "{\"type\":\"auth_success\"}", true) catch { log.err("events-out", "failed to send auth_success", .{}); return; }; c.state = .waiting_filter; log.debug("events-out", "auth successful, waiting for filter", .{}); }, .waiting_filter => { if (!std.mem.eql(u8, msg_type.?, "filter")) { log.err("events-out", "expected filter message, got {s}", .{msg_type.?}); SubscriberHandler.close(handle); return; } // Check backfill flag (defaults to true like Python) const wants_backfill = if (obj.get("backfill")) |v| switch (v) { .bool => |b| b, else => true, } else true; // Parse filter from client message (uses page_allocator for filter lifetime) const filter = event_broadcaster.EventFilter.parseFromJson(std.heap.page_allocator, message) catch { log.err("events-out", "failed to parse filter", .{}); SubscriberHandler.close(handle); return; }; // Backfill: send recent events before registering for live if (wants_backfill) { const db = @import("../db/sqlite.zig"); if (db.queryRecentEvents(alloc, 100)) |events| { var backfill_count: usize = 0; for (events) |event_json| { // Apply filter to backfill events if (!filter.matchesJson(event_json)) continue; const wrapped = std.fmt.allocPrint(alloc, "{{\"type\":\"event\",\"event\":{s}}}", .{event_json}) catch continue; SubscriberHandler.write(handle, wrapped, true) catch continue; backfill_count += 1; } if (backfill_count > 0) { log.debug("events-out", "sent {d} backfill events", .{backfill_count}); } } else |err| { log.warn("events-out", "backfill query failed: {}", .{err}); } } if (!event_broadcaster.addSubscriber(handle, filter)) { log.err("events-out", "failed to add subscriber (at capacity)", .{}); var f = filter; f.deinit(); SubscriberHandler.close(handle); return; } c.state = .subscribed; log.info("events-out", "subscriber registered, total: {d}", .{event_broadcaster.getSubscriberCount()}); }, .subscribed => { log.debug("events-out", "ignoring message from subscribed client", .{}); }, } } // ============================================================================ // Public interface // ============================================================================ pub fn onUpgrade(r: zap.Request, target_protocol: []const u8) !void { const target = r.path orelse "/"; if (!std.mem.eql(u8, target_protocol, "websocket")) { r.setStatus(.bad_request); r.sendBody("{\"detail\":\"bad protocol\"}") catch {}; return; } // /events/in - event ingestion if (std.mem.eql(u8, target, "/events/in") or std.mem.eql(u8, target, "/api/events/in")) { log.debug("websocket", "upgrading {s} (events-in)", .{target}); EventsHandler.upgrade(r.h, &events_settings) catch |err| { log.err("websocket", "upgrade failed: {}", .{err}); r.setStatus(.internal_server_error); r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; }; return; } // /events/out - event subscription if (std.mem.eql(u8, target, "/events/out") or std.mem.eql(u8, target, "/api/events/out")) { log.debug("websocket", "upgrading {s} (events-out)", .{target}); initIfNeeded(); var slot: ?usize = null; for (0..MAX_SUBSCRIBERS) |i| { if (subscriber_contexts[i].handle == null) { slot = i; break; } } if (slot) |i| { subscriber_contexts[i].state = .waiting_auth; SubscriberHandler.upgrade(r.h, &subscriber_settings[i]) catch |err| { log.err("websocket", "events-out upgrade failed: {}", .{err}); r.setStatus(.internal_server_error); r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; }; } else { log.warn("websocket", "events-out at capacity", .{}); r.setStatus(.service_unavailable); r.sendBody("{\"detail\":\"too many subscribers\"}") catch {}; } return; } r.setStatus(.not_found); r.sendBody("{\"detail\":\"not found\"}") catch {}; }