forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{
2 env,
3 str::FromStr,
4 sync::{Arc, Mutex},
5 thread,
6};
7
8use anyhow::Error;
9use duckdb::Connection;
10use sqlx::postgres::PgPoolOptions;
11
12use crate::core::{create_tables, update_artist_genres};
13
14pub mod cmd;
15pub mod core;
16pub mod handlers;
17pub mod subscriber;
18pub mod types;
19pub mod xata;
20
21pub async fn serve() -> Result<(), Error> {
22 let conn = Connection::open("./rocksky-analytics.ddb")?;
23
24 create_tables(&conn).await?;
25
26 let pool = PgPoolOptions::new()
27 .max_connections(5)
28 .connect(&env::var("XATA_POSTGRES_URL")?)
29 .await?;
30
31 let conn = Arc::new(Mutex::new(conn));
32 update_artist_genres(conn.clone(), &pool).await?;
33
34 export_parquets(conn.clone());
35 cmd::serve::serve(conn).await?;
36
37 Ok(())
38}
39
40pub async fn sync() -> Result<(), Error> {
41 let pool = PgPoolOptions::new()
42 .max_connections(5)
43 .connect(&env::var("XATA_POSTGRES_URL")?)
44 .await?;
45
46 let conn = Connection::open("./rocksky-analytics.ddb")?;
47 create_tables(&conn).await?;
48
49 let conn = Arc::new(Mutex::new(conn));
50
51 cmd::sync::sync(conn, &pool).await?;
52
53 Ok(())
54}
55
56fn export_parquets(conn: Arc<Mutex<Connection>>) {
57 thread::spawn(move || {
58 // fire every 5 minutes
59 let cron_expr = "0 */5 * * * * *";
60 let schedule = cron::Schedule::from_str(cron_expr);
61 if let Err(err) = schedule {
62 tracing::error!("Failed to parse cron expression: {}", cron_expr);
63 tracing::error!(error = %err);
64 return Ok(());
65 }
66 let schedule = schedule.unwrap();
67 loop {
68 let now = chrono::Utc::now();
69 let mut upcoming = schedule.upcoming(chrono::Utc).take(1);
70
71 if let Some(next) = upcoming.next() {
72 let duration = next.signed_duration_since(now).to_std().unwrap();
73 thread::sleep(duration);
74 tracing::info!("Exporting parquets ...");
75
76 let conn = conn.lock().unwrap();
77 conn.execute_batch(
78 "BEGIN;
79 COPY (SELECT * FROM scrobbles) TO 'scrobbles.parquet' (FORMAT PARQUET);
80 COPY (SELECT * FROM artists) TO 'artists.parquet' (FORMAT PARQUET);
81 COPY (SELECT * FROM albums) TO 'albums.parquet' (FORMAT PARQUET);
82 COPY (SELECT * FROM tracks) TO 'tracks.parquet' (FORMAT PARQUET);
83 COPY (SELECT * FROM users) TO 'users.parquet' (FORMAT PARQUET);
84 COPY (SELECT * FROM album_tracks) TO 'album_tracks.parquet' (FORMAT PARQUET);
85 COPY (SELECT * FROM artist_albums) TO 'artist_albums.parquet' (FORMAT PARQUET);
86 COPY (SELECT * FROM artist_tracks) TO 'artist_tracks.parquet' (FORMAT PARQUET);
87 COPY (SELECT * FROM loved_tracks) TO 'loved_tracks.parquet' (FORMAT PARQUET);
88 COPY (SELECT * FROM user_albums) TO 'user_albums.parquet' (FORMAT PARQUET);
89 COPY (SELECT * FROM user_artists) TO 'user_artists.parquet' (FORMAT PARQUET);
90 COPY (SELECT * FROM user_tracks) TO 'user_tracks.parquet' (FORMAT PARQUET);
91 COMMIT;",
92 )?;
93
94 drop(conn);
95
96 if env::var("CF_ACCOUNT_ID").is_err() {
97 tracing::warn!("CF_ACCOUNT_ID is not set, skipping upload to R2");
98 continue;
99 }
100
101 upload_to_r2("scrobbles.parquet");
102 upload_to_r2("artists.parquet");
103 upload_to_r2("albums.parquet");
104 upload_to_r2("tracks.parquet");
105 upload_to_r2("users.parquet");
106 upload_to_r2("album_tracks.parquet");
107 upload_to_r2("artist_albums.parquet");
108 upload_to_r2("artist_tracks.parquet");
109 upload_to_r2("loved_tracks.parquet");
110 upload_to_r2("user_albums.parquet");
111 upload_to_r2("user_artists.parquet");
112 upload_to_r2("user_tracks.parquet");
113
114 tracing::info!("Exported parquets successfully.");
115 }
116 }
117
118 #[allow(unreachable_code)]
119 Ok::<(), Error>(())
120 });
121}
122
123fn upload_to_r2(file: &str) {
124 let status = std::process::Command::new("aws")
125 .arg("s3")
126 .arg("cp")
127 .arg(file)
128 .arg(format!(
129 "s3://{}",
130 env::var("R2_BUCKET_NAME").unwrap_or("rocksky-backup".to_string())
131 ))
132 .arg("--endpoint-url")
133 .arg(&format!(
134 "https://{}.r2.cloudflarestorage.com",
135 env::var("CF_ACCOUNT_ID").unwrap()
136 ))
137 .arg("--profile")
138 .arg("r2")
139 .stdout(std::process::Stdio::inherit())
140 .stderr(std::process::Stdio::inherit())
141 .status();
142 match status {
143 Ok(status) => {
144 if status.success() {
145 tracing::info!("Uploaded {} to R2 successfully.", file);
146 } else {
147 tracing::error!("Failed to upload {} to R2.", file);
148 }
149 }
150 Err(err) => {
151 tracing::error!("Failed to execute aws command: {}", err);
152 }
153 };
154}