slack status without the slack status.zzstoatzz.io
hatk statusphere
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e50e1e7104625f5e672e064fa69396cce5f834bb 118 lines 4.8 kB view raw
1use crate::db::StatusFromDb; 2use crate::lexicons; 3use anyhow::anyhow; 4use async_sqlite::Pool; 5use async_trait::async_trait; 6use log::error; 7use rocketman::{ 8 connection::JetstreamConnection, 9 handler, 10 ingestion::LexiconIngestor, 11 options::JetstreamOptions, 12 types::event::{Event, Operation}, 13}; 14use serde_json::Value; 15use std::{ 16 collections::HashMap, 17 sync::{Arc, Mutex}, 18}; 19 20#[async_trait] 21impl LexiconIngestor for StatusSphereIngester { 22 async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> { 23 if let Some(commit) = &message.commit { 24 //We manually construct the uri since Jetstream does not provide it 25 //at://{users did}/{collection: xyz.statusphere.status}{records key} 26 let record_uri = format!("at://{}/{}/{}", message.did, commit.collection, commit.rkey); 27 match commit.operation { 28 Operation::Create | Operation::Update => { 29 if let Some(record) = &commit.record { 30 let status_at_proto_record = serde_json::from_value::< 31 lexicons::io::zzstoatzz::status::record::RecordData, 32 >(record.clone())?; 33 34 if let Some(ref _cid) = commit.cid { 35 // Although esquema does not have full validation yet, 36 // if you get to this point, 37 // You know the data structure is the same 38 let created = status_at_proto_record.created_at.as_ref(); 39 let right_now = chrono::Utc::now(); 40 // We save or update the record in the db 41 StatusFromDb { 42 uri: record_uri, 43 author_did: message.did.clone(), 44 status: status_at_proto_record.emoji.clone(), 45 text: status_at_proto_record.text.clone(), 46 expires_at: status_at_proto_record.expires.as_ref().map(|e| { 47 // Convert ATProto Datetime to chrono DateTime 48 chrono::DateTime::parse_from_rfc3339(e.as_str()) 49 .ok() 50 .map(|dt| dt.with_timezone(&chrono::Utc)) 51 .unwrap_or_else(chrono::Utc::now) 52 }), 53 started_at: created.to_utc(), 54 indexed_at: right_now, 55 handle: None, 56 } 57 .save_or_update(&self.db_pool) 58 .await?; 59 } 60 } 61 } 62 Operation::Delete => StatusFromDb::delete_by_uri(&self.db_pool, record_uri).await?, 63 } 64 } else { 65 return Err(anyhow!("Message has no commit")); 66 } 67 Ok(()) 68 } 69} 70pub struct StatusSphereIngester { 71 db_pool: Arc<Pool>, 72} 73 74pub async fn start_ingester(db_pool: Arc<Pool>) { 75 // init the builder 76 let opts = JetstreamOptions::builder() 77 // listen for our status record collection 78 .wanted_collections(vec!["io.zzstoatzz.status.record".parse().unwrap()]) 79 .build(); 80 // create the jetstream connector 81 let jetstream = JetstreamConnection::new(opts); 82 83 // create your ingesters 84 let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 85 ingesters.insert( 86 // your EXACT nsid 87 "io.zzstoatzz.status.record".parse().unwrap(), 88 Box::new(StatusSphereIngester { db_pool }), 89 ); 90 91 // tracks the last message we've processed 92 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 93 94 // get channels 95 let msg_rx = jetstream.get_msg_rx(); 96 let reconnect_tx = jetstream.get_reconnect_tx(); 97 98 // spawn a task to process messages from the queue. 99 // this is a simple implementation, you can use a more complex one based on needs. 100 let c_cursor = cursor.clone(); 101 tokio::spawn(async move { 102 while let Ok(message) = msg_rx.recv_async().await { 103 if let Err(e) = 104 handler::handle_message(message, &ingesters, reconnect_tx.clone(), c_cursor.clone()) 105 .await 106 { 107 error!("Error processing message: {}", e); 108 }; 109 } 110 }); 111 112 // connect to jetstream 113 // retries internally, but may fail if there is an extreme error. 114 if let Err(e) = jetstream.connect(cursor.clone()).await { 115 error!("Failed to connect to Jetstream: {}", e); 116 std::process::exit(1); 117 } 118}