-50
crates/analytics/src/main.rs
-50
crates/analytics/src/main.rs
···
1
-
use core::create_tables;
2
-
use std::{
3
-
env,
4
-
sync::{Arc, Mutex},
5
-
};
6
-
7
-
use clap::Command;
8
-
use cmd::{serve::serve, sync::sync};
9
-
use dotenv::dotenv;
10
-
use duckdb::Connection;
11
-
use sqlx::postgres::PgPoolOptions;
12
-
13
-
pub mod cmd;
14
-
pub mod core;
15
-
pub mod handlers;
16
-
pub mod subscriber;
17
-
pub mod types;
18
-
pub mod xata;
19
-
20
-
fn cli() -> Command {
21
-
Command::new("analytics")
22
-
.version(env!("CARGO_PKG_VERSION"))
23
-
.about("Rocksky Analytics CLI built with Rust and DuckDB")
24
-
.subcommand(Command::new("sync").about("Sync data from Xata to DuckDB"))
25
-
.subcommand(Command::new("serve").about("Serve the Rocksky Analytics API"))
26
-
}
27
-
28
-
#[tokio::main]
29
-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
30
-
dotenv().ok();
31
-
32
-
let pool = PgPoolOptions::new()
33
-
.max_connections(5)
34
-
.connect(&env::var("XATA_POSTGRES_URL")?)
35
-
.await?;
36
-
let conn = Connection::open("./rocksky-analytics.ddb")?;
37
-
38
-
create_tables(&conn).await?;
39
-
40
-
let args = cli().get_matches();
41
-
let conn = Arc::new(Mutex::new(conn));
42
-
43
-
match args.subcommand() {
44
-
Some(("sync", _)) => sync(conn, &pool).await?,
45
-
Some(("serve", _)) => serve(conn).await?,
46
-
_ => serve(conn).await?,
47
-
}
48
-
49
-
Ok(())
50
-
}
-37
crates/dropbox/src/main.rs
-37
crates/dropbox/src/main.rs
···
1
-
use clap::Command;
2
-
use cmd::{scan::scan, serve::serve};
3
-
use dotenv::dotenv;
4
-
5
-
pub mod client;
6
-
pub mod cmd;
7
-
pub mod consts;
8
-
pub mod crypto;
9
-
pub mod handlers;
10
-
pub mod repo;
11
-
pub mod scan;
12
-
pub mod token;
13
-
pub mod types;
14
-
pub mod xata;
15
-
16
-
fn cli() -> Command {
17
-
Command::new("dropbox")
18
-
.version(env!("CARGO_PKG_VERSION"))
19
-
.about("Rocksky Dropbox Service")
20
-
.subcommand(Command::new("scan").about("Scan Dropbox Music Folder"))
21
-
.subcommand(Command::new("serve").about("Serve Rocksky Dropbox API"))
22
-
}
23
-
24
-
#[tokio::main]
25
-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
26
-
dotenv().ok();
27
-
28
-
let args = cli().get_matches();
29
-
30
-
match args.subcommand() {
31
-
Some(("scan", _)) => scan().await?,
32
-
Some(("serve", _)) => serve().await?,
33
-
_ => serve().await?,
34
-
}
35
-
36
-
Ok(())
37
-
}
-37
crates/googledrive/src/main.rs
-37
crates/googledrive/src/main.rs
···
1
-
use clap::Command;
2
-
use cmd::{scan::scan, serve::serve};
3
-
use dotenv::dotenv;
4
-
5
-
pub mod client;
6
-
pub mod cmd;
7
-
pub mod consts;
8
-
pub mod crypto;
9
-
pub mod handlers;
10
-
pub mod repo;
11
-
pub mod scan;
12
-
pub mod token;
13
-
pub mod types;
14
-
pub mod xata;
15
-
16
-
fn cli() -> Command {
17
-
Command::new("googledrive")
18
-
.version(env!("CARGO_PKG_VERSION"))
19
-
.about("Rocksky Google Drive Service")
20
-
.subcommand(Command::new("scan").about("Scan Google Drive Music Folder"))
21
-
.subcommand(Command::new("serve").about("Serve Rocksky Google Drive API"))
22
-
}
23
-
24
-
#[tokio::main]
25
-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
26
-
dotenv().ok();
27
-
28
-
let args = cli().get_matches();
29
-
30
-
match args.subcommand() {
31
-
Some(("scan", _)) => scan().await?,
32
-
Some(("serve", _)) => serve().await?,
33
-
_ => serve().await?,
34
-
}
35
-
36
-
Ok(())
37
-
}
-37
crates/jetstream/src/main.rs
-37
crates/jetstream/src/main.rs
···
1
-
use std::{env, sync::Arc};
2
-
3
-
use dotenv::dotenv;
4
-
use subscriber::ScrobbleSubscriber;
5
-
use tokio::sync::Mutex;
6
-
7
-
use crate::webhook_worker::AppState;
8
-
9
-
pub mod profile;
10
-
pub mod repo;
11
-
pub mod subscriber;
12
-
pub mod types;
13
-
pub mod webhook;
14
-
pub mod webhook_worker;
15
-
pub mod xata;
16
-
17
-
#[tokio::main]
18
-
async fn main() -> Result<(), anyhow::Error> {
19
-
dotenv()?;
20
-
let jetstream_server = env::var("JETSTREAM_SERVER")
21
-
.unwrap_or_else(|_| "wss://jetstream2.us-west.bsky.network".to_string());
22
-
let url = format!(
23
-
"{}/subscribe?wantedCollections=app.rocksky.*",
24
-
jetstream_server
25
-
);
26
-
let subscriber = ScrobbleSubscriber::new(&url);
27
-
28
-
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
29
-
let redis = redis::Client::open(redis_url)?;
30
-
let queue_key =
31
-
env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string());
32
-
33
-
let state = Arc::new(Mutex::new(AppState { redis, queue_key }));
34
-
35
-
subscriber.run(state).await?;
36
-
Ok(())
37
-
}
-65
crates/playlists/src/main.rs
-65
crates/playlists/src/main.rs
···
1
-
use core::{create_tables, find_spotify_users, load_users, save_playlists};
2
-
use std::{
3
-
env,
4
-
sync::{Arc, Mutex},
5
-
};
6
-
7
-
use anyhow::Error;
8
-
use async_nats::connect;
9
-
use dotenv::dotenv;
10
-
use duckdb::Connection;
11
-
use owo_colors::OwoColorize;
12
-
use rocksky_playlists::subscriber::subscribe;
13
-
use spotify::get_user_playlists;
14
-
use sqlx::postgres::PgPoolOptions;
15
-
16
-
pub mod core;
17
-
pub mod crypto;
18
-
pub mod spotify;
19
-
pub mod types;
20
-
pub mod xata;
21
-
22
-
#[tokio::main]
23
-
async fn main() -> Result<(), Error> {
24
-
dotenv().ok();
25
-
26
-
let conn = Connection::open("./rocksky-playlists.ddb")?;
27
-
let conn = Arc::new(Mutex::new(conn));
28
-
create_tables(conn.clone())?;
29
-
30
-
subscribe(conn.clone()).await?;
31
-
32
-
let pool = PgPoolOptions::new()
33
-
.max_connections(5)
34
-
.connect(&env::var("XATA_POSTGRES_URL")?)
35
-
.await?;
36
-
let users = find_spotify_users(&pool, 0, 100).await?;
37
-
38
-
load_users(conn.clone(), &pool).await?;
39
-
40
-
sqlx::query(r#"
41
-
CREATE UNIQUE INDEX IF NOT EXISTS user_playlists_unique_index ON user_playlists (user_id, playlist_id)
42
-
"#)
43
-
.execute(&pool)
44
-
.await?;
45
-
let conn = conn.clone();
46
-
47
-
let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
48
-
let nc = connect(&addr).await?;
49
-
let nc = Arc::new(Mutex::new(nc));
50
-
println!("Connected to NATS server at {}", addr.bright_green());
51
-
52
-
for user in users {
53
-
let token = user.1.clone();
54
-
let did = user.2.clone();
55
-
let user_id = user.3.clone();
56
-
let playlists = get_user_playlists(token).await?;
57
-
save_playlists(&pool, conn.clone(), nc.clone(), playlists, &user_id, &did).await?;
58
-
}
59
-
60
-
println!("Done!");
61
-
62
-
loop {
63
-
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
64
-
}
65
-
}
-105
crates/scrobbler/src/main.rs
-105
crates/scrobbler/src/main.rs
···
1
-
pub mod auth;
2
-
pub mod cache;
3
-
pub mod crypto;
4
-
pub mod handlers;
5
-
pub mod listenbrainz;
6
-
pub mod musicbrainz;
7
-
pub mod params;
8
-
pub mod repo;
9
-
pub mod response;
10
-
pub mod rocksky;
11
-
pub mod scrobbler;
12
-
pub mod signature;
13
-
pub mod spotify;
14
-
pub mod types;
15
-
pub mod xata;
16
-
17
-
use actix_limitation::{Limiter, RateLimiter};
18
-
use actix_session::SessionExt as _;
19
-
use actix_web::{
20
-
dev::ServiceRequest,
21
-
web::{self, Data},
22
-
App, HttpServer,
23
-
};
24
-
use anyhow::Error;
25
-
use cache::Cache;
26
-
use dotenv::dotenv;
27
-
use owo_colors::OwoColorize;
28
-
use sqlx::postgres::PgPoolOptions;
29
-
use std::{env, sync::Arc, time::Duration};
30
-
31
-
pub const BANNER: &str = r#"
32
-
___ ___ _____ __ __ __
33
-
/ | __ ______/ (_)___ / ___/______________ / /_ / /_ / /__ _____
34
-
/ /| |/ / / / __ / / __ \ \__ \/ ___/ ___/ __ \/ __ \/ __ \/ / _ \/ ___/
35
-
/ ___ / /_/ / /_/ / / /_/ / ___/ / /__/ / / /_/ / /_/ / /_/ / / __/ /
36
-
/_/ |_\__,_/\__,_/_/\____/ /____/\___/_/ \____/_.___/_.___/_/\___/_/
37
-
38
-
This is the Rocksky Scrobbler API compatible with Last.fm AudioScrobbler API
39
-
"#;
40
-
41
-
#[tokio::main]
42
-
async fn main() -> Result<(), Error> {
43
-
dotenv().ok();
44
-
45
-
println!("{}", BANNER.magenta());
46
-
47
-
let cache = Cache::new()?;
48
-
49
-
let pool = PgPoolOptions::new()
50
-
.max_connections(5)
51
-
.connect(&env::var("XATA_POSTGRES_URL")?)
52
-
.await?;
53
-
let conn = Arc::new(pool);
54
-
55
-
let host = env::var("SCROBBLE_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
56
-
let port = env::var("SCROBBLE_PORT")
57
-
.unwrap_or_else(|_| "7882".to_string())
58
-
.parse::<u16>()
59
-
.unwrap_or(7882);
60
-
61
-
tracing::info!(
62
-
url = %format!("http://{}:{}", host, port).bright_green(),
63
-
"Starting Scrobble server @"
64
-
);
65
-
66
-
let limiter = web::Data::new(
67
-
Limiter::builder("redis://127.0.0.1")
68
-
.key_by(|req: &ServiceRequest| {
69
-
req.get_session()
70
-
.get(&"session-id")
71
-
.unwrap_or_else(|_| req.cookie(&"rate-api-id").map(|c| c.to_string()))
72
-
})
73
-
.limit(100)
74
-
.period(Duration::from_secs(60)) // 60 minutes
75
-
.build()
76
-
.unwrap(),
77
-
);
78
-
79
-
HttpServer::new(move || {
80
-
App::new()
81
-
.wrap(RateLimiter::default())
82
-
.app_data(limiter.clone())
83
-
.app_data(Data::new(conn.clone()))
84
-
.app_data(Data::new(cache.clone()))
85
-
.service(handlers::handle_methods)
86
-
.service(handlers::handle_nowplaying)
87
-
.service(handlers::handle_submission)
88
-
.service(listenbrainz::handlers::handle_submit_listens)
89
-
.service(listenbrainz::handlers::handle_validate_token)
90
-
.service(listenbrainz::handlers::handle_search_users)
91
-
.service(listenbrainz::handlers::handle_get_playing_now)
92
-
.service(listenbrainz::handlers::handle_get_listens)
93
-
.service(listenbrainz::handlers::handle_get_listen_count)
94
-
.service(listenbrainz::handlers::handle_get_artists)
95
-
.service(listenbrainz::handlers::handle_get_recordings)
96
-
.service(listenbrainz::handlers::handle_get_release_groups)
97
-
.service(handlers::index)
98
-
.service(handlers::handle_get)
99
-
})
100
-
.bind((host, port))?
101
-
.run()
102
-
.await?;
103
-
104
-
Ok(())
105
-
}