forked from
gazagnaire.org/irmin
Persistent store with Git semantics: lazy reads, delayed writes, content-addressing
1type 'hash t = {
2 read : 'hash -> string option;
3 write : 'hash -> string -> unit;
4 exists : 'hash -> bool;
5 get_ref : string -> 'hash option;
6 set_ref : string -> 'hash -> unit;
7 test_and_set_ref : string -> test:'hash option -> set:'hash option -> bool;
8 list_refs : unit -> string list;
9 write_batch : ('hash * string) list -> unit;
10 flush : unit -> unit;
11 close : unit -> unit;
12}
13
14type stats = { reads : int; writes : int; cache_hits : int; cache_misses : int }
15
16module Memory = struct
17 module String_map = Map.Make (String)
18
19 type 'hash state = {
20 mutable objects : string String_map.t;
21 mutable refs : 'hash String_map.t;
22 to_hex : 'hash -> string;
23 equal : 'hash -> 'hash -> bool;
24 }
25
26 let create_with_hash (type h) (to_hex : h -> string) (equal : h -> h -> bool)
27 : h t =
28 let state =
29 { objects = String_map.empty; refs = String_map.empty; to_hex; equal }
30 in
31 {
32 read =
33 (fun h ->
34 let key = state.to_hex h in
35 String_map.find_opt key state.objects);
36 write =
37 (fun h data ->
38 let key = state.to_hex h in
39 state.objects <- String_map.add key data state.objects);
40 exists =
41 (fun h ->
42 let key = state.to_hex h in
43 String_map.mem key state.objects);
44 get_ref = (fun name -> String_map.find_opt name state.refs);
45 set_ref =
46 (fun name hash -> state.refs <- String_map.add name hash state.refs);
47 test_and_set_ref =
48 (fun name ~test ~set ->
49 let current = String_map.find_opt name state.refs in
50 let matches =
51 match (test, current) with
52 | None, None -> true
53 | Some t, Some c -> state.equal t c
54 | _ -> false
55 in
56 if matches then (
57 (match set with
58 | None -> state.refs <- String_map.remove name state.refs
59 | Some h -> state.refs <- String_map.add name h state.refs);
60 true)
61 else false);
62 list_refs = (fun () -> String_map.bindings state.refs |> List.map fst);
63 write_batch =
64 (fun objects ->
65 List.iter
66 (fun (h, data) ->
67 let key = state.to_hex h in
68 state.objects <- String_map.add key data state.objects)
69 objects);
70 flush = (fun () -> ());
71 close = (fun () -> ());
72 }
73
74 let create_sha1 () = create_with_hash Hash.to_hex Hash.equal
75 let create_sha256 () = create_with_hash Hash.to_hex Hash.equal
76end
77
78let cached ?(capacity = 100_000) (type h) (backend : h t) : h t =
79 let cache : (h, string) Lru.t = Lru.create capacity in
80 {
81 backend with
82 read =
83 (fun h ->
84 match Lru.find cache h with
85 | Some v -> Some v
86 | None ->
87 let result = backend.read h in
88 Option.iter (fun v -> Lru.add cache h v) result;
89 result);
90 write =
91 (fun h data ->
92 backend.write h data;
93 Lru.add cache h data);
94 write_batch =
95 (fun objects ->
96 backend.write_batch objects;
97 List.iter (fun (h, data) -> Lru.add cache h data) objects);
98 }
99
100let readonly (backend : 'h t) : 'h t =
101 let fail () = invalid_arg "Backend is read-only" in
102 {
103 backend with
104 write = (fun _ _ -> fail ());
105 set_ref = (fun _ _ -> fail ());
106 test_and_set_ref = (fun _ ~test:_ ~set:_ -> fail ());
107 write_batch = (fun _ -> fail ());
108 }
109
110let layered ~(upper : 'h t) ~(lower : 'h t) : 'h t =
111 {
112 read =
113 (fun h ->
114 match upper.read h with Some v -> Some v | None -> lower.read h);
115 write = upper.write;
116 exists = (fun h -> upper.exists h || lower.exists h);
117 get_ref =
118 (fun name ->
119 match upper.get_ref name with
120 | Some v -> Some v
121 | None -> lower.get_ref name);
122 set_ref = upper.set_ref;
123 test_and_set_ref = upper.test_and_set_ref;
124 list_refs =
125 (fun () ->
126 let upper_refs = upper.list_refs () in
127 let lower_refs = lower.list_refs () in
128 List.sort_uniq String.compare (upper_refs @ lower_refs));
129 write_batch = upper.write_batch;
130 flush =
131 (fun () ->
132 upper.flush ();
133 lower.flush ());
134 close =
135 (fun () ->
136 upper.close ();
137 lower.close ());
138 }
139
140let thread_safe (backend : 'h t) : 'h t =
141 let m = Mutex.create () in
142 let with_lock f =
143 Mutex.lock m;
144 Fun.protect ~finally:(fun () -> Mutex.unlock m) f
145 in
146 {
147 read = (fun h -> with_lock (fun () -> backend.read h));
148 write = (fun h data -> with_lock (fun () -> backend.write h data));
149 exists = (fun h -> with_lock (fun () -> backend.exists h));
150 get_ref = (fun name -> with_lock (fun () -> backend.get_ref name));
151 set_ref = (fun name hash -> with_lock (fun () -> backend.set_ref name hash));
152 test_and_set_ref =
153 (fun name ~test ~set ->
154 with_lock (fun () -> backend.test_and_set_ref name ~test ~set));
155 list_refs = (fun () -> with_lock (fun () -> backend.list_refs ()));
156 write_batch =
157 (fun objects -> with_lock (fun () -> backend.write_batch objects));
158 flush = (fun () -> with_lock (fun () -> backend.flush ()));
159 close = (fun () -> with_lock (fun () -> backend.close ()));
160 }
161
162let stats _ = None
163
164(** Disk-based backend using append-only storage with WAL and bloom filter.
165
166 Storage layout:
167 - objects.wal: write-ahead log for crash recovery (uses ocaml-wal)
168 - objects.data: append-only file containing all objects
169 - objects.idx: index file mapping hex hash -> (offset, length)
170 - objects.bloom: serialized bloom filter for fast negative lookups
171 - refs/: directory with one file per ref containing hex hash
172
173 Write path: 1. Write to WAL (crash-safe with CRC) 2. Write to data file 3.
174 Update in-memory index and bloom filter 4. On flush: save index and bloom,
175 then clear WAL
176
177 Recovery: 1. Load index and bloom from disk 2. Replay any entries in WAL not
178 yet in index
179
180 Inspired by lavyek's append-only design and LevelDB's WAL pattern. *)
181module Disk = struct
182 module String_map = Map.Make (String)
183
184 type index_entry = { offset : int; length : int }
185
186 type 'hash state = {
187 root : Eio.Fs.dir_ty Eio.Path.t;
188 mutable wal : Wal.t option;
189 mutable data_file : Eio.File.rw_ty Eio.Resource.t option;
190 mutable data_offset : int;
191 mutable index : index_entry String_map.t;
192 bloom : string Bloom.t;
193 mutable refs : 'hash String_map.t;
194 to_hex : 'hash -> string;
195 equal : 'hash -> 'hash -> bool;
196 mutex : Eio.Mutex.t;
197 }
198
199 let data_path root = Eio.Path.(root / "objects.data")
200 let index_path root = Eio.Path.(root / "objects.idx")
201 let bloom_path root = Eio.Path.(root / "objects.bloom")
202 let wal_path root = Eio.Path.(root / "objects.wal")
203 let refs_path root = Eio.Path.(root / "refs")
204
205 (* Expected number of objects for bloom filter sizing *)
206 let bloom_expected_size = 100_000
207
208 (* Index file format: one line per entry "hex_hash offset length\n" *)
209 let load_index root =
210 let path = index_path root in
211 if Eio.Path.is_file path then
212 Eio.Path.load path |> String.split_on_char '\n'
213 |> List.fold_left
214 (fun idx line ->
215 if String.length line = 0 then idx
216 else
217 match String.split_on_char ' ' line with
218 | [ hex; off_s; len_s ] ->
219 let offset = int_of_string off_s in
220 let length = int_of_string len_s in
221 String_map.add hex { offset; length } idx
222 | _ -> idx)
223 String_map.empty
224 else String_map.empty
225
226 let save_index root index =
227 let path = index_path root in
228 let tmp_path = Eio.Path.(root / "objects.idx.tmp") in
229 let content =
230 String_map.fold
231 (fun hex entry acc ->
232 Fmt.str "%s %d %d\n" hex entry.offset entry.length :: acc)
233 index []
234 |> String.concat ""
235 in
236 Eio.Path.save ~create:(`Or_truncate 0o644) tmp_path content;
237 Eio.Path.rename tmp_path path
238
239 let load_bloom root =
240 let path = bloom_path root in
241 if Eio.Path.is_file path then
242 match Bloom.of_bytes (Bytes.of_string (Eio.Path.load path)) with
243 | Ok bloom -> bloom
244 | Error _ -> Bloom.v bloom_expected_size
245 else Bloom.v bloom_expected_size
246
247 let save_bloom root bloom =
248 let path = bloom_path root in
249 let tmp_path = Eio.Path.(root / "objects.bloom.tmp") in
250 Eio.Path.save ~create:(`Or_truncate 0o644) tmp_path
251 (Bytes.to_string (Bloom.to_bytes bloom));
252 Eio.Path.rename tmp_path path
253
254 let load_ref of_hex acc full_name entry_path =
255 let hex = String.trim (Eio.Path.load entry_path) in
256 match of_hex hex with
257 | Ok hash -> String_map.add full_name hash acc
258 | Error _ -> acc
259
260 let load_refs root of_hex =
261 let refs_root = refs_path root in
262 if not (Eio.Path.is_directory refs_root) then String_map.empty
263 else
264 let rec scan_dir prefix path acc =
265 let entries = Eio.Path.read_dir path in
266 List.fold_left
267 (fun acc name ->
268 let entry_path = Eio.Path.(path / name) in
269 let full_name = if prefix = "" then name else prefix ^ "/" ^ name in
270 if Eio.Path.is_file entry_path then
271 load_ref of_hex acc full_name entry_path
272 else if Eio.Path.is_directory entry_path then
273 scan_dir full_name entry_path acc
274 else acc)
275 acc entries
276 in
277 scan_dir "" refs_root String_map.empty
278
279 let save_ref root name hash to_hex =
280 let path = refs_path root in
281 if not (Eio.Path.is_directory path) then Eio.Path.mkdir ~perm:0o755 path;
282 let ref_path = Eio.Path.(path / name) in
283 (* Handle nested paths like refs/heads/main *)
284 let dir = Filename.dirname name in
285 if dir <> "." && dir <> "" then begin
286 let dir_path = Eio.Path.(path / dir) in
287 if not (Eio.Path.is_directory dir_path) then
288 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 dir_path
289 end;
290 Eio.Path.save ~create:(`Or_truncate 0o644) ref_path (to_hex hash ^ "\n")
291
292 let delete_ref root name =
293 let ref_path = Eio.Path.(refs_path root / name) in
294 if Eio.Path.is_file ref_path then Eio.Path.unlink ref_path
295
296 let open_data_file ~sw root =
297 let path = data_path root in
298 let file =
299 Eio.Path.open_out ~sw ~append:true ~create:(`If_missing 0o644) path
300 in
301 let offset =
302 if Eio.Path.is_file path then
303 let stat = Eio.Path.stat ~follow:true path in
304 Optint.Int63.to_int stat.size
305 else 0
306 in
307 (file, offset)
308
309 (* WAL record format: "hex_hash\x00data" *)
310 let encode_wal_record hex data = hex ^ "\x00" ^ data
311
312 let decode_wal_record record =
313 match String.index_opt record '\x00' with
314 | None -> None
315 | Some i ->
316 let hex = String.sub record 0 i in
317 let data = String.sub record (i + 1) (String.length record - i - 1) in
318 Some (hex, data)
319
320 (* Replay WAL entries that aren't in the index yet *)
321 let replay_wal root index bloom data_file data_offset =
322 let wal_p = wal_path root in
323 if not (Eio.Path.is_file wal_p) then (index, bloom, data_offset)
324 else
325 let records = Wal.read_all wal_p in
326 List.fold_left
327 (fun (idx, blm, offset) record ->
328 match decode_wal_record record with
329 | None -> (idx, blm, offset)
330 | Some (hex, data) ->
331 if String_map.mem hex idx then (idx, blm, offset)
332 else begin
333 (* Write to data file *)
334 let len = String.length data in
335 Eio.File.pwrite_all data_file
336 ~file_offset:(Optint.Int63.of_int offset)
337 [ Cstruct.of_string data ];
338 let idx' = String_map.add hex { offset; length = len } idx in
339 Bloom.add blm hex;
340 (idx', blm, offset + len)
341 end)
342 (index, bloom, data_offset)
343 records
344
345 let create_with_hash (type h) ~sw (root : Eio.Fs.dir_ty Eio.Path.t)
346 (to_hex : h -> string) (of_hex : string -> (h, [ `Msg of string ]) result)
347 (equal : h -> h -> bool) : h t =
348 (* Create root directory if needed *)
349 if not (Eio.Path.is_directory root) then
350 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 root;
351 let index = load_index root in
352 let bloom = load_bloom root in
353 (* Populate bloom from index if empty (first load after upgrade) *)
354 if Bloom.size_estimate bloom = 0 then
355 String_map.iter (fun hex _ -> Bloom.add bloom hex) index;
356 let refs = load_refs root of_hex in
357 let file, offset = open_data_file ~sw root in
358 let data_file = (file :> Eio.File.rw_ty Eio.Resource.t) in
359 (* Replay any uncommitted WAL entries *)
360 let index, bloom, offset = replay_wal root index bloom data_file offset in
361 (* Open WAL for new writes *)
362 let wal = Wal.create ~sw (wal_path root) in
363 let state =
364 {
365 root;
366 wal = Some wal;
367 data_file = Some data_file;
368 data_offset = offset;
369 index;
370 bloom;
371 refs;
372 to_hex;
373 equal;
374 mutex = Eio.Mutex.create ();
375 }
376 in
377 {
378 read =
379 (fun h ->
380 let key = state.to_hex h in
381 match String_map.find_opt key state.index with
382 | None -> None
383 | Some entry -> (
384 match state.data_file with
385 | None -> None
386 | Some file ->
387 let buf = Cstruct.create entry.length in
388 Eio.File.pread_exact file
389 ~file_offset:(Optint.Int63.of_int entry.offset)
390 [ buf ];
391 Some (Cstruct.to_string buf)));
392 write =
393 (fun h data ->
394 Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
395 let key = state.to_hex h in
396 (* Fast path: bloom filter says "definitely not present" *)
397 if Bloom.mem state.bloom key && String_map.mem key state.index
398 then ()
399 else
400 match (state.wal, state.data_file) with
401 | Some wal, Some file ->
402 (* Write to WAL first for crash safety *)
403 Wal.append wal (encode_wal_record key data);
404 Wal.sync wal;
405 (* Then write to data file *)
406 let len = String.length data in
407 let offset = state.data_offset in
408 Eio.File.pwrite_all file
409 ~file_offset:(Optint.Int63.of_int offset)
410 [ Cstruct.of_string data ];
411 state.data_offset <- offset + len;
412 state.index <-
413 String_map.add key { offset; length = len } state.index;
414 Bloom.add state.bloom key
415 | _ -> ()));
416 exists =
417 (fun h ->
418 let key = state.to_hex h in
419 (* Fast path: bloom filter for negative lookups *)
420 Bloom.mem state.bloom key && String_map.mem key state.index);
421 get_ref = (fun name -> String_map.find_opt name state.refs);
422 set_ref =
423 (fun name hash ->
424 Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
425 state.refs <- String_map.add name hash state.refs;
426 save_ref state.root name hash state.to_hex));
427 test_and_set_ref =
428 (fun name ~test ~set ->
429 Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
430 let current = String_map.find_opt name state.refs in
431 let matches =
432 match (test, current) with
433 | None, None -> true
434 | Some t, Some c -> state.equal t c
435 | _ -> false
436 in
437 if matches then begin
438 (match set with
439 | None ->
440 state.refs <- String_map.remove name state.refs;
441 delete_ref state.root name
442 | Some h ->
443 state.refs <- String_map.add name h state.refs;
444 save_ref state.root name h state.to_hex);
445 true
446 end
447 else false));
448 list_refs = (fun () -> String_map.bindings state.refs |> List.map fst);
449 write_batch =
450 (fun objects ->
451 Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
452 match (state.wal, state.data_file) with
453 | Some wal, Some file ->
454 (* Write all to WAL first *)
455 List.iter
456 (fun (h, data) ->
457 let key = state.to_hex h in
458 if not (String_map.mem key state.index) then
459 Wal.append wal (encode_wal_record key data))
460 objects;
461 Wal.sync wal;
462 (* Then write to data file *)
463 List.iter
464 (fun (h, data) ->
465 let key = state.to_hex h in
466 if String_map.mem key state.index then ()
467 else begin
468 let len = String.length data in
469 let offset = state.data_offset in
470 Eio.File.pwrite_all file
471 ~file_offset:(Optint.Int63.of_int offset)
472 [ Cstruct.of_string data ];
473 state.data_offset <- offset + len;
474 state.index <-
475 String_map.add key { offset; length = len }
476 state.index;
477 Bloom.add state.bloom key
478 end)
479 objects
480 | _ -> ()));
481 flush =
482 (fun () ->
483 Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
484 (match state.data_file with
485 | Some file -> Eio.File.sync file
486 | None -> ());
487 save_index state.root state.index;
488 save_bloom state.root state.bloom;
489 (* Clear WAL after persisting index - entries are now recoverable
490 from index + data file *)
491 let wal_p = wal_path state.root in
492 if Eio.Path.is_file wal_p then Eio.Path.unlink wal_p));
493 close =
494 (fun () ->
495 Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
496 (match state.wal with Some wal -> Wal.close wal | None -> ());
497 (match state.data_file with
498 | Some file ->
499 Eio.File.sync file;
500 Eio.Resource.close file
501 | None -> ());
502 save_index state.root state.index;
503 save_bloom state.root state.bloom;
504 (* Clear WAL *)
505 let wal_p = wal_path state.root in
506 if Eio.Path.is_file wal_p then Eio.Path.unlink wal_p;
507 state.wal <- None;
508 state.data_file <- None));
509 }
510
511 let create_sha1 ~sw root =
512 create_with_hash ~sw root Hash.to_hex Hash.sha1_of_hex Hash.equal
513
514 let create_sha256 ~sw root =
515 create_with_hash ~sw root Hash.to_hex Hash.sha256_of_hex Hash.equal
516end