this repo has no description
at main 15 kB view raw
1const std = @import("std"); 2const sqlite = @import("sqlite"); 3const xev = @import("xev"); 4 5const db = @import("db.zig"); 6const irc = @import("irc.zig"); 7const log = @import("log.zig"); 8 9const Connection = Server.Connection; 10const HeapArena = @import("HeapArena.zig"); 11const Server = @import("Server.zig"); 12const User = irc.User; 13const WakeupResult = Server.WakeupResult; 14const WorkerQueue = Server.WorkerQueue; 15 16const DidDocument = struct { 17 alsoKnownAs: []const []const u8, 18 service: []const Service, 19}; 20 21const Service = struct { 22 id: []const u8, 23 type: []const u8, 24 serviceEndpoint: []const u8, 25}; 26 27pub fn authenticateConnection( 28 arena: HeapArena, 29 client: *std.http.Client, 30 queue: *WorkerQueue, 31 pool: *sqlite.Pool, 32 fd: xev.TCP, 33 handle: []const u8, 34 password: []const u8, 35) !void { 36 const did = resolveHandle(arena.allocator(), client, handle) catch |err| { 37 log.err("resolving handle: {}", .{err}); 38 queue.push(.{ 39 .auth_failure = .{ 40 .arena = arena, 41 .fd = fd, 42 .msg = "error resolving handle", 43 }, 44 }); 45 return; 46 }; 47 48 const did_doc = resolveDid(arena.allocator(), client, did) catch |err| { 49 log.err("resolving did: {}", .{err}); 50 queue.push(.{ 51 .auth_failure = .{ 52 .arena = arena, 53 .fd = fd, 54 .msg = "error resolving DID", 55 }, 56 }); 57 return; 58 }; 59 60 const scheme = "at://"; 61 // Backwards verify the alsoKnownAs field 62 for (did_doc.alsoKnownAs) |also_known_as| { 63 if (!std.mem.startsWith(u8, also_known_as, scheme)) continue; 64 if (std.mem.eql(u8, also_known_as[scheme.len..], handle)) break; 65 } else { 66 log.err("handle doesn't match DID Document", .{}); 67 queue.push(.{ 68 .auth_failure = .{ 69 .arena = arena, 70 .fd = fd, 71 .msg = "handle doesn't match DID document", 72 }, 73 }); 74 return; 75 } 76 77 const endpoint: []const u8 = for (did_doc.service) |service| { 78 // Spec says id "ending with #atproto_pds" 79 if (std.mem.endsWith(u8, service.id, "#atproto_pds") and 80 std.ascii.eqlIgnoreCase(service.type, "AtprotoPersonalDataServer")) 81 break service.serviceEndpoint; 82 } else { 83 queue.push(.{ 84 .auth_failure = .{ 85 .arena = arena, 86 .fd = fd, 87 .msg = "DID Document has no #atproto_pds", 88 }, 89 }); 90 return; 91 }; 92 93 // Now we have the endpoint and the DID. 94 95 const result = authenticate(arena, client, pool, fd, handle, password, did, endpoint) catch { 96 log.warn("couldn't authenticate", .{}); 97 queue.push(.{ 98 .auth_failure = .{ 99 .arena = arena, 100 .fd = fd, 101 .msg = "failed to authenticate", 102 }, 103 }); 104 return; 105 }; 106 107 queue.push(result); 108} 109 110/// Resolves a handle to a DID 111fn resolveHandle(arena: std.mem.Allocator, client: *std.http.Client, handle: []const u8) ![]const u8 { 112 // We don't look in the db for this handle. Handles can change, and we don't know if they 113 // can be reused...so we just always resolve the handle to a DID via BlueSky 114 115 if (handle.len >= 253) { 116 return error.InvalidHandle; 117 } 118 119 var buf: [512]u8 = undefined; 120 const query = std.fmt.bufPrint(&buf, "handle={s}", .{handle}) catch unreachable; 121 const uri: std.Uri = .{ 122 .scheme = "https", 123 .host = .{ .raw = "bsky.social" }, 124 .path = .{ .raw = "/xrpc/com.atproto.identity.resolveHandle" }, 125 .query = .{ .raw = query }, 126 }; 127 128 var storage = std.ArrayList(u8).init(arena); 129 const req: std.http.Client.FetchOptions = .{ 130 .response_storage = .{ .dynamic = &storage }, 131 .location = .{ .uri = uri }, 132 .method = .GET, 133 }; 134 135 const result = try fetch(client, req); 136 137 switch (result.status) { 138 .ok => { 139 const Response = struct { 140 did: []const u8, 141 }; 142 const resp = try std.json.parseFromSliceLeaky(Response, arena, storage.items, .{}); 143 return resp.did; 144 }, 145 .bad_request, .unauthorized => { 146 const Response = struct { 147 @"error": []const u8, 148 message: []const u8, 149 }; 150 const resp = try std.json.parseFromSliceLeaky(Response, arena, storage.items, .{}); 151 log.err("resolving handle: {s}: {s}", .{ resp.@"error", resp.message }); 152 return error.BadRequest; 153 }, 154 else => log.err("resolving handle: {d}", .{result.status}), 155 } 156 return error.BadRequest; 157} 158 159/// Resolves a DID to a DID Document 160fn resolveDid(arena: std.mem.Allocator, client: *std.http.Client, did: []const u8) !DidDocument { 161 var iter = std.mem.splitScalar(u8, did, ':'); 162 _ = iter.next() orelse return error.InvalidDID; 163 const method = iter.next() orelse return error.InvalidDID; 164 const id = iter.next() orelse return error.InvalidDID; 165 166 const uri: std.Uri = 167 if (std.mem.eql(u8, "web", method)) blk: { 168 break :blk .{ 169 .scheme = "https", 170 .host = .{ .raw = id }, 171 .path = .{ .raw = "/.well-known/did.json" }, 172 }; 173 } else if (std.mem.eql(u8, "plc", method)) blk: { 174 // We have to add a leading slash 175 const path = std.fmt.allocPrint(arena, "/{s}", .{did}) catch unreachable; 176 break :blk .{ 177 .scheme = "https", 178 .host = .{ .raw = "plc.directory" }, 179 .path = .{ .raw = path }, 180 }; 181 } else return error.InvalidDID; 182 183 var storage = std.ArrayList(u8).init(arena); 184 const req: std.http.Client.FetchOptions = .{ 185 .response_storage = .{ .dynamic = &storage }, 186 .location = .{ .uri = uri }, 187 .method = .GET, 188 }; 189 190 const result = try fetch(client, req); 191 192 switch (result.status) { 193 .ok => { 194 const resp = try std.json.parseFromSliceLeaky( 195 DidDocument, 196 arena, 197 storage.items, 198 .{ .ignore_unknown_fields = true }, 199 ); 200 return resp; 201 }, 202 .not_found => return error.DidNotFound, 203 else => log.err("resolving did: {s}: {d}", .{ did, result.status }), 204 } 205 return error.BadRequest; 206} 207 208fn authenticate( 209 arena: HeapArena, 210 client: *std.http.Client, 211 pool: *sqlite.Pool, 212 fd: xev.TCP, 213 handle: []const u8, 214 password: []const u8, 215 did: []const u8, 216 endpoint: []const u8, 217) !WakeupResult { 218 const db_conn = pool.acquire(); 219 defer pool.release(db_conn); 220 { 221 // We delete all expired tokens 222 const sql = 223 \\DELETE FROM user_tokens 224 \\WHERE refresh_expiry < ?; 225 ; 226 try db_conn.exec(sql, .{std.time.timestamp()}); 227 } 228 229 // First we see if we have this user in the user_tokens table. 230 const sql = 231 \\SELECT id, password_hash, refresh_token 232 \\FROM user_tokens 233 \\WHERE user_id = (SELECT id FROM users WHERE did = ?); 234 ; 235 236 // There could be multiple entries. We'll check them all 237 var rows = try db_conn.rows(sql, .{did}); 238 defer rows.deinit(); 239 240 const id: i64, const refresh_token: []const u8 = while (rows.next()) |row| { 241 const hash = row.text(1); 242 std.crypto.pwhash.bcrypt.strVerify( 243 hash, 244 password, 245 .{ .silently_truncate_password = false }, 246 ) catch |err| { 247 log.warn("bcrypt verification fail: {}", .{err}); 248 continue; 249 }; 250 break .{ row.int(0), row.text(2) }; 251 } else { 252 // None of them matched. Maybe this is a new app password. We try to authenticate 253 // that way 254 return createSession(arena, client, pool, fd, handle, password, did, endpoint); 255 }; 256 257 return refreshSession(arena, client, pool, fd, handle, did, endpoint, refresh_token, id); 258} 259 260fn refreshSession( 261 arena: HeapArena, 262 client: *std.http.Client, 263 pool: *sqlite.Pool, 264 fd: xev.TCP, 265 handle: []const u8, 266 did: []const u8, 267 endpoint: []const u8, 268 token: []const u8, 269 row_id: i64, // the row id we need to update with the new refresh token 270) !WakeupResult { 271 log.debug("refreshing session", .{}); 272 const route = "/xrpc/com.atproto.server.refreshSession"; 273 // Do the api call 274 var uri = try std.Uri.parse(endpoint); 275 if (uri.path.isEmpty()) { 276 uri.path = .{ .raw = route }; 277 } else { 278 const original = try uri.path.toRawMaybeAlloc(arena.allocator()); 279 const trim = std.mem.trimRight(u8, original, "/"); 280 const new = try std.mem.concat(arena.allocator(), u8, &.{ trim, route }); 281 uri.path = .{ .raw = new }; 282 } 283 284 const auth_header = try std.fmt.allocPrint(arena.allocator(), "Bearer {s}", .{token}); 285 286 var storage = std.ArrayList(u8).init(arena.allocator()); 287 const req: std.http.Client.FetchOptions = .{ 288 .response_storage = .{ .dynamic = &storage }, 289 .location = .{ .uri = uri }, 290 .method = .POST, 291 .headers = .{ 292 .authorization = .{ .override = auth_header }, 293 }, 294 }; 295 296 const result = try fetch(client, req); 297 298 const new_token: []const u8 = switch (result.status) { 299 .ok => blk: { 300 const Response = struct { 301 refreshJwt: []const u8, 302 }; 303 const resp = try std.json.parseFromSliceLeaky( 304 Response, 305 arena.allocator(), 306 storage.items, 307 .{ .ignore_unknown_fields = true }, 308 ); 309 break :blk resp.refreshJwt; 310 }, 311 else => { 312 log.err("refreshing session: {s}: {d}: {s}", .{ handle, result.status, storage.items }); 313 // Delete the row to force creation of a new session 314 const db_conn = pool.acquire(); 315 defer pool.release(db_conn); 316 db_conn.exec("DELETE FROM user_tokens WHERE id = ?", .{row_id}) catch {}; 317 return error.BadRequest; 318 }, 319 }; 320 321 const expiry = std.time.timestamp() + std.time.s_per_day * 28; 322 323 const sql = 324 \\UPDATE user_tokens 325 \\SET refresh_token = ?, refresh_expiry = ? 326 \\WHERE id = ?; 327 ; 328 329 const db_conn = pool.acquire(); 330 defer pool.release(db_conn); 331 332 db_conn.exec(sql, .{ new_token, expiry, row_id }) catch |err| { 333 log.err("saving refresh token to db: {}: {s}", .{ err, db_conn.lastError() }); 334 // Try to delete the row 335 db_conn.exec("DELETE FROM user_tokens WHERE id = ?", .{row_id}) catch {}; 336 }; 337 338 log.debug("session saved", .{}); 339 return .{ 340 .auth_success = .{ 341 .arena = arena, 342 .fd = fd, 343 .nick = handle, 344 .user = did, 345 .avatar_url = "", 346 .realname = "", 347 }, 348 }; 349} 350 351fn createSession( 352 arena: HeapArena, 353 client: *std.http.Client, 354 pool: *sqlite.Pool, 355 fd: xev.TCP, 356 handle: []const u8, 357 password: []const u8, 358 did: []const u8, 359 endpoint: []const u8, 360) !WakeupResult { 361 log.debug("creating new session: handle={s}", .{handle}); 362 const route = "/xrpc/com.atproto.server.createSession"; 363 // Do the api call 364 var uri = try std.Uri.parse(endpoint); 365 if (uri.path.isEmpty()) { 366 uri.path = .{ .raw = route }; 367 } else { 368 const original = try uri.path.toRawMaybeAlloc(arena.allocator()); 369 const trim = std.mem.trimRight(u8, original, "/"); 370 const new = try std.mem.concat(arena.allocator(), u8, &.{ trim, route }); 371 uri.path = .{ .raw = new }; 372 } 373 374 const payload = try std.fmt.allocPrint( 375 arena.allocator(), 376 "{{\"identifier\":\"{s}\",\"password\":\"{s}\"}}", 377 .{ handle, password }, 378 ); 379 380 var storage = std.ArrayList(u8).init(arena.allocator()); 381 const req: std.http.Client.FetchOptions = .{ 382 .response_storage = .{ .dynamic = &storage }, 383 .location = .{ .uri = uri }, 384 .method = .POST, 385 .payload = payload, 386 .headers = .{ .content_type = .{ .override = "application/json" } }, 387 }; 388 389 const result = try fetch(client, req); 390 391 const refresh_jwt: []const u8 = switch (result.status) { 392 .ok => blk: { 393 const Response = struct { 394 refreshJwt: []const u8, 395 }; 396 const resp = try std.json.parseFromSliceLeaky( 397 Response, 398 arena.allocator(), 399 storage.items, 400 .{ .ignore_unknown_fields = true }, 401 ); 402 break :blk resp.refreshJwt; 403 }, 404 else => { 405 log.err("creating session: {s}: {d}: {s}", .{ handle, result.status, storage.items }); 406 return error.BadRequest; 407 }, 408 }; 409 410 var buf: [256]u8 = undefined; 411 const opts: std.crypto.pwhash.bcrypt.HashOptions = .{ 412 .params = .owasp, 413 .encoding = .crypt, 414 }; 415 416 const password_hash = try std.crypto.pwhash.bcrypt.strHash( 417 password, 418 opts, 419 &buf, 420 ); 421 422 const expiry = std.time.timestamp() + std.time.s_per_day * 28; 423 424 { 425 // HACK: We create a dummy user and do a store. This ensures the user exists in our 426 // query below. 427 428 var user: User = .init(); 429 user.nick = handle; 430 user.username = did; 431 try db.storeUser(pool, &user); 432 } 433 434 const sql = 435 \\INSERT INTO user_tokens (user_id, refresh_token, refresh_expiry, password_hash) 436 \\VALUES ( 437 \\ (SELECT id FROM users WHERE did = ?), 438 \\ ?, -- token 439 \\ ?, -- expiry 440 \\ ? -- password hash 441 \\); 442 ; 443 444 const db_conn = pool.acquire(); 445 defer pool.release(db_conn); 446 447 db_conn.exec(sql, .{ did, refresh_jwt, expiry, password_hash }) catch |err| { 448 log.err("saving refresh token to db: {}: {s}", .{ err, db_conn.lastError() }); 449 }; 450 451 return .{ 452 .auth_success = .{ 453 .arena = arena, 454 .fd = fd, 455 .nick = handle, 456 .user = did, 457 .avatar_url = "", 458 .realname = "", 459 }, 460 }; 461} 462 463/// Performs a fetch with retries 464fn fetch( 465 client: *std.http.Client, 466 request: std.http.Client.FetchOptions, 467) !std.http.Client.FetchResult { 468 const max_attempts: u2 = 3; 469 var attempts: u2 = 0; 470 while (true) { 471 const result = client.fetch(request) catch |err| { 472 if (attempts == max_attempts) return err; 473 defer attempts += 1; 474 const delay: u64 = @as(u64, 500 * std.time.ns_per_ms) << (attempts + 1); 475 log.warn("request failed, retrying in {d} ms", .{delay / std.time.ns_per_ms}); 476 std.time.sleep(delay); 477 continue; 478 }; 479 return result; 480 } 481}