From a41775899a26594cec2babace296a126d61ebd90 Mon Sep 17 00:00:00 2001 From: max Date: Sat, 3 Jan 2026 20:49:12 +0900 Subject: [PATCH] wip: m2m --- constellation/src/lib.rs | 2 +- constellation/src/server/mod.rs | 102 +++++++++++++ constellation/src/storage/mem_store.rs | 93 ++++++++++++ constellation/src/storage/mod.rs | 12 ++ constellation/src/storage/rocks_store.rs | 143 ++++++++++++++++++ .../templates/get-many-to-many.html.j2 | 60 ++++++++ constellation/templates/hello.html.j2 | 19 +++ constellation/templates/try-it-macros.html.j2 | 30 ++++ 8 files changed, 460 insertions(+), 1 deletion(-) create mode 100644 constellation/templates/get-many-to-many.html.j2 diff --git a/constellation/src/lib.rs b/constellation/src/lib.rs index bcb4e9b..3452ed5 100644 --- a/constellation/src/lib.rs +++ b/constellation/src/lib.rs @@ -31,7 +31,7 @@ impl> From for Did { } } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct RecordId { pub did: Did, pub collection: String, diff --git a/constellation/src/server/mod.rs b/constellation/src/server/mod.rs index 6ca2429..a70bf7e 100644 --- a/constellation/src/server/mod.rs +++ b/constellation/src/server/mod.rs @@ -88,6 +88,17 @@ where } }), ) + .route( + "/xrpc/blue.microcosm.links.getManyToMany", + get({ + let store = store.clone(); + move |accept, query| async { + spawn_blocking(|| get_many_to_many(accept, query, store)) + .await + .map_err(to500)? + } + }), + ) .route( "/xrpc/blue.microcosm.links.getBacklinks", get({ @@ -587,6 +598,97 @@ fn get_links( )) } +#[derive(Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct GetManyToManyItemsQuery { + subject: String, + source: String, + /// path to the secondary link in the linking record + path_to_other: String, + /// filter to linking records (join of the m2m) by these DIDs + #[serde(default)] + did: Vec, + /// filter to specific secondary records + #[serde(default)] + other_subject: Vec, + cursor: Option, + #[serde(default = "get_default_cursor_limit")] + limit: u64, +} +#[derive(Template, Serialize)] +#[template(path = "get-many-to-many.html.j2")] +struct GetManyToManyItemsResponse { + linking_records: Vec<(String, Vec)>, + cursor: Option, + #[serde(skip_serializing)] + query: GetManyToManyItemsQuery, +} +fn get_many_to_many( + accept: ExtractAccept, + query: axum_extra::extract::Query, // supports multiple param occurrences + store: impl LinkReader, +) -> Result { + let after = query + .cursor + .clone() + .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) + .transpose()? + .map(|c| c.next); + + let limit = query.limit; + if limit > DEFAULT_CURSOR_LIMIT_MAX { + return Err(http::StatusCode::BAD_REQUEST); + } + + let filter_dids: HashSet = HashSet::from_iter( + query + .did + .iter() + .map(|d| d.trim()) + .filter(|d| !d.is_empty()) + .map(|d| Did(d.to_string())), + ); + + let filter_other_subjects: HashSet = HashSet::from_iter( + query + .other_subject + .iter() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()), + ); + + let Some((collection, path)) = query.source.split_once(':') else { + return Err(http::StatusCode::BAD_REQUEST); + }; + let path = format!(".{path}"); + + let path_to_other = format!(".{}", query.path_to_other); + + let paged = store + .get_many_to_many( + &query.subject, + collection, + &path, + &path_to_other, + limit, + after, + &filter_dids, + &filter_other_subjects, + ) + .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; + + let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); + + Ok(acceptable( + accept, + GetManyToManyItemsResponse { + linking_records: paged.items, + cursor, + query: (*query).clone(), + }, + )) +} + #[derive(Clone, Deserialize)] struct GetDidItemsQuery { target: String, diff --git a/constellation/src/storage/mem_store.rs b/constellation/src/storage/mem_store.rs index abc84a2..17443c4 100644 --- a/constellation/src/storage/mem_store.rs +++ b/constellation/src/storage/mem_store.rs @@ -234,6 +234,99 @@ impl LinkReader for MemStorage { .len() as u64) } + fn get_many_to_many( + &self, + target: &str, + collection: &str, + path: &str, + path_to_other: &str, + limit: u64, + after: Option, + filter_dids: &HashSet, + filter_to_targets: &HashSet, + ) -> Result), String>> { + let empty_res = Ok(PagedOrderedCollection { + items: Vec::new(), + next: Some("".to_string()), + }); + + // struct MemStorageData { + // dids: HashMap, + // targets: HashMap>, + // links: HashMap>>, + // } + let data = self.0.lock().unwrap(); + + let Some(sources) = data.targets.get(&Target::new(target)) else { + return empty_res; + }; + let Some(linkers) = sources.get(&Source::new(collection, path)) else { + return empty_res; + }; + let path_to_other = RecordPath::new(path_to_other); + + // Convert filter_to_targets to Target objects for comparison + let filter_to_target_objs: HashSet = + HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s))); + + let mut grouped_links: HashMap> = HashMap::new(); + for (did, rkey) in linkers.iter().flatten().cloned() { + // Filter by DID if filter is provided + if !filter_dids.is_empty() && !filter_dids.contains(&did) { + continue; + } + if let Some(fwd_target) = data + .links + .get(&did) + .unwrap_or(&HashMap::new()) + .get(&RepoId { + collection: collection.to_string(), + rkey: rkey.clone(), + }) + .unwrap_or(&Vec::new()) + .iter() + .find_map(|(path, target)| { + if *path == path_to_other + && (filter_to_target_objs.is_empty() + || filter_to_target_objs.contains(target)) + { + Some(target) + } else { + None + } + }) + { + let record_ids = grouped_links.entry(fwd_target.clone()).or_default(); + record_ids.push(RecordId { + did, + collection: collection.to_string(), + rkey: rkey.0, + }); + } + } + + let mut items = grouped_links + .into_iter() + .map(|(t, r)| (t.0, r)) + .collect::>(); + + items.sort_by(|(a, _), (b, _)| a.cmp(b)); + + items = items + .into_iter() + .skip_while(|(t, _)| after.as_ref().map(|a| t <= a).unwrap_or(false)) + .take(limit as usize) + .collect(); + + let next = if items.len() as u64 >= limit { + items.last().map(|(t, _)| t.clone()) + } else { + None + }; + + Ok(PagedOrderedCollection { items, next }) + } + fn get_links( &self, target: &str, diff --git a/constellation/src/storage/mod.rs b/constellation/src/storage/mod.rs index 68010ca..12d4051 100644 --- a/constellation/src/storage/mod.rs +++ b/constellation/src/storage/mod.rs @@ -104,6 +104,18 @@ pub trait LinkReader: Clone + Send + Sync + 'static { fn get_all_record_counts(&self, _target: &str) -> Result>>; + fn get_many_to_many( + &self, + target: &str, + collection: &str, + path: &str, + path_to_other: &str, + limit: u64, + after: Option, + filter_dids: &HashSet, + filter_to_targets: &HashSet, + ) -> Result), String>>; + fn get_all_counts( &self, _target: &str, diff --git a/constellation/src/storage/rocks_store.rs b/constellation/src/storage/rocks_store.rs index 3c7f546..de3fcb9 100644 --- a/constellation/src/storage/rocks_store.rs +++ b/constellation/src/storage/rocks_store.rs @@ -1122,6 +1122,149 @@ impl LinkReader for RocksStorage { } } + fn get_many_to_many( + &self, + target: &str, + collection: &str, + path: &str, + path_to_other: &str, + limit: u64, + after: Option, + filter_dids: &HashSet, + filter_to_targets: &HashSet, + ) -> Result), String>> { + let collection = Collection(collection.to_string()); + let path = RPath(path.to_string()); + + let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); + + let after = after.map(|s| s.parse::().map(TargetId)).transpose()?; + + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { + eprintln!("Target not found for {target_key:?}"); + return Ok(Default::default()); + }; + + let filter_did_ids: HashMap = filter_dids + .iter() + .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) + .collect::>>()? + .into_iter() + .map(|DidIdValue(id, active)| (id, active)) + .collect(); + + let mut filter_to_target_ids: HashSet = HashSet::new(); + for t in filter_to_targets { + for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { + filter_to_target_ids.insert(target_id); + } + } + + let linkers = self.get_target_linkers(&target_id)?; + + // we want to provide many to many which effectively means that we want to show a specific + // list of reords that is linked to by a specific number of linkers + let mut grouped_links: BTreeMap> = BTreeMap::new(); + for (did_id, rkey) in linkers.0 { + if did_id.is_empty() { + continue; + } + + if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) { + continue; + } + + // Make sure the current did is active + let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { + eprintln!("failed to look up did from did_id {did_id:?}"); + continue; + }; + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); + continue; + }; + if !active { + continue; + } + + let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey.clone()); + let Some(targets) = self.get_record_link_targets(&record_link_key)? else { + continue; + }; + + let Some(fwd_target) = targets + .0 + .into_iter() + .filter_map(|RecordLinkTarget(rpath, target_id)| { + if rpath.0 == path_to_other + && (filter_to_target_ids.is_empty() + || filter_to_target_ids.contains(&target_id)) + { + Some(target_id) + } else { + None + } + }) + .take(1) + .next() + else { + eprintln!("no forward match found."); + continue; + }; + + // pagination logic mirrors what is currently done in get_many_to_many_counts + if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) { + continue; + } + let page_is_full = grouped_links.len() as u64 >= limit; + if page_is_full { + let current_max = grouped_links.keys().next_back().unwrap(); + if fwd_target > *current_max { + continue; + } + } + + // pagination, continued + let mut should_evict = false; + let entry = grouped_links.entry(fwd_target.clone()).or_insert_with(|| { + should_evict = page_is_full; + Vec::default() + }); + entry.push(RecordId { + did, + collection: collection.0.clone(), + rkey: rkey.0, + }); + + if should_evict { + grouped_links.pop_last(); + } + } + + let mut items: Vec<(String, Vec)> = Vec::with_capacity(grouped_links.len()); + for (fwd_target_id, records) in &grouped_links { + let Some(target_key) = self + .target_id_table + .get_val_from_id(&self.db, fwd_target_id.0)? + else { + eprintln!("failed to look up target from target_id {fwd_target_id:?}"); + continue; + }; + + let target_string = target_key.0 .0; + + items.push((target_string, records.clone())); + } + + let next = if grouped_links.len() as u64 >= limit { + grouped_links.keys().next_back().map(|k| format!("{}", k.0)) + } else { + None + }; + + Ok(PagedOrderedCollection { items, next }) + } + fn get_links( &self, target: &str, diff --git a/constellation/templates/get-many-to-many.html.j2 b/constellation/templates/get-many-to-many.html.j2 new file mode 100644 index 0000000..acf6b36 --- /dev/null +++ b/constellation/templates/get-many-to-many.html.j2 @@ -0,0 +1,60 @@ +{% extends "base.html.j2" %} +{% import "try-it-macros.html.j2" as try_it %} + +{% block title %}Many-to-Many Links{% endblock %} +{% block description %}All {{ query.source }} records with many-to-many links to {{ query.subject }} joining through {{ query.path_to_other }}{% endblock %} + +{% block content %} + + {% call try_it::get_many_to_many(query.subject, query.source, query.path_to_other, query.did, query.other_subject, query.limit) %} + +

+ Many-to-many links to {{ query.subject }} + {% if let Some(browseable_uri) = query.subject|to_browseable %} + browse record + {% endif %} +

+ +

Many-to-many links from {{ query.source }} joining through {{ query.path_to_other }}

+ + + +

Many-to-many links, most recent first:

+ + {% for (target, records) in linking_records %} +

Target: {{ target }} (view all links)

+ {% for record in records %} +
DID:        {{ record.did().0 }}
+Collection: {{ record.collection }}
+RKey:       {{ record.rkey }}
+-> browse record
+ {% endfor %} + {% endfor %} + + {% if let Some(c) = cursor %} +
+ + + + {% for did in query.did %} + + {% endfor %} + {% for other in query.other_subject %} + + {% endfor %} + + + +
+ {% else %} + + {% endif %} + +
+ Raw JSON response +
{{ self|tojson }}
+
+ +{% endblock %} diff --git a/constellation/templates/hello.html.j2 b/constellation/templates/hello.html.j2 index b916d1c..3403a1b 100644 --- a/constellation/templates/hello.html.j2 +++ b/constellation/templates/hello.html.j2 @@ -81,6 +81,25 @@ ) %} +

GET /xrpc/blue.microcosm.links.getManyToMany

+ +

A list of many-to-many join records linking to a target and a secondary target.

+ +

Query parameters:

+ +
    +
  • subject: required, must url-encode. Example: at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r

  • +
  • source: required. Example: app.bsky.feed.like:subject.uri

  • +
  • pathToOther: required. Path to the secondary link in the many-to-many record. Example: otherThing.uri

  • +
  • did: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze

  • +
  • otherSubject: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple subjects. Example: at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r

  • +
  • limit: optional. Default: 16. Maximum: 100

  • +
+ +

Try it:

+ {% call try_it::get_many_to_many("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", "reply.parent.uri", [""], [""], 16) %} + +

GET /links

A list of records linking to a target.

diff --git a/constellation/templates/try-it-macros.html.j2 b/constellation/templates/try-it-macros.html.j2 index 6e765b6..61c20ce 100644 --- a/constellation/templates/try-it-macros.html.j2 +++ b/constellation/templates/try-it-macros.html.j2 @@ -66,6 +66,36 @@ {% endmacro %} +{% macro get_many_to_many(subject, source, pathToOther, dids, otherSubjects, limit) %} +
+
GET /xrpc/blue.microcosm.links.getManyToMany
+  ?subject=      
+  &source=       
+  &pathToOther=  
+  {%- for did in dids %}{% if !did.is_empty() %}
+  &did=          {% endif %}{% endfor %}
+                 
+  {%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %}
+  &otherSubject= {% endif %}{% endfor %}
+                 
+  &limit=         
+
+ +{% endmacro %} + {% macro links(target, collection, path, dids, limit) %}
GET /links
-- 
2.43.0


From 81b7df557d463e6c3f071838e261f8f3d85c0f87 Mon Sep 17 00:00:00 2001
From: max 
Date: Sun, 4 Jan 2026 12:23:43 +0900
Subject: [PATCH] Add tests for new get_many_to_many query handler

---
 constellation/src/storage/mod.rs | 212 +++++++++++++++++++++++++++++++
 1 file changed, 212 insertions(+)

diff --git a/constellation/src/storage/mod.rs b/constellation/src/storage/mod.rs
index 12d4051..ec0913f 100644
--- a/constellation/src/storage/mod.rs
+++ b/constellation/src/storage/mod.rs
@@ -1564,4 +1564,216 @@ mod tests {
             }
         );
     });
+
+    test_each_storage!(get_m2m_empty, |storage| {
+        assert_eq!(
+            storage.get_many_to_many(
+                "a.com",
+                "a.b.c",
+                ".d.e",
+                ".f.g",
+                10,
+                None,
+                &HashSet::new(),
+                &HashSet::new(),
+            )?,
+            PagedOrderedCollection {
+                items: vec![],
+                next: None,
+            }
+        );
+    });
+
+    test_each_storage!(get_m2m_single, |storage| {
+        storage.push(
+            &ActionableEvent::CreateLinks {
+                record_id: RecordId {
+                    did: "did:plc:asdf".into(),
+                    collection: "app.t.c".into(),
+                    rkey: "asdf".into(),
+                },
+                links: vec![
+                    CollectedLink {
+                        target: Link::Uri("a.com".into()),
+                        path: ".abc.uri".into(),
+                    },
+                    CollectedLink {
+                        target: Link::Uri("b.com".into()),
+                        path: ".def.uri".into(),
+                    },
+                    CollectedLink {
+                        target: Link::Uri("b.com".into()),
+                        path: ".ghi.uri".into(),
+                    },
+                ],
+            },
+            0,
+        )?;
+        assert_eq!(
+            storage.get_many_to_many(
+                "a.com",
+                "app.t.c",
+                ".abc.uri",
+                ".def.uri",
+                10,
+                None,
+                &HashSet::new(),
+                &HashSet::new(),
+            )?,
+            PagedOrderedCollection {
+                items: vec![(
+                    "b.com".to_string(),
+                    vec![RecordId {
+                        did: "did:plc:asdf".into(),
+                        collection: "app.t.c".into(),
+                        rkey: "asdf".into(),
+                    }]
+                )],
+                next: None,
+            }
+        );
+    });
+
+    test_each_storage!(get_m2m_filters, |storage| {
+        storage.push(
+            &ActionableEvent::CreateLinks {
+                record_id: RecordId {
+                    did: "did:plc:asdf".into(),
+                    collection: "app.t.c".into(),
+                    rkey: "asdf".into(),
+                },
+                links: vec![
+                    CollectedLink {
+                        target: Link::Uri("a.com".into()),
+                        path: ".abc.uri".into(),
+                    },
+                    CollectedLink {
+                        target: Link::Uri("b.com".into()),
+                        path: ".def.uri".into(),
+                    },
+                ],
+            },
+            0,
+        )?;
+        storage.push(
+            &ActionableEvent::CreateLinks {
+                record_id: RecordId {
+                    did: "did:plc:asdf".into(),
+                    collection: "app.t.c".into(),
+                    rkey: "asdf2".into(),
+                },
+                links: vec![
+                    CollectedLink {
+                        target: Link::Uri("a.com".into()),
+                        path: ".abc.uri".into(),
+                    },
+                    CollectedLink {
+                        target: Link::Uri("b.com".into()),
+                        path: ".def.uri".into(),
+                    },
+                ],
+            },
+            1,
+        )?;
+        storage.push(
+            &ActionableEvent::CreateLinks {
+                record_id: RecordId {
+                    did: "did:plc:fdsa".into(),
+                    collection: "app.t.c".into(),
+                    rkey: "fdsa".into(),
+                },
+                links: vec![
+                    CollectedLink {
+                        target: Link::Uri("a.com".into()),
+                        path: ".abc.uri".into(),
+                    },
+                    CollectedLink {
+                        target: Link::Uri("c.com".into()),
+                        path: ".def.uri".into(),
+                    },
+                ],
+            },
+            2,
+        )?;
+        storage.push(
+            &ActionableEvent::CreateLinks {
+                record_id: RecordId {
+                    did: "did:plc:fdsa".into(),
+                    collection: "app.t.c".into(),
+                    rkey: "fdsa2".into(),
+                },
+                links: vec![
+                    CollectedLink {
+                        target: Link::Uri("a.com".into()),
+                        path: ".abc.uri".into(),
+                    },
+                    CollectedLink {
+                        target: Link::Uri("c.com".into()),
+                        path: ".def.uri".into(),
+                    },
+                ],
+            },
+            3,
+        )?;
+
+        // Test without filters - should get all records grouped by secondary target
+        let result = storage.get_many_to_many(
+            "a.com",
+            "app.t.c",
+            ".abc.uri",
+            ".def.uri",
+            10,
+            None,
+            &HashSet::new(),
+            &HashSet::new(),
+        )?;
+        assert_eq!(result.items.len(), 2);
+        assert_eq!(result.next, None);
+        // Find b.com group
+        let (b_target, b_records) = result.items.iter().find(|(target, _)| target == "b.com").unwrap();
+        assert_eq!(b_target, "b.com");
+        assert_eq!(b_records.len(), 2);
+        assert!(b_records.iter().any(|r| r.did.0 == "did:plc:asdf" && r.rkey == "asdf"));
+        assert!(b_records.iter().any(|r| r.did.0 == "did:plc:asdf" && r.rkey == "asdf2"));
+        // Find c.com group
+        let (c_target, c_records) = result.items.iter().find(|(target, _)| target == "c.com").unwrap();
+        assert_eq!(c_target, "c.com");
+        assert_eq!(c_records.len(), 2);
+        assert!(c_records.iter().any(|r| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa"));
+        assert!(c_records.iter().any(|r| r.did.0 == "did:plc:fdsa" && r.rkey == "fdsa2"));
+
+        // Test with DID filter - should only get records from did:plc:fdsa
+        let result = storage.get_many_to_many(
+            "a.com",
+            "app.t.c",
+            ".abc.uri",
+            ".def.uri",
+            10,
+            None,
+            &HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
+            &HashSet::new(),
+        )?;
+        assert_eq!(result.items.len(), 1);
+        let (target, records) = &result.items[0];
+        assert_eq!(target, "c.com");
+        assert_eq!(records.len(), 2);
+        assert!(records.iter().all(|r| r.did.0 == "did:plc:fdsa"));
+
+        // Test with target filter - should only get records linking to b.com
+        let result = storage.get_many_to_many(
+            "a.com",
+            "app.t.c",
+            ".abc.uri",
+            ".def.uri",
+            10,
+            None,
+            &HashSet::new(),
+            &HashSet::from_iter(["b.com".to_string()]),
+        )?;
+        assert_eq!(result.items.len(), 1);
+        let (target, records) = &result.items[0];
+        assert_eq!(target, "b.com");
+        assert_eq!(records.len(), 2);
+        assert!(records.iter().all(|r| r.did.0 == "did:plc:asdf"));
+    });
 }
-- 
2.43.0


From c674291799a5a4c39c99daa21a46b0a8a3a0c302 Mon Sep 17 00:00:00 2001
From: max 
Date: Sun, 4 Jan 2026 12:24:20 +0900
Subject: [PATCH] Fix get_m2m_empty test

---
 constellation/src/storage/mem_store.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/constellation/src/storage/mem_store.rs b/constellation/src/storage/mem_store.rs
index 17443c4..ed146b3 100644
--- a/constellation/src/storage/mem_store.rs
+++ b/constellation/src/storage/mem_store.rs
@@ -247,7 +247,7 @@ impl LinkReader for MemStorage {
     ) -> Result), String>> {
         let empty_res = Ok(PagedOrderedCollection {
             items: Vec::new(),
-            next: Some("".to_string()),
+            next: None,
         });
 
         // struct MemStorageData {
-- 
2.43.0