···119119pub mod handlers;
120120pub mod notifications;
121121pub mod processor;
122122+pub mod search_index;
122123pub mod types;
123124124125pub use processor::process_record_to_operations;
126126+pub use search_index::SearchIndexWriter;
125127pub use types::{AggregateDelta, DatabaseOperation};
126128// SelfLabels available via types::SelfLabels if needed, but most code imports from lexica directly
127129// Notification helpers re-exported for convenience
···11+use crate::search::SearchTokenizer;
22+use chrono::Utc;
33+use deadpool_postgres::GenericClient;
44+use eyre::Context as _;
55+66+pub struct SearchIndexWriter {
77+ tokenizer: SearchTokenizer,
88+}
99+1010+impl SearchIndexWriter {
1111+ pub fn new() -> Self {
1212+ Self {
1313+ tokenizer: SearchTokenizer::new(),
1414+ }
1515+ }
1616+1717+ /// Index posts for search
1818+ ///
1919+ /// Takes post data (id, content, author_did, etc.) and creates tokenized search records.
2020+ /// This is designed to be called from the database writer after posts are inserted.
2121+ pub async fn index_posts_raw<C: GenericClient>(
2222+ &self,
2323+ conn: &C,
2424+ post_data: &[(i64, Option<Vec<u8>>, String, Option<String>, bool, bool)], // (post_id, content, author_did, lang, has_media, has_links)
2525+ ) -> eyre::Result<()> {
2626+ if post_data.is_empty() {
2727+ return Ok(());
2828+ }
2929+3030+ let now = Utc::now();
3131+3232+ for (post_id, content, author_did, lang, has_media, has_links) in post_data {
3333+ // Decompress and tokenize content
3434+ let tokens = if let Some(content_bytes) = content {
3535+ let decompressed = zstd::decode_all(content_bytes.as_slice())
3636+ .wrap_err("Failed to decompress post content")?;
3737+ let text = String::from_utf8(decompressed)
3838+ .wrap_err("Failed to decode post content as UTF-8")?;
3939+ self.tokenizer.tokenize(&text)
4040+ } else {
4141+ // Empty post or stub - no tokens
4242+ vec![]
4343+ };
4444+4545+ // Insert or update search tokens
4646+ conn.execute(
4747+ "INSERT INTO post_search_tokens (post_id, indexed_at, tokens, lang, has_media, has_links, author_did)
4848+ VALUES ($1, $2, $3, $4, $5, $6, $7)
4949+ ON CONFLICT (post_id) DO UPDATE SET
5050+ tokens = EXCLUDED.tokens,
5151+ indexed_at = EXCLUDED.indexed_at,
5252+ lang = EXCLUDED.lang,
5353+ has_media = EXCLUDED.has_media,
5454+ has_links = EXCLUDED.has_links",
5555+ &[post_id, &now, &tokens, lang, has_media, has_links, author_did],
5656+ )
5757+ .await
5858+ .wrap_err("Failed to insert/update search tokens")?;
5959+ }
6060+6161+ Ok(())
6262+ }
6363+}
6464+6565+impl Default for SearchIndexWriter {
6666+ fn default() -> Self {
6767+ Self::new()
6868+ }
6969+}
+1
consumer/src/lib.rs
···1515// mod label_indexer; // Disabled - will be reimplemented with new cursor system
1616pub mod parsing;
1717mod relay;
1818+pub mod search;
1819mod sources;
1920pub mod types;
2021mod utils;
+2
consumer/src/search/mod.rs
···11+pub mod tokenizer;
22+pub use tokenizer::SearchTokenizer;
···11+-- Create search tokens table (regular PostgreSQL table)
22+-- TimescaleDB conversion will happen in a future migration
33+CREATE TABLE post_search_tokens (
44+ post_id BIGINT PRIMARY KEY,
55+ indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
66+ tokens TEXT[] NOT NULL,
77+ lang TEXT,
88+ has_media BOOLEAN NOT NULL DEFAULT false,
99+ has_links BOOLEAN NOT NULL DEFAULT false,
1010+ author_did TEXT NOT NULL,
1111+1212+ CONSTRAINT fk_post FOREIGN KEY (post_id)
1313+ REFERENCES posts(id) ON DELETE CASCADE
1414+);
1515+1616+-- Indexes
1717+CREATE INDEX idx_post_search_tokens_gin
1818+ ON post_search_tokens USING GIN (tokens);
1919+2020+CREATE INDEX idx_post_search_author
2121+ ON post_search_tokens (author_did, indexed_at DESC);
2222+2323+CREATE INDEX idx_post_search_lang
2424+ ON post_search_tokens (lang, indexed_at DESC)
2525+ WHERE lang IS NOT NULL;
2626+2727+CREATE INDEX idx_post_search_indexed_at
2828+ ON post_search_tokens (indexed_at DESC);
2929+3030+-- Note: We'll convert this to a TimescaleDB hypertable in a future migration
3131+-- See .claude/plans/1115-2249-TOKEN_SEARCH_IMPLEMENTATION.md appendix for TimescaleDB conversion plan