Learn how to use Rust to build ATProto powered applications
1use crate::db::StatusFromDb;
2use crate::lexicons;
3use crate::lexicons::xyz::statusphere::Status;
4use anyhow::anyhow;
5use async_sqlite::Pool;
6use async_trait::async_trait;
7use atrium_api::types::Collection;
8use log::error;
9use rocketman::{
10 connection::JetstreamConnection,
11 handler,
12 ingestion::LexiconIngestor,
13 options::JetstreamOptions,
14 types::event::{Event, Operation},
15};
16use serde_json::Value;
17use std::{
18 collections::HashMap,
19 sync::{Arc, Mutex},
20};
21
22#[async_trait]
23impl LexiconIngestor for StatusSphereIngester {
24 async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> {
25 if let Some(commit) = &message.commit {
26 //We manually construct the uri since Jetstream does not provide it
27 //at://{users did}/{collection: xyz.statusphere.status}{records key}
28 let record_uri = format!("at://{}/{}/{}", message.did, commit.collection, commit.rkey);
29 match commit.operation {
30 Operation::Create | Operation::Update => {
31 if let Some(record) = &commit.record {
32 let status_at_proto_record = serde_json::from_value::<
33 lexicons::xyz::statusphere::status::RecordData,
34 >(record.clone())?;
35
36 if let Some(ref _cid) = commit.cid {
37 // Although esquema does not have full validation yet,
38 // if you get to this point,
39 // You know the data structure is the same
40 let created = status_at_proto_record.created_at.as_ref();
41 let right_now = chrono::Utc::now();
42 // We save or update the record in the db
43 StatusFromDb {
44 uri: record_uri,
45 author_did: message.did.clone(),
46 status: status_at_proto_record.status.clone(),
47 created_at: created.to_utc(),
48 indexed_at: right_now,
49 handle: None,
50 }
51 .save_or_update(&self.db_pool)
52 .await?;
53 }
54 }
55 }
56 Operation::Delete => StatusFromDb::delete_by_uri(&self.db_pool, record_uri).await?,
57 }
58 } else {
59 return Err(anyhow!("Message has no commit"));
60 }
61 Ok(())
62 }
63}
64pub struct StatusSphereIngester {
65 db_pool: Arc<Pool>,
66}
67
68pub async fn start_ingester(db_pool: Arc<Pool>) {
69 // init the builder
70 let opts = JetstreamOptions::builder()
71 // your EXACT nsids
72 // Which in this case is xyz.statusphere.status
73 .wanted_collections(vec![Status::NSID.parse().unwrap()])
74 .build();
75 // create the jetstream connector
76 let jetstream = JetstreamConnection::new(opts);
77
78 // create your ingesters
79 // Which in this case is xyz.statusphere.status
80 let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
81 ingesters.insert(
82 // your EXACT nsid
83 Status::NSID.parse().unwrap(),
84 Box::new(StatusSphereIngester { db_pool }),
85 );
86
87 // tracks the last message we've processed
88 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
89
90 // get channels
91 let msg_rx = jetstream.get_msg_rx();
92 let reconnect_tx = jetstream.get_reconnect_tx();
93
94 // spawn a task to process messages from the queue.
95 // this is a simple implementation, you can use a more complex one based on needs.
96 let c_cursor = cursor.clone();
97 tokio::spawn(async move {
98 while let Ok(message) = msg_rx.recv_async().await {
99 if let Err(e) =
100 handler::handle_message(message, &ingesters, reconnect_tx.clone(), c_cursor.clone())
101 .await
102 {
103 error!("Error processing message: {}", e);
104 };
105 }
106 });
107
108 // connect to jetstream
109 // retries internally, but may fail if there is an extreme error.
110 if let Err(e) = jetstream.connect(cursor.clone()).await {
111 error!("Failed to connect to Jetstream: {}", e);
112 std::process::exit(1);
113 }
114}