prefect server in zig
1// websocket handlers for /events/in and /events/out
2
3const std = @import("std");
4const zap = @import("zap");
5const log = @import("../logging.zig");
6const messaging = @import("../utilities/messaging.zig"); // legacy fallback
7const broker = @import("../broker.zig");
8const event_broadcaster = @import("../services/event_broadcaster.zig");
9
10/// Topic for Prefect events (matches Python: "events")
11pub const EVENTS_TOPIC = "events";
12
13const WebSockets = zap.WebSockets;
14
15// ============================================================================
16// Events In - event ingestion endpoint
17// ============================================================================
18
19const EventsContext = struct {};
20const EventsHandler = WebSockets.Handler(EventsContext);
21
22var events_context: EventsContext = .{};
23var events_settings: EventsHandler.WebSocketSettings = .{
24 .on_open = onEventsOpen,
25 .on_close = onEventsClose,
26 .on_message = onEventsMessage,
27 .context = &events_context,
28};
29
30fn onEventsOpen(_: ?*EventsContext, _: WebSockets.WsHandle) !void {
31 log.debug("websocket", "events connection opened", .{});
32}
33
34fn onEventsClose(_: ?*EventsContext, _: isize) !void {
35 log.debug("websocket", "events connection closed", .{});
36}
37
38fn onEventsMessage(_: ?*EventsContext, _: WebSockets.WsHandle, message: []const u8, is_text: bool) !void {
39 log.debug("events", "received message: {d} bytes, text={}", .{ message.len, is_text });
40
41 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
42 defer arena.deinit();
43 const alloc = arena.allocator();
44
45 const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch {
46 log.err("events", "failed to parse event json", .{});
47 return;
48 };
49
50 const obj = parsed.value.object;
51
52 const getString = struct {
53 fn f(val: ?std.json.Value) ?[]const u8 {
54 if (val) |v| {
55 return switch (v) {
56 .string => |s| s,
57 else => null,
58 };
59 }
60 return null;
61 }
62 }.f;
63
64 const stringifyJson = struct {
65 fn f(a: std.mem.Allocator, val: ?std.json.Value, default: []const u8) []const u8 {
66 if (val) |v| {
67 const formatted = std.fmt.allocPrint(a, "{f}", .{std.json.fmt(v, .{})}) catch return default;
68 return formatted;
69 }
70 return default;
71 }
72 }.f;
73
74 const id = getString(obj.get("id"));
75 const occurred = getString(obj.get("occurred"));
76 const event_name = getString(obj.get("event"));
77
78 if (id == null or occurred == null or event_name == null) {
79 log.err("events", "event missing required fields", .{});
80 return;
81 }
82
83 var resource_id: []const u8 = "";
84 if (obj.get("resource")) |res| {
85 if (res == .object) {
86 resource_id = getString(res.object.get("prefect.resource.id")) orelse "";
87 }
88 }
89
90 const resource_json = stringifyJson(alloc, obj.get("resource"), "{}");
91 const payload_json = stringifyJson(alloc, obj.get("payload"), "{}");
92 const related_json = stringifyJson(alloc, obj.get("related"), "[]");
93 const follows = getString(obj.get("follows"));
94
95 // Publish to broker (single source of truth)
96 // Format: JSON with all event fields for consumers to parse
97 const event_json = std.fmt.allocPrint(alloc,
98 \\{{"id":"{s}","occurred":"{s}","event":"{s}","resource_id":"{s}","resource":{s},"payload":{s},"related":{s},"follows":{s}}}
99 , .{
100 id.?,
101 occurred.?,
102 event_name.?,
103 resource_id,
104 resource_json,
105 payload_json,
106 related_json,
107 if (follows) |f| std.fmt.allocPrint(alloc, "\"{s}\"", .{f}) catch "null" else "null",
108 }) catch {
109 log.err("events", "failed to format event json", .{});
110 return;
111 };
112
113 if (broker.getBroker()) |b| {
114 b.publish(EVENTS_TOPIC, id.?, event_json) catch |err| {
115 log.warn("events", "dropped (broker error): {s} - {}", .{ event_name.?, err });
116 return;
117 };
118 log.debug("events", "published: {s}", .{event_name.?});
119 } else {
120 // Fallback to legacy path if broker not initialized
121 if (messaging.publishEvent(id.?, occurred.?, event_name.?, resource_id, resource_json, payload_json, related_json, follows)) {
122 log.debug("events", "queued (legacy): {s}", .{event_name.?});
123 } else {
124 log.warn("events", "dropped (backpressure): {s}", .{event_name.?});
125 }
126 }
127}
128
129// ============================================================================
130// Events Out - subscription endpoint
131// ============================================================================
132
133const SubscriberState = enum { waiting_auth, waiting_filter, subscribed };
134
135const SubscriberContext = struct {
136 state: SubscriberState = .waiting_auth,
137 handle: WebSockets.WsHandle = null,
138};
139
140const SubscriberHandler = WebSockets.Handler(SubscriberContext);
141
142const MAX_SUBSCRIBERS = 256;
143var subscriber_contexts: [MAX_SUBSCRIBERS]SubscriberContext = undefined;
144var subscriber_settings: [MAX_SUBSCRIBERS]SubscriberHandler.WebSocketSettings = undefined;
145var initialized: bool = false;
146
147fn initIfNeeded() void {
148 if (initialized) return;
149 for (0..MAX_SUBSCRIBERS) |i| {
150 subscriber_contexts[i] = .{ .state = .waiting_auth, .handle = null };
151 subscriber_settings[i] = .{
152 .on_open = onSubscriberOpen,
153 .on_close = onSubscriberClose,
154 .on_message = onSubscriberMessage,
155 .context = &subscriber_contexts[i],
156 };
157 }
158 initialized = true;
159}
160
161fn onSubscriberOpen(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle) !void {
162 log.debug("events-out", "subscriber connected", .{});
163 if (ctx) |c| {
164 c.handle = handle;
165 c.state = .waiting_auth;
166 }
167}
168
169fn onSubscriberClose(ctx: ?*SubscriberContext, _: isize) !void {
170 log.debug("events-out", "subscriber disconnected", .{});
171 if (ctx) |c| {
172 if (c.handle) |h| event_broadcaster.removeSubscriber(h);
173 c.handle = null;
174 c.state = .waiting_auth;
175 }
176}
177
178fn onSubscriberMessage(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle, message: []const u8, _: bool) !void {
179 const c = ctx orelse return;
180
181 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
182 defer arena.deinit();
183 const alloc = arena.allocator();
184
185 const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch {
186 log.err("events-out", "invalid json from subscriber", .{});
187 SubscriberHandler.close(handle);
188 return;
189 };
190
191 const obj = parsed.value.object;
192 const msg_type = if (obj.get("type")) |v| switch (v) {
193 .string => |s| s,
194 else => null,
195 } else null;
196
197 if (msg_type == null) {
198 log.err("events-out", "message missing type field", .{});
199 SubscriberHandler.close(handle);
200 return;
201 }
202
203 switch (c.state) {
204 .waiting_auth => {
205 if (!std.mem.eql(u8, msg_type.?, "auth")) {
206 log.err("events-out", "expected auth message, got {s}", .{msg_type.?});
207 SubscriberHandler.close(handle);
208 return;
209 }
210 SubscriberHandler.write(handle, "{\"type\":\"auth_success\"}", true) catch {
211 log.err("events-out", "failed to send auth_success", .{});
212 return;
213 };
214 c.state = .waiting_filter;
215 log.debug("events-out", "auth successful, waiting for filter", .{});
216 },
217 .waiting_filter => {
218 if (!std.mem.eql(u8, msg_type.?, "filter")) {
219 log.err("events-out", "expected filter message, got {s}", .{msg_type.?});
220 SubscriberHandler.close(handle);
221 return;
222 }
223
224 // Check backfill flag (defaults to true like Python)
225 const wants_backfill = if (obj.get("backfill")) |v| switch (v) {
226 .bool => |b| b,
227 else => true,
228 } else true;
229
230 // Parse filter from client message (uses page_allocator for filter lifetime)
231 const filter = event_broadcaster.EventFilter.parseFromJson(std.heap.page_allocator, message) catch {
232 log.err("events-out", "failed to parse filter", .{});
233 SubscriberHandler.close(handle);
234 return;
235 };
236
237 // Backfill: send recent events before registering for live
238 if (wants_backfill) {
239 const db = @import("../db/sqlite.zig");
240 if (db.queryRecentEvents(alloc, 100)) |events| {
241 var backfill_count: usize = 0;
242 for (events) |event_json| {
243 // Apply filter to backfill events
244 if (!filter.matchesJson(event_json)) continue;
245 const wrapped = std.fmt.allocPrint(alloc, "{{\"type\":\"event\",\"event\":{s}}}", .{event_json}) catch continue;
246 SubscriberHandler.write(handle, wrapped, true) catch continue;
247 backfill_count += 1;
248 }
249 if (backfill_count > 0) {
250 log.debug("events-out", "sent {d} backfill events", .{backfill_count});
251 }
252 } else |err| {
253 log.warn("events-out", "backfill query failed: {}", .{err});
254 }
255 }
256
257 if (!event_broadcaster.addSubscriber(handle, filter)) {
258 log.err("events-out", "failed to add subscriber (at capacity)", .{});
259 var f = filter;
260 f.deinit();
261 SubscriberHandler.close(handle);
262 return;
263 }
264 c.state = .subscribed;
265 log.info("events-out", "subscriber registered, total: {d}", .{event_broadcaster.getSubscriberCount()});
266 },
267 .subscribed => {
268 log.debug("events-out", "ignoring message from subscribed client", .{});
269 },
270 }
271}
272
273// ============================================================================
274// Public interface
275// ============================================================================
276
277pub fn onUpgrade(r: zap.Request, target_protocol: []const u8) !void {
278 const target = r.path orelse "/";
279
280 if (!std.mem.eql(u8, target_protocol, "websocket")) {
281 r.setStatus(.bad_request);
282 r.sendBody("{\"detail\":\"bad protocol\"}") catch {};
283 return;
284 }
285
286 // /events/in - event ingestion
287 if (std.mem.eql(u8, target, "/events/in") or std.mem.eql(u8, target, "/api/events/in")) {
288 log.debug("websocket", "upgrading {s} (events-in)", .{target});
289 EventsHandler.upgrade(r.h, &events_settings) catch |err| {
290 log.err("websocket", "upgrade failed: {}", .{err});
291 r.setStatus(.internal_server_error);
292 r.sendBody("{\"detail\":\"upgrade failed\"}") catch {};
293 };
294 return;
295 }
296
297 // /events/out - event subscription
298 if (std.mem.eql(u8, target, "/events/out") or std.mem.eql(u8, target, "/api/events/out")) {
299 log.debug("websocket", "upgrading {s} (events-out)", .{target});
300 initIfNeeded();
301
302 var slot: ?usize = null;
303 for (0..MAX_SUBSCRIBERS) |i| {
304 if (subscriber_contexts[i].handle == null) {
305 slot = i;
306 break;
307 }
308 }
309
310 if (slot) |i| {
311 subscriber_contexts[i].state = .waiting_auth;
312 SubscriberHandler.upgrade(r.h, &subscriber_settings[i]) catch |err| {
313 log.err("websocket", "events-out upgrade failed: {}", .{err});
314 r.setStatus(.internal_server_error);
315 r.sendBody("{\"detail\":\"upgrade failed\"}") catch {};
316 };
317 } else {
318 log.warn("websocket", "events-out at capacity", .{});
319 r.setStatus(.service_unavailable);
320 r.sendBody("{\"detail\":\"too many subscribers\"}") catch {};
321 }
322 return;
323 }
324
325 r.setStatus(.not_found);
326 r.sendBody("{\"detail\":\"not found\"}") catch {};
327}