Highly ambitious ATProtocol AppView service and sdks

add sync job queues using sqlxmq, display job history on slice sync page

+11
api/.env.example
··· 1 + # Database configuration 1 2 DATABASE_URL=postgresql://slice:slice@localhost:5432/slice 3 + 4 + # Server configuration 5 + PORT=3000 6 + 7 + # Authentication service base URL 2 8 AUTH_BASE_URL=http://localhost:8081 9 + 10 + # AT Protocol relay endpoint for syncing data 11 + RELAY_ENDPOINT=https://relay1.us-west.bsky.network 12 + 13 + # Logging level 3 14 RUST_LOG=debug
+23
api/.sqlx/query-1773b673407c9c36e9fc66af2cef4ea1b28adfc37bbb5f194056f5a26d077639.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO job_results (\n job_id, user_did, slice_uri, status, success, total_records,\n collections_synced, repos_processed, message, error_message\n ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)\n ON CONFLICT (job_id) \n DO UPDATE SET\n status = EXCLUDED.status,\n success = EXCLUDED.success,\n total_records = EXCLUDED.total_records,\n collections_synced = EXCLUDED.collections_synced,\n repos_processed = EXCLUDED.repos_processed,\n message = EXCLUDED.message,\n error_message = EXCLUDED.error_message,\n completed_at = NOW()\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "Text", 10 + "Text", 11 + "Text", 12 + "Bool", 13 + "Int8", 14 + "Jsonb", 15 + "Int8", 16 + "Text", 17 + "Text" 18 + ] 19 + }, 20 + "nullable": [] 21 + }, 22 + "hash": "1773b673407c9c36e9fc66af2cef4ea1b28adfc37bbb5f194056f5a26d077639" 23 + }
+29
api/.sqlx/query-414527b47d83fa224abc41aab8f5ee85ce45430cd0ec6e5414a5f4d339050286.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT \"uri\", \"cid\"\n FROM \"record\"\n WHERE \"did\" = $1 AND \"collection\" = $2", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "uri", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "cid", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [ 19 + "Text", 20 + "Text" 21 + ] 22 + }, 23 + "nullable": [ 24 + false, 25 + false 26 + ] 27 + }, 28 + "hash": "414527b47d83fa224abc41aab8f5ee85ce45430cd0ec6e5414a5f4d339050286" 29 + }
+90
api/.sqlx/query-644745eb5e81daafd5f77110aea2c812d0d4cbd60d69f98d29c90af8ddb350f4.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT \n job_id, user_did, slice_uri, status, success, total_records,\n collections_synced, repos_processed, message, error_message,\n created_at, completed_at\n FROM job_results \n WHERE user_did = $1 AND slice_uri = $2\n ORDER BY created_at DESC\n LIMIT $3\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "job_id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "user_did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "slice_uri", 19 + "type_info": "Text" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "status", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "success", 29 + "type_info": "Bool" 30 + }, 31 + { 32 + "ordinal": 5, 33 + "name": "total_records", 34 + "type_info": "Int8" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "collections_synced", 39 + "type_info": "Jsonb" 40 + }, 41 + { 42 + "ordinal": 7, 43 + "name": "repos_processed", 44 + "type_info": "Int8" 45 + }, 46 + { 47 + "ordinal": 8, 48 + "name": "message", 49 + "type_info": "Text" 50 + }, 51 + { 52 + "ordinal": 9, 53 + "name": "error_message", 54 + "type_info": "Text" 55 + }, 56 + { 57 + "ordinal": 10, 58 + "name": "created_at", 59 + "type_info": "Timestamptz" 60 + }, 61 + { 62 + "ordinal": 11, 63 + "name": "completed_at", 64 + "type_info": "Timestamptz" 65 + } 66 + ], 67 + "parameters": { 68 + "Left": [ 69 + "Text", 70 + "Text", 71 + "Int8" 72 + ] 73 + }, 74 + "nullable": [ 75 + false, 76 + false, 77 + false, 78 + false, 79 + false, 80 + false, 81 + false, 82 + false, 83 + false, 84 + true, 85 + false, 86 + false 87 + ] 88 + }, 89 + "hash": "644745eb5e81daafd5f77110aea2c812d0d4cbd60d69f98d29c90af8ddb350f4" 90 + }
-20
api/.sqlx/query-825a803ba0fae58f2f29a46901f7283fb227cb62749564fcc6a9953a2812ecdc.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT COUNT(*) as count FROM record", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "count", 9 - "type_info": "Int8" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [] 14 - }, 15 - "nullable": [ 16 - null 17 - ] 18 - }, 19 - "hash": "825a803ba0fae58f2f29a46901f7283fb227cb62749564fcc6a9953a2812ecdc" 20 - }
+46
api/.sqlx/query-d0a3202bcfadd162663281444b1650ae4aaa1f80a8410aea2b1553a15a61ce86.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT \n m.id,\n m.created_at,\n m.attempt_at,\n m.attempts,\n p.payload_json\n FROM mq_msgs m \n LEFT JOIN mq_payloads p ON m.id = p.id\n WHERE p.payload_json::jsonb ->> 'job_id' = $1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "created_at", 14 + "type_info": "Timestamptz" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "attempt_at", 19 + "type_info": "Timestamptz" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "attempts", 24 + "type_info": "Int4" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "payload_json", 29 + "type_info": "Jsonb" 30 + } 31 + ], 32 + "parameters": { 33 + "Left": [ 34 + "Text" 35 + ] 36 + }, 37 + "nullable": [ 38 + false, 39 + true, 40 + true, 41 + false, 42 + true 43 + ] 44 + }, 45 + "hash": "d0a3202bcfadd162663281444b1650ae4aaa1f80a8410aea2b1553a15a61ce86" 46 + }
+88
api/.sqlx/query-e1053720f38d7ac898d980901bd7d58a6e85277a7c7b2191cc6ab818dbfbe3b5.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT \n job_id, user_did, slice_uri, status, success, total_records,\n collections_synced, repos_processed, message, error_message,\n created_at, completed_at\n FROM job_results \n WHERE job_id = $1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "job_id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "user_did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "slice_uri", 19 + "type_info": "Text" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "status", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "success", 29 + "type_info": "Bool" 30 + }, 31 + { 32 + "ordinal": 5, 33 + "name": "total_records", 34 + "type_info": "Int8" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "collections_synced", 39 + "type_info": "Jsonb" 40 + }, 41 + { 42 + "ordinal": 7, 43 + "name": "repos_processed", 44 + "type_info": "Int8" 45 + }, 46 + { 47 + "ordinal": 8, 48 + "name": "message", 49 + "type_info": "Text" 50 + }, 51 + { 52 + "ordinal": 9, 53 + "name": "error_message", 54 + "type_info": "Text" 55 + }, 56 + { 57 + "ordinal": 10, 58 + "name": "created_at", 59 + "type_info": "Timestamptz" 60 + }, 61 + { 62 + "ordinal": 11, 63 + "name": "completed_at", 64 + "type_info": "Timestamptz" 65 + } 66 + ], 67 + "parameters": { 68 + "Left": [ 69 + "Uuid" 70 + ] 71 + }, 72 + "nullable": [ 73 + false, 74 + false, 75 + false, 76 + false, 77 + false, 78 + false, 79 + false, 80 + false, 81 + false, 82 + true, 83 + false, 84 + false 85 + ] 86 + }, 87 + "hash": "e1053720f38d7ac898d980901bd7d58a6e85277a7c7b2191cc6ab818dbfbe3b5" 88 + }
+76 -23
api/Cargo.lock
··· 54 54 checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" 55 55 56 56 [[package]] 57 + name = "anymap2" 58 + version = "0.13.0" 59 + source = "registry+https://github.com/rust-lang/crates.io-index" 60 + checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" 61 + 62 + [[package]] 57 63 name = "async-trait" 58 64 version = "0.1.89" 59 65 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 61 67 dependencies = [ 62 68 "proc-macro2", 63 69 "quote", 64 - "syn", 70 + "syn 2.0.106", 65 71 ] 66 72 67 73 [[package]] ··· 272 278 dependencies = [ 273 279 "proc-macro2", 274 280 "quote", 275 - "syn", 281 + "syn 2.0.106", 276 282 ] 277 283 278 284 [[package]] ··· 557 563 checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" 558 564 dependencies = [ 559 565 "data-encoding", 560 - "syn", 566 + "syn 2.0.106", 561 567 ] 562 568 563 569 [[package]] ··· 591 597 dependencies = [ 592 598 "proc-macro2", 593 599 "quote", 594 - "syn", 600 + "syn 2.0.106", 595 601 ] 596 602 597 603 [[package]] ··· 666 672 "heck", 667 673 "proc-macro2", 668 674 "quote", 669 - "syn", 675 + "syn 2.0.106", 670 676 ] 671 677 672 678 [[package]] ··· 822 828 dependencies = [ 823 829 "proc-macro2", 824 830 "quote", 825 - "syn", 831 + "syn 2.0.106", 826 832 ] 827 833 828 834 [[package]] ··· 1733 1739 dependencies = [ 1734 1740 "proc-macro2", 1735 1741 "quote", 1736 - "syn", 1742 + "syn 2.0.106", 1737 1743 ] 1738 1744 1739 1745 [[package]] ··· 2391 2397 dependencies = [ 2392 2398 "proc-macro2", 2393 2399 "quote", 2394 - "syn", 2400 + "syn 2.0.106", 2395 2401 ] 2396 2402 2397 2403 [[package]] ··· 2544 2550 "serde", 2545 2551 "serde_json", 2546 2552 "sqlx", 2553 + "sqlxmq", 2547 2554 "thiserror 1.0.69", 2548 2555 "tokio", 2549 2556 "tower", ··· 2636 2643 "indexmap", 2637 2644 "log", 2638 2645 "memchr", 2646 + "native-tls", 2639 2647 "once_cell", 2640 2648 "percent-encoding", 2641 2649 "rustls", ··· 2648 2656 "tokio-stream", 2649 2657 "tracing", 2650 2658 "url", 2659 + "uuid", 2651 2660 "webpki-roots 0.26.11", 2652 2661 ] 2653 2662 ··· 2661 2670 "quote", 2662 2671 "sqlx-core", 2663 2672 "sqlx-macros-core", 2664 - "syn", 2673 + "syn 2.0.106", 2665 2674 ] 2666 2675 2667 2676 [[package]] ··· 2684 2693 "sqlx-mysql", 2685 2694 "sqlx-postgres", 2686 2695 "sqlx-sqlite", 2687 - "syn", 2696 + "syn 2.0.106", 2688 2697 "tokio", 2689 2698 "url", 2690 2699 ] ··· 2729 2738 "stringprep", 2730 2739 "thiserror 2.0.14", 2731 2740 "tracing", 2741 + "uuid", 2732 2742 "whoami", 2733 2743 ] 2734 2744 ··· 2767 2777 "stringprep", 2768 2778 "thiserror 2.0.14", 2769 2779 "tracing", 2780 + "uuid", 2770 2781 "whoami", 2771 2782 ] 2772 2783 ··· 2793 2804 "thiserror 2.0.14", 2794 2805 "tracing", 2795 2806 "url", 2807 + "uuid", 2808 + ] 2809 + 2810 + [[package]] 2811 + name = "sqlxmq" 2812 + version = "0.6.0" 2813 + source = "registry+https://github.com/rust-lang/crates.io-index" 2814 + checksum = "b2bd99fe20294f548a6298d60b6490871d78d53d739919a13834d77ffd656869" 2815 + dependencies = [ 2816 + "anymap2", 2817 + "chrono", 2818 + "dotenvy", 2819 + "log", 2820 + "serde", 2821 + "serde_json", 2822 + "sqlx", 2823 + "sqlxmq_macros", 2824 + "tokio", 2825 + "uuid", 2826 + ] 2827 + 2828 + [[package]] 2829 + name = "sqlxmq_macros" 2830 + version = "0.6.0" 2831 + source = "registry+https://github.com/rust-lang/crates.io-index" 2832 + checksum = "c791553cc438a36c90c9eeb0c288e44b79ac9f79702b7b3e9f0116450adb6681" 2833 + dependencies = [ 2834 + "proc-macro2", 2835 + "quote", 2836 + "syn 1.0.109", 2796 2837 ] 2797 2838 2798 2839 [[package]] ··· 2820 2861 2821 2862 [[package]] 2822 2863 name = "syn" 2864 + version = "1.0.109" 2865 + source = "registry+https://github.com/rust-lang/crates.io-index" 2866 + checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" 2867 + dependencies = [ 2868 + "proc-macro2", 2869 + "quote", 2870 + "unicode-ident", 2871 + ] 2872 + 2873 + [[package]] 2874 + name = "syn" 2823 2875 version = "2.0.106" 2824 2876 source = "registry+https://github.com/rust-lang/crates.io-index" 2825 2877 checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" ··· 2846 2898 dependencies = [ 2847 2899 "proc-macro2", 2848 2900 "quote", 2849 - "syn", 2901 + "syn 2.0.106", 2850 2902 ] 2851 2903 2852 2904 [[package]] ··· 2915 2967 dependencies = [ 2916 2968 "proc-macro2", 2917 2969 "quote", 2918 - "syn", 2970 + "syn 2.0.106", 2919 2971 ] 2920 2972 2921 2973 [[package]] ··· 2926 2978 dependencies = [ 2927 2979 "proc-macro2", 2928 2980 "quote", 2929 - "syn", 2981 + "syn 2.0.106", 2930 2982 ] 2931 2983 2932 2984 [[package]] ··· 2991 3043 dependencies = [ 2992 3044 "proc-macro2", 2993 3045 "quote", 2994 - "syn", 3046 + "syn 2.0.106", 2995 3047 ] 2996 3048 2997 3049 [[package]] ··· 3117 3169 dependencies = [ 3118 3170 "proc-macro2", 3119 3171 "quote", 3120 - "syn", 3172 + "syn 2.0.106", 3121 3173 ] 3122 3174 3123 3175 [[package]] ··· 3281 3333 dependencies = [ 3282 3334 "getrandom 0.3.3", 3283 3335 "js-sys", 3336 + "serde", 3284 3337 "wasm-bindgen", 3285 3338 ] 3286 3339 ··· 3354 3407 "log", 3355 3408 "proc-macro2", 3356 3409 "quote", 3357 - "syn", 3410 + "syn 2.0.106", 3358 3411 "wasm-bindgen-shared", 3359 3412 ] 3360 3413 ··· 3389 3442 dependencies = [ 3390 3443 "proc-macro2", 3391 3444 "quote", 3392 - "syn", 3445 + "syn 2.0.106", 3393 3446 "wasm-bindgen-backend", 3394 3447 "wasm-bindgen-shared", 3395 3448 ] ··· 3546 3599 dependencies = [ 3547 3600 "proc-macro2", 3548 3601 "quote", 3549 - "syn", 3602 + "syn 2.0.106", 3550 3603 ] 3551 3604 3552 3605 [[package]] ··· 3557 3610 dependencies = [ 3558 3611 "proc-macro2", 3559 3612 "quote", 3560 - "syn", 3613 + "syn 2.0.106", 3561 3614 ] 3562 3615 3563 3616 [[package]] ··· 3881 3934 dependencies = [ 3882 3935 "proc-macro2", 3883 3936 "quote", 3884 - "syn", 3937 + "syn 2.0.106", 3885 3938 "synstructure", 3886 3939 ] 3887 3940 ··· 3902 3955 dependencies = [ 3903 3956 "proc-macro2", 3904 3957 "quote", 3905 - "syn", 3958 + "syn 2.0.106", 3906 3959 ] 3907 3960 3908 3961 [[package]] ··· 3922 3975 dependencies = [ 3923 3976 "proc-macro2", 3924 3977 "quote", 3925 - "syn", 3978 + "syn 2.0.106", 3926 3979 "synstructure", 3927 3980 ] 3928 3981 ··· 3962 4015 dependencies = [ 3963 4016 "proc-macro2", 3964 4017 "quote", 3965 - "syn", 4018 + "syn 2.0.106", 3966 4019 ]
+4 -1
api/Cargo.toml
··· 32 32 chrono = { version = "0.4", features = ["serde"] } 33 33 34 34 # UUID generation 35 - uuid = { version = "1.0", features = ["v4"] } 35 + uuid = { version = "1.0", features = ["v4", "serde"] } 36 36 37 37 # Environment variables 38 38 dotenvy = "0.15" ··· 51 51 # Middleware for HTTP requests with retry logic 52 52 reqwest-middleware = { version = "0.4.2", features = ["json", "multipart"] } 53 53 reqwest-chain = "1.0.0" 54 + 55 + # Job queue 56 + sqlxmq = "0.6"
+289
api/migrations/004_sqlxmq_setup.sql
··· 1 + CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; 2 + 3 + -- The UDT for creating messages 4 + CREATE TYPE mq_new_t AS ( 5 + -- Unique message ID 6 + id UUID, 7 + -- Delay before message is processed 8 + delay INTERVAL, 9 + -- Number of retries if initial processing fails 10 + retries INT, 11 + -- Initial backoff between retries 12 + retry_backoff INTERVAL, 13 + -- Name of channel 14 + channel_name TEXT, 15 + -- Arguments to channel 16 + channel_args TEXT, 17 + -- Interval for two-phase commit (or NULL to disable two-phase commit) 18 + commit_interval INTERVAL, 19 + -- Whether this message should be processed in order with respect to other 20 + -- ordered messages. 21 + ordered BOOLEAN, 22 + -- Name of message 23 + name TEXT, 24 + -- JSON payload 25 + payload_json TEXT, 26 + -- Binary payload 27 + payload_bytes BYTEA 28 + ); 29 + 30 + -- Small, frequently updated table of messages 31 + CREATE TABLE mq_msgs ( 32 + id UUID PRIMARY KEY, 33 + created_at TIMESTAMPTZ DEFAULT NOW(), 34 + attempt_at TIMESTAMPTZ DEFAULT NOW(), 35 + attempts INT NOT NULL DEFAULT 5, 36 + retry_backoff INTERVAL NOT NULL DEFAULT INTERVAL '1 second', 37 + channel_name TEXT NOT NULL, 38 + channel_args TEXT NOT NULL, 39 + commit_interval INTERVAL, 40 + after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT 41 + ); 42 + 43 + -- Insert dummy message so that the 'nil' UUID can be referenced 44 + INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL); 45 + 46 + -- Internal helper function to check that a UUID is neither NULL nor NIL 47 + CREATE FUNCTION mq_uuid_exists( 48 + id UUID 49 + ) RETURNS BOOLEAN AS $$ 50 + SELECT id IS NOT NULL AND id != uuid_nil() 51 + $$ LANGUAGE SQL IMMUTABLE; 52 + 53 + -- Index for polling 54 + CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id); 55 + -- Index for adding messages 56 + CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; 57 + 58 + -- Index for ensuring strict message order 59 + CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); 60 + 61 + 62 + -- Large, less frequently updated table of message payloads 63 + CREATE TABLE mq_payloads( 64 + id UUID PRIMARY KEY, 65 + name TEXT NOT NULL, 66 + payload_json JSONB, 67 + payload_bytes BYTEA 68 + ); 69 + 70 + -- Internal helper function to return the most recently added message in a queue. 71 + CREATE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) 72 + RETURNS UUID AS $$ 73 + SELECT COALESCE( 74 + ( 75 + SELECT id FROM mq_msgs 76 + WHERE channel_name = from_channel_name 77 + AND channel_args = from_channel_args 78 + AND after_message_id IS NOT NULL 79 + AND id != uuid_nil() 80 + ORDER BY created_at DESC, id DESC 81 + LIMIT 1 82 + ), 83 + uuid_nil() 84 + ) 85 + $$ LANGUAGE SQL STABLE; 86 + 87 + -- Internal helper function to randomly select a set of channels with "ready" messages. 88 + CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT) 89 + RETURNS TABLE(name TEXT, args TEXT) AS $$ 90 + SELECT channel_name, channel_args 91 + FROM mq_msgs 92 + WHERE id != uuid_nil() 93 + AND attempt_at <= NOW() 94 + AND (channel_names IS NULL OR channel_name = ANY(channel_names)) 95 + AND NOT mq_uuid_exists(after_message_id) 96 + GROUP BY channel_name, channel_args 97 + ORDER BY RANDOM() 98 + LIMIT batch_size 99 + $$ LANGUAGE SQL STABLE; 100 + 101 + -- Main entry-point for job runner: pulls a batch of messages from the queue. 102 + CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) 103 + RETURNS TABLE( 104 + id UUID, 105 + is_committed BOOLEAN, 106 + name TEXT, 107 + payload_json TEXT, 108 + payload_bytes BYTEA, 109 + retry_backoff INTERVAL, 110 + wait_time INTERVAL 111 + ) AS $$ 112 + BEGIN 113 + RETURN QUERY UPDATE mq_msgs 114 + SET 115 + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, 116 + attempts = mq_msgs.attempts - 1, 117 + retry_backoff = mq_msgs.retry_backoff * 2 118 + FROM ( 119 + SELECT 120 + msgs.id 121 + FROM mq_active_channels(channel_names, batch_size) AS active_channels 122 + INNER JOIN LATERAL ( 123 + SELECT * FROM mq_msgs 124 + WHERE mq_msgs.id != uuid_nil() 125 + AND mq_msgs.attempt_at <= NOW() 126 + AND mq_msgs.channel_name = active_channels.name 127 + AND mq_msgs.channel_args = active_channels.args 128 + AND NOT mq_uuid_exists(mq_msgs.after_message_id) 129 + ORDER BY mq_msgs.attempt_at ASC 130 + LIMIT batch_size 131 + ) AS msgs ON TRUE 132 + LIMIT batch_size 133 + ) AS messages_to_update 134 + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id 135 + WHERE mq_msgs.id = messages_to_update.id 136 + RETURNING 137 + mq_msgs.id, 138 + mq_msgs.commit_interval IS NULL, 139 + mq_payloads.name, 140 + mq_payloads.payload_json::TEXT, 141 + mq_payloads.payload_bytes, 142 + mq_msgs.retry_backoff / 2, 143 + interval '0' AS wait_time; 144 + 145 + IF NOT FOUND THEN 146 + RETURN QUERY SELECT 147 + NULL::UUID, 148 + NULL::BOOLEAN, 149 + NULL::TEXT, 150 + NULL::TEXT, 151 + NULL::BYTEA, 152 + NULL::INTERVAL, 153 + MIN(mq_msgs.attempt_at) - NOW() 154 + FROM mq_msgs 155 + WHERE mq_msgs.id != uuid_nil() 156 + AND NOT mq_uuid_exists(mq_msgs.after_message_id) 157 + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); 158 + END IF; 159 + END; 160 + $$ LANGUAGE plpgsql; 161 + 162 + -- Creates new messages 163 + CREATE FUNCTION mq_insert(new_messages mq_new_t[]) 164 + RETURNS VOID AS $$ 165 + BEGIN 166 + PERFORM pg_notify(CONCAT('mq_', channel_name), '') 167 + FROM unnest(new_messages) AS new_msgs 168 + GROUP BY channel_name; 169 + 170 + IF FOUND THEN 171 + PERFORM pg_notify('mq', ''); 172 + END IF; 173 + 174 + INSERT INTO mq_payloads ( 175 + id, 176 + name, 177 + payload_json, 178 + payload_bytes 179 + ) SELECT 180 + id, 181 + name, 182 + payload_json::JSONB, 183 + payload_bytes 184 + FROM UNNEST(new_messages); 185 + 186 + INSERT INTO mq_msgs ( 187 + id, 188 + attempt_at, 189 + attempts, 190 + retry_backoff, 191 + channel_name, 192 + channel_args, 193 + commit_interval, 194 + after_message_id 195 + ) 196 + SELECT 197 + id, 198 + NOW() + delay + COALESCE(commit_interval, INTERVAL '0'), 199 + retries + 1, 200 + retry_backoff, 201 + channel_name, 202 + channel_args, 203 + commit_interval, 204 + CASE WHEN ordered 205 + THEN 206 + LAG(id, 1, mq_latest_message(channel_name, channel_args)) 207 + OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id) 208 + ELSE 209 + NULL 210 + END 211 + FROM UNNEST(new_messages); 212 + END; 213 + $$ LANGUAGE plpgsql; 214 + 215 + -- Commits messages previously created with a non-NULL commit interval. 216 + CREATE FUNCTION mq_commit(msg_ids UUID[]) 217 + RETURNS VOID AS $$ 218 + BEGIN 219 + UPDATE mq_msgs 220 + SET 221 + attempt_at = attempt_at - commit_interval, 222 + commit_interval = NULL 223 + WHERE id = ANY(msg_ids) 224 + AND commit_interval IS NOT NULL; 225 + END; 226 + $$ LANGUAGE plpgsql; 227 + 228 + 229 + -- Deletes messages from the queue. This occurs when a message has been 230 + -- processed, or when it expires without being processed. 231 + CREATE FUNCTION mq_delete(msg_ids UUID[]) 232 + RETURNS VOID AS $$ 233 + BEGIN 234 + PERFORM pg_notify(CONCAT('mq_', channel_name), '') 235 + FROM mq_msgs 236 + WHERE id = ANY(msg_ids) 237 + AND after_message_id = uuid_nil() 238 + GROUP BY channel_name; 239 + 240 + IF FOUND THEN 241 + PERFORM pg_notify('mq', ''); 242 + END IF; 243 + 244 + DELETE FROM mq_msgs WHERE id = ANY(msg_ids); 245 + DELETE FROM mq_payloads WHERE id = ANY(msg_ids); 246 + END; 247 + $$ LANGUAGE plpgsql; 248 + 249 + 250 + -- Can be called during the initial commit interval, or when processing 251 + -- a message. Indicates that the caller is still active and will prevent either 252 + -- the commit interval elapsing or the message being retried for the specified 253 + -- interval. 254 + CREATE FUNCTION mq_keep_alive(msg_ids UUID[], duration INTERVAL) 255 + RETURNS VOID AS $$ 256 + UPDATE mq_msgs 257 + SET 258 + attempt_at = NOW() + duration, 259 + commit_interval = commit_interval + ((NOW() + duration) - attempt_at) 260 + WHERE id = ANY(msg_ids) 261 + AND attempt_at < NOW() + duration; 262 + $$ LANGUAGE SQL; 263 + 264 + 265 + -- Called during lengthy processing of a message to checkpoint the progress. 266 + -- As well as behaving like `mq_keep_alive`, the message payload can be 267 + -- updated. 268 + CREATE FUNCTION mq_checkpoint( 269 + msg_id UUID, 270 + duration INTERVAL, 271 + new_payload_json TEXT, 272 + new_payload_bytes BYTEA, 273 + extra_retries INT 274 + ) 275 + RETURNS VOID AS $$ 276 + UPDATE mq_msgs 277 + SET 278 + attempt_at = GREATEST(attempt_at, NOW() + duration), 279 + attempts = attempts + COALESCE(extra_retries, 0) 280 + WHERE id = msg_id; 281 + 282 + UPDATE mq_payloads 283 + SET 284 + payload_json = COALESCE(new_payload_json::JSONB, payload_json), 285 + payload_bytes = COALESCE(new_payload_bytes, payload_bytes) 286 + WHERE 287 + id = msg_id; 288 + $$ LANGUAGE SQL; 289 +
+20
api/migrations/005_job_results.sql
··· 1 + -- Job results table to store sync job outcomes 2 + CREATE TABLE job_results ( 3 + job_id UUID PRIMARY KEY, 4 + user_did TEXT NOT NULL, 5 + slice_uri TEXT NOT NULL, 6 + status TEXT NOT NULL CHECK (status IN ('completed', 'failed')), 7 + success BOOLEAN NOT NULL, 8 + total_records BIGINT NOT NULL DEFAULT 0, 9 + collections_synced JSONB NOT NULL DEFAULT '[]'::jsonb, 10 + repos_processed BIGINT NOT NULL DEFAULT 0, 11 + message TEXT NOT NULL, 12 + error_message TEXT, 13 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 14 + completed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 15 + ); 16 + 17 + -- Index for looking up jobs by user 18 + CREATE INDEX idx_job_results_user_did ON job_results(user_did); 19 + -- Index for looking up recent jobs 20 + CREATE INDEX idx_job_results_completed_at ON job_results(completed_at DESC);
+99 -10
api/scripts/generate-typescript.ts
··· 242 242 ], 243 243 }); 244 244 245 + // Job queue interfaces 246 + sourceFile.addInterface({ 247 + name: "SyncJobResponse", 248 + isExported: true, 249 + properties: [ 250 + { name: "success", type: "boolean" }, 251 + { name: "jobId", type: "string", hasQuestionToken: true }, 252 + { name: "message", type: "string" }, 253 + ], 254 + }); 255 + 256 + sourceFile.addInterface({ 257 + name: "SyncJobResult", 258 + isExported: true, 259 + properties: [ 260 + { name: "success", type: "boolean" }, 261 + { name: "totalRecords", type: "number" }, 262 + { name: "collectionsSynced", type: "string[]" }, 263 + { name: "reposProcessed", type: "number" }, 264 + { name: "message", type: "string" }, 265 + ], 266 + }); 267 + 268 + sourceFile.addInterface({ 269 + name: "JobStatus", 270 + isExported: true, 271 + properties: [ 272 + { name: "jobId", type: "string" }, 273 + { name: "status", type: "string" }, 274 + { name: "createdAt", type: "string" }, 275 + { name: "startedAt", type: "string", hasQuestionToken: true }, 276 + { name: "completedAt", type: "string", hasQuestionToken: true }, 277 + { name: "result", type: "SyncJobResult", hasQuestionToken: true }, 278 + { name: "error", type: "string", hasQuestionToken: true }, 279 + { name: "retryCount", type: "number" }, 280 + ], 281 + }); 282 + 283 + sourceFile.addInterface({ 284 + name: "GetJobStatusParams", 285 + isExported: true, 286 + properties: [ 287 + { name: "jobId", type: "string" }, 288 + ], 289 + }); 290 + 291 + sourceFile.addInterface({ 292 + name: "GetJobHistoryParams", 293 + isExported: true, 294 + properties: [ 295 + { name: "userDid", type: "string" }, 296 + { name: "sliceUri", type: "string" }, 297 + { name: "limit", type: "number", hasQuestionToken: true }, 298 + ], 299 + }); 300 + 301 + sourceFile.addInterface({ 302 + name: "GetJobHistoryResponse", 303 + isExported: true, 304 + properties: [ 305 + { name: "jobs", type: "JobStatus[]" }, 306 + ], 307 + }); 308 + 245 309 sourceFile.addInterface({ 246 310 name: "CollectionStats", 247 311 isExported: true, ··· 802 866 } 803 867 } 804 868 805 - // Add codegen and sync methods to the social.slices.slice class 869 + // Add codegen method to the social.slices.slice class 806 870 if (currentPath.length === 3 && currentPath[0] === "social" && currentPath[1] === "slices" && currentPath[2] === "slice") { 807 871 classDeclaration.addMethod({ 808 872 name: "codegen", ··· 814 878 ], 815 879 }); 816 880 817 - classDeclaration.addMethod({ 818 - name: "sync", 819 - parameters: [{ name: "params", type: "BulkSyncParams" }], 820 - returnType: "Promise<BulkSyncOutput>", 821 - isAsync: true, 822 - statements: [ 823 - `return await this.makeRequest<BulkSyncOutput>('social.slices.slice.sync', 'POST', params);`, 824 - ], 825 - }); 826 881 827 882 classDeclaration.addMethod({ 828 883 name: "stats", ··· 841 896 isAsync: true, 842 897 statements: [ 843 898 `return await this.makeRequest<SliceRecordsOutput>('social.slices.slice.records', 'POST', params);`, 899 + ], 900 + }); 901 + } 902 + 903 + // Add sync methods to the social.slices.slice class 904 + if (currentPath.length === 3 && currentPath[0] === "social" && currentPath[1] === "slices" && currentPath[2] === "slice") { 905 + classDeclaration.addMethod({ 906 + name: "startSync", 907 + parameters: [{ name: "params", type: "BulkSyncParams" }], 908 + returnType: "Promise<SyncJobResponse>", 909 + isAsync: true, 910 + statements: [ 911 + `const requestParams = { ...params, slice: this.sliceUri };`, 912 + `return await this.makeRequest<SyncJobResponse>('social.slices.slice.startSync', 'POST', requestParams);`, 913 + ], 914 + }); 915 + 916 + classDeclaration.addMethod({ 917 + name: "getJobStatus", 918 + parameters: [{ name: "params", type: "GetJobStatusParams" }], 919 + returnType: "Promise<JobStatus>", 920 + isAsync: true, 921 + statements: [ 922 + `return await this.makeRequest<JobStatus>('social.slices.slice.getJobStatus', 'GET', params);`, 923 + ], 924 + }); 925 + 926 + classDeclaration.addMethod({ 927 + name: "getJobHistory", 928 + parameters: [{ name: "params", type: "GetJobHistoryParams" }], 929 + returnType: "Promise<GetJobHistoryResponse>", 930 + isAsync: true, 931 + statements: [ 932 + `return await this.makeRequest<GetJobHistoryResponse>('social.slices.slice.getJobHistory', 'GET', params);`, 844 933 ], 845 934 }); 846 935 }
+22 -8
api/src/database.rs
··· 13 13 Self { pool } 14 14 } 15 15 16 + pub fn from_pool(pool: PgPool) -> Self { 17 + Self::new(pool) 18 + } 19 + 16 20 #[allow(dead_code)] 17 21 pub async fn insert_record(&self, record: &Record) -> Result<(), DatabaseError> { 18 22 sqlx::query!( ··· 63 67 Ok(()) 64 68 } 65 69 70 + pub async fn get_existing_record_cids(&self, did: &str, collection: &str) -> Result<std::collections::HashMap<String, String>, DatabaseError> { 71 + let records = sqlx::query!( 72 + r#"SELECT "uri", "cid" 73 + FROM "record" 74 + WHERE "did" = $1 AND "collection" = $2"#, 75 + did, 76 + collection 77 + ) 78 + .fetch_all(&self.pool) 79 + .await?; 80 + 81 + let mut cid_map = std::collections::HashMap::new(); 82 + for record in records { 83 + cid_map.insert(record.uri, record.cid); 84 + } 85 + Ok(cid_map) 86 + } 87 + 66 88 pub async fn get_record(&self, uri: &str) -> Result<Option<IndexedRecord>, DatabaseError> { 67 89 let record = sqlx::query_as::<_, Record>( 68 90 r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at" ··· 116 138 } 117 139 118 140 119 - 120 - pub async fn get_total_record_count(&self) -> Result<i64, DatabaseError> { 121 - let count = sqlx::query!("SELECT COUNT(*) as count FROM record") 122 - .fetch_one(&self.pool) 123 - .await?; 124 - 125 - Ok(count.count.unwrap_or(0)) 126 - } 127 141 128 142 129 143 pub async fn update_record(&self, record: &Record) -> Result<(), DatabaseError> {
+59
api/src/handler_jobs.rs
··· 1 + use axum::{ 2 + extract::{Query, State}, 3 + http::StatusCode, 4 + response::Json, 5 + }; 6 + use serde::Deserialize; 7 + use uuid::Uuid; 8 + use crate::AppState; 9 + use crate::jobs; 10 + 11 + /// Query parameters for getting job status 12 + #[derive(Debug, Deserialize)] 13 + #[serde(rename_all = "camelCase")] 14 + pub struct GetJobStatusQuery { 15 + pub job_id: Uuid, 16 + } 17 + 18 + /// Query parameters for getting slice job history 19 + #[derive(Debug, Deserialize)] 20 + #[serde(rename_all = "camelCase")] 21 + pub struct GetSliceJobHistoryQuery { 22 + pub user_did: String, 23 + pub slice_uri: String, 24 + pub limit: Option<i64>, 25 + } 26 + 27 + /// Get the status of a specific job by ID (XRPC style) 28 + pub async fn get_job_status( 29 + State(state): State<AppState>, 30 + Query(query): Query<GetJobStatusQuery>, 31 + ) -> Result<Json<jobs::JobStatus>, StatusCode> { 32 + match jobs::get_job_status(&state.database_pool, query.job_id).await { 33 + Ok(Some(status)) => Ok(Json(status)), 34 + Ok(None) => Err(StatusCode::NOT_FOUND), 35 + Err(e) => { 36 + tracing::error!("Failed to get job status: {}", e); 37 + Err(StatusCode::INTERNAL_SERVER_ERROR) 38 + } 39 + } 40 + } 41 + 42 + /// Get job history for a specific slice (XRPC style) 43 + pub async fn get_slice_job_history( 44 + State(state): State<AppState>, 45 + Query(query): Query<GetSliceJobHistoryQuery>, 46 + ) -> Result<Json<Vec<jobs::JobStatus>>, StatusCode> { 47 + match jobs::get_slice_job_history( 48 + &state.database_pool, 49 + &query.user_did, 50 + &query.slice_uri, 51 + query.limit 52 + ).await { 53 + Ok(history) => Ok(Json(history)), 54 + Err(e) => { 55 + tracing::error!("Failed to get slice job history: {}", e); 56 + Err(StatusCode::INTERNAL_SERVER_ERROR) 57 + } 58 + } 59 + }
+48 -33
api/src/handler_sync.rs
··· 1 + use crate::AppState; 2 + use crate::auth; 3 + use crate::jobs; 4 + use crate::models::BulkSyncParams; 1 5 use axum::{ 2 6 extract::State, 3 - http::StatusCode, 7 + http::{HeaderMap, StatusCode}, 4 8 response::Json, 5 9 }; 6 - use crate::models::{BulkSyncOutput, BulkSyncParams}; 7 - use crate::AppState; 10 + use serde::{Deserialize, Serialize}; 11 + use uuid::Uuid; 12 + 13 + #[derive(Debug, Deserialize)] 14 + #[serde(rename_all = "camelCase")] 15 + pub struct SyncRequest { 16 + #[serde(flatten)] 17 + pub params: BulkSyncParams, 18 + pub slice: String, // The slice URI 19 + } 20 + 21 + #[derive(Debug, Serialize)] 22 + #[serde(rename_all = "camelCase")] 23 + pub struct SyncJobResponse { 24 + pub success: bool, 25 + pub job_id: Option<Uuid>, 26 + pub message: String, 27 + } 8 28 29 + /// Start a sync job (enqueue it for background processing) 9 30 pub async fn sync( 10 31 State(state): State<AppState>, 11 - axum::extract::Json(params): axum::extract::Json<BulkSyncParams>, 12 - ) -> Result<Json<BulkSyncOutput>, StatusCode> { 13 - match state 14 - .sync_service 15 - .backfill_collections( 16 - params.collections.as_deref(), 17 - params.external_collections.as_deref(), 18 - params.repos.as_deref() 19 - ) 20 - .await 21 - { 22 - Ok((repos_processed, _records_synced)) => { 23 - let total_records = state.database.get_total_record_count().await.unwrap_or(0); 24 - Ok(Json(BulkSyncOutput { 25 - success: true, 26 - total_records, 27 - collections_synced: [ 28 - params.collections.unwrap_or_default(), 29 - params.external_collections.unwrap_or_default() 30 - ].concat(), 31 - repos_processed, 32 - message: "Sync completed successfully".to_string(), 32 + headers: HeaderMap, 33 + axum::extract::Json(request): axum::extract::Json<SyncRequest>, 34 + ) -> Result<Json<SyncJobResponse>, StatusCode> { 35 + // Extract and verify authentication 36 + let token = auth::extract_bearer_token(&headers)?; 37 + let user_info = auth::verify_oauth_token(&token, &state.config.auth_base_url).await?; 38 + 39 + let user_did = user_info.sub; 40 + let slice_uri = request.slice; 41 + 42 + // Enqueue the sync job with authenticated user information 43 + match jobs::enqueue_sync_job(&state.database_pool, user_did, slice_uri, request.params).await { 44 + Ok(job_id) => Ok(Json(SyncJobResponse { 45 + success: true, 46 + job_id: Some(job_id), 47 + message: format!("Sync job {} enqueued successfully", job_id), 48 + })), 49 + Err(e) => { 50 + tracing::error!("Failed to enqueue sync job: {}", e); 51 + Ok(Json(SyncJobResponse { 52 + success: false, 53 + job_id: None, 54 + message: format!("Failed to enqueue sync job: {}", e), 33 55 })) 34 56 } 35 - Err(e) => Ok(Json(BulkSyncOutput { 36 - success: false, 37 - total_records: 0, 38 - collections_synced: vec![], 39 - repos_processed: 0, 40 - message: format!("Sync failed: {}", e), 41 - })), 42 57 } 43 - } 58 + }
+1
api/src/handler_xrpc_codegen.rs
··· 15 15 } 16 16 17 17 #[derive(Serialize)] 18 + #[serde(rename_all = "camelCase")] 18 19 pub struct CodegenXrpcResponse { 19 20 success: bool, 20 21 generated_code: Option<String>,
+378
api/src/jobs.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use sqlxmq::{job, CurrentJob, JobRegistry}; 3 + use sqlx::PgPool; 4 + use uuid::Uuid; 5 + use crate::sync::SyncService; 6 + use crate::models::BulkSyncParams; 7 + use tracing::{info, error}; 8 + 9 + /// Payload for sync jobs 10 + #[derive(Debug, Clone, Serialize, Deserialize)] 11 + pub struct SyncJobPayload { 12 + pub job_id: Uuid, 13 + pub user_did: String, 14 + pub slice_uri: String, 15 + pub params: BulkSyncParams, 16 + } 17 + 18 + /// Result stored for completed sync jobs 19 + #[derive(Debug, Clone, Serialize, Deserialize)] 20 + #[serde(rename_all = "camelCase")] 21 + pub struct SyncJobResult { 22 + pub success: bool, 23 + pub total_records: i64, 24 + pub collections_synced: Vec<String>, 25 + pub repos_processed: i64, 26 + pub message: String, 27 + } 28 + 29 + /// Initialize the job registry with all job handlers 30 + pub fn registry() -> JobRegistry { 31 + JobRegistry::new(&[sync_job]) 32 + } 33 + 34 + /// The sync job handler 35 + #[job(channel_name = "sync_queue")] 36 + async fn sync_job(mut current_job: CurrentJob) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 37 + let payload: SyncJobPayload = current_job.json()?.expect("Invalid job payload"); 38 + 39 + info!( 40 + "Starting sync job {} for user {} on slice {}", 41 + payload.job_id, payload.user_did, payload.slice_uri 42 + ); 43 + 44 + // Get database pool from job context 45 + let pool = current_job.pool(); 46 + 47 + // Create sync service 48 + let database = crate::database::Database::from_pool(pool.clone()); 49 + let relay_endpoint = std::env::var("RELAY_ENDPOINT") 50 + .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()); 51 + let sync_service = SyncService::new(database.clone(), relay_endpoint); 52 + 53 + // Track progress 54 + let start_time = std::time::Instant::now(); 55 + 56 + // Perform the sync 57 + match sync_service 58 + .backfill_collections( 59 + payload.params.collections.as_deref(), 60 + payload.params.external_collections.as_deref(), 61 + payload.params.repos.as_deref(), 62 + ) 63 + .await 64 + { 65 + Ok((repos_processed, records_synced)) => { 66 + let result = SyncJobResult { 67 + success: true, 68 + total_records: records_synced, 69 + collections_synced: [ 70 + payload.params.collections.unwrap_or_default(), 71 + payload.params.external_collections.unwrap_or_default(), 72 + ].concat(), 73 + repos_processed, 74 + message: format!( 75 + "Sync completed successfully in {:?}", 76 + start_time.elapsed() 77 + ), 78 + }; 79 + 80 + // Store result in database before completing the job 81 + store_job_result( 82 + pool, 83 + payload.job_id, 84 + &payload.user_did, 85 + &payload.slice_uri, 86 + &result, 87 + None, 88 + ).await?; 89 + 90 + info!( 91 + "Sync job {} completed successfully: {} repos, {} records", 92 + payload.job_id, repos_processed, records_synced 93 + ); 94 + 95 + // CRITICAL: Must explicitly complete the job to prevent it from being retried 96 + current_job.complete().await?; 97 + 98 + info!( 99 + "Sync job {} marked as complete and will be cleaned up", 100 + payload.job_id 101 + ); 102 + 103 + Ok(()) 104 + } 105 + Err(e) => { 106 + error!("Sync job {} failed: {}", payload.job_id, e); 107 + 108 + let result = SyncJobResult { 109 + success: false, 110 + total_records: 0, 111 + collections_synced: vec![], 112 + repos_processed: 0, 113 + message: format!("Sync failed: {}", e), 114 + }; 115 + 116 + // Store error result before returning error 117 + if let Err(db_err) = store_job_result( 118 + pool, 119 + payload.job_id, 120 + &payload.user_did, 121 + &payload.slice_uri, 122 + &result, 123 + Some(&format!("{}", e)), 124 + ).await { 125 + error!("Failed to store job result: {}", db_err); 126 + } 127 + 128 + // Return error to trigger retry 129 + Err(Box::new(e)) 130 + } 131 + } 132 + } 133 + 134 + /// Store job result in the database for later retrieval 135 + async fn store_job_result( 136 + pool: &PgPool, 137 + job_id: Uuid, 138 + user_did: &str, 139 + slice_uri: &str, 140 + result: &SyncJobResult, 141 + error_message: Option<&str>, 142 + ) -> Result<(), sqlx::Error> { 143 + info!( 144 + "Storing job result: job_id={}, user_did={}, slice_uri={}, success={}", 145 + job_id, user_did, slice_uri, result.success 146 + ); 147 + 148 + let collections_json = serde_json::to_value(&result.collections_synced) 149 + .map_err(|e| sqlx::Error::Protocol(format!("Failed to serialize collections: {}", e).into()))?; 150 + 151 + sqlx::query!( 152 + r#" 153 + INSERT INTO job_results ( 154 + job_id, user_did, slice_uri, status, success, total_records, 155 + collections_synced, repos_processed, message, error_message 156 + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 157 + ON CONFLICT (job_id) 158 + DO UPDATE SET 159 + status = EXCLUDED.status, 160 + success = EXCLUDED.success, 161 + total_records = EXCLUDED.total_records, 162 + collections_synced = EXCLUDED.collections_synced, 163 + repos_processed = EXCLUDED.repos_processed, 164 + message = EXCLUDED.message, 165 + error_message = EXCLUDED.error_message, 166 + completed_at = NOW() 167 + "#, 168 + job_id, 169 + user_did, 170 + slice_uri, 171 + if result.success { "completed" } else { "failed" }, 172 + result.success, 173 + result.total_records, 174 + collections_json, 175 + result.repos_processed, 176 + result.message, 177 + error_message, 178 + ) 179 + .execute(pool) 180 + .await?; 181 + 182 + Ok(()) 183 + } 184 + 185 + /// Enqueue a new sync job 186 + pub async fn enqueue_sync_job( 187 + pool: &PgPool, 188 + user_did: String, 189 + slice_uri: String, 190 + params: BulkSyncParams, 191 + ) -> Result<Uuid, Box<dyn std::error::Error + Send + Sync>> { 192 + let job_id = Uuid::new_v4(); 193 + 194 + let payload = SyncJobPayload { 195 + job_id, 196 + user_did: user_did.clone(), 197 + slice_uri: slice_uri.clone(), 198 + params, 199 + }; 200 + 201 + // Spawn the job using the correct builder pattern 202 + let job_uuid = sync_job.builder() 203 + .set_json(&payload)? 204 + .spawn(pool) 205 + .await?; 206 + 207 + info!( 208 + "Enqueued sync job {} (queue id: {}) for user {}", 209 + job_id, job_uuid, user_did 210 + ); 211 + 212 + Ok(job_id) 213 + } 214 + 215 + /// Check the status of a sync job 216 + #[derive(Debug, Serialize, Deserialize)] 217 + #[serde(rename_all = "camelCase")] 218 + pub struct JobStatus { 219 + pub job_id: Uuid, 220 + pub status: String, 221 + pub created_at: chrono::DateTime<chrono::Utc>, 222 + pub started_at: Option<chrono::DateTime<chrono::Utc>>, 223 + pub completed_at: Option<chrono::DateTime<chrono::Utc>>, 224 + pub result: Option<SyncJobResult>, 225 + pub error: Option<String>, 226 + pub retry_count: i32, 227 + } 228 + 229 + pub async fn get_job_status( 230 + pool: &PgPool, 231 + job_id: Uuid, 232 + ) -> Result<Option<JobStatus>, sqlx::Error> { 233 + // First, check if we have a stored result for this job 234 + let result_row = sqlx::query!( 235 + r#" 236 + SELECT 237 + job_id, user_did, slice_uri, status, success, total_records, 238 + collections_synced, repos_processed, message, error_message, 239 + created_at, completed_at 240 + FROM job_results 241 + WHERE job_id = $1 242 + "#, 243 + job_id 244 + ) 245 + .fetch_optional(pool) 246 + .await?; 247 + 248 + if let Some(result) = result_row { 249 + // We have a stored result, return it 250 + let collections_synced: Vec<String> = serde_json::from_value(result.collections_synced) 251 + .unwrap_or_default(); 252 + 253 + return Ok(Some(JobStatus { 254 + job_id, 255 + status: result.status, 256 + created_at: result.created_at, 257 + started_at: Some(result.created_at), 258 + completed_at: Some(result.completed_at), 259 + result: Some(SyncJobResult { 260 + success: result.success, 261 + total_records: result.total_records, 262 + collections_synced, 263 + repos_processed: result.repos_processed, 264 + message: result.message, 265 + }), 266 + error: result.error_message, 267 + retry_count: 0, 268 + })); 269 + } 270 + 271 + // No stored result, check if job is still in queue 272 + let queue_row = sqlx::query!( 273 + r#" 274 + SELECT 275 + m.id, 276 + m.created_at, 277 + m.attempt_at, 278 + m.attempts, 279 + p.payload_json 280 + FROM mq_msgs m 281 + LEFT JOIN mq_payloads p ON m.id = p.id 282 + WHERE p.payload_json::jsonb ->> 'job_id' = $1 283 + "#, 284 + job_id.to_string() 285 + ) 286 + .fetch_optional(pool) 287 + .await?; 288 + 289 + match queue_row { 290 + Some(row) => { 291 + let status = if row.attempt_at.is_none() { 292 + "completed".to_string() 293 + } else if let Some(attempt_at) = row.attempt_at { 294 + if attempt_at <= chrono::Utc::now() { 295 + "running".to_string() 296 + } else { 297 + "pending".to_string() 298 + } 299 + } else { 300 + "pending".to_string() 301 + }; 302 + 303 + Ok(Some(JobStatus { 304 + job_id, 305 + status: status.clone(), 306 + created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 307 + started_at: if status == "running" || status == "completed" { row.created_at } else { None }, 308 + completed_at: if status == "completed" { row.attempt_at } else { None }, 309 + result: None, 310 + error: None, 311 + retry_count: 5 - row.attempts, 312 + })) 313 + }, 314 + None => { 315 + // Job not found in queue or results - it might not exist 316 + Ok(None) 317 + } 318 + } 319 + } 320 + 321 + /// Get job results for a specific slice, ordered by most recent first 322 + pub async fn get_slice_job_history( 323 + pool: &PgPool, 324 + user_did: &str, 325 + slice_uri: &str, 326 + limit: Option<i64>, 327 + ) -> Result<Vec<JobStatus>, sqlx::Error> { 328 + let limit = limit.unwrap_or(10); 329 + 330 + info!( 331 + "Querying job history: user_did={}, slice_uri={}, limit={}", 332 + user_did, slice_uri, limit 333 + ); 334 + 335 + let rows = sqlx::query!( 336 + r#" 337 + SELECT 338 + job_id, user_did, slice_uri, status, success, total_records, 339 + collections_synced, repos_processed, message, error_message, 340 + created_at, completed_at 341 + FROM job_results 342 + WHERE user_did = $1 AND slice_uri = $2 343 + ORDER BY created_at DESC 344 + LIMIT $3 345 + "#, 346 + user_did, 347 + slice_uri, 348 + limit 349 + ) 350 + .fetch_all(pool) 351 + .await?; 352 + 353 + let mut results = Vec::new(); 354 + for row in rows { 355 + let collections_synced: Vec<String> = serde_json::from_value(row.collections_synced) 356 + .unwrap_or_default(); 357 + 358 + results.push(JobStatus { 359 + job_id: row.job_id, 360 + status: row.status, 361 + created_at: row.created_at, 362 + started_at: Some(row.created_at), 363 + completed_at: Some(row.completed_at), 364 + result: Some(SyncJobResult { 365 + success: row.success, 366 + total_records: row.total_records, 367 + collections_synced, 368 + repos_processed: row.repos_processed, 369 + message: row.message, 370 + }), 371 + error: row.error_message, 372 + retry_count: 0, 373 + }); 374 + } 375 + 376 + Ok(results) 377 + } 378 +
+40 -8
api/src/main.rs
··· 3 3 mod codegen; 4 4 mod database; 5 5 mod errors; 6 + mod handler_jobs; 6 7 mod handler_records; 7 8 mod handler_stats; 8 9 mod handler_sync; 9 10 mod handler_upload_blob; 10 11 mod handler_xrpc_codegen; 11 12 mod handler_xrpc_dynamic; 13 + mod jobs; 12 14 mod models; 13 15 mod sync; 14 16 ··· 24 26 25 27 use crate::database::Database; 26 28 use crate::errors::AppError; 27 - use crate::sync::SyncService; 28 29 29 30 #[derive(Clone)] 30 31 pub struct Config { 31 32 pub auth_base_url: String, 33 + pub relay_endpoint: String, 32 34 } 33 35 34 36 #[derive(Clone)] 35 37 pub struct AppState { 36 38 database: Database, 37 - sync_service: SyncService, 39 + database_pool: PgPool, 38 40 config: Config, 39 41 } 40 42 ··· 57 59 // Run migrations if needed 58 60 sqlx::migrate!("./migrations").run(&pool).await?; 59 61 60 - let database = Database::new(pool); 61 - let sync_service = SyncService::new(database.clone()); 62 - 62 + let database = Database::new(pool.clone()); 63 + 63 64 let auth_base_url = 64 65 env::var("AUTH_BASE_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); 65 66 66 - let config = Config { auth_base_url }; 67 + let relay_endpoint = env::var("RELAY_ENDPOINT") 68 + .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()); 69 + 70 + let config = Config { 71 + auth_base_url, 72 + relay_endpoint, 73 + }; 74 + 75 + // Start job queue runner 76 + let pool_for_runner = pool.clone(); 77 + tokio::spawn(async move { 78 + tracing::info!("Starting job queue runner..."); 79 + match jobs::registry() 80 + .runner(&pool_for_runner) 81 + .set_concurrency(2, 5) // Keep 2-5 sync jobs running at a time 82 + .run() 83 + .await 84 + { 85 + Ok(handle) => { 86 + tracing::info!("Job runner started successfully, keeping handle alive..."); 87 + // CRITICAL: We must keep the handle alive for the runner to continue processing jobs 88 + // The runner will stop if the handle is dropped 89 + std::future::pending::<()>().await; // Keep the task alive indefinitely 90 + drop(handle); // This line will never be reached 91 + }, 92 + Err(e) => { 93 + tracing::error!("Failed to start job runner: {}", e); 94 + } 95 + } 96 + }); 67 97 68 98 let state = AppState { 69 99 database: database.clone(), 70 - sync_service, 100 + database_pool: pool, 71 101 config, 72 102 }; 73 103 ··· 90 120 post(handler_upload_blob::upload_blob), 91 121 ) 92 122 // XRPC endpoints 93 - .route("/xrpc/social.slices.slice.sync", post(handler_sync::sync)) 123 + .route("/xrpc/social.slices.slice.startSync", post(handler_sync::sync)) 124 + .route("/xrpc/social.slices.slice.getJobStatus", get(handler_jobs::get_job_status)) 125 + .route("/xrpc/social.slices.slice.getJobHistory", get(handler_jobs::get_slice_job_history)) 94 126 .route("/xrpc/social.slices.slice.stats", post(handler_stats::stats)) 95 127 .route("/xrpc/social.slices.slice.records", post(handler_records::records)) 96 128 .route(
+1 -1
api/src/models.rs
··· 31 31 pub cursor: Option<String>, 32 32 } 33 33 34 - #[derive(Debug, Serialize, Deserialize)] 34 + #[derive(Debug, Clone, Serialize, Deserialize)] 35 35 #[serde(rename_all = "camelCase")] 36 36 pub struct BulkSyncParams { 37 37 pub collections: Option<Vec<String>>,
+35 -5
api/src/sync.rs
··· 50 50 pub struct SyncService { 51 51 client: Client, 52 52 database: Database, 53 + relay_endpoint: String, 53 54 atp_cache: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<String, AtpData>>>, 54 55 } 55 56 56 57 impl SyncService { 57 - pub fn new(database: Database) -> Self { 58 + pub fn new(database: Database, relay_endpoint: String) -> Self { 58 59 Self { 59 60 client: Client::new(), 60 61 database, 62 + relay_endpoint, 61 63 atp_cache: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), 62 64 } 63 65 } ··· 197 199 info!("🔧 Debug: {} successful tasks, {} failed tasks", successful_tasks, failed_tasks); 198 200 199 201 let total_records = all_records.len() as i64; 200 - info!("✓ Fetched {} total records", total_records); 202 + info!("✓ Prepared {} new/changed records for indexing", total_records); 201 203 202 204 // Index actors first (like the TypeScript version) 203 205 info!("📝 Indexing actors..."); 204 206 self.index_actors(&valid_repos, &atp_map).await?; 205 207 info!("✓ Indexed {} actors", valid_repos.len()); 206 208 207 - // Single batch insert for all records 209 + // Single batch insert for new/changed records only 208 210 if !all_records.is_empty() { 209 - info!("📝 Indexing {} records...", total_records); 211 + info!("📝 Indexing {} new/changed records...", total_records); 210 212 self.database.batch_insert_records(&all_records).await?; 213 + } else { 214 + info!("📝 No new or changed records to index"); 211 215 } 212 216 213 217 info!("✅ Backfill complete!"); ··· 216 220 } 217 221 218 222 async fn get_repos_for_collection(&self, collection: &str) -> Result<Vec<String>, SyncError> { 223 + let url = format!("{}/xrpc/com.atproto.sync.listReposByCollection", self.relay_endpoint); 219 224 let response = self.client 220 - .get("https://relay1.us-west.bsky.network/xrpc/com.atproto.sync.listReposByCollection") 225 + .get(&url) 221 226 .query(&[("collection", collection)]) 222 227 .send() 223 228 .await?; ··· 236 241 } 237 242 238 243 async fn fetch_records_for_repo_collection(&self, repo: &str, collection: &str, pds_url: &str) -> Result<Vec<Record>, SyncError> { 244 + // First, get existing record CIDs from database 245 + let existing_cids = self.database.get_existing_record_cids(repo, collection) 246 + .await 247 + .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?; 248 + 249 + debug!("Found {} existing records for {}/{}", existing_cids.len(), repo, collection); 250 + 239 251 let mut records = Vec::new(); 240 252 let mut cursor: Option<String> = None; 253 + let mut fetched_count = 0; 254 + let mut skipped_count = 0; 241 255 242 256 loop { 243 257 let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")]; ··· 258 272 let list_response: ListRecordsResponse = response.json().await?; 259 273 260 274 for atproto_record in list_response.records { 275 + // Check if we already have this record with the same CID 276 + if let Some(existing_cid) = existing_cids.get(&atproto_record.uri) { 277 + if existing_cid == &atproto_record.cid { 278 + // Record unchanged, skip it 279 + skipped_count += 1; 280 + continue; 281 + } 282 + } 283 + 284 + // Record is new or changed, include it 261 285 let record = Record { 262 286 uri: atproto_record.uri, 263 287 cid: atproto_record.cid, ··· 267 291 indexed_at: Utc::now(), 268 292 }; 269 293 records.push(record); 294 + fetched_count += 1; 270 295 } 271 296 272 297 cursor = list_response.cursor; 273 298 if cursor.is_none() { 274 299 break; 275 300 } 301 + } 302 + 303 + if skipped_count > 0 { 304 + info!("Skipped {} unchanged records, fetched {} new/changed records for {}/{}", 305 + skipped_count, fetched_count, repo, collection); 276 306 } 277 307 278 308 Ok(records)
+1
frontend/deno.lock
··· 17 17 "jsr:@std/streams@^1.0.10": "1.0.11", 18 18 "npm:@shikijs/core@^3.7.0": "3.11.0", 19 19 "npm:@shikijs/engine-oniguruma@^3.7.0": "3.11.0", 20 + "npm:@shikijs/types@^3.7.0": "3.11.0", 20 21 "npm:@types/node@*": "22.15.15", 21 22 "npm:preact-render-to-string@^6.5.13": "6.5.13_preact@10.27.1", 22 23 "npm:preact@^10.27.1": "10.27.1",
+111 -51
frontend/src/client.ts
··· 1 1 // Generated TypeScript client for AT Protocol records 2 - // Generated at: 2025-08-26 15:32:09 UTC 2 + // Generated at: 2025-08-26 18:27:23 UTC 3 3 // Lexicons: 3 4 4 5 5 /** ··· 91 91 collectionsSynced: string[]; 92 92 reposProcessed: number; 93 93 message: string; 94 + } 95 + 96 + export interface SyncJobResponse { 97 + success: boolean; 98 + jobId?: string; 99 + message: string; 100 + } 101 + 102 + export interface SyncJobResult { 103 + success: boolean; 104 + totalRecords: number; 105 + collectionsSynced: string[]; 106 + reposProcessed: number; 107 + message: string; 108 + } 109 + 110 + export interface JobStatus { 111 + jobId: string; 112 + status: string; 113 + createdAt: string; 114 + startedAt?: string; 115 + completedAt?: string; 116 + result?: SyncJobResult; 117 + error?: string; 118 + retryCount: number; 119 + } 120 + 121 + export interface GetJobStatusParams { 122 + jobId: string; 123 + } 124 + 125 + export interface GetJobHistoryParams { 126 + userDid: string; 127 + sliceUri: string; 128 + limit?: number; 129 + } 130 + 131 + export interface GetJobHistoryResponse { 132 + jobs: JobStatus[]; 94 133 } 95 134 96 135 export interface CollectionStats { ··· 199 238 protected async makeRequest<T = unknown>( 200 239 endpoint: string, 201 240 method?: "GET" | "POST" | "PUT" | "DELETE", 202 - params?: Record<string, unknown> | unknown, 241 + params?: Record<string, unknown> | unknown 203 242 ): Promise<T> { 204 243 return this.makeRequestWithRetry(endpoint, method, params, false); 205 244 } ··· 208 247 endpoint: string, 209 248 method?: "GET" | "POST" | "PUT" | "DELETE", 210 249 params?: Record<string, unknown> | unknown, 211 - isRetry?: boolean, 250 + isRetry?: boolean 212 251 ): Promise<T> { 213 252 isRetry = isRetry ?? false; 214 253 const httpMethod = method || "GET"; ··· 232 271 // For write operations, OAuth tokens are required 233 272 if (httpMethod !== "GET") { 234 273 throw new Error( 235 - `Authentication required: OAuth tokens are invalid or expired. Please log in again.`, 274 + `Authentication required: OAuth tokens are invalid or expired. Please log in again.` 236 275 ); 237 276 } 238 277 ··· 267 306 268 307 // Handle 401 Unauthorized - attempt token refresh and retry once 269 308 if ( 270 - response.status === 401 && !isRetry && this.oauthClient && 309 + response.status === 401 && 310 + !isRetry && 311 + this.oauthClient && 271 312 httpMethod !== "GET" 272 313 ) { 273 314 try { ··· 277 318 return this.makeRequestWithRetry(endpoint, method, params, true); 278 319 } catch (_refreshError) { 279 320 throw new Error( 280 - `Authentication required: OAuth tokens are invalid or expired. Please log in again.`, 321 + `Authentication required: OAuth tokens are invalid or expired. Please log in again.` 281 322 ); 282 323 } 283 324 } 284 325 285 326 throw new Error( 286 - `Request failed: ${response.status} ${response.statusText}`, 327 + `Request failed: ${response.status} ${response.statusText}` 287 328 ); 288 329 } 289 330 290 - return await response.json() as T; 331 + return (await response.json()) as T; 291 332 } 292 333 } 293 334 ··· 300 341 } 301 342 302 343 async listRecords( 303 - params?: ListRecordsParams, 344 + params?: ListRecordsParams 304 345 ): Promise<ListRecordsResponse<SocialSlicesSliceRecord>> { 305 346 const requestParams = { ...params, slice: this.sliceUri }; 306 347 return await this.makeRequest<ListRecordsResponse<SocialSlicesSliceRecord>>( 307 348 "social.slices.slice.list", 308 349 "GET", 309 - requestParams, 350 + requestParams 310 351 ); 311 352 } 312 353 313 354 async getRecord( 314 - params: GetRecordParams, 355 + params: GetRecordParams 315 356 ): Promise<RecordResponse<SocialSlicesSliceRecord>> { 316 357 const requestParams = { ...params, slice: this.sliceUri }; 317 358 return await this.makeRequest<RecordResponse<SocialSlicesSliceRecord>>( 318 359 "social.slices.slice.get", 319 360 "GET", 320 - requestParams, 361 + requestParams 321 362 ); 322 363 } 323 364 324 365 async searchRecords( 325 - params: SearchRecordsParams, 366 + params: SearchRecordsParams 326 367 ): Promise<ListRecordsResponse<SocialSlicesSliceRecord>> { 327 368 const requestParams = { ...params, slice: this.sliceUri }; 328 369 return await this.makeRequest<ListRecordsResponse<SocialSlicesSliceRecord>>( 329 370 "social.slices.slice.searchRecords", 330 371 "GET", 331 - requestParams, 372 + requestParams 332 373 ); 333 374 } 334 375 335 376 async createRecord( 336 377 record: SocialSlicesSliceRecord, 337 - useSelfRkey?: boolean, 378 + useSelfRkey?: boolean 338 379 ): Promise<{ uri: string; cid: string }> { 339 380 const recordWithType = { $type: "social.slices.slice", ...record }; 340 381 const payload = useSelfRkey ··· 343 384 return await this.makeRequest<{ uri: string; cid: string }>( 344 385 "social.slices.slice.create", 345 386 "POST", 346 - payload, 387 + payload 347 388 ); 348 389 } 349 390 350 391 async updateRecord( 351 392 rkey: string, 352 - record: SocialSlicesSliceRecord, 393 + record: SocialSlicesSliceRecord 353 394 ): Promise<{ uri: string; cid: string }> { 354 395 const recordWithType = { $type: "social.slices.slice", ...record }; 355 396 return await this.makeRequest<{ uri: string; cid: string }>( 356 397 "social.slices.slice.update", 357 398 "POST", 358 - { rkey, record: recordWithType }, 399 + { rkey, record: recordWithType } 359 400 ); 360 401 } 361 402 ··· 369 410 return await this.makeRequest<CodegenXrpcResponse>( 370 411 "social.slices.slice.codegen", 371 412 "POST", 372 - request, 373 - ); 374 - } 375 - 376 - async sync(params: BulkSyncParams): Promise<BulkSyncOutput> { 377 - return await this.makeRequest<BulkSyncOutput>( 378 - "social.slices.slice.sync", 379 - "POST", 380 - params, 413 + request 381 414 ); 382 415 } 383 416 ··· 385 418 return await this.makeRequest<SliceStatsOutput>( 386 419 "social.slices.slice.stats", 387 420 "POST", 388 - params, 421 + params 389 422 ); 390 423 } 391 424 ··· 393 426 return await this.makeRequest<SliceRecordsOutput>( 394 427 "social.slices.slice.records", 395 428 "POST", 396 - params, 429 + params 430 + ); 431 + } 432 + 433 + async startSync(params: BulkSyncParams): Promise<SyncJobResponse> { 434 + const requestParams = { ...params, slice: this.sliceUri }; 435 + return await this.makeRequest<SyncJobResponse>( 436 + "social.slices.slice.startSync", 437 + "POST", 438 + requestParams 439 + ); 440 + } 441 + 442 + async getJobStatus(params: GetJobStatusParams): Promise<JobStatus> { 443 + return await this.makeRequest<JobStatus>( 444 + "social.slices.slice.getJobStatus", 445 + "GET", 446 + params 447 + ); 448 + } 449 + 450 + async getJobHistory( 451 + params: GetJobHistoryParams 452 + ): Promise<GetJobHistoryResponse> { 453 + return await this.makeRequest<GetJobHistoryResponse>( 454 + "social.slices.slice.getJobHistory", 455 + "GET", 456 + params 397 457 ); 398 458 } 399 459 } ··· 407 467 } 408 468 409 469 async listRecords( 410 - params?: ListRecordsParams, 470 + params?: ListRecordsParams 411 471 ): Promise<ListRecordsResponse<SocialSlicesLexiconRecord>> { 412 472 const requestParams = { ...params, slice: this.sliceUri }; 413 473 return await this.makeRequest< ··· 416 476 } 417 477 418 478 async getRecord( 419 - params: GetRecordParams, 479 + params: GetRecordParams 420 480 ): Promise<RecordResponse<SocialSlicesLexiconRecord>> { 421 481 const requestParams = { ...params, slice: this.sliceUri }; 422 482 return await this.makeRequest<RecordResponse<SocialSlicesLexiconRecord>>( 423 483 "social.slices.lexicon.get", 424 484 "GET", 425 - requestParams, 485 + requestParams 426 486 ); 427 487 } 428 488 429 489 async searchRecords( 430 - params: SearchRecordsParams, 490 + params: SearchRecordsParams 431 491 ): Promise<ListRecordsResponse<SocialSlicesLexiconRecord>> { 432 492 const requestParams = { ...params, slice: this.sliceUri }; 433 493 return await this.makeRequest< ··· 437 497 438 498 async createRecord( 439 499 record: SocialSlicesLexiconRecord, 440 - useSelfRkey?: boolean, 500 + useSelfRkey?: boolean 441 501 ): Promise<{ uri: string; cid: string }> { 442 502 const recordWithType = { $type: "social.slices.lexicon", ...record }; 443 503 const payload = useSelfRkey ··· 446 506 return await this.makeRequest<{ uri: string; cid: string }>( 447 507 "social.slices.lexicon.create", 448 508 "POST", 449 - payload, 509 + payload 450 510 ); 451 511 } 452 512 453 513 async updateRecord( 454 514 rkey: string, 455 - record: SocialSlicesLexiconRecord, 515 + record: SocialSlicesLexiconRecord 456 516 ): Promise<{ uri: string; cid: string }> { 457 517 const recordWithType = { $type: "social.slices.lexicon", ...record }; 458 518 return await this.makeRequest<{ uri: string; cid: string }>( 459 519 "social.slices.lexicon.update", 460 520 "POST", 461 - { rkey, record: recordWithType }, 521 + { rkey, record: recordWithType } 462 522 ); 463 523 } 464 524 ··· 466 526 return await this.makeRequest<void>( 467 527 "social.slices.lexicon.delete", 468 528 "POST", 469 - { rkey }, 529 + { rkey } 470 530 ); 471 531 } 472 532 } ··· 480 540 } 481 541 482 542 async listRecords( 483 - params?: ListRecordsParams, 543 + params?: ListRecordsParams 484 544 ): Promise<ListRecordsResponse<SocialSlicesActorProfileRecord>> { 485 545 const requestParams = { ...params, slice: this.sliceUri }; 486 546 return await this.makeRequest< ··· 489 549 } 490 550 491 551 async getRecord( 492 - params: GetRecordParams, 552 + params: GetRecordParams 493 553 ): Promise<RecordResponse<SocialSlicesActorProfileRecord>> { 494 554 const requestParams = { ...params, slice: this.sliceUri }; 495 555 return await this.makeRequest< ··· 498 558 } 499 559 500 560 async searchRecords( 501 - params: SearchRecordsParams, 561 + params: SearchRecordsParams 502 562 ): Promise<ListRecordsResponse<SocialSlicesActorProfileRecord>> { 503 563 const requestParams = { ...params, slice: this.sliceUri }; 504 564 return await this.makeRequest< ··· 508 568 509 569 async createRecord( 510 570 record: SocialSlicesActorProfileRecord, 511 - useSelfRkey?: boolean, 571 + useSelfRkey?: boolean 512 572 ): Promise<{ uri: string; cid: string }> { 513 573 const recordWithType = { $type: "social.slices.actor.profile", ...record }; 514 574 const payload = useSelfRkey ··· 517 577 return await this.makeRequest<{ uri: string; cid: string }>( 518 578 "social.slices.actor.profile.create", 519 579 "POST", 520 - payload, 580 + payload 521 581 ); 522 582 } 523 583 524 584 async updateRecord( 525 585 rkey: string, 526 - record: SocialSlicesActorProfileRecord, 586 + record: SocialSlicesActorProfileRecord 527 587 ): Promise<{ uri: string; cid: string }> { 528 588 const recordWithType = { $type: "social.slices.actor.profile", ...record }; 529 589 return await this.makeRequest<{ uri: string; cid: string }>( 530 590 "social.slices.actor.profile.update", 531 591 "POST", 532 - { rkey, record: recordWithType }, 592 + { rkey, record: recordWithType } 533 593 ); 534 594 } 535 595 ··· 537 597 return await this.makeRequest<void>( 538 598 "social.slices.actor.profile.delete", 539 599 "POST", 540 - { rkey }, 600 + { rkey } 541 601 ); 542 602 } 543 603 } ··· 552 612 this.profile = new ProfileActorSlicesSocialClient( 553 613 baseUrl, 554 614 sliceUri, 555 - oauthClient, 615 + oauthClient 556 616 ); 557 617 } 558 618 } ··· 570 630 this.lexicon = new LexiconSlicesSocialClient( 571 631 baseUrl, 572 632 sliceUri, 573 - oauthClient, 633 + oauthClient 574 634 ); 575 635 this.actor = new ActorSlicesSocialClient(baseUrl, sliceUri, oauthClient); 576 636 } ··· 605 665 606 666 private async uploadBlobWithRetry( 607 667 request: UploadBlobRequest, 608 - isRetry?: boolean, 668 + isRetry?: boolean 609 669 ): Promise<UploadBlobResponse> { 610 670 isRetry = isRetry ?? false; 611 671 // Special handling for blob upload with binary data ··· 638 698 return this.uploadBlobWithRetry(request, true); 639 699 } catch (_refreshError) { 640 700 throw new Error( 641 - `Authentication required: OAuth tokens are invalid or expired. Please log in again.`, 701 + `Authentication required: OAuth tokens are invalid or expired. Please log in again.` 642 702 ); 643 703 } 644 704 } 645 705 646 706 throw new Error( 647 - `Blob upload failed: ${response.status} ${response.statusText}`, 707 + `Blob upload failed: ${response.status} ${response.statusText}` 648 708 ); 649 709 } 650 710
+129
frontend/src/components/JobHistory.tsx
··· 1 + interface JobResult { 2 + success: boolean; 3 + totalRecords: number; 4 + collectionsSynced: string[]; 5 + reposProcessed: number; 6 + message: string; 7 + } 8 + 9 + interface JobHistoryItem { 10 + jobId: string; 11 + status: string; 12 + createdAt: string; 13 + completedAt?: string; 14 + result?: JobResult; 15 + error?: string; 16 + } 17 + 18 + interface JobHistoryProps { 19 + jobs: JobHistoryItem[]; 20 + } 21 + 22 + function formatDate(dateString: string): string { 23 + const date = new Date(dateString); 24 + return date.toLocaleDateString() + ' ' + date.toLocaleTimeString([], { 25 + hour: '2-digit', 26 + minute: '2-digit' 27 + }); 28 + } 29 + 30 + function extractDurationFromMessage(message: string): string { 31 + // Extract duration from message like "Sync completed successfully in 424.233625ms" 32 + const match = message.match(/in ([\d.]+)(ms|s|m)/); 33 + if (!match) return 'N/A'; 34 + 35 + const [, value, unit] = match; 36 + const numValue = parseFloat(value); 37 + 38 + if (unit === 'ms') { 39 + if (numValue < 1000) return `${Math.round(numValue)}ms`; 40 + return `${(numValue / 1000).toFixed(1)}s`; 41 + } else if (unit === 's') { 42 + if (numValue < 60) return `${numValue}s`; 43 + const minutes = Math.floor(numValue / 60); 44 + const seconds = Math.round(numValue % 60); 45 + return `${minutes}m ${seconds}s`; 46 + } else if (unit === 'm') { 47 + return `${numValue}m`; 48 + } 49 + 50 + return `${value}${unit}`; 51 + } 52 + 53 + export function JobHistory({ jobs }: JobHistoryProps) { 54 + if (jobs.length === 0) { 55 + return ( 56 + <div className="bg-gray-50 border border-gray-200 rounded-lg p-6 text-center"> 57 + <div className="text-gray-500 text-sm">No sync history yet</div> 58 + <div className="text-gray-400 text-xs mt-1"> 59 + Sync jobs will appear here once completed 60 + </div> 61 + </div> 62 + ); 63 + } 64 + 65 + return ( 66 + <div className="space-y-3"> 67 + <h3 className="text-lg font-medium text-gray-900">Recent Sync History</h3> 68 + <div className="space-y-2"> 69 + {jobs.map((job) => ( 70 + <div 71 + key={job.jobId} 72 + className="bg-white border border-gray-200 rounded-lg p-4 hover:bg-gray-50 transition-colors" 73 + > 74 + <div className="flex items-start justify-between"> 75 + <div className="flex-1"> 76 + <div className="flex items-center gap-2 mb-2"> 77 + {job.result?.success ? ( 78 + <span className="text-green-600 font-medium">✅ Success</span> 79 + ) : ( 80 + <span className="text-red-600 font-medium">❌ Failed</span> 81 + )} 82 + <span className="text-gray-500 text-sm"> 83 + {formatDate(job.createdAt)} 84 + </span> 85 + {job.result?.message && ( 86 + <span className="text-gray-400 text-xs"> 87 + ({extractDurationFromMessage(job.result.message)}) 88 + </span> 89 + )} 90 + </div> 91 + 92 + {job.result && ( 93 + <div className="text-sm text-gray-600 space-y-1"> 94 + <div className="flex gap-4"> 95 + <span> 96 + <strong>{job.result.totalRecords}</strong> records 97 + </span> 98 + <span> 99 + <strong>{job.result.reposProcessed}</strong> repositories 100 + </span> 101 + <span> 102 + Collections: <strong>{job.result.collectionsSynced.join(', ')}</strong> 103 + </span> 104 + </div> 105 + {job.result.message && ( 106 + <div className="text-xs text-gray-500 mt-1"> 107 + {job.result.message} 108 + </div> 109 + )} 110 + </div> 111 + )} 112 + 113 + {job.error && ( 114 + <div className="text-sm text-red-600 mt-1"> 115 + Error: {job.error} 116 + </div> 117 + )} 118 + </div> 119 + 120 + <div className="text-xs text-gray-400 font-mono"> 121 + {job.jobId.split('-')[0]}... 122 + </div> 123 + </div> 124 + </div> 125 + ))} 126 + </div> 127 + </div> 128 + ); 129 + }
+21 -16
frontend/src/components/SyncResult.tsx
··· 2 2 success: boolean; 3 3 message?: string; 4 4 collectionsCount?: number; 5 - reposProcessed?: number; 6 - totalRecords?: number; 7 5 error?: string; 6 + jobId?: string; 8 7 } 9 8 10 9 export function SyncResult({ 11 10 success, 12 11 message, 13 12 collectionsCount = 0, 14 - reposProcessed = 0, 15 - totalRecords = 0, 16 - error 13 + error, 14 + jobId, 17 15 }: SyncResultProps) { 18 16 if (success) { 19 17 return ( 20 - <div className="bg-green-50 border border-green-200 rounded-lg p-4"> 21 - <div className="text-green-800 font-medium">✅ Sync Completed Successfully</div> 22 - <div className="text-green-700 text-sm mt-2"> 23 - <div>• Synced {collectionsCount} collection{collectionsCount === 1 ? '' : 's'}</div> 24 - <div>• Processed {reposProcessed} repositor{reposProcessed === 1 ? 'y' : 'ies'}</div> 25 - <div>• Total records in slice: {totalRecords}</div> 18 + <div className="bg-blue-50 border border-blue-200 rounded-lg p-4"> 19 + <div className="text-blue-800 font-medium">🚀 Sync Job Started</div> 20 + <div className="text-blue-700 text-sm mt-2"> 21 + <div> 22 + • Queued {collectionsCount} collection 23 + {collectionsCount === 1 ? "" : "s"} for syncing 24 + </div> 25 + {jobId && ( 26 + <div> 27 + • Job ID:{" "} 28 + <code className="bg-blue-100 px-1 rounded text-xs">{jobId}</code> 29 + </div> 30 + )} 26 31 </div> 27 - {message && ( 28 - <div className="text-green-600 text-xs mt-2">{message}</div> 29 - )} 32 + {message && <div className="text-blue-600 text-xs mt-2">{message}</div>} 30 33 </div> 31 34 ); 32 35 } ··· 34 37 return ( 35 38 <div className="bg-red-50 border border-red-200 rounded-lg p-4"> 36 39 <div className="text-red-800 font-medium">❌ Sync Failed</div> 37 - <div className="text-red-700 text-sm mt-1">{error || message || "Unknown error occurred"}</div> 40 + <div className="text-red-700 text-sm mt-1"> 41 + {error || message || "Unknown error occurred"} 42 + </div> 38 43 </div> 39 44 ); 40 - } 45 + }
+11
frontend/src/pages/SliceSyncPage.tsx
··· 1 1 import { Layout } from "../components/Layout.tsx"; 2 2 import { SliceTabs } from "../components/SliceTabs.tsx"; 3 + import { JobHistory } from "../components/JobHistory.tsx"; 3 4 4 5 interface SliceSyncPageProps { 5 6 sliceName?: string; ··· 98 99 <div id="sync-result" className="mt-4"> 99 100 {/* Results will be loaded here via htmx */} 100 101 </div> 102 + </div> 103 + 104 + {/* Job History */} 105 + <div 106 + hx-get={`/api/slices/${sliceId}/job-history`} 107 + hx-trigger="load, every 10s" 108 + hx-swap="innerHTML" 109 + className="mb-6" 110 + > 111 + <JobHistory jobs={[]} /> 101 112 </div> 102 113 103 114 <div className="bg-blue-50 border border-blue-200 rounded-lg p-6">
+78 -14
frontend/src/routes/slices.tsx
··· 14 14 import { CodegenResult } from "../components/CodegenResult.tsx"; 15 15 import { SettingsResult } from "../components/SettingsResult.tsx"; 16 16 import { SyncResult } from "../components/SyncResult.tsx"; 17 + import { JobHistory } from "../components/JobHistory.tsx"; 17 18 import { buildAtUri } from "../utils/at-uri.ts"; 18 19 19 20 async function handleCreateSlice(req: Request): Promise<Response> { ··· 468 469 const target = formData.get("format") || "typescript"; 469 470 470 471 // Construct the slice URI 471 - const sliceUri = buildAtUri({ 472 - did: context.currentUser.sub || "unknown", 473 - collection: "social.slices.slice", 474 - rkey: sliceId, 475 - }); 472 + const sliceUri = buildSliceUri(context.currentUser.sub!, sliceId); 473 + 474 + // Use the slice-specific client 475 + const sliceClient = getSliceClient(context, sliceId); 476 476 477 477 // Call the codegen XRPC endpoint 478 - const result = await atprotoClient.social.slices.slice.codegen({ 478 + const result = await sliceClient.social.slices.slice.codegen({ 479 479 target: target as string, 480 480 slice: sliceUri, 481 481 }); ··· 614 614 } 615 615 } 616 616 617 + 618 + async function handleJobHistory( 619 + req: Request, 620 + params?: URLPatternResult 621 + ): Promise<Response> { 622 + const context = await withAuth(req); 623 + const authResponse = requireAuth(context); 624 + if (authResponse) return authResponse; 625 + 626 + const sliceId = params?.pathname.groups.id; 627 + if (!sliceId) { 628 + const html = render( 629 + <div className="text-red-700 text-sm">❌ Invalid slice ID</div> 630 + ); 631 + return new Response(html, { 632 + status: 400, 633 + headers: { "content-type": "text/html" }, 634 + }); 635 + } 636 + 637 + try { 638 + // Construct the slice URI 639 + const sliceUri = buildSliceUri(context.currentUser.sub!, sliceId); 640 + 641 + // Use the slice-specific client 642 + const sliceClient = getSliceClient(context, sliceId); 643 + 644 + // Get job history 645 + const result = await sliceClient.social.slices.slice.getJobHistory({ 646 + userDid: context.currentUser.sub!, 647 + sliceUri: sliceUri, 648 + limit: 10, 649 + }); 650 + 651 + const jobs = result || []; 652 + const html = render( 653 + <JobHistory jobs={jobs} /> 654 + ); 655 + 656 + return new Response(html, { 657 + status: 200, 658 + headers: { "content-type": "text/html" }, 659 + }); 660 + } catch (error) { 661 + console.error('Failed to get job history:', error); 662 + const html = render( 663 + <JobHistory jobs={[]} /> 664 + ); 665 + return new Response(html, { 666 + status: 200, 667 + headers: { "content-type": "text/html" }, 668 + }); 669 + } 670 + } 671 + 617 672 async function handleSliceSync( 618 673 req: Request, 619 674 params?: URLPatternResult ··· 669 724 }); 670 725 } 671 726 672 - // Call the generated client's sync method 673 - const syncResult = await atprotoClient.social.slices.slice.sync({ 727 + // Start sync job using the new job queue 728 + // Use slice-specific client to ensure consistent slice URI 729 + const sliceClient = getSliceClient(context, sliceId); 730 + const syncJobResponse = await sliceClient.social.slices.slice.startSync({ 674 731 collections, 675 732 repos: repos.length > 0 ? repos : undefined, 676 733 }); 677 734 678 735 const html = render( 679 736 <SyncResult 680 - success={syncResult.success} 681 - message={syncResult.message} 682 - collectionsCount={syncResult.collectionsSynced?.length || 0} 683 - reposProcessed={syncResult.reposProcessed || 0} 684 - totalRecords={syncResult.totalRecords || 0} 685 - error={syncResult.success ? undefined : syncResult.message} 737 + success={syncJobResponse.success} 738 + message={syncJobResponse.success 739 + ? `Sync job started successfully. Job ID: ${syncJobResponse.jobId}` 740 + : syncJobResponse.message 741 + } 742 + jobId={syncJobResponse.jobId} 743 + collectionsCount={collections.length} 744 + error={syncJobResponse.success ? undefined : syncJobResponse.message} 686 745 /> 687 746 ); 688 747 ··· 761 820 method: "POST", 762 821 pattern: new URLPattern({ pathname: "/api/slices/:id/sync" }), 763 822 handler: handleSliceSync, 823 + }, 824 + { 825 + method: "GET", 826 + pattern: new URLPattern({ pathname: "/api/slices/:id/job-history" }), 827 + handler: handleJobHistory, 764 828 }, 765 829 ];