+1
-1
src/bin/blahg.rs
+1
-1
src/bin/blahg.rs
···
147
147
};
148
148
149
149
// Setup markdown render manager
150
-
let render_manager: Arc<dyn RenderManager> = Arc::new(ComrakRenderManager::new(&config.external_base));
150
+
let render_manager: Arc<dyn RenderManager> = Arc::new(ComrakRenderManager::with_theme(&config.syntect_theme, &config.external_base));
151
151
152
152
// Setup template engine
153
153
let template_env = {
+23
src/config.rs
+23
src/config.rs
···
6
6
/// Application configuration loaded from environment variables.
7
7
#[derive(Debug, Clone, Serialize, Deserialize)]
8
8
pub struct Config {
9
+
/// HTTP server configuration
9
10
pub http: HttpConfig,
11
+
/// Base URL for external links and content
10
12
pub external_base: String,
13
+
/// Certificate bundles for TLS verification
11
14
pub certificate_bundles: Vec<String>,
15
+
/// User agent string for HTTP requests
12
16
pub user_agent: String,
17
+
/// Hostname for PLC directory lookups
13
18
pub plc_hostname: String,
19
+
/// DNS nameservers for resolution
14
20
pub dns_nameservers: Vec<std::net::IpAddr>,
21
+
/// HTTP client timeout duration
15
22
pub http_client_timeout: Duration,
23
+
/// Author name for the blog
16
24
pub author: String,
25
+
/// Storage path for attachments
17
26
pub attachment_storage: String,
27
+
/// Database connection URL
18
28
pub database_url: String,
29
+
/// Path to jetstream cursor file
19
30
pub jetstream_cursor_path: Option<String>,
31
+
/// Whether to enable jetstream event consumption
20
32
pub enable_jetstream: bool,
33
+
/// Syntect theme for syntax highlighting
34
+
pub syntect_theme: String,
21
35
}
22
36
37
+
/// HTTP server configuration options.
23
38
#[derive(Debug, Clone, Serialize, Deserialize)]
24
39
pub struct HttpConfig {
40
+
/// Port to bind the HTTP server to
25
41
pub port: u16,
42
+
/// Path to static assets
26
43
pub static_path: String,
44
+
/// Path to template files
27
45
pub templates_path: String,
28
46
}
29
47
···
45
63
database_url: "sqlite://blahg.db".to_string(),
46
64
jetstream_cursor_path: None,
47
65
enable_jetstream: true,
66
+
syntect_theme: std::env::var("SYNTECT_THEME").unwrap_or_else(|_| "base16-ocean.dark".to_string()),
48
67
}
49
68
}
50
69
}
···
136
155
137
156
if let Ok(enable_jetstream) = std::env::var("ENABLE_JETSTREAM") {
138
157
config.enable_jetstream = enable_jetstream.parse().unwrap_or(true);
158
+
}
159
+
160
+
if let Ok(syntect_theme) = std::env::var("SYNTECT_THEME") {
161
+
config.syntect_theme = syntect_theme;
139
162
}
140
163
141
164
Ok(config)
+16
src/consumer.rs
+16
src/consumer.rs
···
9
9
use tokio::sync::Mutex;
10
10
use tokio::sync::mpsc;
11
11
12
+
/// Receiver for `BlahgEvent` instances from the jetstream consumer.
12
13
pub type BlahgEventReceiver = mpsc::UnboundedReceiver<BlahgEvent>;
13
14
15
+
/// ATProtocol events relevant to the blog application.
14
16
#[derive(Debug, Clone)]
15
17
pub enum BlahgEvent {
18
+
/// A record was committed to the ATProtocol network.
16
19
Commit {
20
+
/// The DID of the record author
17
21
did: String,
22
+
/// The collection name
18
23
collection: String,
24
+
/// The record key
19
25
rkey: String,
26
+
/// The content identifier
20
27
cid: String,
28
+
/// The record data
21
29
record: serde_json::Value,
22
30
},
31
+
/// A record was deleted from the ATProtocol network.
23
32
Delete {
33
+
/// The DID of the record author
24
34
did: String,
35
+
/// The collection name
25
36
collection: String,
37
+
/// The record key
26
38
rkey: String,
27
39
},
28
40
}
29
41
42
+
/// Handler for processing ATProtocol events and converting them to `BlahgEvent` instances.
30
43
pub struct BlahgEventHandler {
31
44
id: String,
32
45
event_sender: mpsc::UnboundedSender<BlahgEvent>,
···
155
168
}
156
169
}
157
170
171
+
/// Consumer for creating ATProtocol event handlers.
158
172
pub struct Consumer {}
159
173
160
174
impl Consumer {
175
+
/// Create a new Blahg event handler that processes ATProtocol events.
161
176
pub fn create_blahg_handler(&self) -> (Arc<BlahgEventHandler>, BlahgEventReceiver) {
162
177
let (sender, receiver) = mpsc::unbounded_channel();
163
178
let handler = Arc::new(BlahgEventHandler::new(
···
167
182
(handler, receiver)
168
183
}
169
184
185
+
/// Create a new cursor writer handler that persists jetstream cursor position.
170
186
pub fn create_cursor_writer_handler(&self, cursor_path: String) -> Arc<CursorWriterHandler> {
171
187
Arc::new(CursorWriterHandler::new(
172
188
"cursor-writer".to_string(),
-82
src/identity.rs
-82
src/identity.rs
···
1
1
use atproto_identity::model::Document;
2
2
use atproto_identity::resolve::IdentityResolver;
3
-
use atproto_identity::storage::DidDocumentStorage;
4
3
use chrono::Utc;
5
4
use std::sync::Arc;
6
5
···
64
63
Ok(())
65
64
}
66
65
}
67
-
68
-
/// A variant that works with DidDocumentStorage instead of IdentityStorage
69
-
pub(crate) struct CachingDidDocumentResolver<T: DidDocumentStorage + ?Sized> {
70
-
/// The underlying identity resolver to use when cache misses occur
71
-
resolver: IdentityResolver,
72
-
/// The storage implementation to use for caching
73
-
storage: Arc<T>,
74
-
}
75
-
76
-
impl<T: DidDocumentStorage + ?Sized> CachingDidDocumentResolver<T> {
77
-
/// Create a new caching identity resolver with the given resolver and storage.
78
-
pub(crate) fn new(resolver: IdentityResolver, storage: Arc<T>) -> Self {
79
-
Self { resolver, storage }
80
-
}
81
-
82
-
/// Resolve a DID to a Document, using the cache when possible.
83
-
pub(crate) async fn resolve(&self, did: &str) -> Result<Document> {
84
-
// First, try to get the document from storage
85
-
if let Some(document) = self.storage.get_document_by_did(did).await? {
86
-
return Ok(document);
87
-
}
88
-
89
-
// If not in storage, resolve using the underlying resolver
90
-
let document = self.resolver.resolve(did).await?;
91
-
92
-
// Store the resolved document for future lookups
93
-
self.storage.store_document(document.clone()).await?;
94
-
95
-
Ok(document)
96
-
}
97
-
98
-
/// Resolve a handle to a Document, using the cache when possible.
99
-
pub(crate) async fn resolve_handle(&self, handle: &str) -> Result<Document> {
100
-
// For handle resolution, we need to resolve first since DidDocumentStorage
101
-
// doesn't have a get_by_handle method
102
-
let document = self.resolver.resolve(handle).await?;
103
-
104
-
// Store the resolved document for future lookups
105
-
self.storage.store_document(document.clone()).await?;
106
-
107
-
Ok(document)
108
-
}
109
-
}
110
-
111
-
#[cfg(test)]
112
-
mod tests {
113
-
use super::*;
114
-
use crate::storage::sqlite::SqliteStorage;
115
-
use atproto_identity::resolve::{InnerIdentityResolver, create_resolver};
116
-
use sqlx::SqlitePool;
117
-
use std::sync::Arc;
118
-
119
-
#[tokio::test]
120
-
async fn test_caching_identity_resolver() -> Result<()> {
121
-
// Create an in-memory SQLite database for testing
122
-
let pool = SqlitePool::connect("sqlite::memory:").await?;
123
-
let storage = Arc::new(SqliteStorage::new(pool));
124
-
125
-
// Run migrations using the Storage trait
126
-
use crate::storage::Storage;
127
-
storage.migrate().await?;
128
-
129
-
// Create a mock resolver (this would normally resolve from the network)
130
-
let dns_resolver = create_resolver(&[]);
131
-
let http_client = reqwest::Client::new();
132
-
let inner_resolver = InnerIdentityResolver {
133
-
dns_resolver,
134
-
http_client,
135
-
plc_hostname: "plc.directory".to_string(),
136
-
};
137
-
let resolver = IdentityResolver(Arc::new(inner_resolver));
138
-
139
-
// Create the caching resolver
140
-
let caching_resolver = CachingIdentityResolver::new(resolver, storage.clone());
141
-
142
-
// Test would go here - this is just a skeleton since we'd need real DIDs
143
-
// and network access to properly test the resolution
144
-
145
-
Ok(())
146
-
}
147
-
}
+17
src/lexicon.rs
+17
src/lexicon.rs
···
1
1
use chrono::{DateTime, Utc};
2
2
use serde::Deserialize;
3
3
4
+
/// A blob record containing binary data metadata.
4
5
#[derive(Debug, Clone, Deserialize)]
5
6
pub struct BlobRecord {
7
+
/// The record type
6
8
#[serde(rename = "$type")]
7
9
pub r#type: String,
8
10
11
+
/// MIME type of the blob
9
12
#[serde(rename = "mimeType")]
10
13
pub mime_type: String,
14
+
/// Size of the blob in bytes
11
15
pub size: i64,
12
16
17
+
/// Reference to the blob content
13
18
#[serde(rename = "ref")]
14
19
pub r#ref: BlobRef,
15
20
}
16
21
22
+
/// A reference to blob content via CID link.
17
23
#[derive(Debug, Clone, Deserialize)]
18
24
pub struct BlobRef {
25
+
/// The CID link to the blob content
19
26
#[serde(rename = "$link")]
20
27
pub link: String,
21
28
}
22
29
30
+
/// An attachment to a blog post.
23
31
#[derive(Debug, Clone, Deserialize)]
24
32
pub struct PostAttachment {
33
+
/// The attachment type
25
34
#[serde(rename = "$type")]
26
35
pub r#type: String,
27
36
37
+
/// The blob content of the attachment
28
38
pub content: BlobRecord,
29
39
}
30
40
41
+
/// A blog post record from the ATProtocol lexicon.
31
42
#[derive(Debug, Clone, Deserialize)]
32
43
pub struct PostRecord {
44
+
/// The record type
33
45
#[serde(rename = "$type")]
34
46
pub r#type: String,
35
47
48
+
/// Title of the blog post
36
49
pub title: String,
50
+
/// Main content of the blog post
37
51
pub content: BlobRecord,
38
52
53
+
/// Publication timestamp
39
54
#[serde(rename = "publishedAt")]
40
55
pub published_at: DateTime<Utc>,
41
56
57
+
/// List of attachments to the post
42
58
#[serde(default = "empty_attachments")]
43
59
pub attachments: Vec<PostAttachment>,
44
60
61
+
/// Languages used in the post
45
62
pub langs: Vec<String>,
46
63
}
47
64
+34
src/lib.rs
+34
src/lib.rs
···
1
+
//! # Blahg - ATProtocol Blog AppView
2
+
//!
3
+
//! Blahg is a Rust-based ATProtocol AppView that renders personal blog content.
4
+
//! It consumes ATProtocol records to create a blog-like experience by discovering
5
+
//! and displaying relevant posts.
6
+
//!
7
+
//! ## Architecture
8
+
//!
9
+
//! - **ATProtocol Integration**: Uses the atproto ecosystem crates for identity resolution and record processing
10
+
//! - **Storage Layer**: Flexible storage backends including SQLite, PostgreSQL, and content storage
11
+
//! - **Rendering**: Markdown rendering with syntax highlighting
12
+
//! - **Event Processing**: Real-time consumption of ATProtocol events via jetstream
13
+
//!
14
+
//! ## Key Components
15
+
//!
16
+
//! - [`config`] - Configuration management
17
+
//! - [`consumer`] - ATProtocol event consumption
18
+
//! - [`process`] - Event processing and record handling
19
+
//! - [`storage`] - Data persistence layer
20
+
//! - [`render`] - Markdown rendering
21
+
//! - [`http`] - Web server and API endpoints
22
+
//! - [`identity`] - ATProtocol identity resolution
23
+
//! - [`lexicon`] - ATProtocol record type definitions
24
+
1
25
#![warn(missing_docs)]
2
26
27
+
/// Configuration management for the Blahg application.
3
28
pub mod config;
29
+
/// ATProtocol event consumption and handling.
4
30
pub mod consumer;
31
+
/// Error types and handling for the Blahg application.
5
32
pub mod errors;
33
+
/// HTTP server and web API endpoints.
6
34
pub mod http;
35
+
/// ATProtocol identity resolution and caching.
7
36
pub mod identity;
37
+
/// ATProtocol record type definitions and lexicon.
8
38
pub mod lexicon;
39
+
/// Event processing and record handling logic.
9
40
pub mod process;
41
+
/// Markdown rendering with syntax highlighting.
10
42
pub mod render;
43
+
/// Data persistence layer with multiple backend support.
11
44
pub mod storage;
45
+
/// Template management and rendering.
12
46
pub mod templates;
+4
-13
src/process.rs
+4
-13
src/process.rs
···
28
28
#[derive(Debug, Deserialize, Serialize, Default)]
29
29
struct StrongRef {
30
30
#[serde(rename = "$type")]
31
-
pub type_: Option<String>,
32
-
pub uri: String,
33
-
pub cid: String,
31
+
type_: Option<String>,
32
+
uri: String,
33
+
cid: String,
34
34
}
35
35
36
36
/// app.bsky.feed.post record structure
37
37
#[derive(Debug, Deserialize)]
38
38
struct FeedPostRecord {
39
39
#[serde(rename = "$type")]
40
-
pub type_: Option<String>,
40
+
type_: Option<String>,
41
41
42
42
#[serde(default)]
43
43
facets: Vec<Facet>,
···
409
409
}
410
410
411
411
Ok(())
412
-
}
413
-
414
-
/// Check if an AT-URI references a post from a known author
415
-
fn is_known_author_post(&self, uri: &str) -> bool {
416
-
if let Ok(aturi) = ATURI::from_str(uri) {
417
-
return aturi.collection == "tools.smokesignal.blahg.content.post"
418
-
&& self.config.author == aturi.authority;
419
-
}
420
-
false
421
412
}
422
413
423
414
/// Creates a post prefix lookup hashmap with external URL prefixes mapped to AT-URIs
+12
-41
src/render.rs
+12
-41
src/render.rs
···
9
9
use std::sync::Arc;
10
10
11
11
12
+
/// Trait for rendering markdown content to HTML.
12
13
#[async_trait]
13
14
pub trait RenderManager: Send + Sync {
15
+
/// Render markdown content to HTML.
14
16
async fn render_markdown(&self, markdown: &str) -> Result<String>;
15
17
}
16
18
19
+
/// A markdown renderer using the Comrak library with syntax highlighting.
17
20
pub struct ComrakRenderManager<'a> {
18
21
syntect_adapter: Arc<SyntectAdapter>,
19
22
options: Options<'a>,
···
21
24
}
22
25
23
26
impl ComrakRenderManager<'static> {
27
+
28
+
#[cfg(test)]
29
+
/// Create a new Comrak render manager with the given external base URL.
24
30
pub fn new(external_base: &str) -> Self {
31
+
Self::with_theme("base16-ocean.dark", external_base)
32
+
}
33
+
34
+
/// Create a new Comrak render manager with the given theme and external base URL.
35
+
pub fn with_theme(theme: &str, external_base: &str) -> Self {
25
36
let syntect_adapter = Arc::new(
26
37
SyntectAdapterBuilder::new()
27
-
.theme("base16-ocean.dark")
38
+
.theme(theme)
28
39
.build(),
29
40
);
30
-
31
-
let mut options = Options::default();
32
-
options.extension.strikethrough = true;
33
-
options.extension.table = true;
34
-
options.extension.autolink = true;
35
-
options.extension.tasklist = true;
36
-
options.extension.superscript = true;
37
-
options.extension.footnotes = true;
38
-
options.extension.description_lists = true;
39
-
options.render.unsafe_ = false;
40
-
41
-
Self {
42
-
syntect_adapter,
43
-
options,
44
-
external_base: external_base.to_string(),
45
-
}
46
-
}
47
-
48
-
fn with_theme(theme: &str, external_base: &str) -> Self {
49
-
let syntect_adapter = Arc::new(SyntectAdapterBuilder::new().theme(theme).build());
50
-
51
-
let mut options = Options::default();
52
-
options.extension.strikethrough = true;
53
-
options.extension.table = true;
54
-
options.extension.autolink = true;
55
-
options.extension.tasklist = true;
56
-
options.extension.superscript = true;
57
-
options.extension.footnotes = true;
58
-
options.extension.description_lists = true;
59
-
options.render.unsafe_ = false;
60
-
61
-
Self {
62
-
syntect_adapter,
63
-
options,
64
-
external_base: external_base.to_string(),
65
-
}
66
-
}
67
-
68
-
fn with_css(external_base: &str) -> Self {
69
-
let syntect_adapter = Arc::new(SyntectAdapterBuilder::new().css().build());
70
41
71
42
let mut options = Options::default();
72
43
options.extension.strikethrough = true;
+2
-6
src/storage/cached.rs
+2
-6
src/storage/cached.rs
···
8
8
9
9
use super::{ContentStorage, Identity, IdentityStorage, Post, PostReference, PostStorage, Storage};
10
10
11
+
/// A caching layer for post storage that keeps posts in memory.
11
12
pub struct CachedPostStorage<T: Storage> {
12
13
underlying_storage: Arc<T>,
13
14
post_cache: Arc<RwLock<Option<HashMap<String, Post>>>>,
14
15
}
15
16
16
17
impl<T: Storage> CachedPostStorage<T> {
18
+
/// Create a new cached post storage with the given underlying storage.
17
19
pub fn new(underlying_storage: Arc<T>) -> Self {
18
20
Self {
19
21
underlying_storage,
···
41
43
}
42
44
43
45
*cache = Some(post_map);
44
-
Ok(())
45
-
}
46
-
47
-
async fn invalidate_cache(&self) -> Result<()> {
48
-
let mut cache = self.post_cache.write().await;
49
-
*cache = None;
50
46
Ok(())
51
47
}
52
48
+1
src/storage/content.rs
+1
src/storage/content.rs
+54
-6
src/storage/mod.rs
+54
-6
src/storage/mod.rs
···
5
5
use serde_json::Value;
6
6
use std::collections::HashMap;
7
7
8
+
/// An ATProtocol identity record.
8
9
#[derive(Clone, Serialize, Deserialize, sqlx::FromRow)]
9
10
pub struct Identity {
10
-
pub did: String,
11
-
pub handle: String,
12
-
pub record: Value,
13
-
pub created_at: DateTime<Utc>,
14
-
pub updated_at: DateTime<Utc>,
11
+
/// The decentralized identifier (DID) of the identity
12
+
pub (crate) did: String,
13
+
/// The handle associated with the identity
14
+
pub (crate) handle: String,
15
+
/// The identity document as JSON
16
+
pub (crate) record: Value,
17
+
/// When the identity was first created
18
+
pub (crate) created_at: DateTime<Utc>,
19
+
/// When the identity was last updated
20
+
pub (crate) updated_at: DateTime<Utc>,
15
21
}
16
22
17
-
// tools.smokesignal.blahg.content.post
23
+
/// A blog post record from the `tools.smokesignal.blahg.content.post` collection.
18
24
#[derive(Clone, Serialize, Deserialize, sqlx::FromRow)]
19
25
pub struct Post {
26
+
/// The AT-URI of the post record
20
27
pub aturi: String,
28
+
/// The content identifier (CID) of the post
21
29
pub cid: String,
30
+
/// The title of the blog post
22
31
pub title: String,
32
+
/// The URL slug for the post
23
33
pub slug: String,
34
+
/// The markdown content of the post
24
35
pub content: String,
36
+
/// The record key within the collection
25
37
pub record_key: String,
38
+
/// When the post was first created
26
39
pub created_at: DateTime<Utc>,
40
+
/// When the post was last updated
27
41
pub updated_at: DateTime<Utc>,
42
+
/// The original ATProtocol record as JSON
28
43
pub record: Value,
29
44
}
30
45
46
+
/// A reference to a blog post from another ATProtocol record.
31
47
#[derive(Clone, Serialize, Deserialize, sqlx::FromRow)]
32
48
pub struct PostReference {
49
+
/// The AT-URI of the reference record
33
50
pub aturi: String,
51
+
/// The content identifier (CID) of the reference
34
52
pub cid: String,
53
+
/// The DID of the author who created the reference
35
54
pub did: String,
55
+
/// The collection containing the reference
36
56
pub collection: String,
57
+
/// The AT-URI of the post being referenced
37
58
pub post_aturi: String,
59
+
/// When the reference was discovered
38
60
pub discovered_at: DateTime<Utc>,
61
+
/// The original reference record as JSON
39
62
pub record: Value,
40
63
}
41
64
65
+
/// Trait for storing and retrieving blog posts and post references.
42
66
#[async_trait]
43
67
pub trait PostStorage: Send + Sync {
68
+
/// Insert or update a blog post.
44
69
async fn upsert_post(&self, post: &Post) -> Result<()>;
45
70
71
+
/// Get a blog post by its AT-URI.
46
72
async fn get_post(&self, aturi: &str) -> Result<Option<Post>>;
47
73
74
+
/// Get all blog posts.
48
75
async fn get_posts(&self) -> Result<Vec<Post>>;
49
76
77
+
/// Delete a blog post by its AT-URI.
50
78
async fn delete_post(&self, aturi: &str) -> Result<Option<Post>>;
51
79
80
+
/// Insert or update a post reference.
52
81
async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result<bool>;
53
82
83
+
/// Delete a post reference by its AT-URI.
54
84
async fn delete_post_reference(&self, aturi: &str) -> Result<()>;
55
85
86
+
/// Get the count of references to a post grouped by collection.
56
87
async fn get_post_reference_count(&self, post_aturi: &str) -> Result<HashMap<String, i64>>;
57
88
89
+
/// Get all references to a specific post.
58
90
async fn get_post_references_for_post(&self, post_aturi: &str) -> Result<Vec<PostReference>>;
59
91
92
+
/// Get references to a post from a specific collection.
60
93
async fn get_post_references_for_post_for_collection(
61
94
&self,
62
95
post_aturi: &str,
···
64
97
) -> Result<Vec<PostReference>>;
65
98
}
66
99
100
+
/// Trait for storing and retrieving ATProtocol identities.
67
101
#[async_trait]
68
102
pub trait IdentityStorage: Send + Sync {
103
+
/// Insert or update an identity record.
69
104
async fn upsert_identity(&self, identity: &Identity) -> Result<()>;
70
105
106
+
/// Get an identity by its DID.
71
107
async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>>;
72
108
109
+
/// Get an identity by its handle.
73
110
async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>>;
74
111
112
+
/// Delete an identity by its AT-URI.
75
113
async fn delete_identity(&self, aturi: &str) -> Result<Option<Identity>>;
76
114
}
77
115
116
+
/// Trait for storing and retrieving content by CID.
78
117
#[async_trait]
79
118
pub trait ContentStorage: Send + Sync {
119
+
/// Check if content exists for a given CID.
80
120
async fn content_exists(&self, cid: &str) -> Result<bool>;
81
121
122
+
/// Write content data for a given CID.
82
123
async fn write_content(&self, cid: &str, data: &[u8]) -> Result<()>;
83
124
125
+
/// Read content data for a given CID.
84
126
async fn read_content(&self, cid: &str) -> Result<Vec<u8>>;
85
127
}
86
128
129
+
/// Combined trait for all storage operations.
87
130
#[async_trait]
88
131
pub trait Storage: PostStorage + IdentityStorage + Send + Sync {
132
+
/// Run database migrations.
89
133
async fn migrate(&self) -> Result<()>;
90
134
}
91
135
136
+
/// Cached storage implementations.
92
137
pub mod cached;
138
+
/// Content storage implementations.
93
139
pub mod content;
140
+
/// PostgreSQL storage implementation.
94
141
pub mod postgres;
142
+
/// SQLite storage implementation.
95
143
#[cfg(feature = "sqlite")]
96
144
pub mod sqlite;
97
145