slack status without the slack
status.zzstoatzz.io
hatk
statusphere
1use crate::db::StatusFromDb;
2use crate::lexicons;
3use anyhow::anyhow;
4use async_sqlite::Pool;
5use async_trait::async_trait;
6use log::error;
7use rocketman::{
8 connection::JetstreamConnection,
9 handler,
10 ingestion::LexiconIngestor,
11 options::JetstreamOptions,
12 types::event::{Event, Operation},
13};
14use serde_json::Value;
15use std::{
16 collections::HashMap,
17 sync::{Arc, Mutex},
18};
19
20#[async_trait]
21impl LexiconIngestor for StatusSphereIngester {
22 async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> {
23 if let Some(commit) = &message.commit {
24 //We manually construct the uri since Jetstream does not provide it
25 //at://{users did}/{collection: xyz.statusphere.status}{records key}
26 let record_uri = format!("at://{}/{}/{}", message.did, commit.collection, commit.rkey);
27 match commit.operation {
28 Operation::Create | Operation::Update => {
29 if let Some(record) = &commit.record {
30 let status_at_proto_record = serde_json::from_value::<
31 lexicons::io::zzstoatzz::status::record::RecordData,
32 >(record.clone())?;
33
34 if let Some(ref _cid) = commit.cid {
35 // Although esquema does not have full validation yet,
36 // if you get to this point,
37 // You know the data structure is the same
38 let created = status_at_proto_record.created_at.as_ref();
39 let right_now = chrono::Utc::now();
40 // We save or update the record in the db
41 StatusFromDb {
42 uri: record_uri,
43 author_did: message.did.clone(),
44 status: status_at_proto_record.emoji.clone(),
45 text: status_at_proto_record.text.clone(),
46 expires_at: status_at_proto_record.expires.as_ref().map(|e| {
47 // Convert ATProto Datetime to chrono DateTime
48 chrono::DateTime::parse_from_rfc3339(e.as_str())
49 .ok()
50 .map(|dt| dt.with_timezone(&chrono::Utc))
51 .unwrap_or_else(chrono::Utc::now)
52 }),
53 started_at: created.to_utc(),
54 indexed_at: right_now,
55 handle: None,
56 }
57 .save_or_update(&self.db_pool)
58 .await?;
59 }
60 }
61 }
62 Operation::Delete => StatusFromDb::delete_by_uri(&self.db_pool, record_uri).await?,
63 }
64 } else {
65 return Err(anyhow!("Message has no commit"));
66 }
67 Ok(())
68 }
69}
70pub struct StatusSphereIngester {
71 db_pool: Arc<Pool>,
72}
73
74pub async fn start_ingester(db_pool: Arc<Pool>) {
75 // init the builder
76 let opts = JetstreamOptions::builder()
77 // listen for our status record collection
78 .wanted_collections(vec!["io.zzstoatzz.status.record".parse().unwrap()])
79 .build();
80 // create the jetstream connector
81 let jetstream = JetstreamConnection::new(opts);
82
83 // create your ingesters
84 let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
85 ingesters.insert(
86 // your EXACT nsid
87 "io.zzstoatzz.status.record".parse().unwrap(),
88 Box::new(StatusSphereIngester { db_pool }),
89 );
90
91 // tracks the last message we've processed
92 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
93
94 // get channels
95 let msg_rx = jetstream.get_msg_rx();
96 let reconnect_tx = jetstream.get_reconnect_tx();
97
98 // spawn a task to process messages from the queue.
99 // this is a simple implementation, you can use a more complex one based on needs.
100 let c_cursor = cursor.clone();
101 tokio::spawn(async move {
102 while let Ok(message) = msg_rx.recv_async().await {
103 if let Err(e) =
104 handler::handle_message(message, &ingesters, reconnect_tx.clone(), c_cursor.clone())
105 .await
106 {
107 error!("Error processing message: {}", e);
108 };
109 }
110 });
111
112 // connect to jetstream
113 // retries internally, but may fail if there is an extreme error.
114 if let Err(e) = jetstream.connect(cursor.clone()).await {
115 error!("Failed to connect to Jetstream: {}", e);
116 std::process::exit(1);
117 }
118}