+34
-16
crates/jetstream/src/repo.rs
+34
-16
crates/jetstream/src/repo.rs
···
1
-
use std::{env, sync::Arc};
1
+
use std::sync::Arc;
2
2
3
3
use anyhow::Error;
4
4
use chrono::DateTime;
5
-
use futures_util::SinkExt;
6
5
use owo_colors::OwoColorize;
7
6
use serde_json::json;
8
7
use sqlx::{Pool, Postgres};
···
32
31
pub async fn save_scrobble(
33
32
state: Arc<Mutex<AppState>>,
34
33
pool: Arc<Mutex<Pool<Postgres>>>,
34
+
nc: Arc<async_nats::Client>,
35
35
did: &str,
36
36
commit: Commit,
37
37
) -> Result<(), Error> {
···
88
88
.bind(artist_id)
89
89
.bind(track_id)
90
90
.bind(uri)
91
-
.bind(user_id)
91
+
.bind(&user_id)
92
92
.bind(
93
93
DateTime::parse_from_rfc3339(&scrobble_record.created_at)
94
94
.unwrap()
···
98
98
.await?;
99
99
100
100
tx.commit().await?;
101
+
publish_user(&nc, &pool, &user_id).await?;
101
102
102
103
let users: Vec<User> =
103
104
sqlx::query_as::<_, User>("SELECT * FROM users WHERE did = $1")
···
160
161
update_artist_uri(&mut tx, &user_id, artist_record, &uri).await?;
161
162
162
163
tx.commit().await?;
164
+
publish_user(&nc, &pool, &user_id).await?;
163
165
}
164
166
165
167
if commit.collection == ALBUM_NSID {
···
172
174
update_album_uri(&mut tx, &user_id, album_record, &uri).await?;
173
175
174
176
tx.commit().await?;
177
+
publish_user(&nc, &pool, &user_id).await?;
175
178
}
176
179
177
180
if commit.collection == SONG_NSID {
···
185
188
update_track_uri(&mut tx, &user_id, song_record, &uri).await?;
186
189
187
190
tx.commit().await?;
191
+
publish_user(&nc, &pool, &user_id).await?;
188
192
}
189
193
190
194
if commit.collection == FEED_GENERATOR_NSID {
···
198
202
save_feed_generator(&mut tx, &user_id, feed_generator_record, &uri).await?;
199
203
200
204
tx.commit().await?;
205
+
publish_user(&nc, &pool, &user_id).await?;
201
206
}
202
207
203
208
if commit.collection == FOLLOW_NSID {
204
209
let mut tx = pool.begin().await?;
205
210
206
-
save_user(&mut tx, did).await?;
211
+
let user_id = save_user(&mut tx, did).await?;
207
212
let uri = format!("at://{}/app.rocksky.graph.follow/{}", did, commit.rkey);
208
213
209
214
let follow_record: FollowRecord = serde_json::from_value(commit.record)?;
210
-
save_user(&mut tx, &follow_record.subject).await?;
215
+
let subject_user_id = save_user(&mut tx, &follow_record.subject).await?;
211
216
save_follow(&mut tx, did, follow_record, &uri).await?;
212
217
213
218
tx.commit().await?;
219
+
publish_user(&nc, &pool, &user_id).await?;
220
+
publish_user(&nc, &pool, &subject_user_id).await?;
214
221
}
215
222
}
216
223
_ => {
···
258
265
.await?;
259
266
}
260
267
268
+
Ok(users[0].xata_id.clone())
269
+
}
270
+
271
+
pub async fn publish_user(
272
+
nc: &async_nats::Client,
273
+
pool: &Pool<Postgres>,
274
+
id: &str,
275
+
) -> Result<(), Error> {
276
+
let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1")
277
+
.bind(id)
278
+
.fetch_all(pool)
279
+
.await?;
280
+
281
+
if users.is_empty() {
282
+
tracing::warn!(user=%id, "user not found");
283
+
return Ok(());
284
+
}
285
+
261
286
let u = &users[0];
287
+
262
288
let payload = json!({
263
289
"xata_id": u.xata_id,
264
290
"did": u.did,
···
271
297
});
272
298
let payload = serde_json::to_string(&payload)?;
273
299
274
-
tokio::spawn(async move {
275
-
let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
276
-
let nc = async_nats::connect(&addr).await?;
277
-
tracing::info!(server = %addr.bright_green(), "Connected to NATS");
300
+
nc.publish("rocksky.user", payload.into()).await?;
301
+
nc.flush().await?;
278
302
279
-
nc.publish("rocksky.user", payload.into()).await?;
280
-
nc.flush().await?;
281
-
282
-
Ok::<(), Error>(())
283
-
});
284
-
285
-
Ok(users[0].xata_id.clone())
303
+
Ok(())
286
304
}
287
305
288
306
pub async fn save_track(
+6
-2
crates/jetstream/src/subscriber.rs
+6
-2
crates/jetstream/src/subscriber.rs
···
40
40
.await?;
41
41
let pool = Arc::new(Mutex::new(pool));
42
42
43
+
let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
44
+
let nc = Arc::new(async_nats::connect(&addr).await?);
45
+
43
46
let (mut ws_stream, _) = connect_async(&self.service_url).await?;
44
47
tracing::info!(url = %self.service_url.bright_green(), "Connected to jetstream at");
45
48
46
49
while let Some(msg) = ws_stream.next().await {
47
50
match msg {
48
51
Ok(msg) => {
49
-
if let Err(e) = handle_message(state.clone(), pool.clone(), msg) {
52
+
if let Err(e) = handle_message(state.clone(), pool.clone(), nc.clone(), msg) {
50
53
tracing::error!(error = %e, "Error handling message");
51
54
}
52
55
}
···
64
67
fn handle_message(
65
68
state: Arc<Mutex<AppState>>,
66
69
pool: Arc<Mutex<sqlx::PgPool>>,
70
+
nc: Arc<async_nats::Client>,
67
71
msg: Message,
68
72
) -> Result<(), Error> {
69
73
tokio::spawn(async move {
···
76
80
77
81
tracing::info!(message = %text, "Received message");
78
82
if let Some(commit) = message.commit {
79
-
match save_scrobble(state, pool, &message.did, commit).await {
83
+
match save_scrobble(state, pool, nc, &message.did, commit).await {
80
84
Ok(_) => {
81
85
tracing::info!(user_id = %message.did.bright_green(), "Scrobble saved successfully");
82
86
}