···1919 pub tags: Vec<SmolStr>,
2020 pub author_dids: Vec<SmolStr>,
2121 #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
2222+ pub created_at: chrono::DateTime<chrono::Utc>,
2323+ #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
2224 pub indexed_at: chrono::DateTime<chrono::Utc>,
2325 pub record: SmolStr,
2426}
···3436 pub path: SmolStr,
3537 pub tags: Vec<SmolStr>,
3638 pub author_dids: Vec<SmolStr>,
3939+ #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
4040+ pub created_at: chrono::DateTime<chrono::Utc>,
3741 #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
3842 pub indexed_at: chrono::DateTime<chrono::Utc>,
3943 pub record: SmolStr,
···5862 path,
5963 tags,
6064 author_dids,
6565+ created_at,
6166 indexed_at,
6267 record
6368 FROM notebooks
6469 WHERE did = ?
6570 AND (path = ? OR title = ?)
6671 AND deleted_at = toDateTime64(0, 3)
6767- ORDER BY event_time DESC
7272+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
6873 LIMIT 1
6974 "#;
7075···102107 path,
103108 tags,
104109 author_dids,
110110+ created_at,
105111 indexed_at,
106112 record
107113 FROM notebooks
108114 WHERE did = ?
109115 AND rkey = ?
110116 AND deleted_at = toDateTime64(0, 3)
111111- ORDER BY event_time DESC
117117+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
112118 LIMIT 1
113119 "#;
114120···131137 ///
132138 /// Note: This is a simplified version. The full implementation would
133139 /// need to join with notebook's entryList to get proper ordering.
134134- /// For now, we just list entries by the same author.
140140+ /// For now, we just list entries by the same author, ordered by rkey (notebook order).
135141 pub async fn list_notebook_entries(
136142 &self,
137143 did: &str,
138144 limit: u32,
139145 cursor: Option<&str>,
140146 ) -> Result<Vec<EntryRow>, IndexError> {
147147+ // Note: rkey ordering is intentional here - it's the notebook's entry order
141148 let query = if cursor.is_some() {
142149 r#"
143150 SELECT
···149156 path,
150157 tags,
151158 author_dids,
159159+ created_at,
152160 indexed_at,
153161 record
154162 FROM entries
···169177 path,
170178 tags,
171179 author_dids,
180180+ created_at,
172181 indexed_at,
173182 record
174183 FROM entries
···200209 /// Get an entry by rkey, picking the most recent version across collaborators.
201210 ///
202211 /// For collaborative entries, the same rkey may exist in multiple repos.
203203- /// This returns the most recently updated version, with indexed_at as tiebreaker.
212212+ /// This returns the most recently updated version.
204213 ///
205214 /// `candidate_dids` should include the notebook owner + all collaborator DIDs.
206215 pub async fn get_entry(
···225234 path,
226235 tags,
227236 author_dids,
237237+ created_at,
228238 indexed_at,
229239 record
230240 FROM entries
231241 WHERE rkey = ?
232242 AND did IN ({})
233243 AND deleted_at = toDateTime64(0, 3)
234234- ORDER BY updated_at DESC, indexed_at DESC
244244+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
235245 LIMIT 1
236246 "#,
237247 placeholders.join(", ")
···271281 path,
272282 tags,
273283 author_dids,
284284+ created_at,
274285 indexed_at,
275286 record
276287 FROM entries
277288 WHERE did = ?
278289 AND rkey = ?
279290 AND deleted_at = toDateTime64(0, 3)
280280- ORDER BY updated_at DESC, indexed_at DESC
291291+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
281292 LIMIT 1
282293 "#;
283294···312323 path,
313324 tags,
314325 author_dids,
326326+ created_at,
315327 indexed_at,
316328 record
317329 FROM entries
318330 WHERE did = ?
319331 AND (path = ? OR title = ?)
320332 AND deleted_at = toDateTime64(0, 3)
321321- ORDER BY event_time DESC
333333+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
322334 LIMIT 1
323335 "#;
324336···336348 })?;
337349338350 Ok(row)
351351+ }
352352+353353+ /// List notebooks for an actor.
354354+ ///
355355+ /// Returns notebooks owned by the given DID, ordered by created_at DESC.
356356+ /// Cursor is created_at timestamp in milliseconds.
357357+ pub async fn list_actor_notebooks(
358358+ &self,
359359+ did: &str,
360360+ limit: u32,
361361+ cursor: Option<i64>,
362362+ ) -> Result<Vec<NotebookRow>, IndexError> {
363363+ let query = if cursor.is_some() {
364364+ r#"
365365+ SELECT
366366+ did,
367367+ rkey,
368368+ cid,
369369+ uri,
370370+ title,
371371+ path,
372372+ tags,
373373+ author_dids,
374374+ created_at,
375375+ indexed_at,
376376+ record
377377+ FROM notebooks
378378+ WHERE did = ?
379379+ AND deleted_at = toDateTime64(0, 3)
380380+ AND created_at < fromUnixTimestamp64Milli(?)
381381+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
382382+ LIMIT ?
383383+ "#
384384+ } else {
385385+ r#"
386386+ SELECT
387387+ did,
388388+ rkey,
389389+ cid,
390390+ uri,
391391+ title,
392392+ path,
393393+ tags,
394394+ author_dids,
395395+ created_at,
396396+ indexed_at,
397397+ record
398398+ FROM notebooks
399399+ WHERE did = ?
400400+ AND deleted_at = toDateTime64(0, 3)
401401+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
402402+ LIMIT ?
403403+ "#
404404+ };
405405+406406+ let mut q = self.inner().query(query).bind(did);
407407+408408+ if let Some(c) = cursor {
409409+ q = q.bind(c);
410410+ }
411411+412412+ let rows = q
413413+ .bind(limit)
414414+ .fetch_all::<NotebookRow>()
415415+ .await
416416+ .map_err(|e| ClickHouseError::Query {
417417+ message: "failed to list actor notebooks".into(),
418418+ source: e,
419419+ })?;
420420+421421+ Ok(rows)
422422+ }
423423+424424+ /// List entries for an actor.
425425+ ///
426426+ /// Returns entries owned by the given DID, ordered by created_at DESC.
427427+ /// Cursor is created_at timestamp in milliseconds.
428428+ pub async fn list_actor_entries(
429429+ &self,
430430+ did: &str,
431431+ limit: u32,
432432+ cursor: Option<i64>,
433433+ ) -> Result<Vec<EntryRow>, IndexError> {
434434+ let query = if cursor.is_some() {
435435+ r#"
436436+ SELECT
437437+ did,
438438+ rkey,
439439+ cid,
440440+ uri,
441441+ title,
442442+ path,
443443+ tags,
444444+ author_dids,
445445+ created_at,
446446+ indexed_at,
447447+ record
448448+ FROM entries
449449+ WHERE did = ?
450450+ AND deleted_at = toDateTime64(0, 3)
451451+ AND created_at < fromUnixTimestamp64Milli(?)
452452+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
453453+ LIMIT ?
454454+ "#
455455+ } else {
456456+ r#"
457457+ SELECT
458458+ did,
459459+ rkey,
460460+ cid,
461461+ uri,
462462+ title,
463463+ path,
464464+ tags,
465465+ author_dids,
466466+ created_at,
467467+ indexed_at,
468468+ record
469469+ FROM entries
470470+ WHERE did = ?
471471+ AND deleted_at = toDateTime64(0, 3)
472472+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
473473+ LIMIT ?
474474+ "#
475475+ };
476476+477477+ let mut q = self.inner().query(query).bind(did);
478478+479479+ if let Some(c) = cursor {
480480+ q = q.bind(c);
481481+ }
482482+483483+ let rows =
484484+ q.bind(limit)
485485+ .fetch_all::<EntryRow>()
486486+ .await
487487+ .map_err(|e| ClickHouseError::Query {
488488+ message: "failed to list actor entries".into(),
489489+ source: e,
490490+ })?;
491491+492492+ Ok(rows)
493493+ }
494494+495495+ /// Get a global feed of notebooks.
496496+ ///
497497+ /// Returns notebooks ordered by created_at DESC (chronological) or by
498498+ /// popularity metrics if algorithm is "popular".
499499+ /// Cursor is created_at timestamp in milliseconds.
500500+ pub async fn get_notebook_feed(
501501+ &self,
502502+ algorithm: &str,
503503+ tags: Option<&[&str]>,
504504+ limit: u32,
505505+ cursor: Option<i64>,
506506+ ) -> Result<Vec<NotebookRow>, IndexError> {
507507+ // For now, just chronological. Popular would need join with counts.
508508+ let base_query = if tags.is_some() && cursor.is_some() {
509509+ r#"
510510+ SELECT
511511+ did,
512512+ rkey,
513513+ cid,
514514+ uri,
515515+ title,
516516+ path,
517517+ tags,
518518+ author_dids,
519519+ created_at,
520520+ indexed_at,
521521+ record
522522+ FROM notebooks
523523+ WHERE deleted_at = toDateTime64(0, 3)
524524+ AND hasAny(tags, ?)
525525+ AND created_at < fromUnixTimestamp64Milli(?)
526526+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
527527+ LIMIT ?
528528+ "#
529529+ } else if tags.is_some() {
530530+ r#"
531531+ SELECT
532532+ did,
533533+ rkey,
534534+ cid,
535535+ uri,
536536+ title,
537537+ path,
538538+ tags,
539539+ author_dids,
540540+ created_at,
541541+ indexed_at,
542542+ record
543543+ FROM notebooks
544544+ WHERE deleted_at = toDateTime64(0, 3)
545545+ AND hasAny(tags, ?)
546546+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
547547+ LIMIT ?
548548+ "#
549549+ } else if cursor.is_some() {
550550+ r#"
551551+ SELECT
552552+ did,
553553+ rkey,
554554+ cid,
555555+ uri,
556556+ title,
557557+ path,
558558+ tags,
559559+ author_dids,
560560+ created_at,
561561+ indexed_at,
562562+ record
563563+ FROM notebooks
564564+ WHERE deleted_at = toDateTime64(0, 3)
565565+ AND created_at < fromUnixTimestamp64Milli(?)
566566+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
567567+ LIMIT ?
568568+ "#
569569+ } else {
570570+ r#"
571571+ SELECT
572572+ did,
573573+ rkey,
574574+ cid,
575575+ uri,
576576+ title,
577577+ path,
578578+ tags,
579579+ author_dids,
580580+ created_at,
581581+ indexed_at,
582582+ record
583583+ FROM notebooks
584584+ WHERE deleted_at = toDateTime64(0, 3)
585585+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
586586+ LIMIT ?
587587+ "#
588588+ };
589589+590590+ let _ = algorithm; // TODO: implement popular sorting
591591+592592+ let mut q = self.inner().query(base_query);
593593+594594+ if let Some(t) = tags {
595595+ q = q.bind(t);
596596+ }
597597+ if let Some(c) = cursor {
598598+ q = q.bind(c);
599599+ }
600600+601601+ let rows = q
602602+ .bind(limit)
603603+ .fetch_all::<NotebookRow>()
604604+ .await
605605+ .map_err(|e| ClickHouseError::Query {
606606+ message: "failed to get notebook feed".into(),
607607+ source: e,
608608+ })?;
609609+610610+ Ok(rows)
611611+ }
612612+613613+ /// Get a global feed of entries.
614614+ ///
615615+ /// Returns entries ordered by created_at DESC (chronological).
616616+ /// Cursor is created_at timestamp in milliseconds.
617617+ pub async fn get_entry_feed(
618618+ &self,
619619+ algorithm: &str,
620620+ tags: Option<&[&str]>,
621621+ limit: u32,
622622+ cursor: Option<i64>,
623623+ ) -> Result<Vec<EntryRow>, IndexError> {
624624+ let base_query = if tags.is_some() && cursor.is_some() {
625625+ r#"
626626+ SELECT
627627+ did,
628628+ rkey,
629629+ cid,
630630+ uri,
631631+ title,
632632+ path,
633633+ tags,
634634+ author_dids,
635635+ created_at,
636636+ indexed_at,
637637+ record
638638+ FROM entries
639639+ WHERE deleted_at = toDateTime64(0, 3)
640640+ AND hasAny(tags, ?)
641641+ AND created_at < fromUnixTimestamp64Milli(?)
642642+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
643643+ LIMIT ?
644644+ "#
645645+ } else if tags.is_some() {
646646+ r#"
647647+ SELECT
648648+ did,
649649+ rkey,
650650+ cid,
651651+ uri,
652652+ title,
653653+ path,
654654+ tags,
655655+ author_dids,
656656+ created_at,
657657+ indexed_at,
658658+ record
659659+ FROM entries
660660+ WHERE deleted_at = toDateTime64(0, 3)
661661+ AND hasAny(tags, ?)
662662+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
663663+ LIMIT ?
664664+ "#
665665+ } else if cursor.is_some() {
666666+ r#"
667667+ SELECT
668668+ did,
669669+ rkey,
670670+ cid,
671671+ uri,
672672+ title,
673673+ path,
674674+ tags,
675675+ author_dids,
676676+ created_at,
677677+ indexed_at,
678678+ record
679679+ FROM entries
680680+ WHERE deleted_at = toDateTime64(0, 3)
681681+ AND created_at < fromUnixTimestamp64Milli(?)
682682+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
683683+ LIMIT ?
684684+ "#
685685+ } else {
686686+ r#"
687687+ SELECT
688688+ did,
689689+ rkey,
690690+ cid,
691691+ uri,
692692+ title,
693693+ path,
694694+ tags,
695695+ author_dids,
696696+ created_at,
697697+ indexed_at,
698698+ record
699699+ FROM entries
700700+ WHERE deleted_at = toDateTime64(0, 3)
701701+ ORDER BY toStartOfFiveMinutes(event_time) DESC, created_at DESC
702702+ LIMIT ?
703703+ "#
704704+ };
705705+706706+ let _ = algorithm; // TODO: implement popular sorting
707707+708708+ let mut q = self.inner().query(base_query);
709709+710710+ if let Some(t) = tags {
711711+ q = q.bind(t);
712712+ }
713713+ if let Some(c) = cursor {
714714+ q = q.bind(c);
715715+ }
716716+717717+ let rows =
718718+ q.bind(limit)
719719+ .fetch_all::<EntryRow>()
720720+ .await
721721+ .map_err(|e| ClickHouseError::Query {
722722+ message: "failed to get entry feed".into(),
723723+ source: e,
724724+ })?;
725725+726726+ Ok(rows)
727727+ }
728728+729729+ /// Get an entry at a specific index within a notebook.
730730+ ///
731731+ /// Returns the entry at the given 0-based index, plus adjacent entries for prev/next.
732732+ pub async fn get_book_entry_at_index(
733733+ &self,
734734+ notebook_did: &str,
735735+ notebook_rkey: &str,
736736+ index: u32,
737737+ ) -> Result<Option<(EntryRow, Option<EntryRow>, Option<EntryRow>)>, IndexError> {
738738+ // Fetch entries for this notebook with index context
739739+ // We need 3 entries: prev (index-1), current (index), next (index+1)
740740+ let offset = if index > 0 { index - 1 } else { 0 };
741741+ let fetch_count = if index > 0 { 3u32 } else { 2u32 };
742742+743743+ let query = r#"
744744+ SELECT
745745+ e.did,
746746+ e.rkey,
747747+ e.cid,
748748+ e.uri,
749749+ e.title,
750750+ e.path,
751751+ e.tags,
752752+ e.author_dids,
753753+ e.created_at,
754754+ e.indexed_at,
755755+ e.record
756756+ FROM notebook_entries ne FINAL
757757+ INNER JOIN entries e ON
758758+ e.did = ne.entry_did
759759+ AND e.rkey = ne.entry_rkey
760760+ AND e.deleted_at = toDateTime64(0, 3)
761761+ WHERE ne.notebook_did = ?
762762+ AND ne.notebook_rkey = ?
763763+ ORDER BY ne.position ASC
764764+ LIMIT ? OFFSET ?
765765+ "#;
766766+767767+ let rows: Vec<EntryRow> = self
768768+ .inner()
769769+ .query(query)
770770+ .bind(notebook_did)
771771+ .bind(notebook_rkey)
772772+ .bind(fetch_count)
773773+ .bind(offset)
774774+ .fetch_all()
775775+ .await
776776+ .map_err(|e| ClickHouseError::Query {
777777+ message: "failed to get book entry at index".into(),
778778+ source: e,
779779+ })?;
780780+781781+ if rows.is_empty() {
782782+ return Ok(None);
783783+ }
784784+785785+ // Determine which row is which based on the offset
786786+ let mut iter = rows.into_iter();
787787+ if index == 0 {
788788+ // No prev, rows[0] is current, rows[1] is next (if exists)
789789+ let current = iter.next();
790790+ let next = iter.next();
791791+ Ok(current.map(|c| (c, None, next)))
792792+ } else {
793793+ // rows[0] is prev, rows[1] is current, rows[2] is next
794794+ let prev = iter.next();
795795+ let current = iter.next();
796796+ let next = iter.next();
797797+ Ok(current.map(|c| (c, prev, next)))
798798+ }
339799 }
340800}
+340-1
crates/weaver-index/src/endpoints/actor.rs
···11//! sh.weaver.actor.* endpoint handlers
2233+use std::collections::{HashMap, HashSet};
44+35use axum::{Json, extract::State};
46use jacquard::IntoStatic;
57use jacquard::cowstr::ToCowStr;
68use jacquard::identity::resolver::IdentityResolver;
79use jacquard::types::ident::AtIdentifier;
88-use jacquard::types::string::{Did, Handle};
1010+use jacquard::types::string::{AtUri, Cid, Did, Handle};
911use jacquard_axum::ExtractXrpc;
1212+use jacquard_axum::service_auth::{ExtractOptionalServiceAuth, VerifiedServiceAuth};
1313+use smol_str::SmolStr;
1014use weaver_api::sh_weaver::actor::{
1115 ProfileDataView, ProfileDataViewInner, ProfileView,
1616+ get_actor_entries::{GetActorEntriesOutput, GetActorEntriesRequest},
1717+ get_actor_notebooks::{GetActorNotebooksOutput, GetActorNotebooksRequest},
1218 get_profile::{GetProfileOutput, GetProfileRequest},
1319};
2020+use weaver_api::sh_weaver::notebook::{AuthorListView, EntryView, NotebookView};
14212222+use crate::clickhouse::ProfileRow;
1523use crate::endpoints::repo::XrpcErrorResponse;
1624use crate::server::AppState;
2525+2626+/// Authenticated viewer context (if present)
2727+pub type Viewer = Option<VerifiedServiceAuth<'static>>;
17281829/// Handle sh.weaver.actor.getProfile
1930///
2031/// Returns a profile view with counts for the requested actor.
2132pub async fn get_profile(
2233 State(state): State<AppState>,
3434+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
2335 ExtractXrpc(args): ExtractXrpc<GetProfileRequest>,
2436) -> Result<Json<GetProfileOutput<'static>>, XrpcErrorResponse> {
3737+ // viewer contains Some(auth) if the request has valid service auth
3838+ // can be used later for viewer-specific state (e.g., "you follow this person")
3939+ let _viewer = viewer;
2540 // Resolve identifier to DID
2641 let did = resolve_actor(&state, &args.actor).await?;
2742 let did_str = did.as_str();
···156171 Some(s.to_cowstr().into_static())
157172 }
158173}
174174+175175+/// Parse cursor string to i64 timestamp millis
176176+fn parse_cursor(cursor: Option<&str>) -> Result<Option<i64>, XrpcErrorResponse> {
177177+ cursor
178178+ .map(|c| {
179179+ c.parse::<i64>()
180180+ .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))
181181+ })
182182+ .transpose()
183183+}
184184+185185+/// Handle sh.weaver.actor.getActorNotebooks
186186+///
187187+/// Returns notebooks owned by the given actor.
188188+pub async fn get_actor_notebooks(
189189+ State(state): State<AppState>,
190190+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
191191+ ExtractXrpc(args): ExtractXrpc<GetActorNotebooksRequest>,
192192+) -> Result<Json<GetActorNotebooksOutput<'static>>, XrpcErrorResponse> {
193193+ let _viewer: Viewer = viewer;
194194+195195+ // Resolve actor to DID
196196+ let did = resolve_actor(&state, &args.actor).await?;
197197+ let did_str = did.as_str();
198198+199199+ // Fetch notebooks for this actor
200200+ let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
201201+ let cursor = parse_cursor(args.cursor.as_deref())?;
202202+203203+ let notebook_rows = state
204204+ .clickhouse
205205+ .list_actor_notebooks(did_str, limit + 1, cursor)
206206+ .await
207207+ .map_err(|e| {
208208+ tracing::error!("Failed to list actor notebooks: {}", e);
209209+ XrpcErrorResponse::internal_error("Database query failed")
210210+ })?;
211211+212212+ // Check if there are more
213213+ let has_more = notebook_rows.len() > limit as usize;
214214+ let notebook_rows: Vec<_> = notebook_rows.into_iter().take(limit as usize).collect();
215215+216216+ // Collect author DIDs for hydration
217217+ let mut all_author_dids: HashSet<&str> = HashSet::new();
218218+ for nb in ¬ebook_rows {
219219+ for did in &nb.author_dids {
220220+ all_author_dids.insert(did.as_str());
221221+ }
222222+ }
223223+224224+ // Batch fetch profiles
225225+ let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
226226+ let profiles = state
227227+ .clickhouse
228228+ .get_profiles_batch(&author_dids_vec)
229229+ .await
230230+ .map_err(|e| {
231231+ tracing::error!("Failed to batch fetch profiles: {}", e);
232232+ XrpcErrorResponse::internal_error("Database query failed")
233233+ })?;
234234+235235+ let profile_map: HashMap<&str, &ProfileRow> =
236236+ profiles.iter().map(|p| (p.did.as_str(), p)).collect();
237237+238238+ // Build NotebookViews
239239+ let mut notebooks: Vec<NotebookView<'static>> = Vec::with_capacity(notebook_rows.len());
240240+ for nb_row in ¬ebook_rows {
241241+ let notebook_uri = AtUri::new(&nb_row.uri).map_err(|e| {
242242+ tracing::error!("Invalid notebook URI in db: {}", e);
243243+ XrpcErrorResponse::internal_error("Invalid URI stored")
244244+ })?;
245245+246246+ let notebook_cid = Cid::new(nb_row.cid.as_bytes()).map_err(|e| {
247247+ tracing::error!("Invalid notebook CID in db: {}", e);
248248+ XrpcErrorResponse::internal_error("Invalid CID stored")
249249+ })?;
250250+251251+ let authors = hydrate_authors(&nb_row.author_dids, &profile_map)?;
252252+ let record = parse_record_json(&nb_row.record)?;
253253+254254+ let notebook = NotebookView::new()
255255+ .uri(notebook_uri.into_static())
256256+ .cid(notebook_cid.into_static())
257257+ .authors(authors)
258258+ .record(record)
259259+ .indexed_at(nb_row.indexed_at.fixed_offset())
260260+ .maybe_title(non_empty_str(&nb_row.title))
261261+ .maybe_path(non_empty_str(&nb_row.path))
262262+ .build();
263263+264264+ notebooks.push(notebook);
265265+ }
266266+267267+ // Build cursor for pagination (created_at millis)
268268+ let next_cursor = if has_more {
269269+ notebook_rows
270270+ .last()
271271+ .map(|nb| nb.created_at.timestamp_millis().to_cowstr().into_static())
272272+ } else {
273273+ None
274274+ };
275275+276276+ Ok(Json(
277277+ GetActorNotebooksOutput {
278278+ notebooks,
279279+ cursor: next_cursor,
280280+ extra_data: None
281281+ }
282282+ .into_static(),
283283+ ))
284284+}
285285+286286+/// Handle sh.weaver.actor.getActorEntries
287287+///
288288+/// Returns entries owned by the given actor.
289289+pub async fn get_actor_entries(
290290+ State(state): State<AppState>,
291291+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
292292+ ExtractXrpc(args): ExtractXrpc<GetActorEntriesRequest>,
293293+) -> Result<Json<GetActorEntriesOutput<'static>>, XrpcErrorResponse> {
294294+ let _viewer: Viewer = viewer;
295295+296296+ // Resolve actor to DID
297297+ let did = resolve_actor(&state, &args.actor).await?;
298298+ let did_str = did.as_str();
299299+300300+ // Fetch entries for this actor
301301+ let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
302302+ let cursor = parse_cursor(args.cursor.as_deref())?;
303303+304304+ let entry_rows = state
305305+ .clickhouse
306306+ .list_actor_entries(did_str, limit + 1, cursor)
307307+ .await
308308+ .map_err(|e| {
309309+ tracing::error!("Failed to list actor entries: {}", e);
310310+ XrpcErrorResponse::internal_error("Database query failed")
311311+ })?;
312312+313313+ // Check if there are more
314314+ let has_more = entry_rows.len() > limit as usize;
315315+ let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect();
316316+317317+ // Collect author DIDs for hydration
318318+ let mut all_author_dids: HashSet<&str> = HashSet::new();
319319+ for entry in &entry_rows {
320320+ for did in &entry.author_dids {
321321+ all_author_dids.insert(did.as_str());
322322+ }
323323+ }
324324+325325+ // Batch fetch profiles
326326+ let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
327327+ let profiles = state
328328+ .clickhouse
329329+ .get_profiles_batch(&author_dids_vec)
330330+ .await
331331+ .map_err(|e| {
332332+ tracing::error!("Failed to batch fetch profiles: {}", e);
333333+ XrpcErrorResponse::internal_error("Database query failed")
334334+ })?;
335335+336336+ let profile_map: HashMap<&str, &ProfileRow> =
337337+ profiles.iter().map(|p| (p.did.as_str(), p)).collect();
338338+339339+ // Build EntryViews
340340+ let mut entries: Vec<EntryView<'static>> = Vec::with_capacity(entry_rows.len());
341341+ for entry_row in &entry_rows {
342342+ let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| {
343343+ tracing::error!("Invalid entry URI in db: {}", e);
344344+ XrpcErrorResponse::internal_error("Invalid URI stored")
345345+ })?;
346346+347347+ let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| {
348348+ tracing::error!("Invalid entry CID in db: {}", e);
349349+ XrpcErrorResponse::internal_error("Invalid CID stored")
350350+ })?;
351351+352352+ let authors = hydrate_authors(&entry_row.author_dids, &profile_map)?;
353353+ let record = parse_record_json(&entry_row.record)?;
354354+355355+ let entry = EntryView::new()
356356+ .uri(entry_uri.into_static())
357357+ .cid(entry_cid.into_static())
358358+ .authors(authors)
359359+ .record(record)
360360+ .indexed_at(entry_row.indexed_at.fixed_offset())
361361+ .maybe_title(non_empty_str(&entry_row.title))
362362+ .maybe_path(non_empty_str(&entry_row.path))
363363+ .build();
364364+365365+ entries.push(entry);
366366+ }
367367+368368+ // Build cursor for pagination (created_at millis)
369369+ let next_cursor = if has_more {
370370+ entry_rows
371371+ .last()
372372+ .map(|e| e.created_at.timestamp_millis().to_cowstr().into_static())
373373+ } else {
374374+ None
375375+ };
376376+377377+ Ok(Json(
378378+ GetActorEntriesOutput {
379379+ entries,
380380+ cursor: next_cursor,
381381+ extra_data: None,
382382+ }
383383+ .into_static(),
384384+ ))
385385+}
386386+387387+/// Hydrate author list from DIDs using profile map
388388+fn hydrate_authors(
389389+ author_dids: &[SmolStr],
390390+ profile_map: &HashMap<&str, &ProfileRow>,
391391+) -> Result<Vec<AuthorListView<'static>>, XrpcErrorResponse> {
392392+ let mut authors = Vec::with_capacity(author_dids.len());
393393+394394+ for (idx, did_str) in author_dids.iter().enumerate() {
395395+ let profile_data = if let Some(profile) = profile_map.get(did_str.as_str()) {
396396+ profile_to_data_view(profile)?
397397+ } else {
398398+ // No profile found - create minimal view with just the DID
399399+ let did = Did::new(did_str).map_err(|e| {
400400+ tracing::error!("Invalid DID in author_dids: {}", e);
401401+ XrpcErrorResponse::internal_error("Invalid DID stored")
402402+ })?;
403403+404404+ let inner_profile = ProfileView::new()
405405+ .did(did.into_static())
406406+ .handle(
407407+ Handle::new(did_str)
408408+ .unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap()),
409409+ )
410410+ .build();
411411+412412+ ProfileDataView::new()
413413+ .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile)))
414414+ .build()
415415+ };
416416+417417+ let author_view = AuthorListView::new()
418418+ .index(idx as i64)
419419+ .record(profile_data.into_static())
420420+ .build();
421421+422422+ authors.push(author_view);
423423+ }
424424+425425+ Ok(authors)
426426+}
427427+428428+/// Convert ProfileRow to ProfileDataView
429429+fn profile_to_data_view(
430430+ profile: &ProfileRow,
431431+) -> Result<ProfileDataView<'static>, XrpcErrorResponse> {
432432+ use jacquard::types::string::Uri;
433433+434434+ let did = Did::new(&profile.did).map_err(|e| {
435435+ tracing::error!("Invalid DID in profile: {}", e);
436436+ XrpcErrorResponse::internal_error("Invalid DID stored")
437437+ })?;
438438+439439+ let handle = if profile.handle.is_empty() {
440440+ Handle::new(&profile.did).unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap())
441441+ } else {
442442+ Handle::new(&profile.handle).map_err(|e| {
443443+ tracing::error!("Invalid handle in profile: {}", e);
444444+ XrpcErrorResponse::internal_error("Invalid handle stored")
445445+ })?
446446+ };
447447+448448+ // Build avatar URL from CID if present
449449+ let avatar = if !profile.avatar_cid.is_empty() {
450450+ let url = format!(
451451+ "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg",
452452+ profile.did, profile.avatar_cid
453453+ );
454454+ Uri::new_owned(url).ok()
455455+ } else {
456456+ None
457457+ };
458458+459459+ // Build banner URL from CID if present
460460+ let banner = if !profile.banner_cid.is_empty() {
461461+ let url = format!(
462462+ "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg",
463463+ profile.did, profile.banner_cid
464464+ );
465465+ Uri::new_owned(url).ok()
466466+ } else {
467467+ None
468468+ };
469469+470470+ let inner_profile = ProfileView::new()
471471+ .did(did.into_static())
472472+ .handle(handle.into_static())
473473+ .maybe_display_name(non_empty_str(&profile.display_name))
474474+ .maybe_description(non_empty_str(&profile.description))
475475+ .maybe_avatar(avatar)
476476+ .maybe_banner(banner)
477477+ .build();
478478+479479+ let profile_data = ProfileDataView::new()
480480+ .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile)))
481481+ .build();
482482+483483+ Ok(profile_data)
484484+}
485485+486486+/// Parse record JSON string into owned Data
487487+fn parse_record_json(
488488+ json: &str,
489489+) -> Result<jacquard::types::value::Data<'static>, XrpcErrorResponse> {
490490+ use jacquard::types::value::Data;
491491+492492+ let data: Data<'_> = serde_json::from_str(json).map_err(|e| {
493493+ tracing::error!("Failed to parse record JSON: {}", e);
494494+ XrpcErrorResponse::internal_error("Invalid record JSON stored")
495495+ })?;
496496+ Ok(data.into_static())
497497+}
+359-3
crates/weaver-index/src/endpoints/notebook.rs
···88use jacquard::types::string::{AtUri, Cid, Did, Handle, Uri};
99use jacquard::types::value::Data;
1010use jacquard_axum::ExtractXrpc;
1111+use jacquard_axum::service_auth::ExtractOptionalServiceAuth;
1112use smol_str::SmolStr;
1213use weaver_api::sh_weaver::actor::{ProfileDataView, ProfileDataViewInner, ProfileView};
1314use weaver_api::sh_weaver::notebook::{
1414- AuthorListView, BookEntryView, EntryView, NotebookView,
1515+ AuthorListView, BookEntryRef, BookEntryView, EntryView, FeedEntryView, NotebookView,
1616+ get_book_entry::{GetBookEntryOutput, GetBookEntryRequest},
1517 get_entry::{GetEntryOutput, GetEntryRequest},
1818+ get_entry_feed::{GetEntryFeedOutput, GetEntryFeedRequest},
1919+ get_notebook_feed::{GetNotebookFeedOutput, GetNotebookFeedRequest},
1620 resolve_entry::{ResolveEntryOutput, ResolveEntryRequest},
1721 resolve_notebook::{ResolveNotebookOutput, ResolveNotebookRequest},
1822};
19232020-use crate::clickhouse::ProfileRow;
2121-use crate::endpoints::actor::resolve_actor;
2424+use crate::clickhouse::{EntryRow, ProfileRow};
2525+use crate::endpoints::actor::{Viewer, resolve_actor};
2226use crate::endpoints::repo::XrpcErrorResponse;
2327use crate::server::AppState;
2428···2731/// Resolves a notebook by actor + path/title, returns notebook with entries.
2832pub async fn resolve_notebook(
2933 State(state): State<AppState>,
3434+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
3035 ExtractXrpc(args): ExtractXrpc<ResolveNotebookRequest>,
3136) -> Result<Json<ResolveNotebookOutput<'static>>, XrpcErrorResponse> {
3737+ // viewer can be used later for viewer state (bookmarks, read status, etc.)
3838+ let _viewer: Viewer = viewer;
3939+3240 // Resolve actor to DID
3341 let did = resolve_actor(&state, &args.actor).await?;
3442 let did_str = did.as_str();
···195203/// Gets an entry by AT URI.
196204pub async fn get_entry(
197205 State(state): State<AppState>,
206206+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
198207 ExtractXrpc(args): ExtractXrpc<GetEntryRequest>,
199208) -> Result<Json<GetEntryOutput<'static>>, XrpcErrorResponse> {
209209+ let _viewer: Viewer = viewer;
210210+200211 // Parse the AT URI to extract authority and rkey
201212 let uri = &args.uri;
202213 let authority = uri.authority();
···273284/// Resolves an entry by actor + notebook name + entry name.
274285pub async fn resolve_entry(
275286 State(state): State<AppState>,
287287+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
276288 ExtractXrpc(args): ExtractXrpc<ResolveEntryRequest>,
277289) -> Result<Json<ResolveEntryOutput<'static>>, XrpcErrorResponse> {
290290+ let _viewer: Viewer = viewer;
291291+278292 // Resolve actor to DID
279293 let did = resolve_actor(&state, &args.actor).await?;
280294 let did_str = did.as_str();
···294308 XrpcErrorResponse::internal_error("Database query failed")
295309 })
296310 },
311311+ // TODO: fix this, as we do need the entries to know for sure which, in case of collisions
297312 async {
298313 state
299314 .clickhouse
···522537523538 Ok(profile_data)
524539}
540540+541541+/// Parse cursor string to i64 timestamp millis
542542+fn parse_cursor(cursor: Option<&str>) -> Result<Option<i64>, XrpcErrorResponse> {
543543+ cursor
544544+ .map(|c| {
545545+ c.parse::<i64>()
546546+ .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))
547547+ })
548548+ .transpose()
549549+}
550550+551551+/// Handle sh.weaver.notebook.getNotebookFeed
552552+///
553553+/// Returns a global feed of notebooks.
554554+pub async fn get_notebook_feed(
555555+ State(state): State<AppState>,
556556+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
557557+ ExtractXrpc(args): ExtractXrpc<GetNotebookFeedRequest>,
558558+) -> Result<Json<GetNotebookFeedOutput<'static>>, XrpcErrorResponse> {
559559+ let _viewer: Viewer = viewer;
560560+561561+ let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
562562+ let cursor = parse_cursor(args.cursor.as_deref())?;
563563+ let algorithm = args.algorithm.as_deref().unwrap_or("chronological");
564564+565565+ // Convert tags to &[&str] if present
566566+ let tags_vec: Vec<&str> = args
567567+ .tags
568568+ .as_ref()
569569+ .map(|t| t.iter().map(|s| s.as_ref()).collect())
570570+ .unwrap_or_default();
571571+ let tags = if tags_vec.is_empty() {
572572+ None
573573+ } else {
574574+ Some(tags_vec.as_slice())
575575+ };
576576+577577+ let notebook_rows = state
578578+ .clickhouse
579579+ .get_notebook_feed(algorithm, tags, limit + 1, cursor)
580580+ .await
581581+ .map_err(|e| {
582582+ tracing::error!("Failed to get notebook feed: {}", e);
583583+ XrpcErrorResponse::internal_error("Database query failed")
584584+ })?;
585585+586586+ // Check if there are more
587587+ let has_more = notebook_rows.len() > limit as usize;
588588+ let notebook_rows: Vec<_> = notebook_rows.into_iter().take(limit as usize).collect();
589589+590590+ // Collect author DIDs for hydration
591591+ let mut all_author_dids: HashSet<&str> = HashSet::new();
592592+ for nb in ¬ebook_rows {
593593+ for did in &nb.author_dids {
594594+ all_author_dids.insert(did.as_str());
595595+ }
596596+ }
597597+598598+ // Batch fetch profiles
599599+ let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
600600+ let profiles = state
601601+ .clickhouse
602602+ .get_profiles_batch(&author_dids_vec)
603603+ .await
604604+ .map_err(|e| {
605605+ tracing::error!("Failed to batch fetch profiles: {}", e);
606606+ XrpcErrorResponse::internal_error("Database query failed")
607607+ })?;
608608+609609+ let profile_map: HashMap<&str, &ProfileRow> =
610610+ profiles.iter().map(|p| (p.did.as_str(), p)).collect();
611611+612612+ // Build NotebookViews
613613+ let mut notebooks: Vec<NotebookView<'static>> = Vec::with_capacity(notebook_rows.len());
614614+ for nb_row in ¬ebook_rows {
615615+ let notebook_uri = AtUri::new(&nb_row.uri).map_err(|e| {
616616+ tracing::error!("Invalid notebook URI in db: {}", e);
617617+ XrpcErrorResponse::internal_error("Invalid URI stored")
618618+ })?;
619619+620620+ let notebook_cid = Cid::new(nb_row.cid.as_bytes()).map_err(|e| {
621621+ tracing::error!("Invalid notebook CID in db: {}", e);
622622+ XrpcErrorResponse::internal_error("Invalid CID stored")
623623+ })?;
624624+625625+ let authors = hydrate_authors(&nb_row.author_dids, &profile_map)?;
626626+ let record = parse_record_json(&nb_row.record)?;
627627+628628+ let notebook = NotebookView::new()
629629+ .uri(notebook_uri.into_static())
630630+ .cid(notebook_cid.into_static())
631631+ .authors(authors)
632632+ .record(record)
633633+ .indexed_at(nb_row.indexed_at.fixed_offset())
634634+ .maybe_title(non_empty_cowstr(&nb_row.title))
635635+ .maybe_path(non_empty_cowstr(&nb_row.path))
636636+ .build();
637637+638638+ notebooks.push(notebook);
639639+ }
640640+641641+ // Build cursor for pagination (created_at millis)
642642+ let next_cursor = if has_more {
643643+ notebook_rows
644644+ .last()
645645+ .map(|nb| nb.created_at.timestamp_millis().to_cowstr().into_static())
646646+ } else {
647647+ None
648648+ };
649649+650650+ Ok(Json(
651651+ GetNotebookFeedOutput {
652652+ notebooks,
653653+ cursor: next_cursor,
654654+ extra_data: None,
655655+ }
656656+ .into_static(),
657657+ ))
658658+}
659659+660660+/// Handle sh.weaver.notebook.getEntryFeed
661661+///
662662+/// Returns a global feed of entries.
663663+pub async fn get_entry_feed(
664664+ State(state): State<AppState>,
665665+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
666666+ ExtractXrpc(args): ExtractXrpc<GetEntryFeedRequest>,
667667+) -> Result<Json<GetEntryFeedOutput<'static>>, XrpcErrorResponse> {
668668+ let _viewer: Viewer = viewer;
669669+670670+ let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
671671+ let cursor = parse_cursor(args.cursor.as_deref())?;
672672+ let algorithm = args.algorithm.as_deref().unwrap_or("chronological");
673673+674674+ // Convert tags to &[&str] if present
675675+ let tags_vec: Vec<&str> = args
676676+ .tags
677677+ .as_ref()
678678+ .map(|t| t.iter().map(|s| s.as_ref()).collect())
679679+ .unwrap_or_default();
680680+ let tags = if tags_vec.is_empty() {
681681+ None
682682+ } else {
683683+ Some(tags_vec.as_slice())
684684+ };
685685+686686+ let entry_rows = state
687687+ .clickhouse
688688+ .get_entry_feed(algorithm, tags, limit + 1, cursor)
689689+ .await
690690+ .map_err(|e| {
691691+ tracing::error!("Failed to get entry feed: {}", e);
692692+ XrpcErrorResponse::internal_error("Database query failed")
693693+ })?;
694694+695695+ // Check if there are more
696696+ let has_more = entry_rows.len() > limit as usize;
697697+ let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect();
698698+699699+ // Collect author DIDs for hydration
700700+ let mut all_author_dids: HashSet<&str> = HashSet::new();
701701+ for entry in &entry_rows {
702702+ for did in &entry.author_dids {
703703+ all_author_dids.insert(did.as_str());
704704+ }
705705+ }
706706+707707+ // Batch fetch profiles
708708+ let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
709709+ let profiles = state
710710+ .clickhouse
711711+ .get_profiles_batch(&author_dids_vec)
712712+ .await
713713+ .map_err(|e| {
714714+ tracing::error!("Failed to batch fetch profiles: {}", e);
715715+ XrpcErrorResponse::internal_error("Database query failed")
716716+ })?;
717717+718718+ let profile_map: HashMap<&str, &ProfileRow> =
719719+ profiles.iter().map(|p| (p.did.as_str(), p)).collect();
720720+721721+ // Build FeedEntryViews
722722+ let mut feed: Vec<FeedEntryView<'static>> = Vec::with_capacity(entry_rows.len());
723723+ for entry_row in &entry_rows {
724724+ let entry_view = build_entry_view(entry_row, &profile_map)?;
725725+726726+ let feed_entry = FeedEntryView::new().entry(entry_view).build();
727727+728728+ feed.push(feed_entry);
729729+ }
730730+731731+ // Build cursor for pagination (created_at millis)
732732+ let next_cursor = if has_more {
733733+ entry_rows
734734+ .last()
735735+ .map(|e| e.created_at.timestamp_millis().to_cowstr().into_static())
736736+ } else {
737737+ None
738738+ };
739739+740740+ Ok(Json(
741741+ GetEntryFeedOutput {
742742+ feed,
743743+ cursor: next_cursor,
744744+ extra_data: None,
745745+ }
746746+ .into_static(),
747747+ ))
748748+}
749749+750750+/// Handle sh.weaver.notebook.getBookEntry
751751+///
752752+/// Returns an entry at a specific index within a notebook, with prev/next navigation.
753753+pub async fn get_book_entry(
754754+ State(state): State<AppState>,
755755+ ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
756756+ ExtractXrpc(args): ExtractXrpc<GetBookEntryRequest>,
757757+) -> Result<Json<GetBookEntryOutput<'static>>, XrpcErrorResponse> {
758758+ let _viewer: Viewer = viewer;
759759+760760+ // Parse the notebook URI
761761+ let notebook_uri = &args.notebook;
762762+ let authority = notebook_uri.authority();
763763+ let notebook_rkey = notebook_uri
764764+ .rkey()
765765+ .ok_or_else(|| XrpcErrorResponse::invalid_request("Notebook URI must include rkey"))?;
766766+767767+ // Resolve authority to DID
768768+ let notebook_did = resolve_actor(&state, authority).await?;
769769+ let notebook_did_str = notebook_did.as_str();
770770+ let notebook_rkey_str = notebook_rkey.as_ref();
771771+772772+ let index = args.index.unwrap_or(0).max(0) as u32;
773773+774774+ // Fetch entry at index with prev/next
775775+ let result = state
776776+ .clickhouse
777777+ .get_book_entry_at_index(notebook_did_str, notebook_rkey_str, index)
778778+ .await
779779+ .map_err(|e| {
780780+ tracing::error!("Failed to get book entry: {}", e);
781781+ XrpcErrorResponse::internal_error("Database query failed")
782782+ })?;
783783+784784+ let (current_row, prev_row, next_row) =
785785+ result.ok_or_else(|| XrpcErrorResponse::not_found("Entry not found at index"))?;
786786+787787+ // Collect all author DIDs for hydration
788788+ let mut all_author_dids: HashSet<&str> = HashSet::new();
789789+ for did in ¤t_row.author_dids {
790790+ all_author_dids.insert(did.as_str());
791791+ }
792792+ if let Some(ref prev) = prev_row {
793793+ for did in &prev.author_dids {
794794+ all_author_dids.insert(did.as_str());
795795+ }
796796+ }
797797+ if let Some(ref next) = next_row {
798798+ for did in &next.author_dids {
799799+ all_author_dids.insert(did.as_str());
800800+ }
801801+ }
802802+803803+ // Batch fetch profiles
804804+ let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
805805+ let profiles = state
806806+ .clickhouse
807807+ .get_profiles_batch(&author_dids_vec)
808808+ .await
809809+ .map_err(|e| {
810810+ tracing::error!("Failed to fetch profiles: {}", e);
811811+ XrpcErrorResponse::internal_error("Database query failed")
812812+ })?;
813813+814814+ let profile_map: HashMap<&str, &ProfileRow> =
815815+ profiles.iter().map(|p| (p.did.as_str(), p)).collect();
816816+817817+ // Build the current entry view
818818+ let entry_view = build_entry_view(¤t_row, &profile_map)?;
819819+820820+ // Build prev/next refs if present
821821+ let prev_ref = if let Some(ref prev) = prev_row {
822822+ let prev_view = build_entry_view(prev, &profile_map)?;
823823+ Some(BookEntryRef::new().entry(prev_view).build())
824824+ } else {
825825+ None
826826+ };
827827+828828+ let next_ref = if let Some(ref next) = next_row {
829829+ let next_view = build_entry_view(next, &profile_map)?;
830830+ Some(BookEntryRef::new().entry(next_view).build())
831831+ } else {
832832+ None
833833+ };
834834+835835+ let book_entry = BookEntryView::new()
836836+ .entry(entry_view)
837837+ .index(index as i64)
838838+ .maybe_prev(prev_ref)
839839+ .maybe_next(next_ref)
840840+ .build();
841841+842842+ Ok(Json(
843843+ GetBookEntryOutput {
844844+ value: book_entry,
845845+ extra_data: None,
846846+ }
847847+ .into_static(),
848848+ ))
849849+}
850850+851851+/// Build an EntryView from an EntryRow
852852+fn build_entry_view(
853853+ entry_row: &EntryRow,
854854+ profile_map: &HashMap<&str, &ProfileRow>,
855855+) -> Result<EntryView<'static>, XrpcErrorResponse> {
856856+ let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| {
857857+ tracing::error!("Invalid entry URI in db: {}", e);
858858+ XrpcErrorResponse::internal_error("Invalid URI stored")
859859+ })?;
860860+861861+ let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| {
862862+ tracing::error!("Invalid entry CID in db: {}", e);
863863+ XrpcErrorResponse::internal_error("Invalid CID stored")
864864+ })?;
865865+866866+ let authors = hydrate_authors(&entry_row.author_dids, profile_map)?;
867867+ let record = parse_record_json(&entry_row.record)?;
868868+869869+ let entry_view = EntryView::new()
870870+ .uri(entry_uri.into_static())
871871+ .cid(entry_cid.into_static())
872872+ .authors(authors)
873873+ .record(record)
874874+ .indexed_at(entry_row.indexed_at.fixed_offset())
875875+ .maybe_title(non_empty_cowstr(&entry_row.title))
876876+ .maybe_path(non_empty_cowstr(&entry_row.path))
877877+ .build();
878878+879879+ Ok(entry_view)
880880+}
+2
crates/weaver-index/src/lib.rs
···66pub mod indexer;
77pub mod parallel_tap;
88pub mod server;
99+pub mod service_identity;
910pub mod sqlite;
1011pub mod tap;
1112···1415pub use indexer::{FirehoseIndexer, load_cursor};
1516pub use parallel_tap::TapIndexer;
1617pub use server::{AppState, ServerConfig};
1818+pub use service_identity::ServiceIdentity;
1719pub use sqlite::{ShardKey, ShardRouter, SqliteShard};
+5-5
crates/weaver-index/src/parallel_tap.rs
···1414use crate::error::{ClickHouseError, Result};
1515use crate::tap::{TapConfig as TapConsumerConfig, TapConsumer, TapEvent};
16161717-/// TAP indexer with multiple parallel websocket connections
1717+/// Tap indexer with multiple parallel websocket connections
1818///
1919-/// Each worker maintains its own websocket connection to TAP and its own
2020-/// ClickHouse inserter. TAP distributes events across connected clients,
1919+/// Each worker maintains its own websocket connection to Tap and its own
2020+/// ClickHouse inserter. Tap distributes events across connected clients,
2121/// and its ack-gating mechanism ensures per-DID ordering is preserved
2222/// regardless of which worker handles which events.
2323pub struct TapIndexer {
···297297/// then runs INSERT queries to populate target tables for incremental MVs.
298298async fn run_backfill(client: Arc<Client>) {
299299 // Wait for in-flight inserts to settle
300300- info!("backfill: waiting 10s for in-flight inserts to settle");
301301- tokio::time::sleep(Duration::from_secs(10)).await;
300300+ info!("backfill: waiting 100s for in-flight inserts to settle");
301301+ tokio::time::sleep(Duration::from_secs(100)).await;
302302303303 let mvs = Migrator::incremental_mvs();
304304 if mvs.is_empty() {