announcing good-first-issue tags added on @tangled.sh (not affiliated with tangled!)
at main 12 kB view raw
1use clap::Parser; 2use jacquard::{ 3 api::{ 4 app_bsky::feed::post::Post, 5 app_bsky::richtext::facet::{ByteSlice, Facet}, 6 com_atproto::repo::create_record::CreateRecord, 7 com_atproto::server::create_session::CreateSession, 8 }, 9 client::{BasicClient, Session}, 10 types::{ 11 collection::Collection, datetime::Datetime, ident::AtIdentifier, language::Language, 12 string::AtUri, value::Data, 13 }, 14}; 15use jetstream::{ 16 JetstreamCompression, JetstreamConfig, JetstreamConnector, 17 events::{CommitOp, Cursor, EventKind, JetstreamEvent}, 18 exports::Nsid, 19}; 20use url::Url; 21 22use serde::Deserialize; 23use std::time::Duration; 24 25type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 26 27#[derive(Debug, Parser)] 28#[command(version, about, long_about = None)] 29struct Args { 30 /// pds of the bot user 31 #[arg(short, long, env = "BOT_PDS")] 32 pds: Url, 33 /// handle or did of the bot user 34 #[arg(short, long, env = "BOT_HANDLE")] 35 identifier: String, 36 /// app password for the bot user 37 #[arg(short, long, env = "BOT_APP_PASSWORD")] 38 app_password: String, 39 /// lightweight firehose 40 #[arg(short, long, env = "BOT_JETSTREAM_URL")] 41 #[clap(default_value = "wss://jetstream1.us-east.fire.hose.cam/subscribe")] 42 jetstream_url: Url, 43 /// optional: we can pick up from a past jetstream cursor 44 /// 45 /// the default is to just live-tail 46 /// 47 /// warning: setting this can lead to rapid bot posting 48 #[arg(long)] 49 jetstream_cursor: Option<u64>, 50 /// don't actually post 51 #[arg(long, action)] 52 dry_run: bool, 53 /// send a checkin to this url every 5 mins 54 #[arg(long)] 55 healthcheck_ping: Option<Url>, 56} 57 58struct IssueDetails { 59 repo_full_name: String, 60 repo_url: String, 61 title: String, 62 issues_url: String, 63} 64 65/// com.bad-example.identity.resolveMiniDoc bit we care about 66#[derive(Deserialize)] 67struct MiniDocResponse { 68 handle: String, 69} 70 71/// com.atproto.repo.getRecord wraps the record in a `value` key 72#[derive(Deserialize)] 73struct GetRecordResonse<T> { 74 value: T, 75} 76 77/// part of CreateLabelRecord: key is the label reference (ie for "good-first-issue") 78#[derive(Deserialize)] 79struct AddLabel { 80 key: String, 81} 82 83/// tangled's record for adding labels to an issue 84#[derive(Deserialize)] 85struct CreateLabelRecord { 86 add: Vec<AddLabel>, 87 subject: String, 88} 89 90/// tangled issue record 91#[derive(Deserialize)] 92struct IssueRecord { 93 title: String, 94 repo: String, 95} 96 97/// tangled repo record 98#[derive(Deserialize)] 99struct RepoRecord { 100 name: String, 101} 102 103/// get some atproto record content (from slingshot) 104async fn get_record<T: for<'a> Deserialize<'a>>( 105 client: &reqwest::Client, 106 at_uri: &str, 107) -> Result<T> { 108 let mut url: Url = "https://slingshot.microcosm.blue".parse()?; 109 url.set_path("/xrpc/com.bad-example.repo.getUriRecord"); 110 url.query_pairs_mut().append_pair("at_uri", at_uri); 111 let GetRecordResonse { value } = client 112 .get(url) 113 .send() 114 .await? 115 .error_for_status()? 116 .json() 117 .await?; 118 Ok(value) 119} 120 121/// try to resolve a bidirectionally verified handle from an identifier (via slingshot) 122async fn get_handle(client: &reqwest::Client, identifier: &str) -> Result<Option<String>> { 123 let mut url: Url = "https://slingshot.microcosm.blue".parse()?; 124 url.set_path("/xrpc/com.bad-example.identity.resolveMiniDoc"); 125 url.query_pairs_mut().append_pair("identifier", identifier); 126 let MiniDocResponse { handle } = client 127 .get(url) 128 .send() 129 .await? 130 .error_for_status()? 131 .json() 132 .await?; 133 if handle == "handle.invalid" { 134 Ok(None) 135 } else { 136 Ok(Some(handle)) 137 } 138} 139 140fn event_to_create_label<T: for<'a> Deserialize<'a>>(event: JetstreamEvent) -> Result<T> { 141 if event.kind != EventKind::Commit { 142 return Err("not a commit".into()); 143 } 144 let commit = event.commit.ok_or("commit event missing commit data")?; 145 if commit.operation != CommitOp::Create { 146 return Err("not a create event".into()); 147 } 148 149 let raw = commit.record.ok_or("commit missing record")?; 150 151 // todo: delete post if label is removed 152 // delete sample: at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.label.op/3m2jvx4c6wf22 153 // tldr: has a "delete" array just like "add" on the same op collection 154 Ok(serde_json::from_str(raw.get())?) 155} 156 157async fn extract_issue_info( 158 client: &reqwest::Client, 159 adds: Vec<AddLabel>, 160 subject: String, 161) -> Result<IssueDetails> { 162 let mut added_good_first_issue = false; 163 for added in adds { 164 if added.key 165 == "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue" 166 { 167 log::info!("found a good first issue label!!"); 168 added_good_first_issue = true; 169 break; // inner 170 } 171 log::debug!("found a label but it wasn't good-first-issue, ignoring..."); 172 } 173 if !added_good_first_issue { 174 return Err("good-first-issue label not found in added labels".into()); 175 } 176 177 let IssueRecord { title, repo } = match get_record(client, &subject).await { 178 Ok(m) => m, 179 Err(e) => return Err(format!("failed to get issue record: {e} for {subject}").into()), 180 }; 181 182 let Ok(repo_uri) = AtUri::new(&repo) else { 183 return Err("failed to parse repo to aturi for {subject}".into()); 184 }; 185 186 let RepoRecord { name: repo_name } = match get_record(client, &repo).await { 187 Ok(m) => m, 188 Err(e) => return Err(format!("failed to get repo record: {e} for {subject}").into()), 189 }; 190 191 let nice_tangled_repo_id = match repo_uri.authority() { 192 AtIdentifier::Handle(h) => format!("@{h}"), 193 AtIdentifier::Did(did) => match get_handle(client, did.as_str()).await { 194 Err(e) => { 195 return Err(format!( 196 "failed to get mini doc from repo identifier: {e} for {subject}" 197 ) 198 .into()); 199 } 200 Ok(None) => did.to_string(), 201 Ok(Some(h)) => format!("@{h}"), 202 }, 203 }; 204 205 let repo_full_name = format!("{nice_tangled_repo_id}/{repo_name}"); 206 let repo_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}"); 207 208 let issues_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}/issues"); 209 210 Ok(IssueDetails { 211 repo_full_name, 212 repo_url, 213 title, 214 issues_url, 215 }) 216} 217 218async fn post( 219 client: &BasicClient, 220 identifier: &AtIdentifier<'_>, 221 IssueDetails { 222 repo_full_name, 223 repo_url, 224 title, 225 issues_url, 226 }: &IssueDetails, 227) -> Result<()> { 228 let message = format!( 229 r#"New from {repo_full_name}: 230 231> {title}"# 232 ); 233 234 let pre_len = 9; 235 236 let repo_feature = serde_json::json!({ 237 "$type": "app.bsky.richtext.facet#link", 238 "uri": repo_url, 239 }); 240 let repo_facet = Facet { 241 features: vec![Data::from_json(&repo_feature)?], 242 index: ByteSlice { 243 byte_start: pre_len, 244 byte_end: pre_len + repo_full_name.len() as i64, 245 extra_data: Default::default(), 246 }, 247 extra_data: Default::default(), 248 }; 249 250 let title_starts_at = pre_len + (repo_full_name.len() + 5) as i64; 251 252 let repo_issues_feature = serde_json::json!({ 253 "$type": "app.bsky.richtext.facet#link", 254 "uri": issues_url, 255 }); 256 let issues_facet = Facet { 257 features: vec![Data::from_json(&repo_issues_feature)?], 258 index: ByteSlice { 259 byte_start: title_starts_at, 260 byte_end: title_starts_at + title.len() as i64, 261 extra_data: Default::default(), 262 }, 263 extra_data: Default::default(), 264 }; 265 266 // Make a post 267 let post = Post { 268 created_at: Datetime::now(), 269 langs: Some(vec![Language::new("en")?]), 270 text: message.into(), 271 facets: Some(vec![repo_facet, issues_facet]), 272 embed: Default::default(), 273 entities: Default::default(), 274 labels: Default::default(), 275 reply: Default::default(), 276 tags: Default::default(), 277 extra_data: Default::default(), 278 }; 279 280 let json = serde_json::to_value(post)?; 281 let data = Data::from_json(&json)?; 282 283 log::info!("\nposting..."); 284 client 285 .send( 286 CreateRecord::new() 287 .repo(identifier.clone()) 288 .collection(Post::nsid()) 289 .record(data) 290 .build(), 291 ) 292 .await? 293 .into_output()?; 294 295 Ok(()) 296} 297 298async fn hc_ping(url: Url, client: reqwest::Client) { 299 let mut interval = tokio::time::interval(Duration::from_secs(5 * 60)); 300 loop { 301 interval.tick().await; 302 log::trace!("sending healthcheck ping..."); 303 if let Err(e) = client 304 .get(url.clone()) 305 .send() 306 .await 307 .and_then(reqwest::Response::error_for_status) 308 { 309 log::warn!("error sending healthcheck ping: {e}"); 310 } 311 } 312} 313 314#[tokio::main] 315async fn main() -> Result<()> { 316 env_logger::init(); 317 let args = Args::parse(); 318 319 // Create HTTP client and session 320 let client = BasicClient::new(args.pds); 321 let bot_id = AtIdentifier::new(&args.identifier)?; 322 let create_session = CreateSession::new() 323 .identifier(bot_id.to_string()) 324 .password(&args.app_password) 325 .build(); 326 let session = Session::from(client.send(create_session.clone()).await?.into_output()?); 327 log::debug!("logged in as {} ({})", session.handle, session.did); 328 client.set_session(session).await?; 329 330 let slingshot_client = reqwest::Client::builder() 331 .user_agent("hacktober_bot") 332 .timeout(Duration::from_secs(9)) 333 .build()?; 334 335 let jetstream_config: JetstreamConfig = JetstreamConfig { 336 endpoint: args.jetstream_url.to_string(), 337 wanted_collections: vec![Nsid::new("sh.tangled.label.op".to_string())?], 338 user_agent: Some("hacktober_bot".to_string()), 339 compression: JetstreamCompression::Zstd, 340 replay_on_reconnect: true, 341 channel_size: 1024, // buffer up to ~1s of jetstream events 342 ..Default::default() 343 }; 344 let mut receiver = JetstreamConnector::new(jetstream_config)? 345 .connect_cursor(args.jetstream_cursor.map(Cursor::from_raw_u64)) 346 .await?; 347 348 if let Some(hc) = args.healthcheck_ping { 349 log::info!("starting healthcheck ping task..."); 350 tokio::task::spawn(hc_ping(hc.clone(), slingshot_client.clone())); 351 } 352 353 log::info!("receiving jetstream messages..."); 354 loop { 355 let Some(event) = receiver.recv().await else { 356 log::error!("consumer: could not receive event, bailing"); 357 break; 358 }; 359 let cursor = event.cursor; 360 361 let CreateLabelRecord { add: adds, subject } = match event_to_create_label(event) { 362 Ok(clr) => clr, 363 Err(e) => { 364 log::debug!("ignoring unparseable event (at {cursor:?}): {e}"); 365 continue; 366 } 367 }; 368 369 let issue_details = match extract_issue_info(&slingshot_client, adds, subject.clone()).await 370 { 371 Ok(deets) => deets, 372 Err(e) => { 373 log::warn!("failed to extract issue details (at {cursor:?}): {e}"); 374 continue; 375 } 376 }; 377 378 if args.dry_run { 379 let IssueDetails { 380 repo_full_name, 381 repo_url, 382 title, 383 issues_url, 384 } = issue_details; 385 log::info!( 386 r#"--dry-run, but would have posted: 387 388good-first-issue label added for {repo_full_name} ({repo_url}): 389 390> {title} ({issues_url})"# 391 ); 392 continue; 393 } 394 395 if let Err(e) = post(&client, &bot_id, &issue_details).await { 396 log::warn!("failed to post for {subject}: {e}, refreshing session for one retry..."); 397 let session = Session::from(client.send(create_session.clone()).await?.into_output()?); 398 log::debug!("logged in as {} ({})", session.handle, session.did); 399 client.set_session(session).await?; 400 401 if let Err(e) = post(&client, &bot_id, &issue_details).await { 402 log::error!( 403 "failed to post after a session refresh: {e:?}, something is wrong. bye." 404 ); 405 break; 406 } 407 }; 408 } 409 410 Ok(()) 411}