Learn how to use Rust to build ATProto powered applications
at main 4.4 kB view raw
1use crate::db::StatusFromDb; 2use crate::lexicons; 3use crate::lexicons::xyz::statusphere::Status; 4use anyhow::anyhow; 5use async_sqlite::Pool; 6use async_trait::async_trait; 7use atrium_api::types::Collection; 8use log::error; 9use rocketman::{ 10 connection::JetstreamConnection, 11 handler, 12 ingestion::LexiconIngestor, 13 options::JetstreamOptions, 14 types::event::{Event, Operation}, 15}; 16use serde_json::Value; 17use std::{ 18 collections::HashMap, 19 sync::{Arc, Mutex}, 20}; 21 22#[async_trait] 23impl LexiconIngestor for StatusSphereIngester { 24 async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> { 25 if let Some(commit) = &message.commit { 26 //We manually construct the uri since Jetstream does not provide it 27 //at://{users did}/{collection: xyz.statusphere.status}{records key} 28 let record_uri = format!("at://{}/{}/{}", message.did, commit.collection, commit.rkey); 29 match commit.operation { 30 Operation::Create | Operation::Update => { 31 if let Some(record) = &commit.record { 32 let status_at_proto_record = serde_json::from_value::< 33 lexicons::xyz::statusphere::status::RecordData, 34 >(record.clone())?; 35 36 if let Some(ref _cid) = commit.cid { 37 // Although esquema does not have full validation yet, 38 // if you get to this point, 39 // You know the data structure is the same 40 let created = status_at_proto_record.created_at.as_ref(); 41 let right_now = chrono::Utc::now(); 42 // We save or update the record in the db 43 StatusFromDb { 44 uri: record_uri, 45 author_did: message.did.clone(), 46 status: status_at_proto_record.status.clone(), 47 created_at: created.to_utc(), 48 indexed_at: right_now, 49 handle: None, 50 } 51 .save_or_update(&self.db_pool) 52 .await?; 53 } 54 } 55 } 56 Operation::Delete => StatusFromDb::delete_by_uri(&self.db_pool, record_uri).await?, 57 } 58 } else { 59 return Err(anyhow!("Message has no commit")); 60 } 61 Ok(()) 62 } 63} 64pub struct StatusSphereIngester { 65 db_pool: Arc<Pool>, 66} 67 68pub async fn start_ingester(db_pool: Arc<Pool>) { 69 // init the builder 70 let opts = JetstreamOptions::builder() 71 // your EXACT nsids 72 // Which in this case is xyz.statusphere.status 73 .wanted_collections(vec![Status::NSID.parse().unwrap()]) 74 .build(); 75 // create the jetstream connector 76 let jetstream = JetstreamConnection::new(opts); 77 78 // create your ingesters 79 // Which in this case is xyz.statusphere.status 80 let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 81 ingesters.insert( 82 // your EXACT nsid 83 Status::NSID.parse().unwrap(), 84 Box::new(StatusSphereIngester { db_pool }), 85 ); 86 87 // tracks the last message we've processed 88 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 89 90 // get channels 91 let msg_rx = jetstream.get_msg_rx(); 92 let reconnect_tx = jetstream.get_reconnect_tx(); 93 94 // spawn a task to process messages from the queue. 95 // this is a simple implementation, you can use a more complex one based on needs. 96 let c_cursor = cursor.clone(); 97 tokio::spawn(async move { 98 while let Ok(message) = msg_rx.recv_async().await { 99 if let Err(e) = 100 handler::handle_message(message, &ingesters, reconnect_tx.clone(), c_cursor.clone()) 101 .await 102 { 103 error!("Error processing message: {}", e); 104 }; 105 } 106 }); 107 108 // connect to jetstream 109 // retries internally, but may fail if there is an extreme error. 110 if let Err(e) = jetstream.connect(cursor.clone()).await { 111 error!("Failed to connect to Jetstream: {}", e); 112 std::process::exit(1); 113 } 114}