announcing good-first-issue tags added on @tangled.sh (not affiliated with tangled!)
1use clap::Parser;
2use jacquard::{
3 api::{
4 app_bsky::feed::post::Post,
5 app_bsky::richtext::facet::{ByteSlice, Facet},
6 com_atproto::repo::create_record::CreateRecord,
7 com_atproto::server::create_session::CreateSession,
8 },
9 client::{BasicClient, Session},
10 types::{
11 collection::Collection, datetime::Datetime, ident::AtIdentifier, language::Language,
12 string::AtUri, value::Data,
13 },
14};
15use jetstream::{
16 JetstreamCompression, JetstreamConfig, JetstreamConnector,
17 events::{CommitOp, Cursor, EventKind, JetstreamEvent},
18 exports::Nsid,
19};
20use url::Url;
21
22use serde::Deserialize;
23use std::time::Duration;
24
25type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
26
27#[derive(Debug, Parser)]
28#[command(version, about, long_about = None)]
29struct Args {
30 /// pds of the bot user
31 #[arg(short, long, env = "BOT_PDS")]
32 pds: Url,
33 /// handle or did of the bot user
34 #[arg(short, long, env = "BOT_HANDLE")]
35 identifier: String,
36 /// app password for the bot user
37 #[arg(short, long, env = "BOT_APP_PASSWORD")]
38 app_password: String,
39 /// lightweight firehose
40 #[arg(short, long, env = "BOT_JETSTREAM_URL")]
41 #[clap(default_value = "wss://jetstream1.us-east.fire.hose.cam/subscribe")]
42 jetstream_url: Url,
43 /// optional: we can pick up from a past jetstream cursor
44 ///
45 /// the default is to just live-tail
46 ///
47 /// warning: setting this can lead to rapid bot posting
48 #[arg(long)]
49 jetstream_cursor: Option<u64>,
50 /// don't actually post
51 #[arg(long, action)]
52 dry_run: bool,
53 /// send a checkin to this url every 5 mins
54 #[arg(long)]
55 healthcheck_ping: Option<Url>,
56}
57
58struct IssueDetails {
59 repo_full_name: String,
60 repo_url: String,
61 title: String,
62 issues_url: String,
63}
64
65/// com.bad-example.identity.resolveMiniDoc bit we care about
66#[derive(Deserialize)]
67struct MiniDocResponse {
68 handle: String,
69}
70
71/// com.atproto.repo.getRecord wraps the record in a `value` key
72#[derive(Deserialize)]
73struct GetRecordResonse<T> {
74 value: T,
75}
76
77/// part of CreateLabelRecord: key is the label reference (ie for "good-first-issue")
78#[derive(Deserialize)]
79struct AddLabel {
80 key: String,
81}
82
83/// tangled's record for adding labels to an issue
84#[derive(Deserialize)]
85struct CreateLabelRecord {
86 add: Vec<AddLabel>,
87 subject: String,
88}
89
90/// tangled issue record
91#[derive(Deserialize)]
92struct IssueRecord {
93 title: String,
94 repo: String,
95}
96
97/// tangled repo record
98#[derive(Deserialize)]
99struct RepoRecord {
100 name: String,
101}
102
103/// get some atproto record content (from slingshot)
104async fn get_record<T: for<'a> Deserialize<'a>>(
105 client: &reqwest::Client,
106 at_uri: &str,
107) -> Result<T> {
108 let mut url: Url = "https://slingshot.microcosm.blue".parse()?;
109 url.set_path("/xrpc/com.bad-example.repo.getUriRecord");
110 url.query_pairs_mut().append_pair("at_uri", at_uri);
111 let GetRecordResonse { value } = client
112 .get(url)
113 .send()
114 .await?
115 .error_for_status()?
116 .json()
117 .await?;
118 Ok(value)
119}
120
121/// try to resolve a bidirectionally verified handle from an identifier (via slingshot)
122async fn get_handle(client: &reqwest::Client, identifier: &str) -> Result<Option<String>> {
123 let mut url: Url = "https://slingshot.microcosm.blue".parse()?;
124 url.set_path("/xrpc/com.bad-example.identity.resolveMiniDoc");
125 url.query_pairs_mut().append_pair("identifier", identifier);
126 let MiniDocResponse { handle } = client
127 .get(url)
128 .send()
129 .await?
130 .error_for_status()?
131 .json()
132 .await?;
133 if handle == "handle.invalid" {
134 Ok(None)
135 } else {
136 Ok(Some(handle))
137 }
138}
139
140fn event_to_create_label<T: for<'a> Deserialize<'a>>(event: JetstreamEvent) -> Result<T> {
141 if event.kind != EventKind::Commit {
142 return Err("not a commit".into());
143 }
144 let commit = event.commit.ok_or("commit event missing commit data")?;
145 if commit.operation != CommitOp::Create {
146 return Err("not a create event".into());
147 }
148
149 let raw = commit.record.ok_or("commit missing record")?;
150
151 // todo: delete post if label is removed
152 // delete sample: at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.label.op/3m2jvx4c6wf22
153 // tldr: has a "delete" array just like "add" on the same op collection
154 Ok(serde_json::from_str(raw.get())?)
155}
156
157async fn extract_issue_info(
158 client: &reqwest::Client,
159 adds: Vec<AddLabel>,
160 subject: String,
161) -> Result<IssueDetails> {
162 let mut added_good_first_issue = false;
163 for added in adds {
164 if added.key
165 == "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue"
166 {
167 log::info!("found a good first issue label!!");
168 added_good_first_issue = true;
169 break; // inner
170 }
171 log::debug!("found a label but it wasn't good-first-issue, ignoring...");
172 }
173 if !added_good_first_issue {
174 return Err("good-first-issue label not found in added labels".into());
175 }
176
177 let IssueRecord { title, repo } = match get_record(client, &subject).await {
178 Ok(m) => m,
179 Err(e) => return Err(format!("failed to get issue record: {e} for {subject}").into()),
180 };
181
182 let Ok(repo_uri) = AtUri::new(&repo) else {
183 return Err("failed to parse repo to aturi for {subject}".into());
184 };
185
186 let RepoRecord { name: repo_name } = match get_record(client, &repo).await {
187 Ok(m) => m,
188 Err(e) => return Err(format!("failed to get repo record: {e} for {subject}").into()),
189 };
190
191 let nice_tangled_repo_id = match repo_uri.authority() {
192 AtIdentifier::Handle(h) => format!("@{h}"),
193 AtIdentifier::Did(did) => match get_handle(client, did.as_str()).await {
194 Err(e) => {
195 return Err(format!(
196 "failed to get mini doc from repo identifier: {e} for {subject}"
197 )
198 .into());
199 }
200 Ok(None) => did.to_string(),
201 Ok(Some(h)) => format!("@{h}"),
202 },
203 };
204
205 let repo_full_name = format!("{nice_tangled_repo_id}/{repo_name}");
206 let repo_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}");
207
208 let issues_url = format!("https://tangled.org/{nice_tangled_repo_id}/{repo_name}/issues");
209
210 Ok(IssueDetails {
211 repo_full_name,
212 repo_url,
213 title,
214 issues_url,
215 })
216}
217
218async fn post(
219 client: &BasicClient,
220 identifier: &AtIdentifier<'_>,
221 IssueDetails {
222 repo_full_name,
223 repo_url,
224 title,
225 issues_url,
226 }: &IssueDetails,
227) -> Result<()> {
228 let message = format!(
229 r#"New from {repo_full_name}:
230
231> {title}"#
232 );
233
234 let pre_len = 9;
235
236 let repo_feature = serde_json::json!({
237 "$type": "app.bsky.richtext.facet#link",
238 "uri": repo_url,
239 });
240 let repo_facet = Facet {
241 features: vec![Data::from_json(&repo_feature)?],
242 index: ByteSlice {
243 byte_start: pre_len,
244 byte_end: pre_len + repo_full_name.len() as i64,
245 extra_data: Default::default(),
246 },
247 extra_data: Default::default(),
248 };
249
250 let title_starts_at = pre_len + (repo_full_name.len() + 5) as i64;
251
252 let repo_issues_feature = serde_json::json!({
253 "$type": "app.bsky.richtext.facet#link",
254 "uri": issues_url,
255 });
256 let issues_facet = Facet {
257 features: vec![Data::from_json(&repo_issues_feature)?],
258 index: ByteSlice {
259 byte_start: title_starts_at,
260 byte_end: title_starts_at + title.len() as i64,
261 extra_data: Default::default(),
262 },
263 extra_data: Default::default(),
264 };
265
266 // Make a post
267 let post = Post {
268 created_at: Datetime::now(),
269 langs: Some(vec![Language::new("en")?]),
270 text: message.into(),
271 facets: Some(vec![repo_facet, issues_facet]),
272 embed: Default::default(),
273 entities: Default::default(),
274 labels: Default::default(),
275 reply: Default::default(),
276 tags: Default::default(),
277 extra_data: Default::default(),
278 };
279
280 let json = serde_json::to_value(post)?;
281 let data = Data::from_json(&json)?;
282
283 log::info!("\nposting...");
284 client
285 .send(
286 CreateRecord::new()
287 .repo(identifier.clone())
288 .collection(Post::nsid())
289 .record(data)
290 .build(),
291 )
292 .await?
293 .into_output()?;
294
295 Ok(())
296}
297
298async fn hc_ping(url: Url, client: reqwest::Client) {
299 let mut interval = tokio::time::interval(Duration::from_secs(5 * 60));
300 loop {
301 interval.tick().await;
302 log::trace!("sending healthcheck ping...");
303 if let Err(e) = client
304 .get(url.clone())
305 .send()
306 .await
307 .and_then(reqwest::Response::error_for_status)
308 {
309 log::warn!("error sending healthcheck ping: {e}");
310 }
311 }
312}
313
314#[tokio::main]
315async fn main() -> Result<()> {
316 env_logger::init();
317 let args = Args::parse();
318
319 // Create HTTP client and session
320 let client = BasicClient::new(args.pds);
321 let bot_id = AtIdentifier::new(&args.identifier)?;
322 let create_session = CreateSession::new()
323 .identifier(bot_id.to_string())
324 .password(&args.app_password)
325 .build();
326 let session = Session::from(client.send(create_session.clone()).await?.into_output()?);
327 log::debug!("logged in as {} ({})", session.handle, session.did);
328 client.set_session(session).await?;
329
330 let slingshot_client = reqwest::Client::builder()
331 .user_agent("hacktober_bot")
332 .timeout(Duration::from_secs(9))
333 .build()?;
334
335 let jetstream_config: JetstreamConfig = JetstreamConfig {
336 endpoint: args.jetstream_url.to_string(),
337 wanted_collections: vec![Nsid::new("sh.tangled.label.op".to_string())?],
338 user_agent: Some("hacktober_bot".to_string()),
339 compression: JetstreamCompression::Zstd,
340 replay_on_reconnect: true,
341 channel_size: 1024, // buffer up to ~1s of jetstream events
342 ..Default::default()
343 };
344 let mut receiver = JetstreamConnector::new(jetstream_config)?
345 .connect_cursor(args.jetstream_cursor.map(Cursor::from_raw_u64))
346 .await?;
347
348 if let Some(hc) = args.healthcheck_ping {
349 log::info!("starting healthcheck ping task...");
350 tokio::task::spawn(hc_ping(hc.clone(), slingshot_client.clone()));
351 }
352
353 log::info!("receiving jetstream messages...");
354 loop {
355 let Some(event) = receiver.recv().await else {
356 log::error!("consumer: could not receive event, bailing");
357 break;
358 };
359 let cursor = event.cursor;
360
361 let CreateLabelRecord { add: adds, subject } = match event_to_create_label(event) {
362 Ok(clr) => clr,
363 Err(e) => {
364 log::debug!("ignoring unparseable event (at {cursor:?}): {e}");
365 continue;
366 }
367 };
368
369 let issue_details = match extract_issue_info(&slingshot_client, adds, subject.clone()).await
370 {
371 Ok(deets) => deets,
372 Err(e) => {
373 log::warn!("failed to extract issue details (at {cursor:?}): {e}");
374 continue;
375 }
376 };
377
378 if args.dry_run {
379 let IssueDetails {
380 repo_full_name,
381 repo_url,
382 title,
383 issues_url,
384 } = issue_details;
385 log::info!(
386 r#"--dry-run, but would have posted:
387
388good-first-issue label added for {repo_full_name} ({repo_url}):
389
390> {title} ({issues_url})"#
391 );
392 continue;
393 }
394
395 if let Err(e) = post(&client, &bot_id, &issue_details).await {
396 log::warn!("failed to post for {subject}: {e}, refreshing session for one retry...");
397 let session = Session::from(client.send(create_session.clone()).await?.into_output()?);
398 log::debug!("logged in as {} ({})", session.handle, session.did);
399 client.set_session(session).await?;
400
401 if let Err(e) = post(&client, &bot_id, &issue_details).await {
402 log::error!(
403 "failed to post after a session refresh: {e:?}, something is wrong. bye."
404 );
405 break;
406 }
407 };
408 }
409
410 Ok(())
411}