Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Add support for reverse ordering in link queries (Issue #1) #6

closed opened by seoul.systems targeting main from seoul.systems/microcosm-rs: order_query

The following adds support for reverse-chronological ordering when fetching back-links.

We add a non-required "reverse" query parameter that allows reversing the default order of returned back-links.

Supports both current storage backends (mem_store and rocks_store).

Labels

None yet.

Participants 2
AT URI
at://did:plc:53wellrw53o7sw4zlpfenvuh/sh.tangled.repo.pull/3majn54wt2l22
+143 -89
Interdiff #0 โ†’ #1
+19 -3
constellation/src/bin/main.rs
··· 26 #[arg(long)] 27 #[clap(default_value = "0.0.0.0:6789")] 28 bind: SocketAddr, 29 /// metrics server's listen address 30 #[arg(long)] 31 #[clap(default_value = "0.0.0.0:8765")] ··· 92 let bind = args.bind; 93 let metrics_bind = args.bind_metrics; 94 95 let stay_alive = CancellationToken::new(); 96 97 match args.backend { ··· 102 stream, 103 bind, 104 metrics_bind, 105 stay_alive, 106 ), 107 #[cfg(feature = "rocks")] ··· 136 stream, 137 bind, 138 metrics_bind, 139 stay_alive, 140 ); 141 eprintln!("run finished: {r:?}"); ··· 147 } 148 } 149 150 fn run( 151 mut storage: impl LinkStorage, 152 fixture: Option<PathBuf>, ··· 154 stream: String, 155 bind: SocketAddr, 156 metrics_bind: SocketAddr, 157 stay_alive: CancellationToken, 158 ) -> Result<()> { 159 ctrlc::set_handler({ ··· 198 .build() 199 .expect("axum startup") 200 .block_on(async { 201 - install_metrics_server(metrics_bind)?; 202 serve(readable, bind, staying_alive).await 203 }) 204 .unwrap(); ··· 206 } 207 }); 208 209 - s.spawn(move || { // monitor thread 210 let stay_alive = stay_alive.clone(); 211 let check_alive = stay_alive.clone(); 212 ··· 258 } 259 } 260 stay_alive.drop_guard(); 261 - }); 262 }); 263 264 println!("byeeee");
··· 26 #[arg(long)] 27 #[clap(default_value = "0.0.0.0:6789")] 28 bind: SocketAddr, 29 + /// optionally disable the metrics server 30 + #[arg(long)] 31 + #[clap(default_value_t = false)] 32 + collect_metrics: bool, 33 /// metrics server's listen address 34 #[arg(long)] 35 #[clap(default_value = "0.0.0.0:8765")] ··· 96 let bind = args.bind; 97 let metrics_bind = args.bind_metrics; 98 99 + let collect_metrics = args.collect_metrics; 100 let stay_alive = CancellationToken::new(); 101 102 match args.backend { ··· 107 stream, 108 bind, 109 metrics_bind, 110 + collect_metrics, 111 stay_alive, 112 ), 113 #[cfg(feature = "rocks")] ··· 142 stream, 143 bind, 144 metrics_bind, 145 + collect_metrics, 146 stay_alive, 147 ); 148 eprintln!("run finished: {r:?}"); ··· 154 } 155 } 156 157 + #[allow(clippy::too_many_lines)] 158 + #[allow(clippy::too_many_arguments)] 159 fn run( 160 mut storage: impl LinkStorage, 161 fixture: Option<PathBuf>, ··· 163 stream: String, 164 bind: SocketAddr, 165 metrics_bind: SocketAddr, 166 + collect_metrics: bool, 167 stay_alive: CancellationToken, 168 ) -> Result<()> { 169 ctrlc::set_handler({ ··· 208 .build() 209 .expect("axum startup") 210 .block_on(async { 211 + // Install metrics server only if requested 212 + if collect_metrics { 213 + install_metrics_server(metrics_bind)?; 214 + } 215 serve(readable, bind, staying_alive).await 216 }) 217 .unwrap(); ··· 219 } 220 }); 221 222 + // only spawn monitoring thread if the metrics server is running 223 + if collect_metrics { 224 + s.spawn(move || { // monitor thread 225 let stay_alive = stay_alive.clone(); 226 let check_alive = stay_alive.clone(); 227 ··· 273 } 274 } 275 stay_alive.drop_guard(); 276 + }); 277 + } 278 }); 279 280 println!("byeeee");
+24 -6
constellation/src/server/mod.rs
··· 17 use tokio::task::spawn_blocking; 18 use tokio_util::sync::CancellationToken; 19 20 - use crate::storage::{LinkReader, StorageStats}; 21 use crate::{CountsByCount, Did, RecordId}; 22 23 mod acceptable; ··· 25 26 use acceptable::{acceptable, ExtractAccept}; 27 28 - const DEFAULT_CURSOR_LIMIT: u64 = 16; 29 - const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100; 30 31 fn get_default_cursor_limit() -> u64 { 32 DEFAULT_CURSOR_LIMIT ··· 298 299 let path_to_other = format!(".{}", query.path_to_other); 300 301 let paged = store 302 .get_many_to_many_counts( 303 &query.subject, 304 collection, 305 &path, 306 &path_to_other, 307 - query.reverse, 308 limit, 309 cursor_key, 310 &filter_dids, ··· 461 }; 462 let path = format!(".{path}"); 463 464 let paged = store 465 .get_links( 466 &query.subject, 467 collection, 468 &path, 469 - query.reverse, 470 limit, 471 until, 472 &filter_dids, ··· 566 } 567 } 568 569 let paged = store 570 .get_links( 571 &query.target, 572 &query.collection, 573 &query.path, 574 - query.reverse, 575 limit, 576 until, 577 &filter_dids,
··· 17 use tokio::task::spawn_blocking; 18 use tokio_util::sync::CancellationToken; 19 20 + use crate::storage::{LinkReader, Order, StorageStats}; 21 use crate::{CountsByCount, Did, RecordId}; 22 23 mod acceptable; ··· 25 26 use acceptable::{acceptable, ExtractAccept}; 27 28 + const DEFAULT_CURSOR_LIMIT: u64 = 100; 29 + const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000; 30 31 fn get_default_cursor_limit() -> u64 { 32 DEFAULT_CURSOR_LIMIT ··· 298 299 let path_to_other = format!(".{}", query.path_to_other); 300 301 + let order = if query.reverse { 302 + Order::OldestToNewest 303 + } else { 304 + Order::NewestToOldest 305 + }; 306 + 307 let paged = store 308 .get_many_to_many_counts( 309 &query.subject, 310 collection, 311 &path, 312 &path_to_other, 313 + order, 314 limit, 315 cursor_key, 316 &filter_dids, ··· 467 }; 468 let path = format!(".{path}"); 469 470 + let order = if query.reverse { 471 + Order::OldestToNewest 472 + } else { 473 + Order::NewestToOldest 474 + }; 475 + 476 let paged = store 477 .get_links( 478 &query.subject, 479 collection, 480 &path, 481 + order, 482 limit, 483 until, 484 &filter_dids, ··· 578 } 579 } 580 581 + let order = if query.reverse { 582 + Order::OldestToNewest 583 + } else { 584 + Order::NewestToOldest 585 + }; 586 + 587 let paged = store 588 .get_links( 589 &query.target, 590 &query.collection, 591 &query.path, 592 + order, 593 limit, 594 until, 595 &filter_dids,
+30 -24
constellation/src/storage/mem_store.rs
··· 1 use super::{ 2 - LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 }; 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 5 use anyhow::Result; ··· 140 collection: &str, 141 path: &str, 142 path_to_other: &str, 143 - reverse: bool, 144 limit: u64, 145 after: Option<String>, 146 filter_dids: &HashSet<Did>, ··· 199 .iter() 200 .map(|(k, (n, u, _))| (k.0.clone(), *n, u.len() as u64)) 201 .collect(); 202 - // sort in reverse order to show entries from oldest to newest 203 - if reverse { 204 - items.sort_by(|a, b| b.cmp(a)); 205 - } else { 206 - items.sort(); 207 } 208 items = items 209 .into_iter() ··· 250 target: &str, 251 collection: &str, 252 path: &str, 253 - reverse: bool, 254 limit: u64, 255 until: Option<u64>, 256 filter_dids: &HashSet<Did>, ··· 293 let end: usize; 294 let next: Option<u64>; 295 296 - if reverse { 297 - begin = until.map(|u| (u) as usize).unwrap_or(0); 298 - end = std::cmp::min(begin + limit as usize, total); 299 - 300 - next = if end < total { 301 - Some(end as u64 + 1) 302 - } else { 303 - None 304 - }; 305 - } else { 306 - end = until 307 - .map(|u| std::cmp::min(u as usize, total)) 308 - .unwrap_or(total); 309 - begin = end.saturating_sub(limit as usize); 310 - next = if begin == 0 { None } else { Some(begin as u64) }; 311 } 312 313 let alive = did_rkeys.iter().flatten().count(); ··· 325 }) 326 .collect(); 327 328 - if reverse { 329 items.reverse(); 330 } 331
··· 1 use super::{ 2 + LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, 3 + StorageStats, 4 }; 5 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 6 use anyhow::Result; ··· 141 collection: &str, 142 path: &str, 143 path_to_other: &str, 144 + order: Order, 145 limit: u64, 146 after: Option<String>, 147 filter_dids: &HashSet<Did>, ··· 200 .iter() 201 .map(|(k, (n, u, _))| (k.0.clone(), *n, u.len() as u64)) 202 .collect(); 203 + // Sort based on order: OldestToNewest uses descending order, NewestToOldest uses ascending 204 + match order { 205 + Order::OldestToNewest => items.sort_by(|a, b| b.cmp(a)), 206 + Order::NewestToOldest => items.sort(), 207 } 208 items = items 209 .into_iter() ··· 250 target: &str, 251 collection: &str, 252 path: &str, 253 + order: Order, 254 limit: u64, 255 until: Option<u64>, 256 filter_dids: &HashSet<Did>, ··· 293 let end: usize; 294 let next: Option<u64>; 295 296 + match order { 297 + // OldestToNewest: start from the beginning, paginate forward 298 + Order::OldestToNewest => { 299 + begin = until.map(|u| (u) as usize).unwrap_or(0); 300 + end = std::cmp::min(begin + limit as usize, total); 301 + 302 + next = if end < total { 303 + Some(end as u64 + 1) 304 + } else { 305 + None 306 + }; 307 + } 308 + // NewestToOldest: start from the end, paginate backward 309 + Order::NewestToOldest => { 310 + end = until 311 + .map(|u| std::cmp::min(u as usize, total)) 312 + .unwrap_or(total); 313 + begin = end.saturating_sub(limit as usize); 314 + next = if begin == 0 { None } else { Some(begin as u64) }; 315 + } 316 } 317 318 let alive = did_rkeys.iter().flatten().count(); ··· 330 }) 331 .collect(); 332 333 + // For OldestToNewest, reverse the items to maintain forward chronological order 334 + if order == Order::OldestToNewest { 335 items.reverse(); 336 } 337
+43 -34
constellation/src/storage/mod.rs
··· 11 #[cfg(feature = "rocks")] 12 pub use rocks_store::RocksStorage; 13 14 #[derive(Debug, PartialEq)] 15 pub struct PagedAppendingCollection<T> { 16 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" ··· 72 collection: &str, 73 path: &str, 74 path_to_other: &str, 75 - reverse: bool, 76 limit: u64, 77 after: Option<String>, 78 filter_dids: &HashSet<Did>, ··· 88 target: &str, 89 collection: &str, 90 path: &str, 91 - reverse: bool, 92 limit: u64, 93 until: Option<u64>, 94 filter_dids: &HashSet<Did>, ··· 182 "a.com", 183 "app.t.c", 184 ".abc.uri", 185 - false, 186 100, 187 None, 188 &HashSet::default() ··· 686 "a.com", 687 "app.t.c", 688 ".abc.uri", 689 - false, 690 100, 691 None, 692 &HashSet::default() ··· 735 "a.com", 736 "app.t.c", 737 ".abc.uri", 738 - false, 739 2, 740 None, 741 &HashSet::default(), ··· 774 "a.com", 775 "app.t.c", 776 ".abc.uri", 777 - false, 778 2, 779 links.next, 780 &HashSet::default(), ··· 813 "a.com", 814 "app.t.c", 815 ".abc.uri", 816 - false, 817 2, 818 links.next, 819 &HashSet::default(), ··· 862 )?; 863 } 864 865 - // Test reverse: true (oldest first) 866 let links = storage.get_links( 867 "a.com", 868 "app.t.c", 869 ".abc.uri", 870 - true, 871 2, 872 None, 873 &HashSet::default(), ··· 892 total: 5, 893 } 894 ); 895 - // Test reverse: false (newest first) 896 let links = storage.get_links( 897 "a.com", 898 "app.t.c", 899 ".abc.uri", 900 - false, 901 2, 902 None, 903 &HashSet::default(), ··· 930 "a.com", 931 "app.t.c", 932 ".abc.uri", 933 - false, 934 2, 935 None, 936 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 964 "a.com", 965 "app.t.c", 966 ".abc.uri", 967 - false, 968 2, 969 None, 970 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 987 "a.com", 988 "app.t.c", 989 ".abc.uri", 990 - false, 991 2, 992 None, 993 &HashSet::from([Did("did:plc:someone-else".to_string())]), ··· 1035 "a.com", 1036 "app.t.c", 1037 ".abc.uri", 1038 - false, 1039 2, 1040 None, 1041 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 1065 "a.com", 1066 "app.t.c", 1067 ".abc.uri", 1068 - false, 1069 2, 1070 None, 1071 &HashSet::from([ ··· 1098 "a.com", 1099 "app.t.c", 1100 ".abc.uri", 1101 - false, 1102 2, 1103 None, 1104 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), ··· 1135 "a.com", 1136 "app.t.c", 1137 ".abc.uri", 1138 - false, 1139 2, 1140 None, 1141 &HashSet::default(), ··· 1164 "a.com", 1165 "app.t.c", 1166 ".abc.uri", 1167 - false, 1168 2, 1169 links.next, 1170 &HashSet::default(), ··· 1213 "a.com", 1214 "app.t.c", 1215 ".abc.uri", 1216 - false, 1217 2, 1218 None, 1219 &HashSet::default(), ··· 1256 "a.com", 1257 "app.t.c", 1258 ".abc.uri", 1259 - false, 1260 2, 1261 links.next, 1262 &HashSet::default(), ··· 1305 "a.com", 1306 "app.t.c", 1307 ".abc.uri", 1308 - false, 1309 2, 1310 None, 1311 &HashSet::default(), ··· 1342 "a.com", 1343 "app.t.c", 1344 ".abc.uri", 1345 - false, 1346 2, 1347 links.next, 1348 &HashSet::default(), ··· 1384 "a.com", 1385 "app.t.c", 1386 ".abc.uri", 1387 - false, 1388 2, 1389 None, 1390 &HashSet::default(), ··· 1417 "a.com", 1418 "app.t.c", 1419 ".abc.uri", 1420 - false, 1421 2, 1422 links.next, 1423 &HashSet::default(), ··· 1499 "a.b.c", 1500 ".d.e", 1501 ".f.g", 1502 - false, 1503 10, 1504 None, 1505 &HashSet::new(), ··· 1543 "app.t.c", 1544 ".abc.uri", 1545 ".def.uri", 1546 - false, 1547 10, 1548 None, 1549 &HashSet::new(), ··· 1643 "app.t.c", 1644 ".abc.uri", 1645 ".def.uri", 1646 - false, 1647 10, 1648 None, 1649 &HashSet::new(), ··· 1660 "app.t.c", 1661 ".abc.uri", 1662 ".def.uri", 1663 - false, 1664 10, 1665 None, 1666 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), ··· 1677 "app.t.c", 1678 ".abc.uri", 1679 ".def.uri", 1680 - false, 1681 10, 1682 None, 1683 &HashSet::new(), ··· 1753 2, 1754 )?; 1755 1756 - // Test reverse: false (default order - by target ascending) 1757 let counts = storage.get_many_to_many_counts( 1758 "a.com", 1759 "app.t.c", 1760 ".abc.uri", 1761 ".def.uri", 1762 - false, 1763 10, 1764 None, 1765 &HashSet::new(), ··· 1771 assert_eq!(counts.items[1].0, "c.com"); 1772 assert_eq!(counts.items[2].0, "d.com"); 1773 1774 - // Test reverse: true (descending order - by target descending) 1775 let counts = storage.get_many_to_many_counts( 1776 "a.com", 1777 "app.t.c", 1778 ".abc.uri", 1779 ".def.uri", 1780 - true, 1781 10, 1782 None, 1783 &HashSet::new(),
··· 11 #[cfg(feature = "rocks")] 12 pub use rocks_store::RocksStorage; 13 14 + /// Ordering for paginated link queries 15 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 16 + pub enum Order { 17 + /// Newest links first (default) 18 + NewestToOldest, 19 + /// Oldest links first 20 + OldestToNewest, 21 + } 22 + 23 #[derive(Debug, PartialEq)] 24 pub struct PagedAppendingCollection<T> { 25 pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" ··· 81 collection: &str, 82 path: &str, 83 path_to_other: &str, 84 + order: Order, 85 limit: u64, 86 after: Option<String>, 87 filter_dids: &HashSet<Did>, ··· 97 target: &str, 98 collection: &str, 99 path: &str, 100 + order: Order, 101 limit: u64, 102 until: Option<u64>, 103 filter_dids: &HashSet<Did>, ··· 191 "a.com", 192 "app.t.c", 193 ".abc.uri", 194 + Order::NewestToOldest, 195 100, 196 None, 197 &HashSet::default() ··· 695 "a.com", 696 "app.t.c", 697 ".abc.uri", 698 + Order::NewestToOldest, 699 100, 700 None, 701 &HashSet::default() ··· 744 "a.com", 745 "app.t.c", 746 ".abc.uri", 747 + Order::NewestToOldest, 748 2, 749 None, 750 &HashSet::default(), ··· 783 "a.com", 784 "app.t.c", 785 ".abc.uri", 786 + Order::NewestToOldest, 787 2, 788 links.next, 789 &HashSet::default(), ··· 822 "a.com", 823 "app.t.c", 824 ".abc.uri", 825 + Order::NewestToOldest, 826 2, 827 links.next, 828 &HashSet::default(), ··· 871 )?; 872 } 873 874 + // Test OldestToNewest order (oldest first) 875 let links = storage.get_links( 876 "a.com", 877 "app.t.c", 878 ".abc.uri", 879 + Order::OldestToNewest, 880 2, 881 None, 882 &HashSet::default(), ··· 901 total: 5, 902 } 903 ); 904 + // Test NewestToOldest order (newest first) 905 let links = storage.get_links( 906 "a.com", 907 "app.t.c", 908 ".abc.uri", 909 + Order::NewestToOldest, 910 2, 911 None, 912 &HashSet::default(), ··· 939 "a.com", 940 "app.t.c", 941 ".abc.uri", 942 + Order::NewestToOldest, 943 2, 944 None, 945 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 973 "a.com", 974 "app.t.c", 975 ".abc.uri", 976 + Order::NewestToOldest, 977 2, 978 None, 979 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 996 "a.com", 997 "app.t.c", 998 ".abc.uri", 999 + Order::NewestToOldest, 1000 2, 1001 None, 1002 &HashSet::from([Did("did:plc:someone-else".to_string())]), ··· 1044 "a.com", 1045 "app.t.c", 1046 ".abc.uri", 1047 + Order::NewestToOldest, 1048 2, 1049 None, 1050 &HashSet::from([Did("did:plc:linker".to_string())]), ··· 1074 "a.com", 1075 "app.t.c", 1076 ".abc.uri", 1077 + Order::NewestToOldest, 1078 2, 1079 None, 1080 &HashSet::from([ ··· 1107 "a.com", 1108 "app.t.c", 1109 ".abc.uri", 1110 + Order::NewestToOldest, 1111 2, 1112 None, 1113 &HashSet::from([Did("did:plc:someone-unknown".to_string())]), ··· 1144 "a.com", 1145 "app.t.c", 1146 ".abc.uri", 1147 + Order::NewestToOldest, 1148 2, 1149 None, 1150 &HashSet::default(), ··· 1173 "a.com", 1174 "app.t.c", 1175 ".abc.uri", 1176 + Order::NewestToOldest, 1177 2, 1178 links.next, 1179 &HashSet::default(), ··· 1222 "a.com", 1223 "app.t.c", 1224 ".abc.uri", 1225 + Order::NewestToOldest, 1226 2, 1227 None, 1228 &HashSet::default(), ··· 1265 "a.com", 1266 "app.t.c", 1267 ".abc.uri", 1268 + Order::NewestToOldest, 1269 2, 1270 links.next, 1271 &HashSet::default(), ··· 1314 "a.com", 1315 "app.t.c", 1316 ".abc.uri", 1317 + Order::NewestToOldest, 1318 2, 1319 None, 1320 &HashSet::default(), ··· 1351 "a.com", 1352 "app.t.c", 1353 ".abc.uri", 1354 + Order::NewestToOldest, 1355 2, 1356 links.next, 1357 &HashSet::default(), ··· 1393 "a.com", 1394 "app.t.c", 1395 ".abc.uri", 1396 + Order::NewestToOldest, 1397 2, 1398 None, 1399 &HashSet::default(), ··· 1426 "a.com", 1427 "app.t.c", 1428 ".abc.uri", 1429 + Order::NewestToOldest, 1430 2, 1431 links.next, 1432 &HashSet::default(), ··· 1508 "a.b.c", 1509 ".d.e", 1510 ".f.g", 1511 + Order::NewestToOldest, 1512 10, 1513 None, 1514 &HashSet::new(), ··· 1552 "app.t.c", 1553 ".abc.uri", 1554 ".def.uri", 1555 + Order::NewestToOldest, 1556 10, 1557 None, 1558 &HashSet::new(), ··· 1652 "app.t.c", 1653 ".abc.uri", 1654 ".def.uri", 1655 + Order::NewestToOldest, 1656 10, 1657 None, 1658 &HashSet::new(), ··· 1669 "app.t.c", 1670 ".abc.uri", 1671 ".def.uri", 1672 + Order::NewestToOldest, 1673 10, 1674 None, 1675 &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), ··· 1686 "app.t.c", 1687 ".abc.uri", 1688 ".def.uri", 1689 + Order::NewestToOldest, 1690 10, 1691 None, 1692 &HashSet::new(), ··· 1762 2, 1763 )?; 1764 1765 + // Test NewestToOldest order (default order - by target ascending) 1766 let counts = storage.get_many_to_many_counts( 1767 "a.com", 1768 "app.t.c", 1769 ".abc.uri", 1770 ".def.uri", 1771 + Order::NewestToOldest, 1772 10, 1773 None, 1774 &HashSet::new(), ··· 1780 assert_eq!(counts.items[1].0, "c.com"); 1781 assert_eq!(counts.items[2].0, "d.com"); 1782 1783 + // Test OldestToNewest order (descending order - by target descending) 1784 let counts = storage.get_many_to_many_counts( 1785 "a.com", 1786 "app.t.c", 1787 ".abc.uri", 1788 ".def.uri", 1789 + Order::OldestToNewest, 1790 10, 1791 None, 1792 &HashSet::new(),
+27 -22
constellation/src/storage/rocks_store.rs
··· 1 use super::{ 2 - ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, 3 - StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 use anyhow::{bail, Result}; ··· 941 collection: &str, 942 path: &str, 943 path_to_other: &str, 944 - reverse: bool, 945 limit: u64, 946 after: Option<String>, 947 filter_dids: &HashSet<Did>, ··· 1084 items.push((target.0 .0, *n, dids.len() as u64)); 1085 } 1086 1087 - // Sort in desired direction 1088 - if reverse { 1089 - items.sort_by(|a, b| b.cmp(a)); // descending 1090 - } else { 1091 - items.sort(); // ascending 1092 } 1093 1094 let next = if grouped_counts.len() as u64 >= limit { ··· 1136 target: &str, 1137 collection: &str, 1138 path: &str, 1139 - reverse: bool, 1140 limit: u64, 1141 until: Option<u64>, 1142 filter_dids: &HashSet<Did>, ··· 1182 let begin: usize; 1183 let next: Option<u64>; 1184 1185 - if reverse { 1186 - begin = until.map(|u| (u - 1) as usize).unwrap_or(0); 1187 - end = std::cmp::min(begin + limit as usize, total as usize); 1188 - 1189 - next = if end < total as usize { 1190 - Some(end as u64 + 1) 1191 - } else { 1192 - None 1193 } 1194 - } else { 1195 - end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1196 - begin = end.saturating_sub(limit as usize); 1197 - next = if begin == 0 { None } else { Some(begin as u64) }; 1198 } 1199 1200 let mut did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1201 1202 - if reverse { 1203 did_id_rkeys.reverse(); 1204 } 1205
··· 1 use super::{ 2 + ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection, 3 + PagedOrderedCollection, StorageStats, 4 }; 5 use crate::{CountsByCount, Did, RecordId}; 6 use anyhow::{bail, Result}; ··· 941 collection: &str, 942 path: &str, 943 path_to_other: &str, 944 + order: Order, 945 limit: u64, 946 after: Option<String>, 947 filter_dids: &HashSet<Did>, ··· 1084 items.push((target.0 .0, *n, dids.len() as u64)); 1085 } 1086 1087 + // Sort based on order: OldestToNewest uses descending order, NewestToOldest uses ascending 1088 + match order { 1089 + Order::OldestToNewest => items.sort_by(|a, b| b.cmp(a)), // descending 1090 + Order::NewestToOldest => items.sort(), // ascending 1091 } 1092 1093 let next = if grouped_counts.len() as u64 >= limit { ··· 1135 target: &str, 1136 collection: &str, 1137 path: &str, 1138 + order: Order, 1139 limit: u64, 1140 until: Option<u64>, 1141 filter_dids: &HashSet<Did>, ··· 1181 let begin: usize; 1182 let next: Option<u64>; 1183 1184 + match order { 1185 + // OldestToNewest: start from the beginning, paginate forward 1186 + Order::OldestToNewest => { 1187 + begin = until.map(|u| (u - 1) as usize).unwrap_or(0); 1188 + end = std::cmp::min(begin + limit as usize, total as usize); 1189 + 1190 + next = if end < total as usize { 1191 + Some(end as u64 + 1) 1192 + } else { 1193 + None 1194 + } 1195 + } 1196 + // NewestToOldest: start from the end, paginate backward 1197 + Order::NewestToOldest => { 1198 + end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1199 + begin = end.saturating_sub(limit as usize); 1200 + next = if begin == 0 { None } else { Some(begin as u64) }; 1201 } 1202 } 1203 1204 let mut did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1205 1206 + // For OldestToNewest, reverse the items to maintain forward chronological order 1207 + if order == Order::OldestToNewest { 1208 did_id_rkeys.reverse(); 1209 } 1210
constellation/templates/base.html.j2

This file has not been changed.

constellation/templates/get-backlinks.html.j2

This file has not been changed.

constellation/templates/get-many-to-many-counts.html.j2

This file has not been changed.

constellation/templates/hello.html.j2

This file has not been changed.

constellation/templates/links.html.j2

This file has not been changed.

constellation/templates/try-it-macros.html.j2

This file has not been changed.

History

2 rounds 23 comments
sign up or login to add to the discussion
6 commits
expand
Add reverse ordering support to link query endpoints
Fix failing tests
Format tests
Add tests for reverse ordering in link queries
Fix pagination logic for reverse-ordered link queries
Replace boolean reverse parameter with Order enum
expand 16 comments

Damn, I just realized I have not yet added a lexicon for the new endpoint. Will do that now. At least the endpoint logic itself should not be affected by that and thus be ready for review, given its similarity to the existing getManyToManyCounts endpoint.

As Lexicons do not offer support for tuples as primitive field types I had to convert the return to use a new RecordsBySubject struct instead, which closely matches what you already did for the m2m counts endpoint.

The comments obviously don't belong here... please refer to the comments made for the PR #7

alright! reading back my own code gave me such a headache with this!

i'm reverting a few parts of this:

  • deprecated links endpoint: going to hard-code it to the existing order (deprecated endpoints don't get new features)

  • reverse ordering on many-to-many counts.

the many-to-many counts endpoint is a weird one to think about because it's not obvious what an "order" on it means. especially since the counts are aggregated, which row should be first in the response?

the current behaviour is deterministic but arbitrary: items are sorted by the other subject's target id (ie., the first time we saw the joined thingy), which is potentially unrelated to the actual many-to-many records themselves. if someone later creates a many-to-many record linking an earlier other subject, it'll become first in the list.

so: the determinism of that other-subject-target-id sorting is what enables paging to work at this endpoint, but it doesn't impose a client-meaningful order of items in the response. reversing an arbitrary order just gives you a different arbitrary order.

hopefully that makes sense ha

(additionally: there is a bit of optimization in the main join loop in the rocksdb implementation that assumes target-id-order to save work from unused pages, and i could be wrong, but i think it wouldn't stay valid under reverse paging)

(oh but i do think we can add a reverse option to the distinct-dids endpoint! probably under a new xrpc version.)

despite the above, this is merged!

i did a couple edits locally:

Just so that I understand you correctly here: We decided to not proceed with the implementation of a reverse order parameter for many-to-many related things as the current order is deterministic but arbitrary in nature, and thus reversing is essentially not providing any more information that is meaningful in the end.

If it's not too much trouble, do you mind explaining why you used Iterator::skip and ::take for the slicing? Is this simply more idiomatic or more performant even? :)

Really liked the way you resolved empty results with the empty impl by the way. Definitely less verbose and even clearer than what I opted for haha

many-to-many: yes exactly. it's not clear what expectation we would be trying to meet for a client requesting the many-to-many counts in reverse.

i'm open to being wrong here! when thinking about ordering, i was focused on the order of the many-to-many join-records themselves. which is where things felt meaningless/arbitrary because a) they're aggregated together (at this endpoint) and b) their order of creation is independent of the order this endpoint returns its results in.

but maybe having results ordered by the other-target-id does have client-relevant meaning. joined subjects are sorted by their age in a global sense?

thinking through it with concrete examples, i'm more inclined to bring it back:

  • joining tangled issues to a label: issues are ordered oldest-to-newest (independent from when the label was added)
  • joining bluesky users to a bluesky list: users are ordered by the age of their account (again independent from when they were added to the list)

hmm.

the thing that still feels off to me for these is the fact that this endpoint is an aggregate of many-to-many counts. that implies to me that the count is the thing of interest, and the order of responses in relation to the count is arbitrary again.

trying to keep the non-aggregated version of this endpoint in view, does ordering by the linked subject age make sense then? (ordering by the link actually feels more natural to me in that case) (i think i just talked myself out of and then back into bringing back your ordering here!!)

skip and take: yeah, it's a little more functional-style which works well for my brain. i like a big pipeline of chained transformations operating on sequences so i took the chance to do a little refactor with it. especially since it's immediately setting up an iterator right after slicing, it feels more cohesive to me.

it can be technically the other things too:

indexing ranges like thing[start..end] can panic if either index is out of bounds. in some cases the compiler can prove we already did the bounds checks and remove the panic hooks, but i suspect it can't here. even though we can prove that the indices are in range by reasoning about the code, it's nice not to have things-that-can-panic in the code.

i think it eliminated a Vec::reverse() in favour of an Iterator::rev(), which is potentially a very tiny efficiency improvement

m2m: I think this just boils down to how we communicate this fact to users of our API imo. As your internal back and forth shows there can be valid arguments in both ways. I think most users might naively - as I did at first - simply expect the links/records to be returned in the order they were created in. But, clearly documenting this in the endpoint description seems like a good idea here to make sure we don't leave any kind of ambiguity on the table.

skip and take: That's actually a great argument and I tend to agree with you on both fronts. Having worked professionally more with Typescript than, say, C, I always liked that you could chain transformations to a specific object together (map, filter and so on). The additional safety we get here from using iterators to slice data is new for me but definitely worth it.

m2m (again): Depending on how you see this I might reopen the PR again and introduce the parameter again. I let you make the final judgement call ofc :)

thanks for following up! i really appreciate the discussion!

100% agree the most important thing is communicating and setting accurate expectations in the api docs regardless of how this lands.

I really had convinced myself that the order was "meaningless" for clients, but then every example i was thinking of was like, welllll it kind of is actually meaningful. After a few days I'm still feeling that way, so I'm interested in bringing this back.

(the other thing that got me was switching to the other M2M PR, and remembering that this endpoint is useful as a "distinct" version of that query. it's not only useful for its counts)

i do think there's some extra logic needed in one of the loops that currently tries to avoid some work based on cursor assumptions.

aaaaaand i think there might be a bug in the existing logic, so this will be extra fun sorry! (i will see if i can finally get that to a proper repro or disprove it, so we can hopefully fix that first)

Sounds great! I think no matter how you look at it, the order is not entirely arbitrary as we collect results on the order they're stored.

Do you mind elaborating a bit further on your last two remarks before I open the PR again?! Might be easier for me to know what to look for then!

closed without merging
5 commits
expand
Add reverse ordering support to link query endpoints
Fix failing tests
Format tests
Add tests for reverse ordering in link queries
Fix pagination logic for reverse-ordered link queries
expand 7 comments

Looks like we might need a rebase, seems like this picked up (and is trying to undo) the changes from your other two PRs

โœ… naming the query param reverse sounds good to me since it's consistent with com.atproto.repo.listRecords (and i assume/hopefully most other atproto lexicons)

i think we double-reverse for reverse-order backlinks:

https://tangled.org/microcosm.blue/microcosm-rs/pulls/6/round/0#constellation%2fsrc%2fstorage%2frocks_store.rs-N1200

could probably work it so we only call reverse once (when "forward". not confusing at all.)

Looks like we might need a rebase, seems like this picked up (and is trying to undo) the changes from your other two PRs

I think merging upstream into the fork might work as well and makes things easier for you as the reviewer. I don't mind rebasing ofc.

i think we double-reverse for reverse-order backlinks:

Yeah the existing .rev() calls were a bit confusing at first. Maybe changing the default order to return the oldest links per default might make things a bit clearer? Though I think this is mostly helping us as the maintainers as users might expect to get served the newest ones most of the time.

Another option could be introducing an "order" query parameter with two possible values asc and desc (or something similar) to make things a bit more clear what the default order is, but then we would lose the consistency between our endpoint and com.atproto.repo.listRecords.

We could reduce the number of times we reverse using the following I think:

let mut did_id_rkeys = linkers.0[begin..end].iter().collect::<Vec<_>>();

if !reverse {
    did_id_rkeys.reverse();
}

Some comments might be justified though. That conditional looks super confusing lol

Upon thinking about this a bit more I think we should keep the reverse query parameter as is and don't touch the default order as this would be an unnecessary API break. Instead we could just introduce an enum and convert the query parameter to an enum value immediately after receiving. Other operations would be clearer then. We should still add some more context somewhere in the comments though. I had something like this in mind. What do you think?

// As backlinks are stored chronologically (oldest โ†’ newest) we need to reverse for newest-first queries
pub enum Order {
    NewestToOldest,  // default (reverse=false)
    OldestToNewest,  // reverse=true
}

impl From<bool> for Order {
    fn from(reverse: bool) -> Self {
        if reverse {
            Order::OldestToNewest
        } else {
            Order::NewestToOldest
        }
    }
}

// In server/mod.rs, where we receive the query parameter
fn get_links(
    accept: ExtractAccept,
    query: axum_extra::extract::Query<GetLinkItemsQuery>,
    store: impl LinkReader,
) -> Result<impl IntoResponse, http::StatusCode> {
    // Convert boolean to enum right at the boundary
    let order = Order::from(query.reverse);

    // Now pass the enum to storage layer
    let paged = store.get_links(
        &query.target,
        &query.collection,
        &query.path,
        order,  // โ† Clean enum instead of mysterious boolean
        limit,
        until,
        &filter_dids,
    )?;
    // ...
}

// In rocks_store.rs get_links() - this replaces the confusing double-reverse!
let did_id_rkeys = match order {
    Order::OldestToNewest => {
        begin = until.map(|u| (u - 1) as usize).unwrap_or(0);
        end = std::cmp::min(begin + limit as usize, total as usize);
        next = if end < total as usize {
            Some(end as u64 + 1)
        } else {
            None
        };
        // No reversal - storage is already chronological
        linkers.0[begin..end].iter().collect::<Vec<_>>()
    }
    Order::NewestToOldest => {
        end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
        begin = end.saturating_sub(limit as usize);
        next = if begin == 0 { None } else { Some(begin as u64) };
        // Reverse chronological storage to get newest-to-oldest
        linkers.0[begin..end].iter().rev().collect::<Vec<_>>()
    }
};

Thanks for the follow-up!

For the API, I agree with where you landed: keep it newest-first by default, optional reverse param being true makes it chronological. Small nit would be to skip the From<bool> impl since a bool itself doesn't inherently carry directional meaning; the endpoint gives it that meaning, so we can put the let order = if query.reverse { Order::OldestToNewest } ... directly in the endpoint.

i think you're right to just put the whole construction of the vec and cursor into branches on Order. part of me wants to try and encapsulate it a bit and switch the indexing to use linkers.0.iter().skip(begin).take(end) or whatever but the .rev() in there will still require a branch and/or make type things annoying and probably all of it less clear. i like your code.

comments about order are excellent thank you :)

Rebased the PR to match the current state of main and introduced the Order enum to make the sorting of items a bit clearer throughout the handlers that use the reverse parameter already (followed your suggestion regarding the From<bool> and put the definition of the order parameter directly in the endpoint itself before passing it as a parameter to the underlying handler).