tangled.org trending bluesky account

We posting

Changed files
+426 -554
atproto_api
lexicons
src
bot
logic
slingshot
+1 -1
.gitignore
··· 3 3 .env 4 4 .idea 5 5 /wasm/dist 6 - ./stitch_counter.sqlite 6 + stitch_counter.sqlite
+38 -16
Cargo.lock
··· 380 380 "async-trait", 381 381 "atproto_api", 382 382 "atrium-api", 383 + "bsky-sdk", 383 384 "dotenv", 384 385 "env_logger", 385 386 "log", ··· 388 389 "rocketman", 389 390 "serde", 390 391 "serde_json", 392 + "slingshot", 391 393 "sqlx", 392 394 "tokio", 393 395 ] ··· 1713 1715 name = "logic" 1714 1716 version = "0.1.0" 1715 1717 dependencies = [ 1716 - "anyhow", 1717 1718 "async-trait", 1718 - "atproto_api", 1719 1719 "atrium-api", 1720 1720 "atrium-common", 1721 1721 "atrium-identity", ··· 1730 1730 "reqwest", 1731 1731 "rocketman", 1732 1732 "serde", 1733 - "serde_json", 1734 1733 "thiserror 2.0.12", 1735 1734 "tokio", 1736 1735 "tungstenite", ··· 2192 2191 2193 2192 [[package]] 2194 2193 name = "quinn" 2195 - version = "0.11.8" 2194 + version = "0.11.9" 2196 2195 source = "registry+https://github.com/rust-lang/crates.io-index" 2197 - checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" 2196 + checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" 2198 2197 dependencies = [ 2199 2198 "bytes", 2200 2199 "cfg_aliases", ··· 2212 2211 2213 2212 [[package]] 2214 2213 name = "quinn-proto" 2215 - version = "0.11.12" 2214 + version = "0.11.13" 2216 2215 source = "registry+https://github.com/rust-lang/crates.io-index" 2217 - checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" 2216 + checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" 2218 2217 dependencies = [ 2219 2218 "bytes", 2220 2219 "getrandom 0.3.3", ··· 2233 2232 2234 2233 [[package]] 2235 2234 name = "quinn-udp" 2236 - version = "0.5.12" 2235 + version = "0.5.14" 2237 2236 source = "registry+https://github.com/rust-lang/crates.io-index" 2238 - checksum = "ee4e529991f949c5e25755532370b8af5d114acae52326361d68d47af64aa842" 2237 + checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" 2239 2238 dependencies = [ 2240 2239 "cfg_aliases", 2241 2240 "libc", 2242 2241 "once_cell", 2243 2242 "socket2", 2244 2243 "tracing", 2245 - "windows-sys 0.52.0", 2244 + "windows-sys 0.59.0", 2246 2245 ] 2247 2246 2248 2247 [[package]] ··· 2699 2698 2700 2699 [[package]] 2701 2700 name = "serde" 2702 - version = "1.0.219" 2701 + version = "1.0.223" 2703 2702 source = "registry+https://github.com/rust-lang/crates.io-index" 2704 - checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" 2703 + checksum = "a505d71960adde88e293da5cb5eda57093379f64e61cf77bf0e6a63af07a7bac" 2705 2704 dependencies = [ 2705 + "serde_core", 2706 2706 "serde_derive", 2707 2707 ] 2708 2708 ··· 2716 2716 ] 2717 2717 2718 2718 [[package]] 2719 + name = "serde_core" 2720 + version = "1.0.223" 2721 + source = "registry+https://github.com/rust-lang/crates.io-index" 2722 + checksum = "20f57cbd357666aa7b3ac84a90b4ea328f1d4ddb6772b430caa5d9e1309bb9e9" 2723 + dependencies = [ 2724 + "serde_derive", 2725 + ] 2726 + 2727 + [[package]] 2719 2728 name = "serde_derive" 2720 - version = "1.0.219" 2729 + version = "1.0.223" 2721 2730 source = "registry+https://github.com/rust-lang/crates.io-index" 2722 - checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" 2731 + checksum = "3d428d07faf17e306e699ec1e91996e5a165ba5d6bce5b5155173e91a8a01a56" 2723 2732 dependencies = [ 2724 2733 "proc-macro2", 2725 2734 "quote", ··· 2741 2750 2742 2751 [[package]] 2743 2752 name = "serde_json" 2744 - version = "1.0.140" 2753 + version = "1.0.145" 2745 2754 source = "registry+https://github.com/rust-lang/crates.io-index" 2746 - checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" 2755 + checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" 2747 2756 dependencies = [ 2748 2757 "itoa", 2749 2758 "memchr", 2750 2759 "ryu", 2751 2760 "serde", 2761 + "serde_core", 2752 2762 ] 2753 2763 2754 2764 [[package]] ··· 2826 2836 checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" 2827 2837 dependencies = [ 2828 2838 "autocfg", 2839 + ] 2840 + 2841 + [[package]] 2842 + name = "slingshot" 2843 + version = "0.1.0" 2844 + dependencies = [ 2845 + "reqwest", 2846 + "serde", 2847 + "serde_json", 2848 + "thiserror 1.0.69", 2849 + "tokio", 2850 + "url", 2829 2851 ] 2830 2852 2831 2853 [[package]]
+5 -2
Cargo.toml
··· 1 1 [workspace] 2 - members = ['atproto_api', "bot", "logic"] 2 + members = ['atproto_api', "bot", "logic", "slingshot"] 3 3 resolver = "2" 4 4 5 5 [workspace.dependencies] ··· 10 10 atrium-oauth = "0.1.4" 11 11 atrium-xrpc = "0.12.3" 12 12 atrium-xrpc-client = "0.5.14" 13 - bsky-sdk ="0.1.21" 13 + bsky-sdk = "0.1.21" 14 14 clap = { version = "4.0", features = ["derive"] } 15 15 log = "0.4.27" 16 16 logic = { path = "logic" } 17 17 serde = { version = "1.0.219", features = ["derive"] } 18 + serde_json = "1.0.145" 18 19 rocketman = "0.2.5" 19 20 tokio = { version = "1.45.0", features = ["full"] } 20 21 wasm-bindgen = { version = "0.2.100", features = ["default", "serde_json"] } 21 22 web-sys = "0.3.77" 22 23 gloo = "0.11.0" 23 24 gloo-utils = "0.2.0" 25 + reqwest = { version = "0.12", features = ["json", "charset", "rustls-tls", "gzip"] } 26 + slingshot = { path = "slingshot" }
+2
README.md
··· 1 1 # Come back later 2 2 3 + It ain't much, but it's bandiad engineering. 4 + 3 5 ![](./assets/nothing.gif)
+54
atproto_api/lexicons/repo.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "sh.tangled.repo", 4 + "needsCbor": true, 5 + "needsType": true, 6 + "defs": { 7 + "main": { 8 + "type": "record", 9 + "key": "tid", 10 + "record": { 11 + "type": "object", 12 + "required": [ 13 + "name", 14 + "knot", 15 + "owner", 16 + "createdAt" 17 + ], 18 + "properties": { 19 + "name": { 20 + "type": "string", 21 + "description": "name of the repo" 22 + }, 23 + "owner": { 24 + "type": "string", 25 + "format": "did" 26 + }, 27 + "knot": { 28 + "type": "string", 29 + "description": "knot where the repo was created" 30 + }, 31 + "spindle": { 32 + "type": "string", 33 + "description": "CI runner to send jobs to and receive results from" 34 + }, 35 + "description": { 36 + "type": "string", 37 + "minGraphemes": 1, 38 + "maxGraphemes": 140 39 + }, 40 + "source": { 41 + "type": "string", 42 + "format": "uri", 43 + "description": "source of the repo" 44 + }, 45 + "createdAt": { 46 + "type": "string", 47 + "format": "datetime" 48 + } 49 + } 50 + } 51 + } 52 + } 53 + } 54 +
+1 -1
atproto_api/src/lib.rs
··· 1 1 // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. 2 2 pub mod record; 3 - pub mod xyz; 4 3 pub mod sh; 4 + pub mod xyz;
+12
atproto_api/src/record.rs
··· 5 5 pub enum KnownRecord { 6 6 #[serde(rename = "sh.tangled.feed.star")] 7 7 ShTangledFeedStar(Box<crate::sh::tangled::feed::star::Record>), 8 + #[serde(rename = "sh.tangled.repo")] 9 + ShTangledRepo(Box<crate::sh::tangled::repo::Record>), 8 10 #[serde(rename = "xyz.statusphere.status")] 9 11 XyzStatusphereStatus(Box<crate::xyz::statusphere::status::Record>), 10 12 } ··· 16 18 impl From<crate::sh::tangled::feed::star::RecordData> for KnownRecord { 17 19 fn from(record_data: crate::sh::tangled::feed::star::RecordData) -> Self { 18 20 KnownRecord::ShTangledFeedStar(Box::new(record_data.into())) 21 + } 22 + } 23 + impl From<crate::sh::tangled::repo::Record> for KnownRecord { 24 + fn from(record: crate::sh::tangled::repo::Record) -> Self { 25 + KnownRecord::ShTangledRepo(Box::new(record)) 26 + } 27 + } 28 + impl From<crate::sh::tangled::repo::RecordData> for KnownRecord { 29 + fn from(record_data: crate::sh::tangled::repo::RecordData) -> Self { 30 + KnownRecord::ShTangledRepo(Box::new(record_data.into())) 19 31 } 20 32 } 21 33 impl From<crate::xyz::statusphere::status::Record> for KnownRecord {
+7
atproto_api/src/sh/tangled.rs
··· 1 1 // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. 2 2 //!Definitions for the `sh.tangled` namespace. 3 3 pub mod feed; 4 + pub mod repo; 5 + #[derive(Debug)] 6 + pub struct Repo; 7 + impl atrium_api::types::Collection for Repo { 8 + const NSID: &'static str = "sh.tangled.repo"; 9 + type Record = repo::Record; 10 + }
+27
atproto_api/src/sh/tangled/repo.rs
··· 1 + // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. 2 + //!Definitions for the `sh.tangled.repo` namespace. 3 + use atrium_api::types::TryFromUnknown; 4 + #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] 5 + #[serde(rename_all = "camelCase")] 6 + pub struct RecordData { 7 + pub created_at: atrium_api::types::string::Datetime, 8 + #[serde(skip_serializing_if = "core::option::Option::is_none")] 9 + pub description: core::option::Option<String>, 10 + ///knot where the repo was created 11 + pub knot: String, 12 + ///name of the repo 13 + pub name: String, 14 + pub owner: atrium_api::types::string::Did, 15 + ///source of the repo 16 + #[serde(skip_serializing_if = "core::option::Option::is_none")] 17 + pub source: core::option::Option<String>, 18 + ///CI runner to send jobs to and receive results from 19 + #[serde(skip_serializing_if = "core::option::Option::is_none")] 20 + pub spindle: core::option::Option<String>, 21 + } 22 + pub type Record = atrium_api::types::Object<RecordData>; 23 + impl From<atrium_api::types::Unknown> for RecordData { 24 + fn from(value: atrium_api::types::Unknown) -> Self { 25 + Self::try_from_unknown(value).unwrap() 26 + } 27 + }
+4 -3
bot/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0.98" 8 8 atproto_api.workspace = true 9 + bsky-sdk.workspace = true 9 10 rocketman.workspace = true 11 + reqwest.workspace = true 10 12 atrium-api.workspace = true 11 13 log.workspace = true 12 14 env_logger = "0.11.8" ··· 15 17 sqlx = { version = "0.8.2", features = ["runtime-tokio", "macros", "sqlite", "chrono"] } 16 18 async-trait = "0.1.83" 17 19 serde.workspace = true 18 - serde_json = "1.0.132" 20 + serde_json.workspace = true 19 21 logic.workspace = true 20 - reqwest = { version = "0.12", features = ["json", "rustls-tls"] } 21 - 22 + slingshot.workspace = true
+37
bot/src/constellation.rs
··· 1 + use atrium_api::types::Collection; 2 + use reqwest::Url; 3 + use serde::Deserialize; 4 + 5 + #[derive(Deserialize, Debug)] 6 + struct ConstellationCountResponse { 7 + total: i64, 8 + } 9 + 10 + pub async fn fetch_constellation_count(target: &str) -> anyhow::Result<i64> { 11 + let base = std::env::var("CONSTELLATION_BASE_URL") 12 + .unwrap_or_else(|_| "https://constellation.microcosm.blue".to_string()); 13 + let endpoint = if base.ends_with('/') { 14 + format!("{}links/count", base) 15 + } else { 16 + format!("{}/links/count", base) 17 + }; 18 + 19 + // Build URL with query params in one go to avoid non-Send lifetimes 20 + let url = Url::parse_with_params( 21 + &endpoint, 22 + &[ 23 + ("target", target.to_string()), 24 + ("collection", atproto_api::sh::tangled::feed::Star::NSID.to_string()), 25 + ("path", ".subject".to_string()), 26 + ], 27 + )?; 28 + 29 + let resp = reqwest::Client::new() 30 + .get(url) 31 + .send() 32 + .await? 33 + .error_for_status()?; 34 + 35 + let parsed: ConstellationCountResponse = resp.json().await?; 36 + Ok(parsed.total) 37 + }
+128 -50
bot/src/main.rs
··· 1 + mod constellation; 2 + 1 3 extern crate dotenv; 2 4 5 + use crate::constellation::fetch_constellation_count; 6 + use atrium_api::types::Collection; 7 + use atrium_api::types::string::Language; 8 + use bsky_sdk::rich_text::RichText; 3 9 use dotenv::dotenv; 4 - use atrium_api::types::Collection; 5 - use rocketman::{connection::JetstreamConnection, handler, ingestion::LexiconIngestor, options::JetstreamOptions, types::event::Operation}; 10 + use logic::BotApi; 11 + use reqwest::Url; 12 + use rocketman::{ 13 + connection::JetstreamConnection, handler, ingestion::LexiconIngestor, 14 + options::JetstreamOptions, types::event::Operation, 15 + }; 16 + use serde::Deserialize; 17 + use slingshot::Slingshot; 6 18 use sqlx::sqlite::{SqliteConnectOptions, SqlitePool}; 7 19 use std::collections::HashMap; 20 + use std::fmt::format; 8 21 use std::sync::{Arc, Mutex}; 9 22 use std::time::Duration; 10 - use logic::BotApi; 11 - use reqwest::Url; 12 - use serde::Deserialize; 23 + 24 + #[derive(Debug)] 25 + struct Record { 26 + did: String, 27 + collection: String, 28 + rkey: String, 29 + } 30 + 31 + fn parse_uri(uri: &str) -> anyhow::Result<Record> { 32 + let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 33 + if parts.len() != 3 { 34 + return Err(anyhow::anyhow!("Invalid URI format")); 35 + } 36 + Ok(Record { 37 + did: parts[0].to_string(), 38 + collection: parts[1].to_string(), 39 + rkey: parts[2].to_string(), 40 + }) 41 + } 13 42 14 43 #[tokio::main] 15 44 async fn main() -> anyhow::Result<()> { ··· 17 46 dotenv().ok(); 18 47 19 48 // Initialize DB pool and run migrations 20 - let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| "./stitch_counter.sqlite".to_string()); 49 + let database_url = 50 + std::env::var("DATABASE_URL").unwrap_or_else(|_| "./stitch_counter.sqlite".to_string()); 21 51 let options = SqliteConnectOptions::new() 22 52 .filename(database_url) 23 53 .create_if_missing(true) ··· 60 90 let bot_pds_url = std::env::var("BOT_PDS_URL").expect("BOT_PDS_URL must be set"); 61 91 62 92 let bot_api = BotApi::new_logged_in(bot_username, bot_password, bot_pds_url).await?; 63 - 64 - 93 + let sling_shot = Arc::new(Slingshot::new("https://slingshot.microcosm.blue")?); 65 94 // Ingestor for the star collection 66 95 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 67 96 ingestors.insert( 68 97 atproto_api::sh::tangled::feed::Star::NSID.to_string(), 69 - Box::new(StarIngestor { pool: pool.clone(), bot: Arc::new(bot_api), timeframe_hours, star_threshold, post_window_hours }), 98 + Box::new(StarIngestor { 99 + pool: pool.clone(), 100 + bot: Arc::new(bot_api), 101 + sling_shot: sling_shot.clone(), 102 + timeframe_hours, 103 + star_threshold, 104 + post_window_hours, 105 + }), 70 106 ); 71 107 let ingestors = Arc::new(ingestors); 72 108 ··· 76 112 let cursor_clone = cursor.clone(); 77 113 tokio::spawn(async move { 78 114 while let Ok(message) = msg_rx.recv_async().await { 79 - if let Err(e) = handler::handle_message(message, &ingestors_clone, reconnect_tx_clone.clone(), cursor_clone.clone()).await { 115 + if let Err(e) = handler::handle_message( 116 + message, 117 + &ingestors_clone, 118 + reconnect_tx_clone.clone(), 119 + cursor_clone.clone(), 120 + ) 121 + .await 122 + { 80 123 eprintln!("Error processing message: {}", e); 81 124 } 82 125 } 83 126 }); 84 127 85 128 // Connect to jetstream 86 - jetstream.connect(cursor.clone()).await.map_err(|e| anyhow::anyhow!(e.to_string())) 129 + jetstream 130 + .connect(cursor.clone()) 131 + .await 132 + .map_err(|e| anyhow::anyhow!(e.to_string())) 87 133 } 88 134 89 135 struct StarIngestor { 90 136 pool: SqlitePool, 91 137 bot: Arc<BotApi>, 138 + sling_shot: Arc<Slingshot>, 92 139 timeframe_hours: i64, 93 140 star_threshold: i64, 94 141 post_window_hours: i64, 95 142 } 96 143 97 - #[derive(Deserialize, Debug)] 98 - struct ConstellationCountResponse { 99 - total: i64, 100 - } 101 - 102 - async fn fetch_constellation_count(target: &str) -> anyhow::Result<i64> { 103 - let base = std::env::var("CONSTELLATION_BASE_URL") 104 - .unwrap_or_else(|_| "https://constellation.microcosm.blue".to_string()); 105 - let endpoint = if base.ends_with('/') { 106 - format!("{}links/count", base) 107 - } else { 108 - format!("{}/links/count", base) 109 - }; 110 - 111 - // Build URL with query params in one go to avoid non-Send lifetimes 112 - let url = Url::parse_with_params( 113 - &endpoint, 114 - &[ 115 - ("target", target.to_string()), 116 - ("collection", atproto_api::sh::tangled::feed::Star::NSID.to_string()), 117 - ("path", ".subject".to_string()), 118 - ], 119 - )?; 120 - 121 - let resp = reqwest::Client::new() 122 - .get(url) 123 - .send() 124 - .await? 125 - .error_for_status()?; 126 - 127 - let parsed: ConstellationCountResponse = resp.json().await?; 128 - Ok(parsed.total) 129 - } 130 - 131 144 #[async_trait::async_trait] 132 145 impl LexiconIngestor for StarIngestor { 133 - async fn ingest(&self, message: rocketman::types::event::Event<serde_json::Value>) -> anyhow::Result<()> { 146 + /// Asynchronously processes an incoming event message related to "stars" and performs database operations and other actions based on the event type. 147 + /// 148 + /// # Parameters: 149 + /// - `message`: An [`Event`](rocketman::types::event::Event) containing event data, which includes operations (`Create`, `Update`, or `Delete`) and associated metadata. 150 + /// 151 + /// # Returns: 152 + /// - An [`anyhow::Result<()>`](anyhow::Result), which is `Ok(())` if the operation succeeds, or an error if any step fails. 153 + /// 154 + /// # Functionality: 155 + /// 1. **Create or Update Operations**: 156 + /// - If the 157 + async fn ingest( 158 + &self, 159 + message: rocketman::types::event::Event<serde_json::Value>, 160 + ) -> anyhow::Result<()> { 134 161 if let Some(commit) = &message.commit { 135 162 match commit.operation { 136 163 Operation::Create | Operation::Update => { 137 164 if let Some(record) = &commit.record { 138 - let rec = serde_json::from_value::<atproto_api::sh::tangled::feed::star::RecordData>(record.clone())?; 165 + let rec = serde_json::from_value::< 166 + atproto_api::sh::tangled::feed::star::RecordData, 167 + >(record.clone())?; 139 168 // Insert or ignore duplicate per did+rkey 140 169 let result = sqlx::query( 141 170 "INSERT OR IGNORE INTO stars(createdAt, did, rkey, subject) VALUES(?, ?, ?, ?)" ··· 161 190 if count_in_window >= self.star_threshold { 162 191 log::info!( 163 192 "Star threshold met: {} stars in the last {} hour(s) (threshold: {})", 164 - count_in_window, self.timeframe_hours, self.star_threshold 193 + count_in_window, 194 + self.timeframe_hours, 195 + self.star_threshold 165 196 ); 166 197 167 198 // Check if a post was made within the last post_window_hours ··· 182 213 .execute(&self.pool) 183 214 .await?; 184 215 185 - let starts_count = fetch_constellation_count(&rec.subject).await?; 216 + let parsed = parse_uri(&rec.subject)?; 217 + let cloned_repo_owner = parsed.did.clone(); 218 + let stars = fetch_constellation_count(&rec.subject).await?; 219 + let repo_record = &self 220 + .sling_shot 221 + .get_record::<atproto_api::sh::tangled::repo::RecordData>( 222 + parsed.did.as_str(), 223 + parsed.collection.as_str(), 224 + parsed.rkey.as_str(), 225 + ) 226 + .await?; 227 + let repo_name = repo_record.value.name.clone(); 228 + let handle = 229 + &self.bot.get_handle(cloned_repo_owner.clone()).await?; 230 + let tangled_sh_url = format!( 231 + "https://tangled.sh/{cloned_repo_owner}/{repo_name}" 232 + ); 233 + let description = match repo_record.value.description.clone() { 234 + None => "".to_string(), 235 + Some(desc) => format!(" : {desc}"), 236 + }; 186 237 187 - log::info!("Threshold met and allowed to be posted, stars: {starts_count}"); 238 + let rt = RichText::new_with_detect_facets(format!( 239 + "{handle}/{repo_name}{description}\n⭐️ {stars} {tangled_sh_url}" 240 + )) 241 + .await?; 242 + let post = atrium_api::app::bsky::feed::post::RecordData { 243 + created_at: atrium_api::types::string::Datetime::now(), 244 + embed: None, 245 + entities: None, 246 + facets: rt.facets, 247 + labels: None, 248 + //You don't see a thing. No unwraps 249 + langs: Some(vec![Language::new("en".to_string()).unwrap()]), 250 + reply: None, 251 + tags: None, 252 + text: rt.text, 253 + }; 254 + 255 + match self.bot.agent.create_record(post).await { 256 + Ok(err) => { 257 + log::info!("NEW POST MADE") 258 + } 259 + Err(err) => { 260 + log::error!("{err}") 261 + } 262 + } 263 + log::info!( 264 + "Threshold met and allowed to be posted, stars: {stars}" 265 + ); 188 266 } 189 267 } 190 268 }
+1 -4
logic/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 - atproto_api.workspace = true 8 7 atrium-api.workspace = true 9 8 atrium-common.workspace = true 10 9 atrium-identity.workspace = true ··· 12 11 atrium-xrpc-client.workspace = true 13 12 bsky-sdk.workspace = true 14 13 serde.workspace = true 15 - reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] } 14 + reqwest.workspace = true 16 15 thiserror = "2.0.12" 17 - anyhow = "1.0.98" 18 - serde_json = "1.0.140" 19 16 20 17 [target.'cfg(not(target_arch = "wasm32"))'.dependencies] 21 18 flume = "0.11.1"
-106
logic/src/emoji.rs
··· 1 - use serde::Serialize; 2 - 3 - #[derive(Clone, Debug, Serialize)] 4 - #[cfg_attr(feature = "cli", derive(clap::ValueEnum))] 5 - #[serde(rename_all = "kebab-case")] 6 - pub enum Emoji { 7 - ThumbsUp, 8 - ThumbsDown, 9 - BlueHeart, 10 - PleadingFace, 11 - AnguishedFace, 12 - AngryFace, 13 - UpsideDownFace, 14 - WinkingFace, 15 - SunglassesFace, 16 - GlassesFace, 17 - RaisedEyebrowFace, 18 - PartyingFace, 19 - CryingFace, 20 - SteamFromNose, 21 - ExplodingHead, 22 - Saluting, 23 - Skull, 24 - RaisedFist, 25 - RockOn, 26 - Eyes, 27 - Brain, 28 - WomanTechnologist, 29 - PersonTechnologist, 30 - Ninja, 31 - Troll, 32 - Butterfly, 33 - Rocket, 34 - Potato, 35 - Crab, 36 - } 37 - 38 - impl Emoji { 39 - pub fn to_emoji(&self) -> &'static str { 40 - match self { 41 - Emoji::ThumbsUp => "👍", 42 - Emoji::ThumbsDown => "👎", 43 - Emoji::BlueHeart => "💙", 44 - Emoji::PleadingFace => "🥹", 45 - Emoji::AnguishedFace => "😧", 46 - Emoji::AngryFace => "😤", 47 - Emoji::UpsideDownFace => "🙃", 48 - Emoji::WinkingFace => "😉", 49 - Emoji::SunglassesFace => "😎", 50 - Emoji::GlassesFace => "🤓", 51 - Emoji::RaisedEyebrowFace => "🤨", 52 - Emoji::PartyingFace => "🥳", 53 - Emoji::CryingFace => "😭", 54 - Emoji::SteamFromNose => "😤", 55 - Emoji::ExplodingHead => "🤯", 56 - Emoji::Saluting => "🫡", 57 - Emoji::Skull => "💀", 58 - Emoji::RaisedFist => "✊", 59 - Emoji::RockOn => "🤘", 60 - Emoji::Eyes => "👀", 61 - Emoji::Brain => "🧠", 62 - Emoji::WomanTechnologist => "👩‍💻", 63 - Emoji::PersonTechnologist => "🧑‍💻", 64 - Emoji::Ninja => "🥷", 65 - Emoji::Troll => "🧌", 66 - Emoji::Butterfly => "🦋", 67 - Emoji::Rocket => "🚀", 68 - Emoji::Potato => "🥔", 69 - Emoji::Crab => "🦀", 70 - } 71 - } 72 - } 73 - 74 - pub fn emojis() -> Vec<Emoji> { 75 - vec![ 76 - Emoji::ThumbsUp, 77 - Emoji::ThumbsDown, 78 - Emoji::BlueHeart, 79 - Emoji::PleadingFace, 80 - Emoji::AnguishedFace, 81 - Emoji::AngryFace, 82 - Emoji::UpsideDownFace, 83 - Emoji::WinkingFace, 84 - Emoji::SunglassesFace, 85 - Emoji::GlassesFace, 86 - Emoji::RaisedEyebrowFace, 87 - Emoji::PartyingFace, 88 - Emoji::CryingFace, 89 - Emoji::SteamFromNose, 90 - Emoji::ExplodingHead, 91 - Emoji::Saluting, 92 - Emoji::Skull, 93 - Emoji::RaisedFist, 94 - Emoji::RockOn, 95 - Emoji::Eyes, 96 - Emoji::Brain, 97 - Emoji::WomanTechnologist, 98 - Emoji::PersonTechnologist, 99 - Emoji::Ninja, 100 - Emoji::Troll, 101 - Emoji::Butterfly, 102 - Emoji::Rocket, 103 - Emoji::Potato, 104 - Emoji::Crab, 105 - ] 106 - }
+18 -371
logic/src/lib.rs
··· 1 - pub mod emoji; 2 1 mod resolver; 3 - 4 - use crate::Error::RequestError; 5 2 use crate::resolver::ApiDNSTxtResolver; 6 - #[cfg(not(target_arch = "wasm32"))] 7 - use async_trait::async_trait; 8 - use atproto_api::record::KnownRecord; 9 3 use atrium_api::agent::atp_agent::store::MemorySessionStore; 10 4 use atrium_api::agent::atp_agent::{AtpSession, CredentialSession}; 11 5 use atrium_api::agent::{Agent, Configure}; ··· 18 12 use atrium_identity::handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig}; 19 13 use atrium_oauth::DefaultHttpClient; 20 14 use atrium_xrpc_client::reqwest::ReqwestClient; 21 - #[cfg(not(target_arch = "wasm32"))] 22 - use rocketman::{ 23 - connection::JetstreamConnection, handler, ingestion::LexiconIngestor, 24 - options::JetstreamOptions, types::event::Operation, 25 - }; 15 + use bsky_sdk::BskyAgent; 26 16 use serde::{Deserialize, Serialize}; 27 - #[cfg(not(target_arch = "wasm32"))] 28 - use std::collections::HashMap; 29 17 use std::sync::Arc; 30 - #[cfg(not(target_arch = "wasm32"))] 31 - use std::sync::Mutex; 32 - use bsky_sdk::BskyAgent; 33 18 use thiserror::Error; 34 - #[cfg(not(target_arch = "wasm32"))] 35 - use tungstenite::Message; 36 - #[cfg(target_arch = "wasm32")] 37 - use wasm_bindgen::{JsCast, closure::Closure}; 38 - #[cfg(target_arch = "wasm32")] 39 - use web_sys::{ErrorEvent, MessageEvent, WebSocket}; 40 19 41 20 #[derive(Debug, Error)] 42 21 pub enum Error { ··· 58 37 59 38 #[derive(Clone)] 60 39 pub struct BotApi { 61 - agent: Arc<BskyAgent>, 40 + pub agent: Arc<BskyAgent>, 62 41 handle_resolver: Arc<AtprotoHandleResolver<ApiDNSTxtResolver, DefaultHttpClient>>, 63 42 did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>, 64 43 authenticated: bool, 65 44 } 66 45 67 - fn get_new_session(pds_url: String) -> CredentialSession<MemoryStore<(), AtpSession>, ReqwestClient> { 46 + fn get_new_session( 47 + pds_url: String, 48 + ) -> CredentialSession<MemoryStore<(), AtpSession>, ReqwestClient> { 68 49 CredentialSession::new( 69 50 ReqwestClient::new(pds_url.as_str()), 70 51 MemorySessionStore::default(), ··· 86 67 } 87 68 88 69 impl BotApi { 89 - /// Creates a new StatusphereApi to make unauthenticated calls to atproto repos 90 - // pub fn new() -> Self { 91 - // let session = get_new_session(); 92 - // let agent = Agent::new(session); 93 - // let http_client = Arc::new(DefaultHttpClient::default()); 94 - // let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 95 - // dns_txt_resolver: ApiDNSTxtResolver::default(), 96 - // http_client: Arc::new(DefaultHttpClient::default()), 97 - // }); 98 - // let did_resolver = CommonDidResolver::new(CommonDidResolverConfig { 99 - // plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(), 100 - // http_client, 101 - // }); 102 - // Self { 103 - // agent: Arc::new(agent), 104 - // handle_resolver: Arc::new(handle_resolver), 105 - // did_resolver: Arc::new(did_resolver), 106 - // authenticated: false, 107 - // } 108 - // } 109 - 110 - /// Creates a new StatusphereAPi to make authenticated requests to your atproto repo 111 - pub async fn new_logged_in(handle: String, password: String, pds_url: String) -> Result<Self, Error> { 70 + pub async fn new_logged_in( 71 + handle: String, 72 + password: String, 73 + pds_url: String, 74 + ) -> Result<Self, Error> { 112 75 // let session = get_new_session(pds_url); 113 76 // if let Err(error) = session.login(&handle, &password).await { 114 77 // return Err(Error::LoginError(error.to_string())); 115 78 // } 116 79 117 - let agent = BskyAgent::builder().build().await.expect("Failed to build agent"); 80 + let agent = BskyAgent::builder() 81 + .build() 82 + .await 83 + .expect("Failed to build agent"); 118 84 agent.configure_endpoint(pds_url); 119 85 // let agent = Agent::new(session); 120 - agent.login(handle, password).await.expect("Failed to login"); 86 + agent 87 + .login(handle, password) 88 + .await 89 + .expect("Failed to login"); 121 90 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 122 91 dns_txt_resolver: ApiDNSTxtResolver::default(), 123 92 http_client: Arc::new(DefaultHttpClient::default()), ··· 136 105 }) 137 106 } 138 107 139 - /// Updates a user's Statusphere status on their atproto repo 140 - pub async fn new_status(&self, status: &str) -> Result<(), Error> { 141 - if !self.authenticated { 142 - return Err(Error::NeedsAuthentication); 143 - } 144 - 145 - let status: KnownRecord = atproto_api::xyz::statusphere::status::RecordData { 146 - created_at: Datetime::now(), 147 - status: status.to_string(), 148 - } 149 - .into(); 150 - 151 - let did = self.agent.did().await.unwrap(); 152 - match self 153 - .agent 154 - .api 155 - .com 156 - .atproto 157 - .repo 158 - .create_record( 159 - atrium_api::com::atproto::repo::create_record::InputData { 160 - collection: atproto_api::xyz::statusphere::Status::NSID.parse().unwrap(), 161 - record: status.into(), 162 - repo: did.clone().parse().unwrap(), 163 - rkey: None, 164 - swap_commit: None, 165 - validate: None, 166 - } 167 - .into(), 168 - ) 169 - .await 170 - { 171 - Ok(_) => Ok(()), 172 - Err(err) => Err(Error::WriteError(err.to_string())), 173 - } 174 - } 175 - 176 - /// Gets the users latest Statusphere status if they do not have one returns None 177 - pub async fn get_latest_status( 178 - &self, 179 - handle: String, 180 - ) -> Result<Option<atproto_api::xyz::statusphere::status::RecordData>, Error> { 181 - let handle = match Handle::new(handle) { 182 - Ok(handle) => handle, 183 - Err(err) => Err(Error::ParseError(err.to_string()))?, 184 - }; 185 - 186 - let did = match self.handle_resolver.resolve(&handle).await { 187 - Ok(did) => did, 188 - Err(err) => { 189 - return Err(RequestError(format!( 190 - "Could not resolve the users handle: {:?}", 191 - err.to_string() 192 - )))?; 193 - } 194 - }; 195 - 196 - let pds_url = match self.did_resolver.resolve(&did).await { 197 - Ok(did_doc) => get_pds_from_doc(did_doc)?, 198 - Err(err) => Err(RequestError(err.to_string()))?, 199 - }; 200 - 201 - //changing the agent to call the users pds 202 - self.agent.configure_endpoint(pds_url); 203 - 204 - let result = self 205 - .agent 206 - .api 207 - .com 208 - .atproto 209 - .repo 210 - .list_records( 211 - atrium_api::com::atproto::repo::list_records::ParametersData { 212 - collection: atproto_api::xyz::statusphere::Status::NSID.parse().unwrap(), 213 - cursor: None, 214 - limit: Some(LimitedNonZeroU8::<100>::try_from(10_u8).unwrap()), 215 - repo: handle.parse().unwrap(), 216 - reverse: None, 217 - } 218 - .into(), 219 - ) 220 - .await 221 - .map_err(|e| Error::RequestError(e.to_string()))?; 222 - 223 - if let Some(record) = result.records.first() { 224 - let status: atproto_api::xyz::statusphere::status::RecordData = 225 - atproto_api::xyz::statusphere::status::RecordData::from(record.value.clone()); 226 - return Ok(Some(status)); 227 - } 228 - Ok(None) 229 - } 230 - 231 108 ///Takes a users handle like @baileytownsend.dev and resolves it ot the did 232 109 pub async fn resolve_handle(&self, handle: String) -> Result<Did, Error> { 233 110 let handle = Handle::new(handle).map_err(|e| Error::ParseError(e.to_string()))?; ··· 263 140 } 264 141 } 265 142 } 266 - 267 - /// A Jetstream listener using Rocketman 268 - #[cfg(not(target_arch = "wasm32"))] 269 - pub struct StatusSphereListener { 270 - pub cursor: Arc<Mutex<Option<u64>>>, 271 - pub ingestors: Arc<HashMap<String, Box<dyn LexiconIngestor + Send + Sync>>>, 272 - jetstream_connection: JetstreamConnection, 273 - pub msg_rx: flume::Receiver<Message>, 274 - pub reconnect_tx: flume::Sender<()>, 275 - } 276 - 277 - /// The data we get back from handlers of the Jetstream. 278 - #[derive(Serialize, Deserialize)] 279 - pub struct HydratedStatus { 280 - ///Handle may not always be hydrated depending on the target. 281 - pub handle: Option<String>, 282 - pub did: String, 283 - pub status: atproto_api::xyz::statusphere::status::RecordData, 284 - } 285 - 286 - /// Trait to share some similar types between jetstream implementations 287 - pub trait StatusphereIngesterTrait: Send + Sync { 288 - fn ingest(&self, status: HydratedStatus) -> Result<(), Error>; 289 - } 290 - 291 - /// our non wasm32 Injester 292 - #[cfg(not(target_arch = "wasm32"))] 293 - pub struct StatusphereIngester<F> 294 - where 295 - F: StatusphereIngesterTrait, 296 - { 297 - status_sphere_agent: BotApi, 298 - handler: F, 299 - } 300 - 301 - // #[cfg(not(target_arch = "wasm32"))] 302 - // impl StatusSphereListener { 303 - // pub fn new<F>(handler: F) -> Self 304 - // where 305 - // F: StatusphereIngesterTrait + Send + Sync + 'static, 306 - // { 307 - // // init the builder 308 - // let opts = JetstreamOptions::builder() 309 - // // your EXACT nsids 310 - // .wanted_collections(vec![ 311 - // atproto_api::xyz::statusphere::Status::NSID.to_string(), 312 - // ]) 313 - // .build(); 314 - // // create the jetstream connector 315 - // let jetstream = JetstreamConnection::new(opts); 316 - // 317 - // // tracks the last message we've processed 318 - // let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 319 - // 320 - // // get channels 321 - // let msg_rx = jetstream.get_msg_rx(); 322 - // let reconnect_tx = jetstream.get_reconnect_tx(); 323 - // 324 - // // create your ingestors 325 - // let mut injestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 326 - // 327 - // injestors.insert( 328 - // atproto_api::xyz::statusphere::Status::NSID.to_string(), 329 - // Box::new(StatusphereIngester { 330 - // status_sphere_agent: BotApi::new(), 331 - // handler, 332 - // }), 333 - // ); 334 - // Self { 335 - // cursor, 336 - // ingestors: Arc::new(injestors), 337 - // jetstream_connection: jetstream, 338 - // msg_rx, 339 - // reconnect_tx, 340 - // } 341 - // } 342 - // 343 - // /// Start listening to the jetstream for new statusphere updates 344 - // pub async fn listen(&self) -> Result<(), Error> { 345 - // let msg_rx = self.msg_rx.clone(); 346 - // let ingestors = Arc::clone(&self.ingestors); 347 - // let reconnect_tx = self.reconnect_tx.clone(); 348 - // let cursor = self.cursor.clone(); 349 - // 350 - // tokio::spawn(async move { 351 - // while let Ok(message) = msg_rx.recv_async().await { 352 - // if let Err(e) = handler::handle_message( 353 - // message, 354 - // &ingestors, 355 - // reconnect_tx.clone(), 356 - // cursor.clone(), 357 - // ) 358 - // .await 359 - // { 360 - // //Just error internally since this is a template 361 - // eprintln!("Error processing message: {}", e); 362 - // }; 363 - // } 364 - // }); 365 - // 366 - // // connect to jetstream 367 - // // retries internally, but may fail if there is an extreme error. 368 - // self.jetstream_connection 369 - // .connect(self.cursor.clone()) 370 - // .await 371 - // .map_err(|e| Error::GeneralError(e.to_string())) 372 - // } 373 - // } 374 - // 375 - // /// Shows a Jestream Listener for desktop or server side 376 - // #[cfg(not(target_arch = "wasm32"))] 377 - // #[async_trait] 378 - // impl<F: StatusphereIngesterTrait + Send + Sync> LexiconIngestor for StatusphereIngester<F> { 379 - // async fn ingest( 380 - // &self, 381 - // message: rocketman::types::event::Event<serde_json::Value>, 382 - // ) -> anyhow::Result<()> { 383 - // if let Some(commit) = &message.commit { 384 - // let handle = self 385 - // .status_sphere_agent 386 - // .get_handle(message.did.clone()) 387 - // .await?; 388 - // 389 - // match commit.operation { 390 - // Operation::Create | Operation::Update => { 391 - // if let Some(record) = &commit.record { 392 - // let status_at_proto_record = serde_json::from_value::< 393 - // atproto_api::xyz::statusphere::status::RecordData, 394 - // >(record.clone())?; 395 - // 396 - // if let Some(ref _cid) = commit.cid { 397 - // self.handler.ingest(HydratedStatus { 398 - // handle: Some(handle), 399 - // did: message.did, 400 - // status: status_at_proto_record.clone(), 401 - // })?; 402 - // } 403 - // Ok::<(), anyhow::Error>(()) 404 - // } else { 405 - // Ok::<(), anyhow::Error>(()) 406 - // } 407 - // } 408 - // Operation::Delete => Ok::<(), anyhow::Error>(()), 409 - // } 410 - // } else { 411 - // Ok(()) 412 - // } 413 - // } 414 - // } 415 - 416 - #[cfg(target_arch = "wasm32")] 417 - /// Just a simple jetstream listener with a lot less features than Rocketman but just to show it can happen and sharing code 418 - pub fn listen<F>(on_messa_handler: F) -> Result<(), Error> 419 - where 420 - F: StatusphereIngesterTrait + 'static, 421 - { 422 - // Connect to an echo server 423 - let ws = WebSocket::new( 424 - "wss://jetstream1.us-east.bsky.network/subscribe?wantedCollections=xyz.statusphere.status", 425 - ) 426 - //HACK prob dont want that unwrap either, but this is just a template 427 - .map_err(|e| Error::GeneralError(String::try_from(e).unwrap()))?; 428 - // For small binary messages, like CBOR, Arraybuffer is more efficient than Blob handling 429 - ws.set_binary_type(web_sys::BinaryType::Arraybuffer); 430 - // create callback 431 - let on_message = Closure::<dyn FnMut(_)>::new(move |e: MessageEvent| { 432 - //We're just handling txt to make it easy 433 - if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() { 434 - //HACK .into_Serde() on jsvalue of e.data() was not working for some reason 435 - let txt_str = String::from(txt); 436 - let envelope: WasmEvent<atproto_api::xyz::statusphere::status::RecordData> = 437 - serde_json::from_str(&txt_str).unwrap(); 438 - if let Some(commit) = envelope.commit { 439 - if let Some(record) = commit.record { 440 - let _ = on_message_handler.ingest(HydratedStatus { 441 - handle: None, 442 - did: envelope.did, 443 - status: record, 444 - }); 445 - } 446 - } 447 - } 448 - }); 449 - 450 - // set message event handler on WebSocket 451 - ws.set_onmessage(Some(on_message.as_ref().unchecked_ref())); 452 - // forget the callback to keep it alive 453 - on_message.forget(); 454 - 455 - let onerror_callback = Closure::<dyn FnMut(_)>::new(move |_e: ErrorEvent| { 456 - //HACK would put an actual error handler here 457 - }); 458 - ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref())); 459 - onerror_callback.forget(); 460 - let onopen_callback = Closure::<dyn FnMut()>::new(move || { 461 - //HACK 462 - }); 463 - ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref())); 464 - onopen_callback.forget(); 465 - Ok(()) 466 - } 467 - 468 - ///Shamelessly taken from rocketman. reimplmenting because Rocketman is not wasm compatible 469 - /// Ideally you would have the same types between packages but just a proof of concept 470 - /// https://github.com/espeon/cadet/blob/f034b19b73df4ac97e3ae54162d48883f81ff168/rocketman/src/types/event.rs#L14 471 - #[derive(Debug, Serialize, Deserialize)] 472 - #[serde(rename_all = "snake_case")] 473 - struct WasmEvent<T> { 474 - pub did: String, 475 - pub commit: Option<WasmCommit<T>>, 476 - } 477 - 478 - #[derive(Debug, Serialize, Deserialize)] 479 - #[serde(rename_all = "camelCase")] 480 - struct WasmCommit<T> { 481 - pub rev: String, 482 - pub operation: WasmOperation, 483 - pub collection: String, 484 - pub rkey: String, 485 - pub record: Option<T>, 486 - pub cid: Option<String>, 487 - } 488 - // 489 - #[derive(Debug, Serialize, Deserialize)] 490 - #[serde(rename_all = "lowercase")] 491 - enum WasmOperation { 492 - Create, 493 - Update, 494 - Delete, 495 - }
+15
slingshot/Cargo.toml
··· 1 + [package] 2 + name = "slingshot" 3 + version = "0.1.0" 4 + edition = "2021" 5 + license = "MIT OR Apache-2.0" 6 + 7 + [dependencies] 8 + serde.workspace = true 9 + reqwest.workspace = true 10 + serde_json.workspace = true 11 + thiserror = "1" 12 + url = "2" 13 + 14 + [dev-dependencies] 15 + tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
+76
slingshot/src/lib.rs
··· 1 + use reqwest::Url; 2 + use serde::de::DeserializeOwned; 3 + use serde::Deserialize; 4 + use thiserror::Error; 5 + 6 + /// Error type for the slingshot crate 7 + #[derive(Debug, Error)] 8 + pub enum Error { 9 + #[error("invalid base url: {0}")] 10 + Url(#[from] url::ParseError), 11 + #[error("http error: {0}")] 12 + Http(#[from] reqwest::Error), 13 + #[error("request failed with status {status}: {body}")] 14 + Status { status: u16, body: String }, 15 + #[error("json error: {0}")] 16 + Json(#[from] serde_json::Error), 17 + } 18 + 19 + /// Simple HTTP client for AT Protocol endpoints 20 + /// 21 + /// Example endpoint implemented: com.atproto.repo.getRecord 22 + pub struct Slingshot { 23 + base: Url, 24 + client: reqwest::Client, 25 + } 26 + 27 + #[derive(Debug, Deserialize, Clone)] 28 + pub struct RecordRapper<T> { 29 + pub cid: String, 30 + pub uri: String, 31 + pub value: T, 32 + } 33 + 34 + impl Slingshot { 35 + /// Create a new Slingshot client with the given base URL. 36 + /// Example base URL: https://slingshot.microcosm.blue/ 37 + pub fn new(base_url: impl AsRef<str>) -> Result<Self, Error> { 38 + let mut base = Url::parse(base_url.as_ref())?; 39 + // Ensure trailing slash so join works consistently 40 + if !base.as_str().ends_with('/') { 41 + base = Url::parse(&(base.as_str().to_owned() + "/"))?; 42 + } 43 + let client = reqwest::Client::new(); 44 + Ok(Self { base, client }) 45 + } 46 + 47 + /// Call com.atproto.repo.getRecord 48 + /// 49 + /// Maps to: 50 + /// GET /xrpc/com.atproto.repo.getRecord?repo=<did>&collection=<collection>&rkey=<recordKey> 51 + pub async fn get_record<T: DeserializeOwned>( 52 + &self, 53 + did: &str, 54 + collection: &str, 55 + record_key: &str, 56 + ) -> Result<RecordRapper<T>, Error> { 57 + let mut url = self.base.join("/xrpc/com.atproto.repo.getRecord")?; 58 + url.query_pairs_mut() 59 + .append_pair("repo", did) 60 + .append_pair("collection", collection) 61 + .append_pair("rkey", record_key); 62 + 63 + let resp = self.client.get(url).send().await?; 64 + let status = resp.status(); 65 + let bytes = resp.bytes().await?; 66 + if !status.is_success() { 67 + let body = String::from_utf8_lossy(&bytes).to_string(); 68 + return Err(Error::Status { 69 + status: status.as_u16(), 70 + body, 71 + }); 72 + } 73 + let value: RecordRapper<T> = serde_json::from_slice(&bytes)?; 74 + Ok(value) 75 + } 76 + }