back interdiff of round #1 and #0

feat(feed): implement feed service #8

closed
opened by tsiry-sandratraina.com targeting main from feat/feed-generator
ERROR
crates/feed/Cargo.toml

Failed to calculate interdiff for this file.

ERROR
crates/feed/src/config.rs

Failed to calculate interdiff for this file.

ERROR
crates/feed/src/feed_handler.rs

Failed to calculate interdiff for this file.

ERROR
crates/feed/src/types.rs

Failed to calculate interdiff for this file.

ERROR
crates/rockskyd/Cargo.toml

Failed to calculate interdiff for this file.

REVERTED
crates/rockskyd/src/cmd/feed.rs
··· 1 - use anyhow::Error; 2 - 3 - pub async fn serve() -> Result<(), Error> { 4 - rocksky_feed::run().await; 5 - Ok(()) 6 - }
ERROR
crates/rockskyd/src/cmd/mod.rs

Failed to calculate interdiff for this file.

ERROR
crates/rockskyd/src/main.rs

Failed to calculate interdiff for this file.

NEW
crates/feed/src/feed.rs
··· 1 1 use crate::config::Config; 2 2 use crate::types::{DidDocument, FeedSkeletonParameters, Service}; 3 3 use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery}; 4 + use anyhow::Error; 5 + use sqlx::postgres::PgPoolOptions; 6 + use sqlx::{Pool, Postgres}; 7 + use std::env; 4 8 use std::fmt::Debug; 5 9 use std::net::SocketAddr; 10 + use std::sync::Arc; 6 11 use warp::Filter; 7 12 8 13 /// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods. ··· 21 26 &mut self, 22 27 name: impl AsRef<str>, 23 28 address: impl Into<SocketAddr> + Debug + Clone + Send, 24 - ) -> impl std::future::Future<Output = ()> + Send { 29 + ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 25 30 self.start_with_config(name, Config::load_env_config(), address) 26 31 } 27 32 ··· 39 44 name: impl AsRef<str>, 40 45 config: Config, 41 46 address: impl Into<SocketAddr> + Debug + Clone + Send, 42 - ) -> impl std::future::Future<Output = ()> + Send { 47 + ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 43 48 let handler = self.handler(); 44 49 let address = address.clone(); 45 50 let feed_name = name.as_ref().to_string(); 46 51 47 52 async move { 48 53 let config = config; 54 + let pool = PgPoolOptions::new() 55 + .max_connections(5) 56 + .connect(&env::var("XATA_POSTGRES_URL")?) 57 + .await?; 58 + let pool = Arc::new(pool); 59 + let db_filter = warp::any().map(move || pool.clone()); 49 60 50 61 let did_config = config.clone(); 51 62 let did_json = warp::path(".well-known") ··· 56 67 let describe_feed_generator = warp::path("xrpc") 57 68 .and(warp::path("app.rocksky.feed.describeFeedGenerator")) 58 69 .and(warp::get()) 59 - .and_then(move || describe_feed_generator(feed_name.clone())); 70 + .and(db_filter.clone()) 71 + .and_then(move |_pool: Arc<Pool<Postgres>>| { 72 + describe_feed_generator(feed_name.clone()) 73 + }); 60 74 61 75 let get_feed_handler = handler.clone(); 62 76 let get_feed_skeleton = warp::path("xrpc") 63 77 .and(warp::path("app.rocksky.feed.getFeedSkeleton")) 64 78 .and(warp::get()) 65 79 .and(warp::query::<FeedSkeletonParameters>()) 66 - .and_then(move |query: FeedSkeletonParameters| { 67 - get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone()) 68 - }); 80 + .and(db_filter.clone()) 81 + .and_then( 82 + move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| { 83 + get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone()) 84 + }, 85 + ); 69 86 70 87 let api = did_json.or(describe_feed_generator).or(get_feed_skeleton); 71 88 ··· 101 118 tokio::join!(feed_server.run(address), firehose_listener) 102 119 .1 103 120 .expect("Couldn't await tasks"); 121 + 122 + Ok::<(), Error>(()) 104 123 } 105 124 } 106 125 }
NEW
crates/feed/src/lib.rs
··· 1 + use anyhow::Error; 1 2 use std::{env, net::SocketAddr, sync::Arc}; 2 - 3 3 use tokio::sync::Mutex; 4 4 5 5 use crate::{ ··· 42 42 } 43 43 } 44 44 45 - pub async fn run() { 45 + pub async fn run() -> Result<(), Error> { 46 46 let mut feed = RecentlyPlayedFeed { 47 47 handler: RecentlyPlayedFeedHandler { 48 48 scrobbles: Arc::new(Mutex::new(Vec::new())), ··· 53 53 let addr_str = format!("{}:{}", host, port); 54 54 let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); 55 55 56 - feed.start("RecentlyPlayed", addr).await; 56 + feed.start("RecentlyPlayed", addr).await?; 57 + Ok(()) 57 58 }
NEW
Cargo.lock
··· 3466 3466 "windows-sys 0.59.0", 3467 3467 ] 3468 3468 3469 + [[package]] 3470 + name = "moka" 3471 + version = "0.12.11" 3472 + source = "registry+https://github.com/rust-lang/crates.io-index" 3473 + checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077" 3474 + dependencies = [ 3475 + "async-lock", 3476 + "crossbeam-channel", 3477 + "crossbeam-epoch", 3478 + "crossbeam-utils", 3479 + "equivalent", 3480 + "event-listener", 3481 + "futures-util", 3482 + "parking_lot", 3483 + "portable-atomic", 3484 + "rustc_version", 3485 + "smallvec", 3486 + "tagptr", 3487 + "uuid", 3488 + ] 3489 + 3490 + [[package]] 3491 + name = "multer" 3492 + version = "2.1.0" 3493 + source = "registry+https://github.com/rust-lang/crates.io-index" 3494 + checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" 3495 + dependencies = [ 3496 + "bytes", 3497 + "encoding_rs", 3498 + "futures-util", 3499 + "http 0.2.12", 3500 + "httparse", 3501 + "log", 3502 + "memchr", 3503 + "mime", 3504 + "spin", 3505 + "version_check", 3506 + ] 3507 + 3508 + [[package]] 3509 + name = "multibase" 3510 + version = "0.9.1" 3511 + source = "registry+https://github.com/rust-lang/crates.io-index" 3512 + checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404" 3513 + dependencies = [ 3514 + "base-x", 3515 + "data-encoding", 3516 + "data-encoding-macro", 3517 + ] 3518 + 3519 + [[package]] 3520 + name = "multihash" 3521 + version = "0.19.3" 3522 + source = "registry+https://github.com/rust-lang/crates.io-index" 3523 + checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" 3524 + dependencies = [ 3525 + "core2", 3526 + "serde", 3527 + "unsigned-varint", 3528 + ] 3529 + 3469 3530 [[package]] 3470 3531 name = "nanoid" 3471 3532 version = "0.4.0" ··· 3475 3536 "rand 0.8.5", 3476 3537 ] 3477 3538 3539 + [[package]] 3540 + name = "nanorand" 3541 + version = "0.7.0" 3542 + source = "registry+https://github.com/rust-lang/crates.io-index" 3543 + checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" 3544 + dependencies = [ 3545 + "getrandom 0.2.16", 3546 + ] 3547 + 3548 + [[package]] 3549 + name = "native-tls" 3550 + version = "0.2.14" 3551 + source = "registry+https://github.com/rust-lang/crates.io-index" 3552 + checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" 3553 + dependencies = [ 3554 + "libc", 3555 + "log", 3556 + "openssl", 3557 + "openssl-probe", 3558 + "openssl-sys", 3559 + "schannel", 3560 + "security-framework 2.11.1", 3561 + "security-framework-sys", 3562 + "tempfile", 3563 + ] 3564 + 3478 3565 [[package]] 3479 3566 name = "nkeys" 3480 3567 version = "0.4.4"