+1
-1
examples/firehose_demo/dune
+1
-1
examples/firehose_demo/dune
+315
-84
examples/firehose_demo/firehose_demo.ml
+315
-84
examples/firehose_demo/firehose_demo.ml
···
3
This example demonstrates how to connect to the Bluesky firehose and process
4
real-time events (posts, likes, follows, etc.).
5
6
-
The firehose uses WebSockets with DAG-CBOR encoded messages. This example
7
-
uses OCaml 5 effects for I/O.
8
9
-
Usage: dune exec examples/firehose_demo/firehose_demo.exe
10
11
Events shown:
12
- #commit: Repository changes (new posts, likes, follows, etc.)
13
- #identity: Handle/identity changes
14
- #account: Account status changes
15
- #handle: Handle updates
16
-
- #tombstone: Repository deletions *)
17
18
-
open Atproto_sync
19
20
(** Format an operation for display *)
21
let format_op (op : Firehose.operation) =
22
let action =
23
-
match op.action with
24
-
| `Create -> "CREATE"
25
-
| `Update -> "UPDATE"
26
-
| `Delete -> "DELETE"
27
in
28
-
Printf.sprintf "%s %s" action op.path
29
30
-
(** Format a commit event *)
31
let format_commit (evt : Firehose.commit_event) =
32
-
let ops_str = String.concat ", " (List.map format_op evt.ops) in
33
-
Printf.sprintf "[COMMIT] seq=%Ld repo=%s rev=%s ops=[%s]" evt.seq evt.repo
34
-
evt.rev ops_str
35
36
(** Format an identity event *)
37
let format_identity (evt : Firehose.identity_event) =
38
-
let handle_str =
39
-
match evt.handle with Some h -> h | None -> "(no handle)"
40
-
in
41
-
Printf.sprintf "[IDENTITY] seq=%Ld did=%s handle=%s" evt.seq evt.did
42
-
handle_str
43
44
(** Format an account event *)
45
let format_account (evt : Firehose.account_event) =
46
-
let status_str =
47
-
match evt.status with Some s -> s | None -> "(no status)"
48
-
in
49
-
Printf.sprintf "[ACCOUNT] seq=%Ld did=%s active=%b status=%s" evt.seq evt.did
50
-
evt.active status_str
51
52
(** Format a handle event *)
53
let format_handle (evt : Firehose.handle_event) =
54
-
Printf.sprintf "[HANDLE] seq=%Ld did=%s handle=%s" evt.seq evt.did evt.handle
55
56
(** Format a tombstone event *)
57
let format_tombstone (evt : Firehose.tombstone_event) =
58
-
Printf.sprintf "[TOMBSTONE] seq=%Ld did=%s" evt.seq evt.did
59
60
(** Format an info message *)
61
let format_info (msg : Firehose.info_message) =
62
-
let msg_str = match msg.message with Some m -> m | None -> "(no message)" in
63
-
Printf.sprintf "[INFO] name=%s message=%s" msg.name msg_str
64
65
(** Format any event *)
66
let format_event = function
···
70
| Firehose.Handle evt -> format_handle evt
71
| Firehose.Tombstone evt -> format_tombstone evt
72
| Firehose.Info msg -> format_info msg
73
-
| Firehose.StreamError err -> Printf.sprintf "[ERROR] %s" err
74
75
-
(** Event handler - prints each event and continues *)
76
-
let handle_event event =
77
-
print_endline (format_event event);
78
-
true (* Continue receiving events *)
79
80
-
(* Suppress unused warning - this is example code *)
81
-
let _ = handle_event
82
83
-
(** Main entry point *)
84
-
let () =
85
-
print_endline "AT Protocol Firehose Demo";
86
-
print_endline "=========================";
87
print_endline "";
88
-
print_endline "This demo shows the structure of a firehose client.";
89
-
print_endline "";
90
-
print_endline "The firehose module uses OCaml 5 effects for WebSocket I/O.";
91
-
print_endline "To connect to the real Bluesky firehose, you need to provide";
92
-
print_endline "an effect handler that implements:";
93
-
print_endline "";
94
-
print_endline " - Firehose.Ws_connect : Uri.t -> (websocket, string) result";
95
-
print_endline " - Firehose.Ws_recv : websocket -> (string, string) result";
96
-
print_endline " - Firehose.Ws_close : websocket -> unit";
97
print_endline "";
98
-
print_endline "Example handler using eio-websocket:";
99
print_endline "";
100
print_endline
101
-
{| let run_with_eio f =
102
-
Eio_main.run @@ fun env ->
103
-
Effect.Deep.match_with f ()
104
-
{
105
-
retc = Fun.id;
106
-
exnc = raise;
107
-
effc = fun (type a) (e : a Effect.t) ->
108
-
match e with
109
-
| Firehose.Ws_connect uri ->
110
-
Some (fun k ->
111
-
let ws = Eio_websocket.connect env uri in
112
-
Effect.Deep.continue k (Ok ws))
113
-
| Firehose.Ws_recv ws ->
114
-
Some (fun k ->
115
-
let msg = Eio_websocket.recv ws in
116
-
Effect.Deep.continue k (Ok msg))
117
-
| Firehose.Ws_close ws ->
118
-
Some (fun k ->
119
-
Eio_websocket.close ws;
120
-
Effect.Deep.continue k ())
121
-
| _ -> None
122
-
}|};
123
print_endline "";
124
-
print_endline "Then subscribe to the firehose:";
125
-
print_endline "";
126
print_endline
127
-
{| let config =
128
-
Firehose.config
129
-
~uri:(Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos")
130
-
()
131
-
in
132
-
run_with_eio (fun () ->
133
-
match Firehose.subscribe config ~handler:handle_event with
134
-
| Ok () -> print_endline "Done"
135
-
| Error e -> print_endline (Firehose.error_to_string e))|};
136
print_endline "";
137
-
print_endline "For a working example with eio, see:";
138
-
print_endline " https://github.com/bluesky-social/atproto"
···
3
This example demonstrates how to connect to the Bluesky firehose and process
4
real-time events (posts, likes, follows, etc.).
5
6
+
The firehose uses WebSockets with DAG-CBOR encoded messages.
7
8
+
Usage: dune exec examples/firehose_demo/firehose_demo.exe [cursor]
9
10
Events shown:
11
- #commit: Repository changes (new posts, likes, follows, etc.)
12
- #identity: Handle/identity changes
13
- #account: Account status changes
14
- #handle: Handle updates
15
+
- #tombstone: Repository deletions
16
+
17
+
Note: This demo requires TLS support to connect to the real firehose
18
+
(wss://). Currently it demonstrates frame decoding with sample data. *)
19
20
+
module Firehose = Atproto_sync.Firehose
21
22
(** Format an operation for display *)
23
let format_op (op : Firehose.operation) =
24
let action =
25
+
match op.action with `Create -> "+" | `Update -> "~" | `Delete -> "-"
26
in
27
+
Printf.sprintf "%s%s" action op.path
28
29
+
(** Format a commit event - shows post/like/follow activity *)
30
let format_commit (evt : Firehose.commit_event) =
31
+
let ops_str = String.concat " " (List.map format_op evt.ops) in
32
+
Printf.sprintf "\027[32m#commit\027[0m seq=%Ld repo=%s ops=[%s]" evt.seq
33
+
evt.repo ops_str
34
35
(** Format an identity event *)
36
let format_identity (evt : Firehose.identity_event) =
37
+
let handle_str = match evt.handle with Some h -> h | None -> "(none)" in
38
+
Printf.sprintf "\027[34m#identity\027[0m seq=%Ld did=%s handle=%s" evt.seq
39
+
evt.did handle_str
40
41
(** Format an account event *)
42
let format_account (evt : Firehose.account_event) =
43
+
let status_str = match evt.status with Some s -> s | None -> "(none)" in
44
+
Printf.sprintf "\027[33m#account\027[0m seq=%Ld did=%s active=%b status=%s"
45
+
evt.seq evt.did evt.active status_str
46
47
(** Format a handle event *)
48
let format_handle (evt : Firehose.handle_event) =
49
+
Printf.sprintf "\027[35m#handle\027[0m seq=%Ld did=%s handle=%s" evt.seq
50
+
evt.did evt.handle
51
52
(** Format a tombstone event *)
53
let format_tombstone (evt : Firehose.tombstone_event) =
54
+
Printf.sprintf "\027[31m#tombstone\027[0m seq=%Ld did=%s" evt.seq evt.did
55
56
(** Format an info message *)
57
let format_info (msg : Firehose.info_message) =
58
+
let msg_str = match msg.message with Some m -> m | None -> "(none)" in
59
+
Printf.sprintf "\027[36m#info\027[0m name=%s message=%s" msg.name msg_str
60
61
(** Format any event *)
62
let format_event = function
···
66
| Firehose.Handle evt -> format_handle evt
67
| Firehose.Tombstone evt -> format_tombstone evt
68
| Firehose.Info msg -> format_info msg
69
+
| Firehose.StreamError err -> Printf.sprintf "\027[31m#error\027[0m %s" err
70
+
71
+
type stats = {
72
+
mutable commits : int;
73
+
mutable identities : int;
74
+
mutable accounts : int;
75
+
mutable handles : int;
76
+
mutable tombstones : int;
77
+
mutable errors : int;
78
+
mutable last_seq : int64;
79
+
}
80
+
(** Statistics tracking *)
81
+
82
+
let create_stats () =
83
+
{
84
+
commits = 0;
85
+
identities = 0;
86
+
accounts = 0;
87
+
handles = 0;
88
+
tombstones = 0;
89
+
errors = 0;
90
+
last_seq = 0L;
91
+
}
92
+
93
+
let update_stats stats = function
94
+
| Firehose.Commit evt ->
95
+
stats.commits <- stats.commits + 1;
96
+
stats.last_seq <- evt.seq
97
+
| Firehose.Identity evt ->
98
+
stats.identities <- stats.identities + 1;
99
+
stats.last_seq <- evt.seq
100
+
| Firehose.Account evt ->
101
+
stats.accounts <- stats.accounts + 1;
102
+
stats.last_seq <- evt.seq
103
+
| Firehose.Handle evt ->
104
+
stats.handles <- stats.handles + 1;
105
+
stats.last_seq <- evt.seq
106
+
| Firehose.Tombstone evt ->
107
+
stats.tombstones <- stats.tombstones + 1;
108
+
stats.last_seq <- evt.seq
109
+
| Firehose.Info _ -> ()
110
+
| Firehose.StreamError _ -> stats.errors <- stats.errors + 1
111
+
112
+
let print_stats stats =
113
+
Printf.printf "\n--- Statistics ---\n";
114
+
Printf.printf "Commits: %d\n" stats.commits;
115
+
Printf.printf "Identities: %d\n" stats.identities;
116
+
Printf.printf "Accounts: %d\n" stats.accounts;
117
+
Printf.printf "Handles: %d\n" stats.handles;
118
+
Printf.printf "Tombstones: %d\n" stats.tombstones;
119
+
Printf.printf "Errors: %d\n" stats.errors;
120
+
Printf.printf "Last seq: %Ld\n" stats.last_seq;
121
+
Printf.printf "------------------\n%!"
122
+
123
+
(** Demo: Create sample firehose frames and decode them *)
124
+
let demo_frame_decoding () =
125
+
print_endline "=== Frame Decoding Demo ===\n";
126
+
print_endline "The firehose sends binary WebSocket frames containing:";
127
+
print_endline " 1. Header (DAG-CBOR): {\"op\": 1, \"t\": \"#commit\"}";
128
+
print_endline " 2. Payload (DAG-CBOR): Event-specific data\n";
129
+
130
+
(* Create sample events to show what they look like *)
131
+
let sample_commit =
132
+
Firehose.Commit
133
+
{
134
+
seq = 1234567890L;
135
+
repo = "did:plc:z72i7hdynmk6r22z27h6tvur";
136
+
rev = "3lbfm5ybmaf2v";
137
+
since = Some "3lbfm5yblaf2u";
138
+
commit = Atproto_ipld.Cid.of_dag_cbor "sample commit data";
139
+
blocks = "";
140
+
ops =
141
+
[
142
+
{
143
+
action = `Create;
144
+
path = "app.bsky.feed.post/3lbfm5ybmaf2v";
145
+
cid = None;
146
+
};
147
+
];
148
+
too_big = false;
149
+
}
150
+
in
151
+
152
+
let sample_like =
153
+
Firehose.Commit
154
+
{
155
+
seq = 1234567891L;
156
+
repo = "did:plc:ewvi7nxzyoun6zhxrhs64oiz";
157
+
rev = "3lbfm5ycdef2w";
158
+
since = None;
159
+
commit = Atproto_ipld.Cid.of_dag_cbor "sample like data";
160
+
blocks = "";
161
+
ops =
162
+
[
163
+
{
164
+
action = `Create;
165
+
path = "app.bsky.feed.like/3lbfm5ycdef2w";
166
+
cid = None;
167
+
};
168
+
];
169
+
too_big = false;
170
+
}
171
+
in
172
+
173
+
let sample_follow =
174
+
Firehose.Commit
175
+
{
176
+
seq = 1234567892L;
177
+
repo = "did:plc:ragtjsm2j2vknwkz3zp4oxrd";
178
+
rev = "3lbfm5ydghi2x";
179
+
since = None;
180
+
commit = Atproto_ipld.Cid.of_dag_cbor "sample follow data";
181
+
blocks = "";
182
+
ops =
183
+
[
184
+
{
185
+
action = `Create;
186
+
path = "app.bsky.graph.follow/3lbfm5ydghi2x";
187
+
cid = None;
188
+
};
189
+
];
190
+
too_big = false;
191
+
}
192
+
in
193
+
194
+
let sample_identity =
195
+
Firehose.Identity
196
+
{
197
+
seq = 1234567893L;
198
+
did = "did:plc:z72i7hdynmk6r22z27h6tvur";
199
+
time = "2024-12-28T12:00:00.000Z";
200
+
handle = Some "alice.bsky.social";
201
+
}
202
+
in
203
+
204
+
let sample_handle =
205
+
Firehose.Handle
206
+
{
207
+
seq = 1234567894L;
208
+
did = "did:plc:ewvi7nxzyoun6zhxrhs64oiz";
209
+
time = "2024-12-28T12:00:01.000Z";
210
+
handle = "bob.example.com";
211
+
}
212
+
in
213
+
214
+
let sample_account =
215
+
Firehose.Account
216
+
{
217
+
seq = 1234567895L;
218
+
did = "did:plc:ragtjsm2j2vknwkz3zp4oxrd";
219
+
time = "2024-12-28T12:00:02.000Z";
220
+
active = true;
221
+
status = Some "active";
222
+
}
223
+
in
224
225
+
let sample_tombstone =
226
+
Firehose.Tombstone
227
+
{
228
+
seq = 1234567896L;
229
+
did = "did:plc:deleted123";
230
+
time = "2024-12-28T12:00:03.000Z";
231
+
}
232
+
in
233
234
+
let sample_info =
235
+
Firehose.Info
236
+
{ name = "OutdatedCursor"; message = Some "Requested cursor is too old" }
237
+
in
238
+
239
+
let samples =
240
+
[
241
+
sample_commit;
242
+
sample_like;
243
+
sample_follow;
244
+
sample_identity;
245
+
sample_handle;
246
+
sample_account;
247
+
sample_tombstone;
248
+
sample_info;
249
+
]
250
+
in
251
+
252
+
print_endline "Sample firehose events:\n";
253
+
254
+
let stats = create_stats () in
255
+
List.iter
256
+
(fun event ->
257
+
print_endline (" " ^ format_event event);
258
+
update_stats stats event)
259
+
samples;
260
+
261
+
print_stats stats
262
+
263
+
(** Demo: Test actual frame encoding/decoding *)
264
+
let demo_roundtrip () =
265
+
print_endline "\n=== Frame Encoding/Decoding Test ===\n";
266
+
267
+
(* Create a sample commit frame manually using DAG-CBOR *)
268
+
let open Atproto_ipld in
269
+
(* Header: {"op": 1, "t": "#commit"} *)
270
+
let header =
271
+
Dag_cbor.Map [ ("op", Dag_cbor.Int 1L); ("t", Dag_cbor.String "#commit") ]
272
+
in
273
+
274
+
(* Payload: commit event data *)
275
+
let commit_cid = Cid.of_dag_cbor "test commit content" in
276
+
let payload =
277
+
Dag_cbor.Map
278
+
[
279
+
("seq", Dag_cbor.Int 999L);
280
+
("repo", Dag_cbor.String "did:plc:testuser123");
281
+
("rev", Dag_cbor.String "3abc123xyz");
282
+
("commit", Dag_cbor.Link commit_cid);
283
+
("blocks", Dag_cbor.Bytes "");
284
+
( "ops",
285
+
Dag_cbor.Array
286
+
[
287
+
Dag_cbor.Map
288
+
[
289
+
("action", Dag_cbor.String "create");
290
+
("path", Dag_cbor.String "app.bsky.feed.post/abc123");
291
+
];
292
+
] );
293
+
("tooBig", Dag_cbor.Bool false);
294
+
]
295
+
in
296
+
297
+
(* Encode header + payload *)
298
+
let header_bytes = Dag_cbor.encode header in
299
+
let payload_bytes = Dag_cbor.encode payload in
300
+
let frame = header_bytes ^ payload_bytes in
301
+
302
+
Printf.printf "Created test frame: %d bytes (header: %d, payload: %d)\n"
303
+
(String.length frame)
304
+
(String.length header_bytes)
305
+
(String.length payload_bytes);
306
+
307
+
(* Decode the frame *)
308
+
match Firehose.decode_frame frame with
309
+
| Ok event ->
310
+
Printf.printf "Decoded: %s\n" (format_event event);
311
+
print_endline "Frame roundtrip successful!"
312
+
| Error e -> Printf.printf "Decode error: %s\n" (Firehose.error_to_string e)
313
314
+
(** Instructions for connecting to real firehose *)
315
+
let print_connection_info () =
316
+
print_endline "\n=== Connecting to the Real Firehose ===\n";
317
+
print_endline
318
+
"The Bluesky firehose is at: \
319
+
wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos";
320
print_endline "";
321
+
print_endline "To connect, you need:";
322
+
print_endline " 1. A WebSocket client library (e.g., httpun-ws-eio)";
323
+
print_endline " 2. TLS support for WSS connections (e.g., tls-eio)";
324
print_endline "";
325
+
print_endline "Install required packages:";
326
+
print_endline " opam install httpun-ws-eio tls-eio ca-certs-nss";
327
print_endline "";
328
+
print_endline "The firehose module provides:";
329
+
print_endline " - Firehose.decode_frame : string -> (event, error) result";
330
print_endline
331
+
" - Event types: Commit, Identity, Account, Handle, Tombstone, Info";
332
+
print_endline " - Helper functions: event_seq, event_did";
333
print_endline "";
334
+
print_endline "Example integration pattern:";
335
print_endline
336
+
{|
337
+
(* In your WebSocket message handler *)
338
+
let on_message frame_data =
339
+
match Firehose.decode_frame frame_data with
340
+
| Ok (Firehose.Commit evt) ->
341
+
(* Process commit - new posts, likes, follows, etc. *)
342
+
List.iter (fun op ->
343
+
match op.action, op.path with
344
+
| `Create, path when String.starts_with ~prefix:"app.bsky.feed.post/" path ->
345
+
handle_new_post evt.repo path
346
+
| `Create, path when String.starts_with ~prefix:"app.bsky.feed.like/" path ->
347
+
handle_new_like evt.repo path
348
+
| _ -> ()
349
+
) evt.ops
350
+
| Ok (Firehose.Identity evt) ->
351
+
(* Handle identity change *)
352
+
handle_identity_change evt.did evt.handle
353
+
| Ok _ -> () (* Other event types *)
354
+
| Error e ->
355
+
log_error (Firehose.error_to_string e)
356
+
|};
357
+
print_endline ""
358
+
359
+
(** Main entry point *)
360
+
let () =
361
+
print_endline "AT Protocol Firehose Demo";
362
+
print_endline "=========================";
363
print_endline "";
364
+
365
+
demo_frame_decoding ();
366
+
demo_roundtrip ();
367
+
print_connection_info ();
368
+
369
+
print_endline "Demo complete!"