a tool for shared writing and social publishing
1import { PgTransaction } from "drizzle-orm/pg-core"; 2import { Fact, PermissionToken } from "."; 3import { MutationContext } from "./mutations"; 4import { supabaseServerClient } from "supabase/serverClient"; 5import { entities, facts } from "drizzle/schema"; 6import * as driz from "drizzle-orm"; 7import { Attribute, Attributes, FilterAttributes } from "./attributes"; 8import { v7 } from "uuid"; 9import { DeepReadonly } from "replicache"; 10 11type WriteCacheEntry = 12 | { type: "put"; fact: Fact<any> } 13 | { type: "del"; fact: { id: string } }; 14 15export function cachedServerMutationContext( 16 tx: PgTransaction<any, any, any>, 17 permission_token_id: string, 18 token_rights: PermissionToken["permission_token_rights"], 19) { 20 let writeCache: WriteCacheEntry[] = []; 21 let eavCache = new Map<string, DeepReadonly<Fact<Attribute>>[]>(); 22 let permissionsCache: { [key: string]: boolean } = {}; 23 let entitiesCache: { set: string; id: string }[] = []; 24 let deleteEntitiesCache: string[] = []; 25 let textAttributeWriteCache = {} as { 26 [entityAttribute: string]: { [clientID: string]: string }; 27 }; 28 29 const scanIndex = { 30 async eav<A extends Attribute>(entity: string, attribute: A) { 31 let cached = eavCache.get(`${entity}-${attribute}`) as DeepReadonly< 32 Fact<A> 33 >[]; 34 let baseFacts: DeepReadonly<Fact<A>>[]; 35 if (deleteEntitiesCache.includes(entity)) return []; 36 if (cached) baseFacts = cached; 37 else { 38 cached = (await tx 39 .select({ 40 id: facts.id, 41 data: facts.data, 42 entity: facts.entity, 43 attribute: facts.attribute, 44 }) 45 .from(facts) 46 .where( 47 driz.and( 48 driz.eq(facts.attribute, attribute), 49 driz.eq(facts.entity, entity), 50 ), 51 )) as DeepReadonly<Fact<A>>[]; 52 } 53 cached = cached.filter( 54 (f) => 55 !writeCache.find((wc) => wc.type === "del" && wc.fact.id === f.id), 56 ); 57 let newlyWrittenFacts = writeCache.filter( 58 (f) => 59 f.type === "put" && 60 f.fact.attribute === attribute && 61 f.fact.entity === entity, 62 ); 63 return [ 64 ...cached, 65 ...newlyWrittenFacts.map((f) => f.fact as Fact<A>), 66 ].filter( 67 (f) => 68 !( 69 (f.data.type === "reference" || 70 f.data.type === "ordered-reference" || 71 f.data.type === "spatial-reference") && 72 deleteEntitiesCache.includes(f.data.value) 73 ), 74 ) as DeepReadonly<Fact<A>>[]; 75 }, 76 }; 77 let getContext = (clientID: string, mutationID: number) => { 78 let ctx: MutationContext & { 79 checkPermission: (entity: string) => Promise<boolean>; 80 } = { 81 scanIndex, 82 permission_token_id, 83 async runOnServer(cb) { 84 return cb({ supabase: supabaseServerClient }); 85 }, 86 async checkPermission(entity: string) { 87 if (deleteEntitiesCache.includes(entity)) return false; 88 let cachedEntity = entitiesCache.find((e) => e.id === entity); 89 if (cachedEntity) { 90 return !!token_rights.find( 91 (r) => r.entity_set === cachedEntity?.set && r.write === true, 92 ); 93 } 94 if (permissionsCache[entity] !== undefined) 95 return permissionsCache[entity]; 96 let [permission_set] = await tx 97 .select({ entity_set: entities.set }) 98 .from(entities) 99 .where(driz.eq(entities.id, entity)); 100 let hasPermission = 101 !!permission_set && 102 !!token_rights.find( 103 (r) => 104 r.entity_set === permission_set.entity_set && r.write == true, 105 ); 106 permissionsCache[entity] = hasPermission; 107 return hasPermission; 108 }, 109 async runOnClient(_cb) {}, 110 async createEntity({ entityID, permission_set }) { 111 if ( 112 !token_rights.find( 113 (r) => r.entity_set === permission_set && r.write === true, 114 ) 115 ) { 116 return false; 117 } 118 if (!entitiesCache.find((e) => e.id === entityID)) 119 entitiesCache.push({ set: permission_set, id: entityID }); 120 deleteEntitiesCache = deleteEntitiesCache.filter((e) => e !== entityID); 121 return true; 122 }, 123 async deleteEntity(entity) { 124 if (!(await this.checkPermission(entity))) return; 125 deleteEntitiesCache.push(entity); 126 entitiesCache = entitiesCache.filter((e) => e.id !== entity); 127 writeCache = writeCache.filter( 128 (f) => 129 f.type !== "put" || 130 (f.fact.entity !== entity && f.fact.data.value !== entity), 131 ); 132 }, 133 async assertFact(f) { 134 if (!f.entity) return; 135 let attribute = Attributes[f.attribute as Attribute]; 136 if (!attribute) return; 137 let id = f.id || v7(); 138 let data = { ...f.data }; 139 if (!(await this.checkPermission(f.entity))) return; 140 if (attribute.cardinality === "one") { 141 let existingFact = await scanIndex.eav(f.entity, f.attribute); 142 if (existingFact[0]) { 143 id = existingFact[0].id; 144 } 145 } 146 writeCache = writeCache.filter((f) => f.fact.id !== id); 147 writeCache.push({ 148 type: "put", 149 fact: { 150 id: id, 151 entity: f.entity, 152 data: data, 153 attribute: f.attribute, 154 }, 155 }); 156 }, 157 async retractFact(factID) { 158 writeCache = writeCache.filter((f) => f.fact.id !== factID); 159 writeCache.push({ type: "del", fact: { id: factID } }); 160 }, 161 }; 162 return ctx; 163 }; 164 let flush = async () => { 165 let flushStart = performance.now(); 166 let timeInsertingEntities = 0; 167 let timeProcessingFactWrites = 0; 168 let timeDeletingEntities = 0; 169 let timeDeletingFacts = 0; 170 let timeCacheCleanup = 0; 171 172 // Insert entities 173 let entityInsertStart = performance.now(); 174 if (entitiesCache.length > 0) 175 await tx 176 .insert(entities) 177 .values(entitiesCache.map((e) => ({ set: e.set, id: e.id }))) 178 .onConflictDoNothing({ target: entities.id }); 179 timeInsertingEntities = performance.now() - entityInsertStart; 180 181 // Process fact writes 182 let factWritesStart = performance.now(); 183 let factWrites = writeCache.flatMap((f) => 184 f.type === "del" ? [] : [f.fact], 185 ); 186 if (factWrites.length > 0) { 187 await tx 188 .insert(facts) 189 .values( 190 factWrites.map((f) => ({ 191 id: f.id, 192 entity: f.entity, 193 data: driz.sql`${f.data}::jsonb`, 194 attribute: f.attribute, 195 })), 196 ) 197 .onConflictDoUpdate({ 198 target: facts.id, 199 set: { 200 data: driz.sql`excluded.data`, 201 entity: driz.sql`excluded.entity`, 202 }, 203 }); 204 } 205 timeProcessingFactWrites = performance.now() - factWritesStart; 206 207 // Delete entities 208 let entityDeleteStart = performance.now(); 209 if (deleteEntitiesCache.length > 0) 210 await tx 211 .delete(entities) 212 .where(driz.inArray(entities.id, deleteEntitiesCache)); 213 timeDeletingEntities = performance.now() - entityDeleteStart; 214 215 // Delete facts 216 let factDeleteStart = performance.now(); 217 let factDeletes = writeCache.flatMap((f) => 218 f.type === "put" ? [] : [f.fact.id], 219 ); 220 if (factDeletes.length > 0 || deleteEntitiesCache.length > 0) { 221 const conditions = []; 222 if (factDeletes.length > 0) { 223 conditions.push(driz.inArray(facts.id, factDeletes)); 224 } 225 if (deleteEntitiesCache.length > 0) { 226 conditions.push( 227 driz.and( 228 driz.sql`(data->>'type' = 'ordered-reference' or data->>'type' = 'reference' or data->>'type' = 'spatial-reference')`, 229 driz.inArray(driz.sql`data->>'value'`, deleteEntitiesCache), 230 ), 231 ); 232 } 233 if (conditions.length > 0) { 234 await tx.delete(facts).where(driz.or(...conditions)); 235 } 236 } 237 timeDeletingFacts = performance.now() - factDeleteStart; 238 239 // Cache cleanup 240 let cacheCleanupStart = performance.now(); 241 writeCache = []; 242 eavCache.clear(); 243 permissionsCache = {}; 244 entitiesCache = []; 245 permissionsCache = {}; 246 deleteEntitiesCache = []; 247 timeCacheCleanup = performance.now() - cacheCleanupStart; 248 249 let totalFlushTime = performance.now() - flushStart; 250 console.log(` 251Flush Performance Breakdown (${totalFlushTime.toFixed(2)}ms): 252========================================== 253Entity Insertions (${entitiesCache.length} entities): ${timeInsertingEntities.toFixed(2)}ms 254Fact Processing (${factWrites.length} facts): ${timeProcessingFactWrites.toFixed(2)}ms 255Entity Deletions (${deleteEntitiesCache.length} entities): ${timeDeletingEntities.toFixed(2)}ms 256Fact Deletions: ${timeDeletingFacts.toFixed(2)}ms 257Cache Cleanup: ${timeCacheCleanup.toFixed(2)}ms 258 `); 259 }; 260 261 return { 262 getContext, 263 flush, 264 }; 265}