+98
-10
src/main.rs
+98
-10
src/main.rs
···
1
1
use clap::Parser;
2
2
use url::Url;
3
+
use jetstream::{
4
+
JetstreamCompression, JetstreamConfig, JetstreamConnector,
5
+
events::{CommitOp, Cursor, EventKind},
6
+
exports::Nsid,
7
+
};
3
8
use jacquard::{
4
9
api::app_bsky::{
5
10
feed::post::Post,
···
19
24
},
20
25
};
21
26
22
-
23
27
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
24
28
25
29
#[derive(Debug, Parser)]
···
34
38
/// app password for the bot user
35
39
#[arg(short, long, env = "BOT_APP_PASSWORD")]
36
40
app_password: String,
41
+
/// lightweight firehose
42
+
#[arg(short, long, env = "BOT_JETSTREAM_URL")]
43
+
#[clap(default_value = "wss://jetstream1.us-east.fire.hose.cam/subscribe")]
44
+
jetstream_url: Url,
45
+
/// optional: we can pick up from a past jetstream cursor
46
+
///
47
+
/// the default is to just live-tail
48
+
///
49
+
/// warning: setting this can lead to rapid bot posting
50
+
#[arg(long)]
51
+
jetstream_cursor: Option<u64>,
37
52
}
38
53
39
54
async fn post_link<C: HttpClient>(client: &AuthenticatedClient<C>, identifier: &AtIdentifier<'_>, pre_text: &str, link: Url) -> Result<()> {
···
88
103
env_logger::init();
89
104
let args = Args::parse();
90
105
91
-
92
-
// Create HTTP client
106
+
// Create HTTP client and session
93
107
let pds_uri = args.pds.as_str().trim_end_matches('/').to_string().into();
94
108
let mut client = AuthenticatedClient::new(reqwest::Client::new(), pds_uri);
95
-
96
-
// Create session
109
+
let bot_id = AtIdentifier::new(&args.identifier)?;
97
110
let session = Session::from(
98
111
client
99
112
.send(
100
113
CreateSession::new()
101
-
.identifier(&args.identifier)
114
+
.identifier(&bot_id.to_string())
102
115
.password(args.app_password)
103
116
.build(),
104
117
)
105
118
.await?
106
119
.into_output()?,
107
120
);
108
-
109
121
println!("logged in as {} ({})", session.handle, session.did);
110
122
client.set_session(session);
111
123
112
-
let identifier = AtIdentifier::new(&args.identifier)?;
124
+
let jetstream_config: JetstreamConfig = JetstreamConfig {
125
+
endpoint: args.jetstream_url.to_string(),
126
+
wanted_collections: vec![Nsid::new("sh.tangled.label.op".to_string())?],
127
+
user_agent: Some("hacktober_bot".to_string()),
128
+
compression: JetstreamCompression::Zstd,
129
+
replay_on_reconnect: true,
130
+
channel_size: 1024, // buffer up to ~1s of jetstream events
131
+
..Default::default()
132
+
};
113
133
114
-
let u2: Url = "https://bad-example.com".parse()?;
115
-
post_link(&client, &identifier, "link test 2: ", u2).await?;
134
+
let mut receiver = JetstreamConnector::new(jetstream_config)?
135
+
.connect_cursor(args.jetstream_cursor.map(Cursor::from_raw_u64))
136
+
.await?;
137
+
138
+
println!("receiving jetstream messages...");
139
+
loop {
140
+
let Some(event) = receiver.recv().await else {
141
+
eprintln!("consumer: could not receive event, bailing");
142
+
break;
143
+
};
144
+
if event.kind != EventKind::Commit {
145
+
continue;
146
+
}
147
+
let Some(ref commit) = event.commit else {
148
+
eprintln!("consumer: commit event missing commit data, ignoring");
149
+
continue;
150
+
};
151
+
if commit.operation != CommitOp::Create {
152
+
continue;
153
+
}
154
+
let Some(ref record) = commit.record else {
155
+
eprintln!("consumer: commit update/delete missing record, ignoring");
156
+
continue;
157
+
};
158
+
let jv: serde_json::Value = match record.get().parse() {
159
+
Ok(v) => v,
160
+
Err(e) => {
161
+
eprintln!("consumer: record failed to parse, ignoring: {e}");
162
+
continue;
163
+
}
164
+
};
165
+
let serde_json::Value::Object(o) = jv else {
166
+
eprintln!("record was not an object, ignoring");
167
+
continue;
168
+
};
169
+
let Some(serde_json::Value::Array(adds)) = o.get("add") else {
170
+
eprintln!("op did not have label added or was not an array");
171
+
continue;
172
+
};
173
+
let mut added_good_first_issue = false;
174
+
for added in adds {
175
+
let serde_json::Value::Object(a) = added else {
176
+
eprintln!("added item was not an obj");
177
+
continue;
178
+
};
179
+
let Some(serde_json::Value::String(key)) = a.get("key") else {
180
+
eprintln!("added was missing key prop");
181
+
continue;
182
+
};
183
+
if key == "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue" {
184
+
println!("found a good first issue label!! {:?}", event.cursor);
185
+
added_good_first_issue = true;
186
+
break;
187
+
}
188
+
eprintln!("found a label but it wasn't good-first-issue, ignoring...");
189
+
}
190
+
if !added_good_first_issue {
191
+
continue;
192
+
}
193
+
let Some(serde_json::Value::String(subject)) = o.get("subject") else {
194
+
eprintln!("could not find `subject` string for the good-first-issue label");
195
+
continue;
196
+
};
197
+
198
+
eprintln!("got {subject} {o:?}");
199
+
}
200
+
201
+
// let u2: Url = "https://bad-example.com".parse()?;
202
+
// let bot_id = AtIdentifier::new(&args.identifier)?;
203
+
// post_link(&client, &bot_id, "link test 2: ", u2).await?;
116
204
117
205
Ok(())
118
206
}