Persistent store with Git semantics: lazy reads, delayed writes, content-addressing
at perf 516 lines 19 kB view raw
1type 'hash t = { 2 read : 'hash -> string option; 3 write : 'hash -> string -> unit; 4 exists : 'hash -> bool; 5 get_ref : string -> 'hash option; 6 set_ref : string -> 'hash -> unit; 7 test_and_set_ref : string -> test:'hash option -> set:'hash option -> bool; 8 list_refs : unit -> string list; 9 write_batch : ('hash * string) list -> unit; 10 flush : unit -> unit; 11 close : unit -> unit; 12} 13 14type stats = { reads : int; writes : int; cache_hits : int; cache_misses : int } 15 16module Memory = struct 17 module String_map = Map.Make (String) 18 19 type 'hash state = { 20 mutable objects : string String_map.t; 21 mutable refs : 'hash String_map.t; 22 to_hex : 'hash -> string; 23 equal : 'hash -> 'hash -> bool; 24 } 25 26 let create_with_hash (type h) (to_hex : h -> string) (equal : h -> h -> bool) 27 : h t = 28 let state = 29 { objects = String_map.empty; refs = String_map.empty; to_hex; equal } 30 in 31 { 32 read = 33 (fun h -> 34 let key = state.to_hex h in 35 String_map.find_opt key state.objects); 36 write = 37 (fun h data -> 38 let key = state.to_hex h in 39 state.objects <- String_map.add key data state.objects); 40 exists = 41 (fun h -> 42 let key = state.to_hex h in 43 String_map.mem key state.objects); 44 get_ref = (fun name -> String_map.find_opt name state.refs); 45 set_ref = 46 (fun name hash -> state.refs <- String_map.add name hash state.refs); 47 test_and_set_ref = 48 (fun name ~test ~set -> 49 let current = String_map.find_opt name state.refs in 50 let matches = 51 match (test, current) with 52 | None, None -> true 53 | Some t, Some c -> state.equal t c 54 | _ -> false 55 in 56 if matches then ( 57 (match set with 58 | None -> state.refs <- String_map.remove name state.refs 59 | Some h -> state.refs <- String_map.add name h state.refs); 60 true) 61 else false); 62 list_refs = (fun () -> String_map.bindings state.refs |> List.map fst); 63 write_batch = 64 (fun objects -> 65 List.iter 66 (fun (h, data) -> 67 let key = state.to_hex h in 68 state.objects <- String_map.add key data state.objects) 69 objects); 70 flush = (fun () -> ()); 71 close = (fun () -> ()); 72 } 73 74 let create_sha1 () = create_with_hash Hash.to_hex Hash.equal 75 let create_sha256 () = create_with_hash Hash.to_hex Hash.equal 76end 77 78let cached ?(capacity = 100_000) (type h) (backend : h t) : h t = 79 let cache : (h, string) Lru.t = Lru.create capacity in 80 { 81 backend with 82 read = 83 (fun h -> 84 match Lru.find cache h with 85 | Some v -> Some v 86 | None -> 87 let result = backend.read h in 88 Option.iter (fun v -> Lru.add cache h v) result; 89 result); 90 write = 91 (fun h data -> 92 backend.write h data; 93 Lru.add cache h data); 94 write_batch = 95 (fun objects -> 96 backend.write_batch objects; 97 List.iter (fun (h, data) -> Lru.add cache h data) objects); 98 } 99 100let readonly (backend : 'h t) : 'h t = 101 let fail () = invalid_arg "Backend is read-only" in 102 { 103 backend with 104 write = (fun _ _ -> fail ()); 105 set_ref = (fun _ _ -> fail ()); 106 test_and_set_ref = (fun _ ~test:_ ~set:_ -> fail ()); 107 write_batch = (fun _ -> fail ()); 108 } 109 110let layered ~(upper : 'h t) ~(lower : 'h t) : 'h t = 111 { 112 read = 113 (fun h -> 114 match upper.read h with Some v -> Some v | None -> lower.read h); 115 write = upper.write; 116 exists = (fun h -> upper.exists h || lower.exists h); 117 get_ref = 118 (fun name -> 119 match upper.get_ref name with 120 | Some v -> Some v 121 | None -> lower.get_ref name); 122 set_ref = upper.set_ref; 123 test_and_set_ref = upper.test_and_set_ref; 124 list_refs = 125 (fun () -> 126 let upper_refs = upper.list_refs () in 127 let lower_refs = lower.list_refs () in 128 List.sort_uniq String.compare (upper_refs @ lower_refs)); 129 write_batch = upper.write_batch; 130 flush = 131 (fun () -> 132 upper.flush (); 133 lower.flush ()); 134 close = 135 (fun () -> 136 upper.close (); 137 lower.close ()); 138 } 139 140let thread_safe (backend : 'h t) : 'h t = 141 let m = Mutex.create () in 142 let with_lock f = 143 Mutex.lock m; 144 Fun.protect ~finally:(fun () -> Mutex.unlock m) f 145 in 146 { 147 read = (fun h -> with_lock (fun () -> backend.read h)); 148 write = (fun h data -> with_lock (fun () -> backend.write h data)); 149 exists = (fun h -> with_lock (fun () -> backend.exists h)); 150 get_ref = (fun name -> with_lock (fun () -> backend.get_ref name)); 151 set_ref = (fun name hash -> with_lock (fun () -> backend.set_ref name hash)); 152 test_and_set_ref = 153 (fun name ~test ~set -> 154 with_lock (fun () -> backend.test_and_set_ref name ~test ~set)); 155 list_refs = (fun () -> with_lock (fun () -> backend.list_refs ())); 156 write_batch = 157 (fun objects -> with_lock (fun () -> backend.write_batch objects)); 158 flush = (fun () -> with_lock (fun () -> backend.flush ())); 159 close = (fun () -> with_lock (fun () -> backend.close ())); 160 } 161 162let stats _ = None 163 164(** Disk-based backend using append-only storage with WAL and bloom filter. 165 166 Storage layout: 167 - objects.wal: write-ahead log for crash recovery (uses ocaml-wal) 168 - objects.data: append-only file containing all objects 169 - objects.idx: index file mapping hex hash -> (offset, length) 170 - objects.bloom: serialized bloom filter for fast negative lookups 171 - refs/: directory with one file per ref containing hex hash 172 173 Write path: 1. Write to WAL (crash-safe with CRC) 2. Write to data file 3. 174 Update in-memory index and bloom filter 4. On flush: save index and bloom, 175 then clear WAL 176 177 Recovery: 1. Load index and bloom from disk 2. Replay any entries in WAL not 178 yet in index 179 180 Inspired by lavyek's append-only design and LevelDB's WAL pattern. *) 181module Disk = struct 182 module String_map = Map.Make (String) 183 184 type index_entry = { offset : int; length : int } 185 186 type 'hash state = { 187 root : Eio.Fs.dir_ty Eio.Path.t; 188 mutable wal : Wal.t option; 189 mutable data_file : Eio.File.rw_ty Eio.Resource.t option; 190 mutable data_offset : int; 191 mutable index : index_entry String_map.t; 192 bloom : string Bloom.t; 193 mutable refs : 'hash String_map.t; 194 to_hex : 'hash -> string; 195 equal : 'hash -> 'hash -> bool; 196 mutex : Eio.Mutex.t; 197 } 198 199 let data_path root = Eio.Path.(root / "objects.data") 200 let index_path root = Eio.Path.(root / "objects.idx") 201 let bloom_path root = Eio.Path.(root / "objects.bloom") 202 let wal_path root = Eio.Path.(root / "objects.wal") 203 let refs_path root = Eio.Path.(root / "refs") 204 205 (* Expected number of objects for bloom filter sizing *) 206 let bloom_expected_size = 100_000 207 208 (* Index file format: one line per entry "hex_hash offset length\n" *) 209 let load_index root = 210 let path = index_path root in 211 if Eio.Path.is_file path then 212 Eio.Path.load path |> String.split_on_char '\n' 213 |> List.fold_left 214 (fun idx line -> 215 if String.length line = 0 then idx 216 else 217 match String.split_on_char ' ' line with 218 | [ hex; off_s; len_s ] -> 219 let offset = int_of_string off_s in 220 let length = int_of_string len_s in 221 String_map.add hex { offset; length } idx 222 | _ -> idx) 223 String_map.empty 224 else String_map.empty 225 226 let save_index root index = 227 let path = index_path root in 228 let tmp_path = Eio.Path.(root / "objects.idx.tmp") in 229 let content = 230 String_map.fold 231 (fun hex entry acc -> 232 Fmt.str "%s %d %d\n" hex entry.offset entry.length :: acc) 233 index [] 234 |> String.concat "" 235 in 236 Eio.Path.save ~create:(`Or_truncate 0o644) tmp_path content; 237 Eio.Path.rename tmp_path path 238 239 let load_bloom root = 240 let path = bloom_path root in 241 if Eio.Path.is_file path then 242 match Bloom.of_bytes (Bytes.of_string (Eio.Path.load path)) with 243 | Ok bloom -> bloom 244 | Error _ -> Bloom.v bloom_expected_size 245 else Bloom.v bloom_expected_size 246 247 let save_bloom root bloom = 248 let path = bloom_path root in 249 let tmp_path = Eio.Path.(root / "objects.bloom.tmp") in 250 Eio.Path.save ~create:(`Or_truncate 0o644) tmp_path 251 (Bytes.to_string (Bloom.to_bytes bloom)); 252 Eio.Path.rename tmp_path path 253 254 let load_ref of_hex acc full_name entry_path = 255 let hex = String.trim (Eio.Path.load entry_path) in 256 match of_hex hex with 257 | Ok hash -> String_map.add full_name hash acc 258 | Error _ -> acc 259 260 let load_refs root of_hex = 261 let refs_root = refs_path root in 262 if not (Eio.Path.is_directory refs_root) then String_map.empty 263 else 264 let rec scan_dir prefix path acc = 265 let entries = Eio.Path.read_dir path in 266 List.fold_left 267 (fun acc name -> 268 let entry_path = Eio.Path.(path / name) in 269 let full_name = if prefix = "" then name else prefix ^ "/" ^ name in 270 if Eio.Path.is_file entry_path then 271 load_ref of_hex acc full_name entry_path 272 else if Eio.Path.is_directory entry_path then 273 scan_dir full_name entry_path acc 274 else acc) 275 acc entries 276 in 277 scan_dir "" refs_root String_map.empty 278 279 let save_ref root name hash to_hex = 280 let path = refs_path root in 281 if not (Eio.Path.is_directory path) then Eio.Path.mkdir ~perm:0o755 path; 282 let ref_path = Eio.Path.(path / name) in 283 (* Handle nested paths like refs/heads/main *) 284 let dir = Filename.dirname name in 285 if dir <> "." && dir <> "" then begin 286 let dir_path = Eio.Path.(path / dir) in 287 if not (Eio.Path.is_directory dir_path) then 288 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 dir_path 289 end; 290 Eio.Path.save ~create:(`Or_truncate 0o644) ref_path (to_hex hash ^ "\n") 291 292 let delete_ref root name = 293 let ref_path = Eio.Path.(refs_path root / name) in 294 if Eio.Path.is_file ref_path then Eio.Path.unlink ref_path 295 296 let open_data_file ~sw root = 297 let path = data_path root in 298 let file = 299 Eio.Path.open_out ~sw ~append:true ~create:(`If_missing 0o644) path 300 in 301 let offset = 302 if Eio.Path.is_file path then 303 let stat = Eio.Path.stat ~follow:true path in 304 Optint.Int63.to_int stat.size 305 else 0 306 in 307 (file, offset) 308 309 (* WAL record format: "hex_hash\x00data" *) 310 let encode_wal_record hex data = hex ^ "\x00" ^ data 311 312 let decode_wal_record record = 313 match String.index_opt record '\x00' with 314 | None -> None 315 | Some i -> 316 let hex = String.sub record 0 i in 317 let data = String.sub record (i + 1) (String.length record - i - 1) in 318 Some (hex, data) 319 320 (* Replay WAL entries that aren't in the index yet *) 321 let replay_wal root index bloom data_file data_offset = 322 let wal_p = wal_path root in 323 if not (Eio.Path.is_file wal_p) then (index, bloom, data_offset) 324 else 325 let records = Wal.read_all wal_p in 326 List.fold_left 327 (fun (idx, blm, offset) record -> 328 match decode_wal_record record with 329 | None -> (idx, blm, offset) 330 | Some (hex, data) -> 331 if String_map.mem hex idx then (idx, blm, offset) 332 else begin 333 (* Write to data file *) 334 let len = String.length data in 335 Eio.File.pwrite_all data_file 336 ~file_offset:(Optint.Int63.of_int offset) 337 [ Cstruct.of_string data ]; 338 let idx' = String_map.add hex { offset; length = len } idx in 339 Bloom.add blm hex; 340 (idx', blm, offset + len) 341 end) 342 (index, bloom, data_offset) 343 records 344 345 let create_with_hash (type h) ~sw (root : Eio.Fs.dir_ty Eio.Path.t) 346 (to_hex : h -> string) (of_hex : string -> (h, [ `Msg of string ]) result) 347 (equal : h -> h -> bool) : h t = 348 (* Create root directory if needed *) 349 if not (Eio.Path.is_directory root) then 350 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 root; 351 let index = load_index root in 352 let bloom = load_bloom root in 353 (* Populate bloom from index if empty (first load after upgrade) *) 354 if Bloom.size_estimate bloom = 0 then 355 String_map.iter (fun hex _ -> Bloom.add bloom hex) index; 356 let refs = load_refs root of_hex in 357 let file, offset = open_data_file ~sw root in 358 let data_file = (file :> Eio.File.rw_ty Eio.Resource.t) in 359 (* Replay any uncommitted WAL entries *) 360 let index, bloom, offset = replay_wal root index bloom data_file offset in 361 (* Open WAL for new writes *) 362 let wal = Wal.create ~sw (wal_path root) in 363 let state = 364 { 365 root; 366 wal = Some wal; 367 data_file = Some data_file; 368 data_offset = offset; 369 index; 370 bloom; 371 refs; 372 to_hex; 373 equal; 374 mutex = Eio.Mutex.create (); 375 } 376 in 377 { 378 read = 379 (fun h -> 380 let key = state.to_hex h in 381 match String_map.find_opt key state.index with 382 | None -> None 383 | Some entry -> ( 384 match state.data_file with 385 | None -> None 386 | Some file -> 387 let buf = Cstruct.create entry.length in 388 Eio.File.pread_exact file 389 ~file_offset:(Optint.Int63.of_int entry.offset) 390 [ buf ]; 391 Some (Cstruct.to_string buf))); 392 write = 393 (fun h data -> 394 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 395 let key = state.to_hex h in 396 (* Fast path: bloom filter says "definitely not present" *) 397 if Bloom.mem state.bloom key && String_map.mem key state.index 398 then () 399 else 400 match (state.wal, state.data_file) with 401 | Some wal, Some file -> 402 (* Write to WAL first for crash safety *) 403 Wal.append wal (encode_wal_record key data); 404 Wal.sync wal; 405 (* Then write to data file *) 406 let len = String.length data in 407 let offset = state.data_offset in 408 Eio.File.pwrite_all file 409 ~file_offset:(Optint.Int63.of_int offset) 410 [ Cstruct.of_string data ]; 411 state.data_offset <- offset + len; 412 state.index <- 413 String_map.add key { offset; length = len } state.index; 414 Bloom.add state.bloom key 415 | _ -> ())); 416 exists = 417 (fun h -> 418 let key = state.to_hex h in 419 (* Fast path: bloom filter for negative lookups *) 420 Bloom.mem state.bloom key && String_map.mem key state.index); 421 get_ref = (fun name -> String_map.find_opt name state.refs); 422 set_ref = 423 (fun name hash -> 424 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 425 state.refs <- String_map.add name hash state.refs; 426 save_ref state.root name hash state.to_hex)); 427 test_and_set_ref = 428 (fun name ~test ~set -> 429 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 430 let current = String_map.find_opt name state.refs in 431 let matches = 432 match (test, current) with 433 | None, None -> true 434 | Some t, Some c -> state.equal t c 435 | _ -> false 436 in 437 if matches then begin 438 (match set with 439 | None -> 440 state.refs <- String_map.remove name state.refs; 441 delete_ref state.root name 442 | Some h -> 443 state.refs <- String_map.add name h state.refs; 444 save_ref state.root name h state.to_hex); 445 true 446 end 447 else false)); 448 list_refs = (fun () -> String_map.bindings state.refs |> List.map fst); 449 write_batch = 450 (fun objects -> 451 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 452 match (state.wal, state.data_file) with 453 | Some wal, Some file -> 454 (* Write all to WAL first *) 455 List.iter 456 (fun (h, data) -> 457 let key = state.to_hex h in 458 if not (String_map.mem key state.index) then 459 Wal.append wal (encode_wal_record key data)) 460 objects; 461 Wal.sync wal; 462 (* Then write to data file *) 463 List.iter 464 (fun (h, data) -> 465 let key = state.to_hex h in 466 if String_map.mem key state.index then () 467 else begin 468 let len = String.length data in 469 let offset = state.data_offset in 470 Eio.File.pwrite_all file 471 ~file_offset:(Optint.Int63.of_int offset) 472 [ Cstruct.of_string data ]; 473 state.data_offset <- offset + len; 474 state.index <- 475 String_map.add key { offset; length = len } 476 state.index; 477 Bloom.add state.bloom key 478 end) 479 objects 480 | _ -> ())); 481 flush = 482 (fun () -> 483 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 484 (match state.data_file with 485 | Some file -> Eio.File.sync file 486 | None -> ()); 487 save_index state.root state.index; 488 save_bloom state.root state.bloom; 489 (* Clear WAL after persisting index - entries are now recoverable 490 from index + data file *) 491 let wal_p = wal_path state.root in 492 if Eio.Path.is_file wal_p then Eio.Path.unlink wal_p)); 493 close = 494 (fun () -> 495 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 496 (match state.wal with Some wal -> Wal.close wal | None -> ()); 497 (match state.data_file with 498 | Some file -> 499 Eio.File.sync file; 500 Eio.Resource.close file 501 | None -> ()); 502 save_index state.root state.index; 503 save_bloom state.root state.bloom; 504 (* Clear WAL *) 505 let wal_p = wal_path state.root in 506 if Eio.Path.is_file wal_p then Eio.Path.unlink wal_p; 507 state.wal <- None; 508 state.data_file <- None)); 509 } 510 511 let create_sha1 ~sw root = 512 create_with_hash ~sw root Hash.to_hex Hash.sha1_of_hex Hash.equal 513 514 let create_sha256 ~sw root = 515 create_with_hash ~sw root Hash.to_hex Hash.sha256_of_hex Hash.equal 516end