+10
-8
Cargo.lock
+10
-8
Cargo.lock
···
1509
1509
1510
1510
[[package]]
1511
1511
name = "deranged"
1512
-
version = "0.4.0"
1512
+
version = "0.5.3"
1513
1513
source = "registry+https://github.com/rust-lang/crates.io-index"
1514
-
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
1514
+
checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc"
1515
1515
dependencies = [
1516
1516
"powerfmt",
1517
1517
"serde",
···
4931
4931
"dotenv",
4932
4932
"futures-util",
4933
4933
"owo-colors",
4934
+
"redis 0.29.5",
4934
4935
"reqwest",
4935
4936
"serde",
4936
4937
"serde_json",
4937
4938
"sha256",
4938
4939
"sqlx",
4940
+
"time",
4939
4941
"tokio",
4940
4942
"tokio-stream",
4941
4943
"tokio-tungstenite",
···
6440
6442
6441
6443
[[package]]
6442
6444
name = "time"
6443
-
version = "0.3.41"
6445
+
version = "0.3.44"
6444
6446
source = "registry+https://github.com/rust-lang/crates.io-index"
6445
-
checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40"
6447
+
checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d"
6446
6448
dependencies = [
6447
6449
"deranged",
6448
6450
"itoa",
···
6455
6457
6456
6458
[[package]]
6457
6459
name = "time-core"
6458
-
version = "0.1.4"
6460
+
version = "0.1.6"
6459
6461
source = "registry+https://github.com/rust-lang/crates.io-index"
6460
-
checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c"
6462
+
checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b"
6461
6463
6462
6464
[[package]]
6463
6465
name = "time-macros"
6464
-
version = "0.2.22"
6466
+
version = "0.2.24"
6465
6467
source = "registry+https://github.com/rust-lang/crates.io-index"
6466
-
checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49"
6468
+
checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3"
6467
6469
dependencies = [
6468
6470
"num-conv",
6469
6471
"time-core",
+2
crates/jetstream/Cargo.toml
+2
crates/jetstream/Cargo.toml
+16
-3
crates/jetstream/src/lib.rs
+16
-3
crates/jetstream/src/lib.rs
···
1
1
use anyhow::Error;
2
-
use std::env;
3
-
2
+
use std::{env, sync::Arc};
4
3
use subscriber::ScrobbleSubscriber;
4
+
use tokio::sync::Mutex;
5
+
6
+
use crate::webhook_worker::{start_worker, AppState};
5
7
6
8
pub mod profile;
7
9
pub mod repo;
8
10
pub mod subscriber;
9
11
pub mod types;
12
+
pub mod webhook;
13
+
pub mod webhook_worker;
10
14
pub mod xata;
11
15
12
16
pub async fn subscribe() -> Result<(), Error> {
17
+
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
18
+
let redis = redis::Client::open(redis_url)?;
19
+
let queue_key =
20
+
env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string());
21
+
22
+
let state = Arc::new(Mutex::new(AppState { redis, queue_key }));
23
+
24
+
start_worker(state.clone()).await?;
25
+
13
26
let jetstream_server = env::var("JETSTREAM_SERVER")
14
27
.unwrap_or_else(|_| "wss://jetstream2.us-west.bsky.network".to_string());
15
28
let url = format!(
···
18
31
);
19
32
let subscriber = ScrobbleSubscriber::new(&url);
20
33
21
-
subscriber.run().await?;
34
+
subscriber.run(state).await?;
22
35
23
36
Ok(())
24
37
}
+14
-2
crates/jetstream/src/main.rs
+14
-2
crates/jetstream/src/main.rs
···
1
-
use std::env;
1
+
use std::{env, sync::Arc};
2
2
3
3
use dotenv::dotenv;
4
4
use subscriber::ScrobbleSubscriber;
5
+
use tokio::sync::Mutex;
6
+
7
+
use crate::webhook_worker::AppState;
5
8
6
9
pub mod profile;
7
10
pub mod repo;
8
11
pub mod subscriber;
9
12
pub mod types;
13
+
pub mod webhook;
14
+
pub mod webhook_worker;
10
15
pub mod xata;
11
16
12
17
#[tokio::main]
···
20
25
);
21
26
let subscriber = ScrobbleSubscriber::new(&url);
22
27
23
-
subscriber.run().await?;
28
+
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
29
+
let redis = redis::Client::open(redis_url)?;
30
+
let queue_key =
31
+
env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string());
32
+
33
+
let state = Arc::new(Mutex::new(AppState { redis, queue_key }));
34
+
35
+
subscriber.run(state).await?;
24
36
Ok(())
25
37
}
+61
crates/jetstream/src/repo.rs
+61
crates/jetstream/src/repo.rs
···
10
10
profile::did_to_profile,
11
11
subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID},
12
12
types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord},
13
+
webhook::discord::{
14
+
self,
15
+
model::{ScrobbleData, WebhookEnvelope},
16
+
},
17
+
webhook_worker::{push_to_queue, AppState},
13
18
xata::{
14
19
album::Album, album_track::AlbumTrack, artist::Artist, artist_album::ArtistAlbum,
15
20
artist_track::ArtistTrack, track::Track, user::User, user_album::UserAlbum,
···
18
23
};
19
24
20
25
pub async fn save_scrobble(
26
+
state: Arc<Mutex<AppState>>,
21
27
pool: Arc<Mutex<Pool<Postgres>>>,
22
28
did: &str,
23
29
commit: Commit,
···
85
91
.await?;
86
92
87
93
tx.commit().await?;
94
+
95
+
let users: Vec<User> =
96
+
sqlx::query_as::<_, User>("SELECT * FROM users WHERE did = $1")
97
+
.bind(did)
98
+
.fetch_all(&*pool)
99
+
.await?;
100
+
101
+
if users.is_empty() {
102
+
return Err(anyhow::anyhow!(
103
+
"User with DID {} not found in database",
104
+
did
105
+
));
106
+
}
107
+
108
+
// Push to webhook queue (Discord)
109
+
match push_to_queue(
110
+
state,
111
+
&WebhookEnvelope {
112
+
r#type: "scrobble.created".to_string(),
113
+
id: commit.rkey.clone(),
114
+
data: ScrobbleData {
115
+
user: discord::model::User {
116
+
did: did.to_string(),
117
+
display_name: users[0].display_name.clone(),
118
+
handle: users[0].handle.clone(),
119
+
},
120
+
track: discord::model::Track {
121
+
title: scrobble_record.title.clone(),
122
+
artist: scrobble_record.artist.clone(),
123
+
album: scrobble_record.album.clone(),
124
+
duration: scrobble_record.duration,
125
+
artwork_url: scrobble_record.album_art.clone().map(|x| {
126
+
format!(
127
+
"https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}",
128
+
did,
129
+
x.r#ref.link,
130
+
x.mime_type.split('/').last().unwrap_or("jpeg")
131
+
)
132
+
}),
133
+
spotify_url: scrobble_record.spotify_link.clone(),
134
+
tidal_url: scrobble_record.tidal_link.clone(),
135
+
youtube_url: scrobble_record.youtube_link.clone(),
136
+
},
137
+
played_at: scrobble_record.created_at.clone(),
138
+
},
139
+
delivered_at: Some(chrono::Utc::now().to_rfc3339()),
140
+
},
141
+
)
142
+
.await
143
+
{
144
+
Ok(_) => {}
145
+
Err(e) => {
146
+
eprintln!("Failed to push to webhook queue: {}", e);
147
+
}
148
+
}
88
149
}
89
150
90
151
if commit.collection == ARTIST_NSID {
+9
-5
crates/jetstream/src/subscriber.rs
+9
-5
crates/jetstream/src/subscriber.rs
···
7
7
use tokio::sync::Mutex;
8
8
use tokio_tungstenite::{connect_async, tungstenite::Message};
9
9
10
-
use crate::{repo::save_scrobble, types::Root};
10
+
use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState};
11
11
12
12
pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble";
13
13
pub const ARTIST_NSID: &str = "app.rocksky.artist";
···
28
28
}
29
29
}
30
30
31
-
pub async fn run(&self) -> Result<(), Error> {
31
+
pub async fn run(&self, state: Arc<Mutex<AppState>>) -> Result<(), Error> {
32
32
// Get the connection string outside of the task
33
33
let db_url = env::var("XATA_POSTGRES_URL")
34
34
.context("Failed to get XATA_POSTGRES_URL environment variable")?;
···
48
48
while let Some(msg) = ws_stream.next().await {
49
49
match msg {
50
50
Ok(msg) => {
51
-
if let Err(e) = handle_message(pool.clone(), msg).await {
51
+
if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await {
52
52
eprintln!("Error handling message: {}", e);
53
53
}
54
54
}
···
63
63
}
64
64
}
65
65
66
-
async fn handle_message(pool: Arc<Mutex<sqlx::PgPool>>, msg: Message) -> Result<(), Error> {
66
+
async fn handle_message(
67
+
state: Arc<Mutex<AppState>>,
68
+
pool: Arc<Mutex<sqlx::PgPool>>,
69
+
msg: Message,
70
+
) -> Result<(), Error> {
67
71
tokio::spawn(async move {
68
72
if let Message::Text(text) = msg {
69
73
let message: Root = serde_json::from_str(&text)?;
···
74
78
75
79
println!("Received message: {:#?}", message);
76
80
if let Some(commit) = message.commit {
77
-
match save_scrobble(pool, &message.did, commit).await {
81
+
match save_scrobble(state, pool, &message.did, commit).await {
78
82
Ok(_) => {
79
83
println!("Scrobble saved successfully");
80
84
}
+48
crates/jetstream/src/webhook/discord/mod.rs
+48
crates/jetstream/src/webhook/discord/mod.rs
···
1
+
pub mod model;
2
+
3
+
use crate::webhook::discord::model::*;
4
+
use reqwest::Client;
5
+
6
+
pub fn embed_from_scrobble(s: &ScrobbleData, rkey: &str) -> DiscordEmbed {
7
+
let url = format!("https://rocksky.app/{}/scrobble/{}", s.user.did, rkey);
8
+
9
+
let mut desc = format!("**{}**\nby {}", esc(&s.track.title), esc(&s.track.artist));
10
+
desc.push_str(&format!("\non *{}*", esc(&s.track.album)));
11
+
12
+
DiscordEmbed {
13
+
title: s.user.display_name.clone(),
14
+
url,
15
+
description: Some(desc),
16
+
timestamp: Some(s.played_at.clone()),
17
+
thumbnail: s.track.artwork_url.clone().map(|u| DiscordThumb { url: u }),
18
+
footer: Some(DiscordFooter {
19
+
text: format!("Rocksky โข {}", s.user.handle.clone()),
20
+
}),
21
+
}
22
+
}
23
+
24
+
pub async fn post_embeds(
25
+
http: &Client,
26
+
discord_webhook_url: &str,
27
+
embeds: Vec<DiscordEmbed>,
28
+
) -> reqwest::Result<()> {
29
+
if discord_webhook_url.is_empty() {
30
+
println!("DISCORD_WEBHOOK_URL is not set, skipping webhook post");
31
+
return Ok(());
32
+
}
33
+
34
+
let body = DiscordWebhookPayload {
35
+
content: String::new(),
36
+
embeds,
37
+
};
38
+
let res = http.post(discord_webhook_url).json(&body).send().await?;
39
+
if !res.status().is_success() {
40
+
let text = res.text().await.unwrap_or_default();
41
+
eprintln!("Failed to post to Discord webhook: {}", text);
42
+
}
43
+
Ok(())
44
+
}
45
+
46
+
fn esc(s: &str) -> String {
47
+
s.replace(['*', '_', '~', '`', '>'], "\\$0")
48
+
}
+75
crates/jetstream/src/webhook/discord/model.rs
+75
crates/jetstream/src/webhook/discord/model.rs
···
1
+
use serde::{Deserialize, Serialize};
2
+
3
+
#[derive(Debug, Deserialize, Serialize, Clone)]
4
+
pub struct WebhookEnvelope {
5
+
#[serde(default)]
6
+
pub r#type: String,
7
+
pub id: String,
8
+
#[serde(default)]
9
+
pub delivered_at: Option<String>,
10
+
pub data: ScrobbleData,
11
+
}
12
+
13
+
#[derive(Debug, Deserialize, Serialize, Clone)]
14
+
pub struct ScrobbleData {
15
+
pub user: User,
16
+
pub track: Track,
17
+
pub played_at: String, // ISO 8601
18
+
}
19
+
20
+
#[derive(Debug, Deserialize, Serialize, Clone)]
21
+
pub struct User {
22
+
pub did: String,
23
+
pub display_name: String,
24
+
pub handle: String,
25
+
}
26
+
27
+
#[derive(Debug, Deserialize, Serialize, Clone)]
28
+
pub struct Track {
29
+
pub title: String,
30
+
pub artist: String,
31
+
pub album: String,
32
+
pub duration: i32,
33
+
#[serde(default)]
34
+
pub artwork_url: Option<String>,
35
+
#[serde(default)]
36
+
pub spotify_url: Option<String>,
37
+
#[serde(default)]
38
+
pub tidal_url: Option<String>,
39
+
#[serde(default)]
40
+
pub youtube_url: Option<String>,
41
+
}
42
+
43
+
/* ---------- Discord payloads ---------- */
44
+
45
+
#[derive(Debug, Serialize)]
46
+
pub struct DiscordWebhookPayload {
47
+
#[serde(default)]
48
+
pub content: String,
49
+
#[serde(default)]
50
+
pub embeds: Vec<DiscordEmbed>,
51
+
}
52
+
53
+
#[derive(Debug, Serialize)]
54
+
pub struct DiscordEmbed {
55
+
pub title: String,
56
+
pub url: String,
57
+
#[serde(skip_serializing_if = "Option::is_none")]
58
+
pub description: Option<String>,
59
+
#[serde(skip_serializing_if = "Option::is_none")]
60
+
pub timestamp: Option<String>,
61
+
#[serde(skip_serializing_if = "Option::is_none")]
62
+
pub thumbnail: Option<DiscordThumb>,
63
+
#[serde(skip_serializing_if = "Option::is_none")]
64
+
pub footer: Option<DiscordFooter>,
65
+
}
66
+
67
+
#[derive(Debug, Serialize)]
68
+
pub struct DiscordThumb {
69
+
pub url: String,
70
+
}
71
+
72
+
#[derive(Debug, Serialize)]
73
+
pub struct DiscordFooter {
74
+
pub text: String,
75
+
}
+1
crates/jetstream/src/webhook/mod.rs
+1
crates/jetstream/src/webhook/mod.rs
···
1
+
pub mod discord;
+139
crates/jetstream/src/webhook_worker.rs
+139
crates/jetstream/src/webhook_worker.rs
···
1
+
use crate::webhook::discord::{self, model::WebhookEnvelope};
2
+
use anyhow::Error;
3
+
use std::{
4
+
env,
5
+
sync::Arc,
6
+
time::{Duration, Instant},
7
+
};
8
+
use tokio::{sync::Mutex, time::interval};
9
+
10
+
#[derive(Clone)]
11
+
pub struct AppState {
12
+
pub redis: redis::Client,
13
+
pub queue_key: String,
14
+
}
15
+
16
+
pub async fn start_worker(state: Arc<Mutex<AppState>>) -> Result<(), Error> {
17
+
let max_rps: u32 = env::var("MAX_REQUESTS_PER_SEC")
18
+
.ok()
19
+
.and_then(|s| s.parse().ok())
20
+
.unwrap_or(5);
21
+
let max_embeds_per: usize = env::var("MAX_EMBEDS_PER_REQUEST")
22
+
.ok()
23
+
.and_then(|s| s.parse().ok())
24
+
.unwrap_or(10);
25
+
let batch_window_ms: u64 = env::var("BATCH_WINDOW_MS")
26
+
.ok()
27
+
.and_then(|s| s.parse().ok())
28
+
.unwrap_or(400);
29
+
let discord_webhook_url = env::var("DISCORD_WEBHOOK_URL").unwrap_or(String::new());
30
+
31
+
tokio::spawn(run_worker(
32
+
state.clone(),
33
+
discord_webhook_url,
34
+
max_rps,
35
+
max_embeds_per,
36
+
Duration::from_millis(batch_window_ms),
37
+
));
38
+
39
+
Ok(())
40
+
}
41
+
42
+
async fn run_worker(
43
+
st: Arc<Mutex<AppState>>,
44
+
discord_webhook_url: String,
45
+
max_rps: u32,
46
+
max_embeds_per: usize,
47
+
batch_window: Duration,
48
+
) {
49
+
let http = reqwest::Client::builder()
50
+
.user_agent("rocksky-discord-bridge/0.1")
51
+
.build()
52
+
.expect("http client");
53
+
54
+
let mut tokens = max_rps as i32;
55
+
let mut refill = interval(Duration::from_secs(1));
56
+
refill.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
57
+
58
+
loop {
59
+
tokio::select! {
60
+
_ = refill.tick() => {
61
+
tokens = (tokens + max_rps as i32).min(max_rps as i32);
62
+
}
63
+
_ = tokio::time::sleep(Duration::from_millis(10)) => { /* tick */ }
64
+
}
65
+
66
+
if tokens <= 0 {
67
+
continue;
68
+
}
69
+
70
+
let start = Instant::now();
71
+
let mut embeds = Vec::with_capacity(max_embeds_per);
72
+
73
+
while embeds.len() < max_embeds_per && start.elapsed() < batch_window {
74
+
match brpop_once(st.clone(), 1).await {
75
+
Ok(Some(json_str)) => {
76
+
if let Ok(env) = serde_json::from_str::<WebhookEnvelope>(&json_str) {
77
+
embeds.push(discord::embed_from_scrobble(&env.data, &env.id));
78
+
}
79
+
}
80
+
Ok(None) => break,
81
+
Err(e) => {
82
+
eprintln!("Failed to pop from Redis: {}", e);
83
+
break;
84
+
}
85
+
}
86
+
}
87
+
88
+
if embeds.is_empty() {
89
+
tokio::time::sleep(Duration::from_millis(50)).await;
90
+
continue;
91
+
}
92
+
93
+
tokens -= 1;
94
+
95
+
if let Err(e) = discord::post_embeds(&http, &discord_webhook_url, embeds).await {
96
+
eprintln!("Failed to post to Discord webhook: {}", e);
97
+
}
98
+
}
99
+
}
100
+
101
+
async fn brpop_once(
102
+
state: Arc<Mutex<AppState>>,
103
+
timeout_secs: u64,
104
+
) -> redis::RedisResult<Option<String>> {
105
+
let AppState {
106
+
redis: client,
107
+
queue_key: key,
108
+
} = &*state.lock().await;
109
+
let mut conn = client.get_multiplexed_async_connection().await?;
110
+
let res: Option<(String, String)> = redis::cmd("BRPOP")
111
+
.arg(key)
112
+
.arg(timeout_secs as usize)
113
+
.query_async(&mut conn)
114
+
.await?;
115
+
Ok(res.map(|(_, v)| v))
116
+
}
117
+
118
+
pub async fn push_to_queue(
119
+
state: Arc<Mutex<AppState>>,
120
+
item: &WebhookEnvelope,
121
+
) -> redis::RedisResult<()> {
122
+
let payload = serde_json::to_string(item).unwrap();
123
+
let AppState {
124
+
redis: client,
125
+
queue_key: key,
126
+
} = &*state.lock().await;
127
+
let mut conn = client.get_multiplexed_async_connection().await?;
128
+
let _: () = redis::pipe()
129
+
.cmd("RPUSH")
130
+
.arg(key)
131
+
.arg(payload)
132
+
.ignore()
133
+
.cmd("EXPIRE")
134
+
.arg(key)
135
+
.arg(60 * 60 * 24) // 24h
136
+
.query_async(&mut conn)
137
+
.await?;
138
+
Ok(())
139
+
}