+25
-1
crates/jetstream/src/repo.rs
+25
-1
crates/jetstream/src/repo.rs
···
1
+
use std::{env, sync::Arc};
2
3
use anyhow::Error;
4
use chrono::DateTime;
5
+
use futures_util::SinkExt;
6
use owo_colors::OwoColorize;
7
+
use serde_json::json;
8
use sqlx::{Pool, Postgres};
9
use tokio::sync::Mutex;
10
···
257
.fetch_all(&mut **tx)
258
.await?;
259
}
260
+
261
+
let u = &users[0];
262
+
let payload = json!({
263
+
"xata_id": u.xata_id,
264
+
"did": u.did,
265
+
"handle": u.handle,
266
+
"display_name": u.display_name,
267
+
"avatar": u.avatar,
268
+
"xata_createdat": u.xata_createdat.to_rfc3339(),
269
+
"xata_updatedat": u.xata_createdat.to_rfc3339(),
270
+
"xata_version": 0,
271
+
});
272
+
let payload = serde_json::to_string(&payload)?;
273
+
274
+
tokio::spawn(async move {
275
+
let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
276
+
let nc = async_nats::connect(&addr).await?;
277
+
tracing::info!(server = %addr.bright_green(), "Connected to NATS");
278
+
279
+
nc.publish("rocksky.user", payload.into()).await?;
280
+
Ok::<(), Error>(())
281
+
});
282
283
Ok(users[0].xata_id.clone())
284
}