back interdiff of round #1 and #0

feat(feed): implement feed service #8

closed
opened by tsiry-sandratraina.com targeting main from feat/feed-generator
REBASED
Cargo.lock

This patch was likely rebased, as context lines do not match.

UNCHANGED
crates/feed/Cargo.toml

This file has not been changed.

UNCHANGED
crates/feed/src/config.rs

This file has not been changed.

CHANGED
crates/feed/src/feed.rs
··· 1 use crate::config::Config; 2 use crate::types::{DidDocument, FeedSkeletonParameters, Service}; 3 use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery}; 4 use std::fmt::Debug; 5 use std::net::SocketAddr; 6 use warp::Filter; 7 8 /// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods. ··· 21 &mut self, 22 name: impl AsRef<str>, 23 address: impl Into<SocketAddr> + Debug + Clone + Send, 24 - ) -> impl std::future::Future<Output = ()> + Send { 25 self.start_with_config(name, Config::load_env_config(), address) 26 } 27 ··· 39 name: impl AsRef<str>, 40 config: Config, 41 address: impl Into<SocketAddr> + Debug + Clone + Send, 42 - ) -> impl std::future::Future<Output = ()> + Send { 43 let handler = self.handler(); 44 let address = address.clone(); 45 let feed_name = name.as_ref().to_string(); 46 47 async move { 48 let config = config; 49 50 let did_config = config.clone(); 51 let did_json = warp::path(".well-known") ··· 56 let describe_feed_generator = warp::path("xrpc") 57 .and(warp::path("app.rocksky.feed.describeFeedGenerator")) 58 .and(warp::get()) 59 - .and_then(move || describe_feed_generator(feed_name.clone())); 60 61 let get_feed_handler = handler.clone(); 62 let get_feed_skeleton = warp::path("xrpc") 63 .and(warp::path("app.rocksky.feed.getFeedSkeleton")) 64 .and(warp::get()) 65 .and(warp::query::<FeedSkeletonParameters>()) 66 - .and_then(move |query: FeedSkeletonParameters| { 67 - get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone()) 68 - }); 69 70 let api = did_json.or(describe_feed_generator).or(get_feed_skeleton); 71 ··· 101 tokio::join!(feed_server.run(address), firehose_listener) 102 .1 103 .expect("Couldn't await tasks"); 104 } 105 } 106 }
··· 1 use crate::config::Config; 2 use crate::types::{DidDocument, FeedSkeletonParameters, Service}; 3 use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery}; 4 + use anyhow::Error; 5 + use sqlx::postgres::PgPoolOptions; 6 + use sqlx::{Pool, Postgres}; 7 + use std::env; 8 use std::fmt::Debug; 9 use std::net::SocketAddr; 10 + use std::sync::Arc; 11 use warp::Filter; 12 13 /// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods. ··· 26 &mut self, 27 name: impl AsRef<str>, 28 address: impl Into<SocketAddr> + Debug + Clone + Send, 29 + ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 30 self.start_with_config(name, Config::load_env_config(), address) 31 } 32 ··· 44 name: impl AsRef<str>, 45 config: Config, 46 address: impl Into<SocketAddr> + Debug + Clone + Send, 47 + ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 48 let handler = self.handler(); 49 let address = address.clone(); 50 let feed_name = name.as_ref().to_string(); 51 52 async move { 53 let config = config; 54 + let pool = PgPoolOptions::new() 55 + .max_connections(5) 56 + .connect(&env::var("XATA_POSTGRES_URL")?) 57 + .await?; 58 + let pool = Arc::new(pool); 59 + let db_filter = warp::any().map(move || pool.clone()); 60 61 let did_config = config.clone(); 62 let did_json = warp::path(".well-known") ··· 67 let describe_feed_generator = warp::path("xrpc") 68 .and(warp::path("app.rocksky.feed.describeFeedGenerator")) 69 .and(warp::get()) 70 + .and(db_filter.clone()) 71 + .and_then(move |_pool: Arc<Pool<Postgres>>| { 72 + describe_feed_generator(feed_name.clone()) 73 + }); 74 75 let get_feed_handler = handler.clone(); 76 let get_feed_skeleton = warp::path("xrpc") 77 .and(warp::path("app.rocksky.feed.getFeedSkeleton")) 78 .and(warp::get()) 79 .and(warp::query::<FeedSkeletonParameters>()) 80 + .and(db_filter.clone()) 81 + .and_then( 82 + move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| { 83 + get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone()) 84 + }, 85 + ); 86 87 let api = did_json.or(describe_feed_generator).or(get_feed_skeleton); 88 ··· 118 tokio::join!(feed_server.run(address), firehose_listener) 119 .1 120 .expect("Couldn't await tasks"); 121 + 122 + Ok::<(), Error>(()) 123 } 124 } 125 }
UNCHANGED
crates/feed/src/feed_handler.rs

This file has not been changed.

CHANGED
crates/feed/src/lib.rs
··· 1 use std::{env, net::SocketAddr, sync::Arc}; 2 - 3 use tokio::sync::Mutex; 4 5 use crate::{ ··· 42 } 43 } 44 45 - pub async fn run() { 46 let mut feed = RecentlyPlayedFeed { 47 handler: RecentlyPlayedFeedHandler { 48 scrobbles: Arc::new(Mutex::new(Vec::new())), ··· 53 let addr_str = format!("{}:{}", host, port); 54 let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); 55 56 - feed.start("RecentlyPlayed", addr).await; 57 }
··· 1 + use anyhow::Error; 2 use std::{env, net::SocketAddr, sync::Arc}; 3 use tokio::sync::Mutex; 4 5 use crate::{ ··· 42 } 43 } 44 45 + pub async fn run() -> Result<(), Error> { 46 let mut feed = RecentlyPlayedFeed { 47 handler: RecentlyPlayedFeedHandler { 48 scrobbles: Arc::new(Mutex::new(Vec::new())), ··· 53 let addr_str = format!("{}:{}", host, port); 54 let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); 55 56 + feed.start("RecentlyPlayed", addr).await?; 57 + Ok(()) 58 }
UNCHANGED
crates/feed/src/types.rs

This file has not been changed.

UNCHANGED
crates/rockskyd/Cargo.toml

This file has not been changed.

CHANGED
crates/rockskyd/src/cmd/feed.rs
··· 1 use anyhow::Error; 2 3 pub async fn serve() -> Result<(), Error> { 4 - rocksky_feed::run().await; 5 Ok(()) 6 }
··· 1 use anyhow::Error; 2 3 pub async fn serve() -> Result<(), Error> { 4 + rocksky_feed::run().await?; 5 Ok(()) 6 }
UNCHANGED
crates/rockskyd/src/cmd/mod.rs

This file has not been changed.

UNCHANGED
crates/rockskyd/src/main.rs

This file has not been changed.