REBASED
Cargo.lock
REBASED
Cargo.lock
This patch was likely rebased, as context lines do not match.
UNCHANGED
crates/feed/Cargo.toml
UNCHANGED
crates/feed/Cargo.toml
This file has not been changed.
UNCHANGED
crates/feed/src/config.rs
UNCHANGED
crates/feed/src/config.rs
This file has not been changed.
CHANGED
crates/feed/src/feed.rs
CHANGED
crates/feed/src/feed.rs
···
1
use crate::config::Config;
2
use crate::types::{DidDocument, FeedSkeletonParameters, Service};
3
use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery};
4
use std::fmt::Debug;
5
use std::net::SocketAddr;
6
use warp::Filter;
7
8
/// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods.
···
21
&mut self,
22
name: impl AsRef<str>,
23
address: impl Into<SocketAddr> + Debug + Clone + Send,
24
-
) -> impl std::future::Future<Output = ()> + Send {
25
self.start_with_config(name, Config::load_env_config(), address)
26
}
27
···
39
name: impl AsRef<str>,
40
config: Config,
41
address: impl Into<SocketAddr> + Debug + Clone + Send,
42
-
) -> impl std::future::Future<Output = ()> + Send {
43
let handler = self.handler();
44
let address = address.clone();
45
let feed_name = name.as_ref().to_string();
46
47
async move {
48
let config = config;
49
50
let did_config = config.clone();
51
let did_json = warp::path(".well-known")
···
56
let describe_feed_generator = warp::path("xrpc")
57
.and(warp::path("app.rocksky.feed.describeFeedGenerator"))
58
.and(warp::get())
59
-
.and_then(move || describe_feed_generator(feed_name.clone()));
60
61
let get_feed_handler = handler.clone();
62
let get_feed_skeleton = warp::path("xrpc")
63
.and(warp::path("app.rocksky.feed.getFeedSkeleton"))
64
.and(warp::get())
65
.and(warp::query::<FeedSkeletonParameters>())
66
-
.and_then(move |query: FeedSkeletonParameters| {
67
-
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
68
-
});
69
70
let api = did_json.or(describe_feed_generator).or(get_feed_skeleton);
71
···
101
tokio::join!(feed_server.run(address), firehose_listener)
102
.1
103
.expect("Couldn't await tasks");
104
}
105
}
106
}
···
1
use crate::config::Config;
2
use crate::types::{DidDocument, FeedSkeletonParameters, Service};
3
use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery};
4
+
use anyhow::Error;
5
+
use sqlx::postgres::PgPoolOptions;
6
+
use sqlx::{Pool, Postgres};
7
+
use std::env;
8
use std::fmt::Debug;
9
use std::net::SocketAddr;
10
+
use std::sync::Arc;
11
use warp::Filter;
12
13
/// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods.
···
26
&mut self,
27
name: impl AsRef<str>,
28
address: impl Into<SocketAddr> + Debug + Clone + Send,
29
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
30
self.start_with_config(name, Config::load_env_config(), address)
31
}
32
···
44
name: impl AsRef<str>,
45
config: Config,
46
address: impl Into<SocketAddr> + Debug + Clone + Send,
47
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
48
let handler = self.handler();
49
let address = address.clone();
50
let feed_name = name.as_ref().to_string();
51
52
async move {
53
let config = config;
54
+
let pool = PgPoolOptions::new()
55
+
.max_connections(5)
56
+
.connect(&env::var("XATA_POSTGRES_URL")?)
57
+
.await?;
58
+
let pool = Arc::new(pool);
59
+
let db_filter = warp::any().map(move || pool.clone());
60
61
let did_config = config.clone();
62
let did_json = warp::path(".well-known")
···
67
let describe_feed_generator = warp::path("xrpc")
68
.and(warp::path("app.rocksky.feed.describeFeedGenerator"))
69
.and(warp::get())
70
+
.and(db_filter.clone())
71
+
.and_then(move |_pool: Arc<Pool<Postgres>>| {
72
+
describe_feed_generator(feed_name.clone())
73
+
});
74
75
let get_feed_handler = handler.clone();
76
let get_feed_skeleton = warp::path("xrpc")
77
.and(warp::path("app.rocksky.feed.getFeedSkeleton"))
78
.and(warp::get())
79
.and(warp::query::<FeedSkeletonParameters>())
80
+
.and(db_filter.clone())
81
+
.and_then(
82
+
move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| {
83
+
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
84
+
},
85
+
);
86
87
let api = did_json.or(describe_feed_generator).or(get_feed_skeleton);
88
···
118
tokio::join!(feed_server.run(address), firehose_listener)
119
.1
120
.expect("Couldn't await tasks");
121
+
122
+
Ok::<(), Error>(())
123
}
124
}
125
}
UNCHANGED
crates/feed/src/feed_handler.rs
UNCHANGED
crates/feed/src/feed_handler.rs
This file has not been changed.
CHANGED
crates/feed/src/lib.rs
CHANGED
crates/feed/src/lib.rs
···
1
use std::{env, net::SocketAddr, sync::Arc};
2
-
3
use tokio::sync::Mutex;
4
5
use crate::{
···
42
}
43
}
44
45
-
pub async fn run() {
46
let mut feed = RecentlyPlayedFeed {
47
handler: RecentlyPlayedFeedHandler {
48
scrobbles: Arc::new(Mutex::new(Vec::new())),
···
53
let addr_str = format!("{}:{}", host, port);
54
let addr: SocketAddr = addr_str.parse().expect("Invalid address format");
55
56
-
feed.start("RecentlyPlayed", addr).await;
57
}
···
1
+
use anyhow::Error;
2
use std::{env, net::SocketAddr, sync::Arc};
3
use tokio::sync::Mutex;
4
5
use crate::{
···
42
}
43
}
44
45
+
pub async fn run() -> Result<(), Error> {
46
let mut feed = RecentlyPlayedFeed {
47
handler: RecentlyPlayedFeedHandler {
48
scrobbles: Arc::new(Mutex::new(Vec::new())),
···
53
let addr_str = format!("{}:{}", host, port);
54
let addr: SocketAddr = addr_str.parse().expect("Invalid address format");
55
56
+
feed.start("RecentlyPlayed", addr).await?;
57
+
Ok(())
58
}
UNCHANGED
crates/feed/src/types.rs
UNCHANGED
crates/feed/src/types.rs
This file has not been changed.
UNCHANGED
crates/rockskyd/Cargo.toml
UNCHANGED
crates/rockskyd/Cargo.toml
This file has not been changed.
CHANGED
crates/rockskyd/src/cmd/feed.rs
CHANGED
crates/rockskyd/src/cmd/feed.rs
UNCHANGED
crates/rockskyd/src/cmd/mod.rs
UNCHANGED
crates/rockskyd/src/cmd/mod.rs
This file has not been changed.
UNCHANGED
crates/rockskyd/src/main.rs
UNCHANGED
crates/rockskyd/src/main.rs
This file has not been changed.