announcing good-first-issue tags added on @tangled.sh (not affiliated with tangled!)
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

at f565f2e017956e1db231fbe3e3faaca50899ba0f 387 lines 12 kB view raw
1use clap::Parser; 2use jacquard::{ 3 api::{ 4 app_bsky::feed::post::Post, 5 app_bsky::richtext::facet::{ByteSlice, Facet}, 6 com_atproto::repo::create_record::CreateRecord, 7 com_atproto::server::create_session::CreateSession, 8 }, 9 client::{BasicClient, Session}, 10 types::{ 11 collection::Collection, datetime::Datetime, ident::AtIdentifier, language::Language, 12 string::AtUri, value::Data, 13 }, 14}; 15use jetstream::{ 16 JetstreamCompression, JetstreamConfig, JetstreamConnector, 17 events::{CommitOp, Cursor, EventKind, JetstreamEvent}, 18 exports::Nsid, 19}; 20use url::Url; 21 22use serde::Deserialize; 23use std::time::Duration; 24 25type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 26 27#[derive(Debug, Parser)] 28#[command(version, about, long_about = None)] 29struct Args { 30 /// pds of the bot user 31 #[arg(short, long, env = "BOT_PDS")] 32 pds: Url, 33 /// handle or did of the bot user 34 #[arg(short, long, env = "BOT_HANDLE")] 35 identifier: String, 36 /// app password for the bot user 37 #[arg(short, long, env = "BOT_APP_PASSWORD")] 38 app_password: String, 39 /// lightweight firehose 40 #[arg(short, long, env = "BOT_JETSTREAM_URL")] 41 #[clap(default_value = "wss://jetstream1.us-east.fire.hose.cam/subscribe")] 42 jetstream_url: Url, 43 /// optional: we can pick up from a past jetstream cursor 44 /// 45 /// the default is to just live-tail 46 /// 47 /// warning: setting this can lead to rapid bot posting 48 #[arg(long)] 49 jetstream_cursor: Option<u64>, 50 /// don't actually post 51 #[arg(long, action)] 52 dry_run: bool, 53} 54 55struct IssueDetails { 56 repo_full_name: String, 57 repo_url: String, 58 title: String, 59 issues_url: String, 60} 61 62/// com.bad-example.identity.resolveMiniDoc bit we care about 63#[derive(Deserialize)] 64struct MiniDocResponse { 65 handle: String, 66} 67 68/// com.atproto.repo.getRecord wraps the record in a `value` key 69#[derive(Deserialize)] 70struct GetRecordResonse<T> { 71 value: T, 72} 73 74/// part of CreateLabelRecord: key is the label reference (ie for "good-first-issue") 75#[derive(Deserialize)] 76struct AddLabel { 77 key: String, 78} 79 80/// tangled's record for adding labels to an issue 81#[derive(Deserialize)] 82struct CreateLabelRecord { 83 add: Vec<AddLabel>, 84 subject: String, 85} 86 87/// tangled issue record 88#[derive(Deserialize)] 89struct IssueRecord { 90 title: String, 91 repo: String, 92} 93 94/// tangled repo record 95#[derive(Deserialize)] 96struct RepoRecord { 97 name: String, 98} 99 100/// get some atproto record content (from slingshot) 101async fn get_record<T: for<'a> Deserialize<'a>>( 102 client: &reqwest::Client, 103 at_uri: &str, 104) -> Result<T> { 105 let mut url: Url = "https://slingshot.microcosm.blue".parse()?; 106 url.set_path("/xrpc/com.bad-example.repo.getUriRecord"); 107 url.query_pairs_mut().append_pair("at_uri", at_uri); 108 let GetRecordResonse { value } = client 109 .get(url) 110 .send() 111 .await? 112 .error_for_status()? 113 .json() 114 .await?; 115 Ok(value) 116} 117 118/// try to resolve a bidirectionally verified handle from an identifier (via slingshot) 119async fn get_handle(client: &reqwest::Client, identifier: &str) -> Result<Option<String>> { 120 let mut url: Url = "https://slingshot.microcosm.blue".parse()?; 121 url.set_path("/xrpc/com.bad-example.identity.resolveMiniDoc"); 122 url.query_pairs_mut().append_pair("identifier", identifier); 123 let MiniDocResponse { handle } = client 124 .get(url) 125 .send() 126 .await? 127 .error_for_status()? 128 .json() 129 .await?; 130 if handle == "handle.invalid" { 131 Ok(None) 132 } else { 133 Ok(Some(handle)) 134 } 135} 136 137fn event_to_create_label<T: for<'a> Deserialize<'a>>(event: JetstreamEvent) -> Result<T> { 138 if event.kind != EventKind::Commit { 139 return Err("not a commit".into()); 140 } 141 let commit = event.commit.ok_or("commit event missing commit data")?; 142 if commit.operation != CommitOp::Create { 143 return Err("not a create event".into()); 144 } 145 146 let raw = commit.record.ok_or("commit missing record")?; 147 148 // todo: delete post if label is removed 149 // delete sample: at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.label.op/3m2jvx4c6wf22 150 // tldr: has a "delete" array just like "add" on the same op collection 151 Ok(serde_json::from_str(raw.get())?) 152} 153 154async fn extract_issue_info( 155 client: &reqwest::Client, 156 adds: Vec<AddLabel>, 157 subject: String, 158) -> Result<IssueDetails> { 159 let mut added_good_first_issue = false; 160 for added in adds { 161 if added.key 162 == "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue" 163 { 164 log::info!("found a good first issue label!!"); 165 added_good_first_issue = true; 166 break; // inner 167 } 168 log::debug!("found a label but it wasn't good-first-issue, ignoring..."); 169 } 170 if !added_good_first_issue { 171 return Err("good-first-issue label not found in added labels".into()); 172 } 173 174 let IssueRecord { title, repo } = match get_record(client, &subject).await { 175 Ok(m) => m, 176 Err(e) => return Err(format!("failed to get issue record: {e} for {subject}").into()), 177 }; 178 179 let Ok(repo_uri) = AtUri::new(&repo) else { 180 return Err("failed to parse repo to aturi for {subject}".into()); 181 }; 182 183 let RepoRecord { name: repo_name } = match get_record(client, &repo).await { 184 Ok(m) => m, 185 Err(e) => return Err(format!("failed to get repo record: {e} for {subject}").into()), 186 }; 187 188 let nice_tangled_repo_id = match repo_uri.authority() { 189 AtIdentifier::Handle(h) => format!("@{h}"), 190 AtIdentifier::Did(did) => match get_handle(client, did.as_str()).await { 191 Err(e) => { 192 return Err(format!( 193 "failed to get mini doc from repo identifier: {e} for {subject}" 194 ) 195 .into()); 196 } 197 Ok(None) => did.to_string(), 198 Ok(Some(h)) => format!("@{h}"), 199 }, 200 }; 201 202 let repo_full_name = format!("{nice_tangled_repo_id}/{repo_name}"); 203 let repo_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}"); 204 205 let issues_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}/issues"); 206 207 Ok(IssueDetails { 208 repo_full_name, 209 repo_url, 210 title, 211 issues_url, 212 }) 213} 214 215async fn post( 216 client: &BasicClient, 217 identifier: &AtIdentifier<'_>, 218 IssueDetails { 219 repo_full_name, 220 repo_url, 221 title, 222 issues_url, 223 }: &IssueDetails, 224) -> Result<()> { 225 let message = format!( 226 r#"New from {repo_full_name}: 227 228> {title}"# 229 ); 230 231 let pre_len = 9; 232 233 let repo_feature = serde_json::json!({ 234 "$type": "app.bsky.richtext.facet#link", 235 "uri": repo_url, 236 }); 237 let repo_facet = Facet { 238 features: vec![Data::from_json(&repo_feature)?], 239 index: ByteSlice { 240 byte_start: pre_len, 241 byte_end: pre_len + repo_full_name.len() as i64, 242 extra_data: Default::default(), 243 }, 244 extra_data: Default::default(), 245 }; 246 247 let title_starts_at = pre_len + (repo_full_name.len() + 5) as i64; 248 249 let repo_issues_feature = serde_json::json!({ 250 "$type": "app.bsky.richtext.facet#link", 251 "uri": issues_url, 252 }); 253 let issues_facet = Facet { 254 features: vec![Data::from_json(&repo_issues_feature)?], 255 index: ByteSlice { 256 byte_start: title_starts_at, 257 byte_end: title_starts_at + title.len() as i64, 258 extra_data: Default::default(), 259 }, 260 extra_data: Default::default(), 261 }; 262 263 // Make a post 264 let post = Post { 265 created_at: Datetime::now(), 266 langs: Some(vec![Language::new("en")?]), 267 text: message.into(), 268 facets: Some(vec![repo_facet, issues_facet]), 269 embed: Default::default(), 270 entities: Default::default(), 271 labels: Default::default(), 272 reply: Default::default(), 273 tags: Default::default(), 274 extra_data: Default::default(), 275 }; 276 277 let json = serde_json::to_value(post)?; 278 let data = Data::from_json(&json)?; 279 280 log::info!("\nposting..."); 281 client 282 .send( 283 CreateRecord::new() 284 .repo(identifier.clone()) 285 .collection(Post::nsid()) 286 .record(data) 287 .build(), 288 ) 289 .await? 290 .into_output()?; 291 292 Ok(()) 293} 294 295#[tokio::main] 296async fn main() -> Result<()> { 297 env_logger::init(); 298 let args = Args::parse(); 299 300 // Create HTTP client and session 301 let client = BasicClient::new(args.pds); 302 let bot_id = AtIdentifier::new(&args.identifier)?; 303 let create_session = CreateSession::new() 304 .identifier(bot_id.to_string()) 305 .password(&args.app_password) 306 .build(); 307 let session = Session::from(client.send(create_session.clone()).await?.into_output()?); 308 log::debug!("logged in as {} ({})", session.handle, session.did); 309 client.set_session(session).await?; 310 311 let slingshot_client = reqwest::Client::builder() 312 .user_agent("hacktober_bot") 313 .timeout(Duration::from_secs(9)) 314 .build()?; 315 316 let jetstream_config: JetstreamConfig = JetstreamConfig { 317 endpoint: args.jetstream_url.to_string(), 318 wanted_collections: vec![Nsid::new("sh.tangled.label.op".to_string())?], 319 user_agent: Some("hacktober_bot".to_string()), 320 compression: JetstreamCompression::Zstd, 321 replay_on_reconnect: true, 322 channel_size: 1024, // buffer up to ~1s of jetstream events 323 ..Default::default() 324 }; 325 let mut receiver = JetstreamConnector::new(jetstream_config)? 326 .connect_cursor(args.jetstream_cursor.map(Cursor::from_raw_u64)) 327 .await?; 328 329 log::info!("receiving jetstream messages..."); 330 loop { 331 let Some(event) = receiver.recv().await else { 332 log::error!("consumer: could not receive event, bailing"); 333 break; 334 }; 335 let cursor = event.cursor; 336 337 let CreateLabelRecord { add: adds, subject } = match event_to_create_label(event) { 338 Ok(clr) => clr, 339 Err(e) => { 340 log::debug!("ignoring unparseable event (at {cursor:?}): {e}"); 341 continue; 342 } 343 }; 344 345 let issue_details = match extract_issue_info(&slingshot_client, adds, subject.clone()).await 346 { 347 Ok(deets) => deets, 348 Err(e) => { 349 log::warn!("failed to extract issue details (at {cursor:?}): {e}"); 350 continue; 351 } 352 }; 353 354 if args.dry_run { 355 let IssueDetails { 356 repo_full_name, 357 repo_url, 358 title, 359 issues_url, 360 } = issue_details; 361 log::info!( 362 r#"--dry-run, but would have posted: 363 364good-first-issue label added for {repo_full_name} ({repo_url}): 365 366> {title} ({issues_url})"# 367 ); 368 continue; 369 } 370 371 if let Err(e) = post(&client, &bot_id, &issue_details).await { 372 log::warn!("failed to post for {subject}: {e}, refreshing session for one retry..."); 373 let session = Session::from(client.send(create_session.clone()).await?.into_output()?); 374 log::debug!("logged in as {} ({})", session.handle, session.did); 375 client.set_session(session).await?; 376 377 if let Err(e) = post(&client, &bot_id, &issue_details).await { 378 log::error!( 379 "failed to post after a session refresh: {e:?}, something is wrong. bye." 380 ); 381 break; 382 } 383 }; 384 } 385 386 Ok(()) 387}