An ATProto Lexicon validator for Gleam.
1// Example: Validating xyz.statusphere.status records from Jetstream using honk
2//
3// This example connects to Bluesky's Jetstream firehose, filters for
4// xyz.statusphere.status records, and validates them in real-time using honk.
5
6import gleam/dynamic/decode
7import gleam/io
8import gleam/json
9import gleam/option
10import gleam/string
11import goose
12import honk
13import honk/errors.{DataValidation, InvalidSchema, LexiconNotFound}
14
15pub fn main() {
16 io.println("🦢 Honk + Goose: Jetstream Validation Example")
17 io.println("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
18 io.println("")
19 io.println("Connecting to Jetstream...")
20 io.println("Filtering for: xyz.statusphere.status")
21 io.println("Validating records with honk...")
22 io.println("")
23
24 // Define the xyz.statusphere.status lexicon
25 let lexicon = create_statusphere_lexicon()
26
27 // Configure goose to connect to Jetstream
28 let config =
29 goose.JetstreamConfig(
30 endpoint: "wss://jetstream2.us-west.bsky.network/subscribe",
31 wanted_collections: ["xyz.statusphere.status"],
32 wanted_dids: [],
33 cursor: option.None,
34 max_message_size_bytes: option.None,
35 compress: True,
36 require_hello: False,
37 )
38
39 // Start consuming events (this blocks forever)
40 goose.start_consumer(config, handle_event(_, lexicon))
41}
42
43/// Handles each Jetstream event
44fn handle_event(json_event: String, lexicon: json.Json) -> Nil {
45 let event = goose.parse_event(json_event)
46
47 case event {
48 // Handle commit events (create/update/delete)
49 goose.CommitEvent(did, time_us, commit) -> {
50 case commit.operation {
51 "create" -> handle_create(did, time_us, commit, lexicon)
52 "update" -> handle_update(did, time_us, commit, lexicon)
53 "delete" -> handle_delete(did, time_us, commit)
54 _ -> Nil
55 }
56 }
57
58 // Ignore identity and account events for this example
59 goose.IdentityEvent(_, _, _) -> Nil
60 goose.AccountEvent(_, _, _) -> Nil
61 goose.UnknownEvent(raw) -> {
62 io.println("⚠️ Unknown event: " <> raw)
63 }
64 }
65}
66
67/// Handles create operations - validates the new record
68fn handle_create(
69 did: String,
70 _time_us: Int,
71 commit: goose.CommitData,
72 lexicon: json.Json,
73) -> Nil {
74 case commit.record {
75 option.Some(record_dynamic) -> {
76 // Convert Dynamic to JSON for honk validation
77 let record_json = dynamic_to_json(record_dynamic)
78
79 // Validate the record using honk
80 case
81 honk.validate_record([lexicon], "xyz.statusphere.status", record_json)
82 {
83 Ok(_) -> {
84 // Extract status emoji for display
85 let status_emoji = extract_status(record_dynamic)
86 io.println(
87 "✓ VALID | "
88 <> truncate_did(did)
89 <> " | "
90 <> status_emoji
91 <> " | "
92 <> commit.rkey,
93 )
94 }
95 Error(err) -> {
96 io.println(
97 "✗ INVALID | "
98 <> truncate_did(did)
99 <> " | "
100 <> format_error(err)
101 <> " | "
102 <> commit.rkey,
103 )
104 }
105 }
106 }
107 option.None -> {
108 io.println("⚠️ CREATE event without record data")
109 }
110 }
111}
112
113/// Handles update operations - validates the updated record
114fn handle_update(
115 did: String,
116 _time_us: Int,
117 commit: goose.CommitData,
118 lexicon: json.Json,
119) -> Nil {
120 case commit.record {
121 option.Some(record_dynamic) -> {
122 let record_json = dynamic_to_json(record_dynamic)
123
124 case
125 honk.validate_record([lexicon], "xyz.statusphere.status", record_json)
126 {
127 Ok(_) -> {
128 let status_emoji = extract_status(record_dynamic)
129 io.println(
130 "✓ UPDATED | "
131 <> truncate_did(did)
132 <> " | "
133 <> status_emoji
134 <> " | "
135 <> commit.rkey,
136 )
137 }
138 Error(err) -> {
139 io.println(
140 "✗ INVALID | "
141 <> truncate_did(did)
142 <> " | "
143 <> format_error(err)
144 <> " | "
145 <> commit.rkey,
146 )
147 }
148 }
149 }
150 option.None -> {
151 io.println("⚠️ UPDATE event without record data")
152 }
153 }
154}
155
156/// Handles delete operations - no validation needed
157fn handle_delete(did: String, _time_us: Int, commit: goose.CommitData) -> Nil {
158 io.println("🗑️ DELETED | " <> truncate_did(did) <> " | " <> commit.rkey)
159}
160
161/// Creates the xyz.statusphere.status lexicon definition
162fn create_statusphere_lexicon() -> json.Json {
163 json.object([
164 #("lexicon", json.int(1)),
165 #("id", json.string("xyz.statusphere.status")),
166 #(
167 "defs",
168 json.object([
169 #(
170 "main",
171 json.object([
172 #("type", json.string("record")),
173 #("key", json.string("tid")),
174 #(
175 "record",
176 json.object([
177 #("type", json.string("object")),
178 #(
179 "required",
180 json.preprocessed_array([
181 json.string("status"),
182 json.string("createdAt"),
183 ]),
184 ),
185 #(
186 "properties",
187 json.object([
188 #(
189 "status",
190 json.object([
191 #("type", json.string("string")),
192 #("minLength", json.int(1)),
193 #("maxGraphemes", json.int(1)),
194 #("maxLength", json.int(32)),
195 ]),
196 ),
197 #(
198 "createdAt",
199 json.object([
200 #("type", json.string("string")),
201 #("format", json.string("datetime")),
202 ]),
203 ),
204 ]),
205 ),
206 ]),
207 ),
208 ]),
209 ),
210 ]),
211 ),
212 ])
213}
214
215/// Converts Dynamic to Json (they're the same underlying type)
216@external(erlang, "gleam@dynamic", "unsafe_coerce")
217fn dynamic_to_json(value: decode.Dynamic) -> json.Json
218
219/// Extracts the status emoji from a record for display
220fn extract_status(record: decode.Dynamic) -> String {
221 let decoder = {
222 use status <- decode.field("status", decode.string)
223 decode.success(status)
224 }
225 case decode.run(record, decoder) {
226 Ok(status) -> status
227 Error(_) -> "�"
228 }
229}
230
231/// Formats a validation error for display
232fn format_error(err: honk.ValidationError) -> String {
233 case err {
234 InvalidSchema(msg) -> "Schema: " <> msg
235 DataValidation(msg) -> "Data: " <> msg
236 LexiconNotFound(id) -> "Not found: " <> id
237 }
238}
239
240/// Truncates a DID for cleaner display
241fn truncate_did(did: String) -> String {
242 case string.split(did, ":") {
243 [_, _, suffix] ->
244 case string.length(suffix) > 12 {
245 True -> string.slice(suffix, 0, 12) <> "..."
246 False -> suffix
247 }
248 _ -> did
249 }
250}