Experiments in applying Entity-Component-System patterns to durable data storage APIs.

Compare changes

Choose any two refs to compare.

+532 -11
+5
src/change_tracking_disable.sql
··· 1 + drop trigger if exists components_changes_attach_trigger; 2 + 3 + drop trigger if exists components_changes_update_trigger; 4 + 5 + drop trigger if exists components_changes_detach_trigger;
+40
src/change_tracking_enable.sql
··· 1 + -- Runs when a component is attached 2 + create trigger if not exists components_changes_attach_trigger 3 + after insert 4 + on components for each row 5 + begin 6 + insert into changes (entity, component, change) 7 + select new.entity, new.component, 'create' 8 + where not exists ( 9 + select true from components 10 + where entity = new.entity 11 + and component != new.component 12 + ); 13 + 14 + insert into changes (entity, component, change) 15 + select new.entity, new.component, 'attach'; 16 + end; 17 + 18 + -- Runs when a component is changed 19 + create trigger if not exists components_changes_update_trigger 20 + after update 21 + on components for each row 22 + begin 23 + insert into changes (entity, component, change) 24 + values (new.entity, new.component, 'attach'); 25 + end; 26 + 27 + -- Runs when a component is detached 28 + create trigger if not exists components_changes_detach_trigger 29 + after delete 30 + on components for each row 31 + begin 32 + insert into changes (entity, component, change) 33 + values (old.entity, old.component, 'detach'); 34 + 35 + insert into changes (entity, component, change) 36 + select old.entity, old.component, 'destroy' 37 + where not exists ( 38 + select true from components where entity = old.entity 39 + ); 40 + end;
+1 -1
src/entity.rs
··· 1 1 use std::iter; 2 2 3 3 use rusqlite::params; 4 - use tracing::{debug, trace}; 4 + use tracing::debug; 5 5 6 6 use crate::{ 7 7 component::Bundle,
+209 -8
src/lib.rs
··· 1 1 pub mod component; 2 - 3 2 pub use component::{Component, ComponentRead, ComponentWrite}; 4 3 5 4 pub mod entity; ··· 14 13 15 14 16 15 16 + pub use resource::*; 17 17 18 + pub mod system; 19 + use ::rusqlite::{params, OptionalExtension}; 20 + pub use system::*; 18 21 22 + use std::path::Path; 19 23 20 24 21 25 ··· 68 72 69 73 70 74 75 + } 76 + } 71 77 78 + impl Ecs { 79 + pub fn enable_change_tracking(&mut self) -> Result<(), Error> { 80 + self.conn 81 + .execute_batch(include_str!("change_tracking_enable.sql"))?; 82 + Ok(()) 83 + } 72 84 73 - 74 - 75 - 85 + pub fn disable_change_tracking(&mut self) -> Result<(), Error> { 86 + self.conn 87 + .execute_batch(include_str!("change_tracking_disable.sql"))?; 88 + Ok(()) 76 89 } 77 90 } 78 91 ··· 181 194 } 182 195 } 183 196 184 - pub mod rusqlite { 185 - pub use rusqlite::*; 197 + #[derive(Debug, PartialEq, Eq)] 198 + pub enum Change { 199 + Create { entity: EntityId }, 200 + Attach { entity: EntityId, component: String }, 201 + Detach { entity: EntityId, component: String }, 202 + Destroy { entity: EntityId }, 186 203 } 187 204 205 + impl Ecs { 206 + pub fn latest_change_id(&self) -> Result<Option<i64>, Error> { 207 + let seq: Option<i64> = self.conn.query_row_and_then( 208 + "select max(sequence) from changes", 209 + params![], 210 + |row| row.get(0), 211 + )?; 188 212 213 + Ok(seq) 214 + } 189 215 216 + pub fn clear_changes_up_to(&self, up_to: i64) -> Result<(), Error> { 217 + self.conn 218 + .execute("delete from changes where sequence < ?1", params![up_to])?; 219 + Ok(()) 220 + } 190 221 222 + pub fn changes(&self) -> Result<Vec<Change>, Error> { 223 + let mut stmt = self 224 + .conn 225 + .prepare_cached("select * from changes order by sequence asc")?; 226 + 227 + let changes = stmt 228 + .query_map(params![], |row| { 229 + let entity = row.get("entity")?; 230 + let change: String = row.get("change")?; 231 + 232 + match change.as_str() { 233 + "create" => Ok(Change::Create { entity }), 234 + "attach" => { 235 + let component = row.get("component")?; 236 + Ok(Change::Attach { entity, component }) 237 + } 238 + "detach" => { 239 + let component = row.get("component")?; 240 + Ok(Change::Detach { entity, component }) 241 + } 242 + "destroy" => Ok(Change::Destroy { entity }), 243 + other => { 244 + panic!("Invalid 'changes.change' {other:?}"); 245 + } 246 + } 247 + })? 248 + .collect::<Result<_, _>>()?; 191 249 250 + Ok(changes) 251 + } 192 252 253 + pub fn clear_changes(&self) -> Result<(), Error> { 254 + self.conn.execute("delete from changes", params![])?; 255 + Ok(()) 256 + } 257 + } 193 258 259 + pub mod rusqlite { 260 + pub use rusqlite::*; 261 + } 194 262 195 263 196 264 ··· 207 275 208 276 209 277 278 + impl sea_query::Iden for Components { 279 + fn unquoted(&self, s: &mut dyn std::fmt::Write) { 280 + let v = match self { 281 + Self::Table => "components", 282 + Self::Entity => "entity", 283 + Self::Component => "component", 284 + Self::Data => "data", 285 + }; 286 + write!(s, "{v}").unwrap() 287 + } 288 + } 210 289 290 + #[allow(unused)] 291 + pub enum Changes { 292 + Table, 293 + Sequence, 294 + Entity, 295 + Component, 296 + Change, 297 + } 211 298 212 - 299 + impl sea_query::Iden for Changes { 300 + fn unquoted(&self, s: &mut dyn std::fmt::Write) { 301 + let v = match self { 302 + Self::Table => "changes", 303 + Self::Sequence => "sequence", 304 + Self::Entity => "entity", 305 + Self::Component => "component", 306 + Self::Change => "change", 307 + }; 308 + write!(s, "{v}").unwrap() 309 + } 213 310 214 311 215 312 216 313 #[cfg(test)] 217 314 mod tests { 218 315 // #[derive(Component)] derives `impl ecsdb::Component for ...` 219 - use crate::{self as ecsdb, Ecs}; 316 + use crate::{self as ecsdb, Change, Ecs}; 220 317 use crate::{BelongsTo, Component}; 221 318 222 319 use anyhow::anyhow; ··· 572 669 573 670 Ok(()) 574 671 } 672 + 673 + #[test] 674 + fn change_tracking_enable_disable() -> Result<(), anyhow::Error> { 675 + let mut ecs = super::Ecs::open_in_memory()?; 676 + ecs.enable_change_tracking()?; 677 + 678 + assert_eq!(ecs.changes()?, vec![]); 679 + 680 + ecs.new_entity().attach(A); 681 + assert_eq!(ecs.changes()?.len(), 2); 682 + 683 + ecs.disable_change_tracking()?; 684 + ecs.clear_changes()?; 685 + 686 + ecs.new_entity().attach(A); 687 + assert!(ecs.changes()?.is_empty()); 688 + 689 + Ok(()) 690 + } 691 + 692 + #[test] 693 + fn change_tracking() -> Result<(), anyhow::Error> { 694 + let mut ecs = super::Ecs::open_in_memory()?; 695 + ecs.enable_change_tracking()?; 696 + 697 + let mut changes = vec![]; 698 + 699 + assert_eq!(ecs.changes()?, vec![]); 700 + 701 + let entity = ecs.new_entity().attach(A); 702 + 703 + changes.extend([ 704 + Change::Create { 705 + entity: entity.id(), 706 + }, 707 + Change::Attach { 708 + entity: entity.id(), 709 + component: <A as Component>::component_name().to_owned(), 710 + }, 711 + ]); 712 + 713 + assert_eq!(ecs.changes()?, changes); 714 + 715 + entity.attach(B); 716 + changes.push(Change::Attach { 717 + entity: entity.id(), 718 + component: <B as Component>::component_name().to_owned(), 719 + }); 720 + 721 + assert_eq!(ecs.changes()?, changes); 722 + 723 + entity.detach::<B>(); 724 + changes.push(Change::Detach { 725 + entity: entity.id(), 726 + component: <B as Component>::component_name().to_owned(), 727 + }); 728 + 729 + assert_eq!(ecs.changes()?, changes); 730 + 731 + entity.detach::<A>(); 732 + changes.extend([ 733 + Change::Detach { 734 + entity: entity.id(), 735 + component: <A as Component>::component_name().to_owned(), 736 + }, 737 + Change::Destroy { 738 + entity: entity.id(), 739 + }, 740 + ]); 741 + 742 + assert_eq!(ecs.changes()?, changes); 743 + 744 + ecs.clear_changes()?; 745 + assert!(ecs.changes()?.is_empty()); 746 + 747 + Ok(()) 748 + } 749 + 750 + #[test] 751 + fn changed_system_param() -> Result<(), anyhow::Error> { 752 + #[derive(Debug, Deserialize, Serialize, Component)] 753 + struct Seen; 754 + 755 + let mut ecs = super::Ecs::open_in_memory()?; 756 + ecs.enable_change_tracking()?; 757 + 758 + fn system(query: Query<Attached<B>>) { 759 + assert_eq!(query.iter().map(|e| e.id()).collect::<Vec<_>>(), vec![200]); 760 + 761 + query.iter().for_each(|e| { 762 + e.attach(Seen); 763 + }); 764 + } 765 + 766 + ecs.register(system); 767 + 768 + ecs.entity(100).attach(A); 769 + ecs.entity(200).attach(B); 770 + 771 + ecs.tick(); 772 + assert!(ecs.entity(200).has::<Seen>()); 773 + 774 + Ok(()) 775 + } 575 776 }
+7
src/schema.sql
··· 20 20 21 21 end; 22 22 23 + create table if not exists changes ( 24 + sequence integer primary key autoincrement, 25 + entity integer not null, 26 + component text, 27 + change text not null 28 + ); 29 + 23 30 create table if not exists resources ( 24 31 name text not null unique, 25 32 data blob,
+214 -1
src/system.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 2 use tracing::{debug, error}; 3 3 4 - use crate::{self as ecsdb, query, Component, Ecs, Entity, EntityId}; 4 + use crate::{self as ecsdb, query, Component, Ecs, Entity, Error}; 5 5 6 6 use core::marker::PhantomData; 7 7 use std::borrow::Cow; 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 + 31 + 32 + 33 + 34 + 35 + 36 + 37 + 38 + 39 + 40 + 41 + 42 + 43 + 44 + 45 + 46 + 47 + 48 + 49 + 50 + 51 + 52 + 53 + 54 + 55 + 56 + 57 + 58 + 59 + 60 + 61 + 62 + 63 + 64 + 65 + 66 + 67 + 68 + 69 + 70 + 71 + 72 + 73 + 74 + 75 + 76 + 77 + 78 + 79 + 80 + 81 + 82 + 83 + 84 + 85 + 86 + 87 + 88 + 89 + 90 + 91 + 92 + 93 + 94 + 95 + 96 + 97 + 98 + 99 + 100 + 101 + 102 + 103 + 104 + 105 + 106 + 107 + 108 + 109 + 110 + 111 + 112 + 113 + 114 + 115 + 116 + 117 + 118 + 119 + 120 + 121 + 122 + 123 + 124 + 125 + 126 + 127 + 128 + 129 + 130 + 131 + 132 + 133 + 134 + 135 + 136 + 137 + 138 + 139 + 140 + 141 + 142 + 143 + 144 + 145 + 146 + 147 + 148 + 149 + 150 + 151 + 152 + 153 + 154 + 155 + 156 + 157 + 158 + 159 + 160 + 161 + 162 + 163 + 164 + 165 + 166 + 167 + 168 + 169 + 170 + 171 + 172 + 173 + 174 + 175 + 176 + 177 + 178 + 179 + 180 + 181 + 182 + 183 + 184 + 185 + 186 + 187 + 188 + 189 + 190 + 191 + 192 + 193 + 194 + 195 + 196 + impl Ecs { 197 + pub fn tick(&self) { 198 + let latest_change = self.latest_change_id().unwrap(); 199 + 200 + for system in &self.systems { 201 + let _span = tracing::info_span!("system", name = system.name().as_ref()).entered(); 202 + let started = std::time::Instant::now(); 203 + 204 + 205 + 206 + 207 + 208 + 209 + 210 + 211 + error!(?e); 212 + } 213 + 214 + if let Some(latest_change) = latest_change { 215 + self.clear_changes_up_to(latest_change).unwrap(); 216 + } 217 + 218 + entity.attach(LastRun(chrono::Utc::now())); 219 + 220 + debug!(elapsed_ms = started.elapsed().as_millis(), "Finished",);
+56 -1
src/query.rs
··· 1 - use crate::EntityId; 1 + use crate::{sql::Changes, EntityId}; 2 2 3 3 use super::{sql::Components, Component}; 4 4 use std::{any, marker::PhantomData}; ··· 167 167 } 168 168 169 169 pub struct Without<C>(PhantomData<C>); 170 + 170 171 impl<C: Component> Filter for Without<C> { 171 172 fn sql_query() -> sea_query::SelectStatement { 172 173 use sea_query::*; ··· 178 179 } 179 180 } 180 181 182 + pub struct Attached<C>(PhantomData<C>); 183 + 184 + impl<C: Component> Filter for Attached<C> { 185 + fn sql_query() -> sea_query::SelectStatement { 186 + use sea_query::*; 187 + Query::select() 188 + .column(Changes::Entity) 189 + .from(Changes::Table) 190 + .and_where(Expr::col(Changes::Component).eq(C::component_name())) 191 + .and_where(Expr::col(Changes::Change).eq("attach")) 192 + .take() 193 + } 194 + } 195 + 196 + pub struct Detached<C>(PhantomData<C>); 197 + 198 + impl<C: Component> Filter for Detached<C> { 199 + fn sql_query() -> sea_query::SelectStatement { 200 + use sea_query::*; 201 + Query::select() 202 + .column(Changes::Entity) 203 + .from(Changes::Table) 204 + .and_where(Expr::col(Changes::Component).eq(C::component_name())) 205 + .and_where(Expr::col(Changes::Change).eq("detach")) 206 + .take() 207 + } 208 + } 209 + 210 + pub struct Created; 211 + 212 + impl Filter for Created { 213 + fn sql_query() -> sea_query::SelectStatement { 214 + use sea_query::*; 215 + Query::select() 216 + .column(Changes::Entity) 217 + .from(Changes::Table) 218 + .and_where(Expr::col(Changes::Change).eq("create")) 219 + .take() 220 + } 221 + } 222 + 223 + pub struct Destroyed; 224 + 225 + impl Filter for Destroyed { 226 + fn sql_query() -> sea_query::SelectStatement { 227 + use sea_query::*; 228 + Query::select() 229 + .column(Changes::Entity) 230 + .from(Changes::Table) 231 + .and_where(Expr::col(Changes::Change).eq("destroy")) 232 + .take() 233 + } 234 + } 235 + 181 236 pub struct Or<T>(PhantomData<T>); 182 237 183 238 macro_rules! filter_tuple_impl {