use clap::Parser; use jacquard::{ api::{ app_bsky::feed::post::Post, app_bsky::richtext::facet::{ByteSlice, Facet}, com_atproto::repo::create_record::CreateRecord, com_atproto::server::create_session::CreateSession, }, client::{BasicClient, Session}, types::{ collection::Collection, datetime::Datetime, ident::AtIdentifier, language::Language, string::AtUri, value::Data, }, }; use jetstream::{ JetstreamCompression, JetstreamConfig, JetstreamConnector, events::{CommitOp, Cursor, EventKind, JetstreamEvent}, exports::Nsid, }; use url::Url; use serde::Deserialize; use std::time::Duration; type Result = std::result::Result>; #[derive(Debug, Parser)] #[command(version, about, long_about = None)] struct Args { /// pds of the bot user #[arg(short, long, env = "BOT_PDS")] pds: Url, /// handle or did of the bot user #[arg(short, long, env = "BOT_HANDLE")] identifier: String, /// app password for the bot user #[arg(short, long, env = "BOT_APP_PASSWORD")] app_password: String, /// lightweight firehose #[arg(short, long, env = "BOT_JETSTREAM_URL")] #[clap(default_value = "wss://jetstream1.us-east.fire.hose.cam/subscribe")] jetstream_url: Url, /// optional: we can pick up from a past jetstream cursor /// /// the default is to just live-tail /// /// warning: setting this can lead to rapid bot posting #[arg(long)] jetstream_cursor: Option, /// don't actually post #[arg(long, action)] dry_run: bool, /// send a checkin to this url every 5 mins #[arg(long)] healthcheck_ping: Option, } struct IssueDetails { repo_full_name: String, repo_url: String, title: String, issues_url: String, } /// com.bad-example.identity.resolveMiniDoc bit we care about #[derive(Deserialize)] struct MiniDocResponse { handle: String, } /// com.atproto.repo.getRecord wraps the record in a `value` key #[derive(Deserialize)] struct GetRecordResonse { value: T, } /// part of CreateLabelRecord: key is the label reference (ie for "good-first-issue") #[derive(Deserialize)] struct AddLabel { key: String, } /// tangled's record for adding labels to an issue #[derive(Deserialize)] struct CreateLabelRecord { add: Vec, subject: String, } /// tangled issue record #[derive(Deserialize)] struct IssueRecord { title: String, repo: String, } /// tangled repo record #[derive(Deserialize)] struct RepoRecord { name: String, } /// get some atproto record content (from slingshot) async fn get_record Deserialize<'a>>( client: &reqwest::Client, at_uri: &str, ) -> Result { let mut url: Url = "https://slingshot.microcosm.blue".parse()?; url.set_path("/xrpc/com.bad-example.repo.getUriRecord"); url.query_pairs_mut().append_pair("at_uri", at_uri); let GetRecordResonse { value } = client .get(url) .send() .await? .error_for_status()? .json() .await?; Ok(value) } /// try to resolve a bidirectionally verified handle from an identifier (via slingshot) async fn get_handle(client: &reqwest::Client, identifier: &str) -> Result> { let mut url: Url = "https://slingshot.microcosm.blue".parse()?; url.set_path("/xrpc/com.bad-example.identity.resolveMiniDoc"); url.query_pairs_mut().append_pair("identifier", identifier); let MiniDocResponse { handle } = client .get(url) .send() .await? .error_for_status()? .json() .await?; if handle == "handle.invalid" { Ok(None) } else { Ok(Some(handle)) } } fn event_to_create_label Deserialize<'a>>(event: JetstreamEvent) -> Result { if event.kind != EventKind::Commit { return Err("not a commit".into()); } let commit = event.commit.ok_or("commit event missing commit data")?; if commit.operation != CommitOp::Create { return Err("not a create event".into()); } let raw = commit.record.ok_or("commit missing record")?; // todo: delete post if label is removed // delete sample: at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.label.op/3m2jvx4c6wf22 // tldr: has a "delete" array just like "add" on the same op collection Ok(serde_json::from_str(raw.get())?) } async fn extract_issue_info( client: &reqwest::Client, adds: Vec, subject: String, ) -> Result { let mut added_good_first_issue = false; for added in adds { if added.key == "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue" { log::info!("found a good first issue label!!"); added_good_first_issue = true; break; // inner } log::debug!("found a label but it wasn't good-first-issue, ignoring..."); } if !added_good_first_issue { return Err("good-first-issue label not found in added labels".into()); } let IssueRecord { title, repo } = match get_record(client, &subject).await { Ok(m) => m, Err(e) => return Err(format!("failed to get issue record: {e} for {subject}").into()), }; let Ok(repo_uri) = AtUri::new(&repo) else { return Err("failed to parse repo to aturi for {subject}".into()); }; let RepoRecord { name: repo_name } = match get_record(client, &repo).await { Ok(m) => m, Err(e) => return Err(format!("failed to get repo record: {e} for {subject}").into()), }; let nice_tangled_repo_id = match repo_uri.authority() { AtIdentifier::Handle(h) => format!("@{h}"), AtIdentifier::Did(did) => match get_handle(client, did.as_str()).await { Err(e) => { return Err(format!( "failed to get mini doc from repo identifier: {e} for {subject}" ) .into()); } Ok(None) => did.to_string(), Ok(Some(h)) => format!("@{h}"), }, }; let repo_full_name = format!("{nice_tangled_repo_id}/{repo_name}"); let repo_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}"); let issues_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}/issues"); Ok(IssueDetails { repo_full_name, repo_url, title, issues_url, }) } async fn post( client: &BasicClient, identifier: &AtIdentifier<'_>, IssueDetails { repo_full_name, repo_url, title, issues_url, }: &IssueDetails, ) -> Result<()> { let message = format!( r#"New from {repo_full_name}: > {title}"# ); let pre_len = 9; let repo_feature = serde_json::json!({ "$type": "app.bsky.richtext.facet#link", "uri": repo_url, }); let repo_facet = Facet { features: vec![Data::from_json(&repo_feature)?], index: ByteSlice { byte_start: pre_len, byte_end: pre_len + repo_full_name.len() as i64, extra_data: Default::default(), }, extra_data: Default::default(), }; let title_starts_at = pre_len + (repo_full_name.len() + 5) as i64; let repo_issues_feature = serde_json::json!({ "$type": "app.bsky.richtext.facet#link", "uri": issues_url, }); let issues_facet = Facet { features: vec![Data::from_json(&repo_issues_feature)?], index: ByteSlice { byte_start: title_starts_at, byte_end: title_starts_at + title.len() as i64, extra_data: Default::default(), }, extra_data: Default::default(), }; // Make a post let post = Post { created_at: Datetime::now(), langs: Some(vec![Language::new("en")?]), text: message.into(), facets: Some(vec![repo_facet, issues_facet]), embed: Default::default(), entities: Default::default(), labels: Default::default(), reply: Default::default(), tags: Default::default(), extra_data: Default::default(), }; let json = serde_json::to_value(post)?; let data = Data::from_json(&json)?; log::info!("\nposting..."); client .send( CreateRecord::new() .repo(identifier.clone()) .collection(Post::nsid()) .record(data) .build(), ) .await? .into_output()?; Ok(()) } async fn hc_ping(url: Url, client: reqwest::Client) { let mut interval = tokio::time::interval(Duration::from_secs(5 * 60)); loop { interval.tick().await; log::trace!("sending healthcheck ping..."); if let Err(e) = client .get(url.clone()) .send() .await .and_then(reqwest::Response::error_for_status) { log::warn!("error sending healthcheck ping: {e}"); } } } #[tokio::main] async fn main() -> Result<()> { env_logger::init(); let args = Args::parse(); // Create HTTP client and session let client = BasicClient::new(args.pds); let bot_id = AtIdentifier::new(&args.identifier)?; let create_session = CreateSession::new() .identifier(bot_id.to_string()) .password(&args.app_password) .build(); let session = Session::from(client.send(create_session.clone()).await?.into_output()?); log::debug!("logged in as {} ({})", session.handle, session.did); client.set_session(session).await?; let slingshot_client = reqwest::Client::builder() .user_agent("hacktober_bot") .timeout(Duration::from_secs(9)) .build()?; let jetstream_config: JetstreamConfig = JetstreamConfig { endpoint: args.jetstream_url.to_string(), wanted_collections: vec![Nsid::new("sh.tangled.label.op".to_string())?], user_agent: Some("hacktober_bot".to_string()), compression: JetstreamCompression::Zstd, replay_on_reconnect: true, channel_size: 1024, // buffer up to ~1s of jetstream events ..Default::default() }; let mut receiver = JetstreamConnector::new(jetstream_config)? .connect_cursor(args.jetstream_cursor.map(Cursor::from_raw_u64)) .await?; if let Some(hc) = args.healthcheck_ping { log::info!("starting healthcheck ping task..."); tokio::task::spawn(hc_ping(hc.clone(), slingshot_client.clone())); } log::info!("receiving jetstream messages..."); loop { let Some(event) = receiver.recv().await else { log::error!("consumer: could not receive event, bailing"); break; }; let cursor = event.cursor; let CreateLabelRecord { add: adds, subject } = match event_to_create_label(event) { Ok(clr) => clr, Err(e) => { log::debug!("ignoring unparseable event (at {cursor:?}): {e}"); continue; } }; let issue_details = match extract_issue_info(&slingshot_client, adds, subject.clone()).await { Ok(deets) => deets, Err(e) => { log::warn!("failed to extract issue details (at {cursor:?}): {e}"); continue; } }; if args.dry_run { let IssueDetails { repo_full_name, repo_url, title, issues_url, } = issue_details; log::info!( r#"--dry-run, but would have posted: good-first-issue label added for {repo_full_name} ({repo_url}): > {title} ({issues_url})"# ); continue; } if let Err(e) = post(&client, &bot_id, &issue_details).await { log::warn!("failed to post for {subject}: {e}, refreshing session for one retry..."); let session = Session::from(client.send(create_session.clone()).await?.into_output()?); log::debug!("logged in as {} ({})", session.handle, session.did); client.set_session(session).await?; if let Err(e) = post(&client, &bot_id, &issue_details).await { log::error!( "failed to post after a session refresh: {e:?}, something is wrong. bye." ); break; } }; } Ok(()) }