prefect server in zig
at main 327 lines 12 kB view raw
1// websocket handlers for /events/in and /events/out 2 3const std = @import("std"); 4const zap = @import("zap"); 5const log = @import("../logging.zig"); 6const messaging = @import("../utilities/messaging.zig"); // legacy fallback 7const broker = @import("../broker.zig"); 8const event_broadcaster = @import("../services/event_broadcaster.zig"); 9 10/// Topic for Prefect events (matches Python: "events") 11pub const EVENTS_TOPIC = "events"; 12 13const WebSockets = zap.WebSockets; 14 15// ============================================================================ 16// Events In - event ingestion endpoint 17// ============================================================================ 18 19const EventsContext = struct {}; 20const EventsHandler = WebSockets.Handler(EventsContext); 21 22var events_context: EventsContext = .{}; 23var events_settings: EventsHandler.WebSocketSettings = .{ 24 .on_open = onEventsOpen, 25 .on_close = onEventsClose, 26 .on_message = onEventsMessage, 27 .context = &events_context, 28}; 29 30fn onEventsOpen(_: ?*EventsContext, _: WebSockets.WsHandle) !void { 31 log.debug("websocket", "events connection opened", .{}); 32} 33 34fn onEventsClose(_: ?*EventsContext, _: isize) !void { 35 log.debug("websocket", "events connection closed", .{}); 36} 37 38fn onEventsMessage(_: ?*EventsContext, _: WebSockets.WsHandle, message: []const u8, is_text: bool) !void { 39 log.debug("events", "received message: {d} bytes, text={}", .{ message.len, is_text }); 40 41 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 42 defer arena.deinit(); 43 const alloc = arena.allocator(); 44 45 const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { 46 log.err("events", "failed to parse event json", .{}); 47 return; 48 }; 49 50 const obj = parsed.value.object; 51 52 const getString = struct { 53 fn f(val: ?std.json.Value) ?[]const u8 { 54 if (val) |v| { 55 return switch (v) { 56 .string => |s| s, 57 else => null, 58 }; 59 } 60 return null; 61 } 62 }.f; 63 64 const stringifyJson = struct { 65 fn f(a: std.mem.Allocator, val: ?std.json.Value, default: []const u8) []const u8 { 66 if (val) |v| { 67 const formatted = std.fmt.allocPrint(a, "{f}", .{std.json.fmt(v, .{})}) catch return default; 68 return formatted; 69 } 70 return default; 71 } 72 }.f; 73 74 const id = getString(obj.get("id")); 75 const occurred = getString(obj.get("occurred")); 76 const event_name = getString(obj.get("event")); 77 78 if (id == null or occurred == null or event_name == null) { 79 log.err("events", "event missing required fields", .{}); 80 return; 81 } 82 83 var resource_id: []const u8 = ""; 84 if (obj.get("resource")) |res| { 85 if (res == .object) { 86 resource_id = getString(res.object.get("prefect.resource.id")) orelse ""; 87 } 88 } 89 90 const resource_json = stringifyJson(alloc, obj.get("resource"), "{}"); 91 const payload_json = stringifyJson(alloc, obj.get("payload"), "{}"); 92 const related_json = stringifyJson(alloc, obj.get("related"), "[]"); 93 const follows = getString(obj.get("follows")); 94 95 // Publish to broker (single source of truth) 96 // Format: JSON with all event fields for consumers to parse 97 const event_json = std.fmt.allocPrint(alloc, 98 \\{{"id":"{s}","occurred":"{s}","event":"{s}","resource_id":"{s}","resource":{s},"payload":{s},"related":{s},"follows":{s}}} 99 , .{ 100 id.?, 101 occurred.?, 102 event_name.?, 103 resource_id, 104 resource_json, 105 payload_json, 106 related_json, 107 if (follows) |f| std.fmt.allocPrint(alloc, "\"{s}\"", .{f}) catch "null" else "null", 108 }) catch { 109 log.err("events", "failed to format event json", .{}); 110 return; 111 }; 112 113 if (broker.getBroker()) |b| { 114 b.publish(EVENTS_TOPIC, id.?, event_json) catch |err| { 115 log.warn("events", "dropped (broker error): {s} - {}", .{ event_name.?, err }); 116 return; 117 }; 118 log.debug("events", "published: {s}", .{event_name.?}); 119 } else { 120 // Fallback to legacy path if broker not initialized 121 if (messaging.publishEvent(id.?, occurred.?, event_name.?, resource_id, resource_json, payload_json, related_json, follows)) { 122 log.debug("events", "queued (legacy): {s}", .{event_name.?}); 123 } else { 124 log.warn("events", "dropped (backpressure): {s}", .{event_name.?}); 125 } 126 } 127} 128 129// ============================================================================ 130// Events Out - subscription endpoint 131// ============================================================================ 132 133const SubscriberState = enum { waiting_auth, waiting_filter, subscribed }; 134 135const SubscriberContext = struct { 136 state: SubscriberState = .waiting_auth, 137 handle: WebSockets.WsHandle = null, 138}; 139 140const SubscriberHandler = WebSockets.Handler(SubscriberContext); 141 142const MAX_SUBSCRIBERS = 256; 143var subscriber_contexts: [MAX_SUBSCRIBERS]SubscriberContext = undefined; 144var subscriber_settings: [MAX_SUBSCRIBERS]SubscriberHandler.WebSocketSettings = undefined; 145var initialized: bool = false; 146 147fn initIfNeeded() void { 148 if (initialized) return; 149 for (0..MAX_SUBSCRIBERS) |i| { 150 subscriber_contexts[i] = .{ .state = .waiting_auth, .handle = null }; 151 subscriber_settings[i] = .{ 152 .on_open = onSubscriberOpen, 153 .on_close = onSubscriberClose, 154 .on_message = onSubscriberMessage, 155 .context = &subscriber_contexts[i], 156 }; 157 } 158 initialized = true; 159} 160 161fn onSubscriberOpen(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle) !void { 162 log.debug("events-out", "subscriber connected", .{}); 163 if (ctx) |c| { 164 c.handle = handle; 165 c.state = .waiting_auth; 166 } 167} 168 169fn onSubscriberClose(ctx: ?*SubscriberContext, _: isize) !void { 170 log.debug("events-out", "subscriber disconnected", .{}); 171 if (ctx) |c| { 172 if (c.handle) |h| event_broadcaster.removeSubscriber(h); 173 c.handle = null; 174 c.state = .waiting_auth; 175 } 176} 177 178fn onSubscriberMessage(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle, message: []const u8, _: bool) !void { 179 const c = ctx orelse return; 180 181 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 182 defer arena.deinit(); 183 const alloc = arena.allocator(); 184 185 const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { 186 log.err("events-out", "invalid json from subscriber", .{}); 187 SubscriberHandler.close(handle); 188 return; 189 }; 190 191 const obj = parsed.value.object; 192 const msg_type = if (obj.get("type")) |v| switch (v) { 193 .string => |s| s, 194 else => null, 195 } else null; 196 197 if (msg_type == null) { 198 log.err("events-out", "message missing type field", .{}); 199 SubscriberHandler.close(handle); 200 return; 201 } 202 203 switch (c.state) { 204 .waiting_auth => { 205 if (!std.mem.eql(u8, msg_type.?, "auth")) { 206 log.err("events-out", "expected auth message, got {s}", .{msg_type.?}); 207 SubscriberHandler.close(handle); 208 return; 209 } 210 SubscriberHandler.write(handle, "{\"type\":\"auth_success\"}", true) catch { 211 log.err("events-out", "failed to send auth_success", .{}); 212 return; 213 }; 214 c.state = .waiting_filter; 215 log.debug("events-out", "auth successful, waiting for filter", .{}); 216 }, 217 .waiting_filter => { 218 if (!std.mem.eql(u8, msg_type.?, "filter")) { 219 log.err("events-out", "expected filter message, got {s}", .{msg_type.?}); 220 SubscriberHandler.close(handle); 221 return; 222 } 223 224 // Check backfill flag (defaults to true like Python) 225 const wants_backfill = if (obj.get("backfill")) |v| switch (v) { 226 .bool => |b| b, 227 else => true, 228 } else true; 229 230 // Parse filter from client message (uses page_allocator for filter lifetime) 231 const filter = event_broadcaster.EventFilter.parseFromJson(std.heap.page_allocator, message) catch { 232 log.err("events-out", "failed to parse filter", .{}); 233 SubscriberHandler.close(handle); 234 return; 235 }; 236 237 // Backfill: send recent events before registering for live 238 if (wants_backfill) { 239 const db = @import("../db/sqlite.zig"); 240 if (db.queryRecentEvents(alloc, 100)) |events| { 241 var backfill_count: usize = 0; 242 for (events) |event_json| { 243 // Apply filter to backfill events 244 if (!filter.matchesJson(event_json)) continue; 245 const wrapped = std.fmt.allocPrint(alloc, "{{\"type\":\"event\",\"event\":{s}}}", .{event_json}) catch continue; 246 SubscriberHandler.write(handle, wrapped, true) catch continue; 247 backfill_count += 1; 248 } 249 if (backfill_count > 0) { 250 log.debug("events-out", "sent {d} backfill events", .{backfill_count}); 251 } 252 } else |err| { 253 log.warn("events-out", "backfill query failed: {}", .{err}); 254 } 255 } 256 257 if (!event_broadcaster.addSubscriber(handle, filter)) { 258 log.err("events-out", "failed to add subscriber (at capacity)", .{}); 259 var f = filter; 260 f.deinit(); 261 SubscriberHandler.close(handle); 262 return; 263 } 264 c.state = .subscribed; 265 log.info("events-out", "subscriber registered, total: {d}", .{event_broadcaster.getSubscriberCount()}); 266 }, 267 .subscribed => { 268 log.debug("events-out", "ignoring message from subscribed client", .{}); 269 }, 270 } 271} 272 273// ============================================================================ 274// Public interface 275// ============================================================================ 276 277pub fn onUpgrade(r: zap.Request, target_protocol: []const u8) !void { 278 const target = r.path orelse "/"; 279 280 if (!std.mem.eql(u8, target_protocol, "websocket")) { 281 r.setStatus(.bad_request); 282 r.sendBody("{\"detail\":\"bad protocol\"}") catch {}; 283 return; 284 } 285 286 // /events/in - event ingestion 287 if (std.mem.eql(u8, target, "/events/in") or std.mem.eql(u8, target, "/api/events/in")) { 288 log.debug("websocket", "upgrading {s} (events-in)", .{target}); 289 EventsHandler.upgrade(r.h, &events_settings) catch |err| { 290 log.err("websocket", "upgrade failed: {}", .{err}); 291 r.setStatus(.internal_server_error); 292 r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; 293 }; 294 return; 295 } 296 297 // /events/out - event subscription 298 if (std.mem.eql(u8, target, "/events/out") or std.mem.eql(u8, target, "/api/events/out")) { 299 log.debug("websocket", "upgrading {s} (events-out)", .{target}); 300 initIfNeeded(); 301 302 var slot: ?usize = null; 303 for (0..MAX_SUBSCRIBERS) |i| { 304 if (subscriber_contexts[i].handle == null) { 305 slot = i; 306 break; 307 } 308 } 309 310 if (slot) |i| { 311 subscriber_contexts[i].state = .waiting_auth; 312 SubscriberHandler.upgrade(r.h, &subscriber_settings[i]) catch |err| { 313 log.err("websocket", "events-out upgrade failed: {}", .{err}); 314 r.setStatus(.internal_server_error); 315 r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; 316 }; 317 } else { 318 log.warn("websocket", "events-out at capacity", .{}); 319 r.setStatus(.service_unavailable); 320 r.sendBody("{\"detail\":\"too many subscribers\"}") catch {}; 321 } 322 return; 323 } 324 325 r.setStatus(.not_found); 326 r.sendBody("{\"detail\":\"not found\"}") catch {}; 327}