a tool for shared writing and social publishing
1import { PgTransaction } from "drizzle-orm/pg-core";
2import { Fact, PermissionToken } from ".";
3import { MutationContext } from "./mutations";
4import { supabaseServerClient } from "supabase/serverClient";
5import { entities, facts } from "drizzle/schema";
6import * as driz from "drizzle-orm";
7import { Attribute, Attributes, FilterAttributes } from "./attributes";
8import { v7 } from "uuid";
9import { DeepReadonly } from "replicache";
10
11type WriteCacheEntry =
12 | { type: "put"; fact: Fact<any> }
13 | { type: "del"; fact: { id: string } };
14
15export function cachedServerMutationContext(
16 tx: PgTransaction<any, any, any>,
17 permission_token_id: string,
18 token_rights: PermissionToken["permission_token_rights"],
19) {
20 let writeCache: WriteCacheEntry[] = [];
21 let eavCache = new Map<string, DeepReadonly<Fact<Attribute>>[]>();
22 let permissionsCache: { [key: string]: boolean } = {};
23 let entitiesCache: { set: string; id: string }[] = [];
24 let deleteEntitiesCache: string[] = [];
25 let textAttributeWriteCache = {} as {
26 [entityAttribute: string]: { [clientID: string]: string };
27 };
28
29 const scanIndex = {
30 async eav<A extends Attribute>(entity: string, attribute: A) {
31 let cached = eavCache.get(`${entity}-${attribute}`) as DeepReadonly<
32 Fact<A>
33 >[];
34 let baseFacts: DeepReadonly<Fact<A>>[];
35 if (deleteEntitiesCache.includes(entity)) return [];
36 if (cached) baseFacts = cached;
37 else {
38 cached = (await tx
39 .select({
40 id: facts.id,
41 data: facts.data,
42 entity: facts.entity,
43 attribute: facts.attribute,
44 })
45 .from(facts)
46 .where(
47 driz.and(
48 driz.eq(facts.attribute, attribute),
49 driz.eq(facts.entity, entity),
50 ),
51 )) as DeepReadonly<Fact<A>>[];
52 }
53 cached = cached.filter(
54 (f) =>
55 !writeCache.find((wc) => wc.type === "del" && wc.fact.id === f.id),
56 );
57 let newlyWrittenFacts = writeCache.filter(
58 (f) =>
59 f.type === "put" &&
60 f.fact.attribute === attribute &&
61 f.fact.entity === entity,
62 );
63 return [
64 ...cached,
65 ...newlyWrittenFacts.map((f) => f.fact as Fact<A>),
66 ].filter(
67 (f) =>
68 !(
69 (f.data.type === "reference" ||
70 f.data.type === "ordered-reference" ||
71 f.data.type === "spatial-reference") &&
72 deleteEntitiesCache.includes(f.data.value)
73 ),
74 ) as DeepReadonly<Fact<A>>[];
75 },
76 };
77 let getContext = (clientID: string, mutationID: number) => {
78 let ctx: MutationContext & {
79 checkPermission: (entity: string) => Promise<boolean>;
80 } = {
81 scanIndex,
82 permission_token_id,
83 async runOnServer(cb) {
84 return cb({ supabase: supabaseServerClient });
85 },
86 async checkPermission(entity: string) {
87 if (deleteEntitiesCache.includes(entity)) return false;
88 let cachedEntity = entitiesCache.find((e) => e.id === entity);
89 if (cachedEntity) {
90 return !!token_rights.find(
91 (r) => r.entity_set === cachedEntity?.set && r.write === true,
92 );
93 }
94 if (permissionsCache[entity] !== undefined)
95 return permissionsCache[entity];
96 let [permission_set] = await tx
97 .select({ entity_set: entities.set })
98 .from(entities)
99 .where(driz.eq(entities.id, entity));
100 let hasPermission =
101 !!permission_set &&
102 !!token_rights.find(
103 (r) =>
104 r.entity_set === permission_set.entity_set && r.write == true,
105 );
106 permissionsCache[entity] = hasPermission;
107 return hasPermission;
108 },
109 async runOnClient(_cb) {},
110 async createEntity({ entityID, permission_set }) {
111 if (
112 !token_rights.find(
113 (r) => r.entity_set === permission_set && r.write === true,
114 )
115 ) {
116 return false;
117 }
118 if (!entitiesCache.find((e) => e.id === entityID))
119 entitiesCache.push({ set: permission_set, id: entityID });
120 deleteEntitiesCache = deleteEntitiesCache.filter((e) => e !== entityID);
121 return true;
122 },
123 async deleteEntity(entity) {
124 if (!(await this.checkPermission(entity))) return;
125 deleteEntitiesCache.push(entity);
126 entitiesCache = entitiesCache.filter((e) => e.id !== entity);
127 writeCache = writeCache.filter(
128 (f) =>
129 f.type !== "put" ||
130 (f.fact.entity !== entity && f.fact.data.value !== entity),
131 );
132 },
133 async assertFact(f) {
134 if (!f.entity) return;
135 let attribute = Attributes[f.attribute as Attribute];
136 if (!attribute) return;
137 let id = f.id || v7();
138 let data = { ...f.data };
139 if (!(await this.checkPermission(f.entity))) return;
140 if (attribute.cardinality === "one") {
141 let existingFact = await scanIndex.eav(f.entity, f.attribute);
142 if (existingFact[0]) {
143 id = existingFact[0].id;
144 }
145 }
146 writeCache = writeCache.filter((f) => f.fact.id !== id);
147 writeCache.push({
148 type: "put",
149 fact: {
150 id: id,
151 entity: f.entity,
152 data: data,
153 attribute: f.attribute,
154 },
155 });
156 },
157 async retractFact(factID) {
158 writeCache = writeCache.filter((f) => f.fact.id !== factID);
159 writeCache.push({ type: "del", fact: { id: factID } });
160 },
161 };
162 return ctx;
163 };
164 let flush = async () => {
165 let flushStart = performance.now();
166 let timeInsertingEntities = 0;
167 let timeProcessingFactWrites = 0;
168 let timeDeletingEntities = 0;
169 let timeDeletingFacts = 0;
170 let timeCacheCleanup = 0;
171
172 // Insert entities
173 let entityInsertStart = performance.now();
174 if (entitiesCache.length > 0)
175 await tx
176 .insert(entities)
177 .values(entitiesCache.map((e) => ({ set: e.set, id: e.id })))
178 .onConflictDoNothing({ target: entities.id });
179 timeInsertingEntities = performance.now() - entityInsertStart;
180
181 // Process fact writes
182 let factWritesStart = performance.now();
183 let factWrites = writeCache.flatMap((f) =>
184 f.type === "del" ? [] : [f.fact],
185 );
186 if (factWrites.length > 0) {
187 await tx
188 .insert(facts)
189 .values(
190 factWrites.map((f) => ({
191 id: f.id,
192 entity: f.entity,
193 data: driz.sql`${f.data}::jsonb`,
194 attribute: f.attribute,
195 })),
196 )
197 .onConflictDoUpdate({
198 target: facts.id,
199 set: {
200 data: driz.sql`excluded.data`,
201 entity: driz.sql`excluded.entity`,
202 },
203 });
204 }
205 timeProcessingFactWrites = performance.now() - factWritesStart;
206
207 // Delete entities
208 let entityDeleteStart = performance.now();
209 if (deleteEntitiesCache.length > 0)
210 await tx
211 .delete(entities)
212 .where(driz.inArray(entities.id, deleteEntitiesCache));
213 timeDeletingEntities = performance.now() - entityDeleteStart;
214
215 // Delete facts
216 let factDeleteStart = performance.now();
217 let factDeletes = writeCache.flatMap((f) =>
218 f.type === "put" ? [] : [f.fact.id],
219 );
220 if (factDeletes.length > 0 || deleteEntitiesCache.length > 0) {
221 const conditions = [];
222 if (factDeletes.length > 0) {
223 conditions.push(driz.inArray(facts.id, factDeletes));
224 }
225 if (deleteEntitiesCache.length > 0) {
226 conditions.push(
227 driz.and(
228 driz.sql`(data->>'type' = 'ordered-reference' or data->>'type' = 'reference' or data->>'type' = 'spatial-reference')`,
229 driz.inArray(driz.sql`data->>'value'`, deleteEntitiesCache),
230 ),
231 );
232 }
233 if (conditions.length > 0) {
234 await tx.delete(facts).where(driz.or(...conditions));
235 }
236 }
237 timeDeletingFacts = performance.now() - factDeleteStart;
238
239 // Cache cleanup
240 let cacheCleanupStart = performance.now();
241 writeCache = [];
242 eavCache.clear();
243 permissionsCache = {};
244 entitiesCache = [];
245 permissionsCache = {};
246 deleteEntitiesCache = [];
247 timeCacheCleanup = performance.now() - cacheCleanupStart;
248
249 let totalFlushTime = performance.now() - flushStart;
250 console.log(`
251Flush Performance Breakdown (${totalFlushTime.toFixed(2)}ms):
252==========================================
253Entity Insertions (${entitiesCache.length} entities): ${timeInsertingEntities.toFixed(2)}ms
254Fact Processing (${factWrites.length} facts): ${timeProcessingFactWrites.toFixed(2)}ms
255Entity Deletions (${deleteEntitiesCache.length} entities): ${timeDeletingEntities.toFixed(2)}ms
256Fact Deletions: ${timeDeletingFacts.toFixed(2)}ms
257Cache Cleanup: ${timeCacheCleanup.toFixed(2)}ms
258 `);
259 };
260
261 return {
262 getContext,
263 flush,
264 };
265}