ERROR
crates/feed/Cargo.toml
ERROR
crates/feed/Cargo.toml
Failed to calculate interdiff for this file.
ERROR
crates/feed/src/config.rs
ERROR
crates/feed/src/config.rs
Failed to calculate interdiff for this file.
ERROR
crates/feed/src/feed_handler.rs
ERROR
crates/feed/src/feed_handler.rs
Failed to calculate interdiff for this file.
ERROR
crates/feed/src/types.rs
ERROR
crates/feed/src/types.rs
Failed to calculate interdiff for this file.
ERROR
crates/rockskyd/Cargo.toml
ERROR
crates/rockskyd/Cargo.toml
Failed to calculate interdiff for this file.
REVERTED
crates/rockskyd/src/cmd/feed.rs
REVERTED
crates/rockskyd/src/cmd/feed.rs
ERROR
crates/rockskyd/src/cmd/mod.rs
ERROR
crates/rockskyd/src/cmd/mod.rs
Failed to calculate interdiff for this file.
ERROR
crates/rockskyd/src/main.rs
ERROR
crates/rockskyd/src/main.rs
Failed to calculate interdiff for this file.
NEW
crates/feed/src/feed.rs
NEW
crates/feed/src/feed.rs
···
1
1
use crate::config::Config;
2
2
use crate::types::{DidDocument, FeedSkeletonParameters, Service};
3
3
use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery};
4
+
use anyhow::Error;
5
+
use sqlx::postgres::PgPoolOptions;
6
+
use sqlx::{Pool, Postgres};
7
+
use std::env;
4
8
use std::fmt::Debug;
5
9
use std::net::SocketAddr;
10
+
use std::sync::Arc;
6
11
use warp::Filter;
7
12
8
13
/// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods.
···
21
26
&mut self,
22
27
name: impl AsRef<str>,
23
28
address: impl Into<SocketAddr> + Debug + Clone + Send,
24
-
) -> impl std::future::Future<Output = ()> + Send {
29
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
25
30
self.start_with_config(name, Config::load_env_config(), address)
26
31
}
27
32
···
39
44
name: impl AsRef<str>,
40
45
config: Config,
41
46
address: impl Into<SocketAddr> + Debug + Clone + Send,
42
-
) -> impl std::future::Future<Output = ()> + Send {
47
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
43
48
let handler = self.handler();
44
49
let address = address.clone();
45
50
let feed_name = name.as_ref().to_string();
46
51
47
52
async move {
48
53
let config = config;
54
+
let pool = PgPoolOptions::new()
55
+
.max_connections(5)
56
+
.connect(&env::var("XATA_POSTGRES_URL")?)
57
+
.await?;
58
+
let pool = Arc::new(pool);
59
+
let db_filter = warp::any().map(move || pool.clone());
49
60
50
61
let did_config = config.clone();
51
62
let did_json = warp::path(".well-known")
···
56
67
let describe_feed_generator = warp::path("xrpc")
57
68
.and(warp::path("app.rocksky.feed.describeFeedGenerator"))
58
69
.and(warp::get())
59
-
.and_then(move || describe_feed_generator(feed_name.clone()));
70
+
.and(db_filter.clone())
71
+
.and_then(move |_pool: Arc<Pool<Postgres>>| {
72
+
describe_feed_generator(feed_name.clone())
73
+
});
60
74
61
75
let get_feed_handler = handler.clone();
62
76
let get_feed_skeleton = warp::path("xrpc")
63
77
.and(warp::path("app.rocksky.feed.getFeedSkeleton"))
64
78
.and(warp::get())
65
79
.and(warp::query::<FeedSkeletonParameters>())
66
-
.and_then(move |query: FeedSkeletonParameters| {
67
-
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
68
-
});
80
+
.and(db_filter.clone())
81
+
.and_then(
82
+
move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| {
83
+
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
84
+
},
85
+
);
69
86
70
87
let api = did_json.or(describe_feed_generator).or(get_feed_skeleton);
71
88
···
101
118
tokio::join!(feed_server.run(address), firehose_listener)
102
119
.1
103
120
.expect("Couldn't await tasks");
121
+
122
+
Ok::<(), Error>(())
104
123
}
105
124
}
106
125
}
NEW
crates/feed/src/lib.rs
NEW
crates/feed/src/lib.rs
···
1
+
use anyhow::Error;
1
2
use std::{env, net::SocketAddr, sync::Arc};
2
-
3
3
use tokio::sync::Mutex;
4
4
5
5
use crate::{
···
42
42
}
43
43
}
44
44
45
-
pub async fn run() {
45
+
pub async fn run() -> Result<(), Error> {
46
46
let mut feed = RecentlyPlayedFeed {
47
47
handler: RecentlyPlayedFeedHandler {
48
48
scrobbles: Arc::new(Mutex::new(Vec::new())),
···
53
53
let addr_str = format!("{}:{}", host, port);
54
54
let addr: SocketAddr = addr_str.parse().expect("Invalid address format");
55
55
56
-
feed.start("RecentlyPlayed", addr).await;
56
+
feed.start("RecentlyPlayed", addr).await?;
57
+
Ok(())
57
58
}
NEW
Cargo.lock
NEW
Cargo.lock
···
3466
3466
"windows-sys 0.59.0",
3467
3467
]
3468
3468
3469
+
[[package]]
3470
+
name = "moka"
3471
+
version = "0.12.11"
3472
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3473
+
checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077"
3474
+
dependencies = [
3475
+
"async-lock",
3476
+
"crossbeam-channel",
3477
+
"crossbeam-epoch",
3478
+
"crossbeam-utils",
3479
+
"equivalent",
3480
+
"event-listener",
3481
+
"futures-util",
3482
+
"parking_lot",
3483
+
"portable-atomic",
3484
+
"rustc_version",
3485
+
"smallvec",
3486
+
"tagptr",
3487
+
"uuid",
3488
+
]
3489
+
3490
+
[[package]]
3491
+
name = "multer"
3492
+
version = "2.1.0"
3493
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3494
+
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
3495
+
dependencies = [
3496
+
"bytes",
3497
+
"encoding_rs",
3498
+
"futures-util",
3499
+
"http 0.2.12",
3500
+
"httparse",
3501
+
"log",
3502
+
"memchr",
3503
+
"mime",
3504
+
"spin",
3505
+
"version_check",
3506
+
]
3507
+
3508
+
[[package]]
3509
+
name = "multibase"
3510
+
version = "0.9.1"
3511
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3512
+
checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404"
3513
+
dependencies = [
3514
+
"base-x",
3515
+
"data-encoding",
3516
+
"data-encoding-macro",
3517
+
]
3518
+
3519
+
[[package]]
3520
+
name = "multihash"
3521
+
version = "0.19.3"
3522
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3523
+
checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d"
3524
+
dependencies = [
3525
+
"core2",
3526
+
"serde",
3527
+
"unsigned-varint",
3528
+
]
3529
+
3469
3530
[[package]]
3470
3531
name = "nanoid"
3471
3532
version = "0.4.0"
···
3475
3536
"rand 0.8.5",
3476
3537
]
3477
3538
3539
+
[[package]]
3540
+
name = "nanorand"
3541
+
version = "0.7.0"
3542
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3543
+
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
3544
+
dependencies = [
3545
+
"getrandom 0.2.16",
3546
+
]
3547
+
3548
+
[[package]]
3549
+
name = "native-tls"
3550
+
version = "0.2.14"
3551
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3552
+
checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
3553
+
dependencies = [
3554
+
"libc",
3555
+
"log",
3556
+
"openssl",
3557
+
"openssl-probe",
3558
+
"openssl-sys",
3559
+
"schannel",
3560
+
"security-framework 2.11.1",
3561
+
"security-framework-sys",
3562
+
"tempfile",
3563
+
]
3564
+
3478
3565
[[package]]
3479
3566
name = "nkeys"
3480
3567
version = "0.4.4"