Persistent store with Git semantics: lazy reads, delayed writes, content-addressing
at inode 490 lines 18 kB view raw
1type 'hash t = { 2 read : 'hash -> string option; 3 write : 'hash -> string -> unit; 4 exists : 'hash -> bool; 5 get_ref : string -> 'hash option; 6 set_ref : string -> 'hash -> unit; 7 test_and_set_ref : string -> test:'hash option -> set:'hash option -> bool; 8 list_refs : unit -> string list; 9 write_batch : ('hash * string) list -> unit; 10 flush : unit -> unit; 11 close : unit -> unit; 12} 13 14type stats = { reads : int; writes : int; cache_hits : int; cache_misses : int } 15 16module Memory = struct 17 module StringMap = Map.Make (String) 18 19 type 'hash state = { 20 mutable objects : string StringMap.t; 21 mutable refs : 'hash StringMap.t; 22 to_hex : 'hash -> string; 23 equal : 'hash -> 'hash -> bool; 24 } 25 26 let create_with_hash (type h) (to_hex : h -> string) (equal : h -> h -> bool) 27 : h t = 28 let state = 29 { objects = StringMap.empty; refs = StringMap.empty; to_hex; equal } 30 in 31 { 32 read = 33 (fun h -> 34 let key = state.to_hex h in 35 StringMap.find_opt key state.objects); 36 write = 37 (fun h data -> 38 let key = state.to_hex h in 39 state.objects <- StringMap.add key data state.objects); 40 exists = 41 (fun h -> 42 let key = state.to_hex h in 43 StringMap.mem key state.objects); 44 get_ref = (fun name -> StringMap.find_opt name state.refs); 45 set_ref = 46 (fun name hash -> state.refs <- StringMap.add name hash state.refs); 47 test_and_set_ref = 48 (fun name ~test ~set -> 49 let current = StringMap.find_opt name state.refs in 50 let matches = 51 match (test, current) with 52 | None, None -> true 53 | Some t, Some c -> state.equal t c 54 | _ -> false 55 in 56 if matches then ( 57 (match set with 58 | None -> state.refs <- StringMap.remove name state.refs 59 | Some h -> state.refs <- StringMap.add name h state.refs); 60 true) 61 else false); 62 list_refs = (fun () -> StringMap.bindings state.refs |> List.map fst); 63 write_batch = 64 (fun objects -> 65 List.iter 66 (fun (h, data) -> 67 let key = state.to_hex h in 68 state.objects <- StringMap.add key data state.objects) 69 objects); 70 flush = (fun () -> ()); 71 close = (fun () -> ()); 72 } 73 74 let create_sha1 () = create_with_hash Hash.to_hex Hash.equal 75 let create_sha256 () = create_with_hash Hash.to_hex Hash.equal 76end 77 78let cached ?(capacity = 100_000) (type h) (backend : h t) : h t = 79 let cache : (h, string) Lru.t = Lru.create capacity in 80 { 81 backend with 82 read = 83 (fun h -> 84 match Lru.find cache h with 85 | Some v -> Some v 86 | None -> 87 let result = backend.read h in 88 Option.iter (fun v -> Lru.add cache h v) result; 89 result); 90 write = 91 (fun h data -> 92 backend.write h data; 93 Lru.add cache h data); 94 write_batch = 95 (fun objects -> 96 backend.write_batch objects; 97 List.iter (fun (h, data) -> Lru.add cache h data) objects); 98 } 99 100let readonly (backend : 'h t) : 'h t = 101 let fail () = invalid_arg "Backend is read-only" in 102 { 103 backend with 104 write = (fun _ _ -> fail ()); 105 set_ref = (fun _ _ -> fail ()); 106 test_and_set_ref = (fun _ ~test:_ ~set:_ -> fail ()); 107 write_batch = (fun _ -> fail ()); 108 } 109 110let layered ~(upper : 'h t) ~(lower : 'h t) : 'h t = 111 { 112 read = 113 (fun h -> 114 match upper.read h with Some v -> Some v | None -> lower.read h); 115 write = upper.write; 116 exists = (fun h -> upper.exists h || lower.exists h); 117 get_ref = 118 (fun name -> 119 match upper.get_ref name with 120 | Some v -> Some v 121 | None -> lower.get_ref name); 122 set_ref = upper.set_ref; 123 test_and_set_ref = upper.test_and_set_ref; 124 list_refs = 125 (fun () -> 126 let upper_refs = upper.list_refs () in 127 let lower_refs = lower.list_refs () in 128 List.sort_uniq String.compare (upper_refs @ lower_refs)); 129 write_batch = upper.write_batch; 130 flush = 131 (fun () -> 132 upper.flush (); 133 lower.flush ()); 134 close = 135 (fun () -> 136 upper.close (); 137 lower.close ()); 138 } 139 140let stats _ = None 141 142(** Disk-based backend using append-only storage with WAL and bloom filter. 143 144 Storage layout: 145 - objects.wal: write-ahead log for crash recovery (uses ocaml-wal) 146 - objects.data: append-only file containing all objects 147 - objects.idx: index file mapping hex hash -> (offset, length) 148 - objects.bloom: serialized bloom filter for fast negative lookups 149 - refs/: directory with one file per ref containing hex hash 150 151 Write path: 1. Write to WAL (crash-safe with CRC) 2. Write to data file 3. 152 Update in-memory index and bloom filter 4. On flush: save index and bloom, 153 then clear WAL 154 155 Recovery: 1. Load index and bloom from disk 2. Replay any entries in WAL not 156 yet in index 157 158 Inspired by lavyek's append-only design and LevelDB's WAL pattern. *) 159module Disk = struct 160 module StringMap = Map.Make (String) 161 162 type index_entry = { offset : int; length : int } 163 164 type 'hash state = { 165 root : Eio.Fs.dir_ty Eio.Path.t; 166 mutable wal : Wal.t option; 167 mutable data_file : Eio.File.rw_ty Eio.Resource.t option; 168 mutable data_offset : int; 169 mutable index : index_entry StringMap.t; 170 bloom : string Bloom.t; 171 mutable refs : 'hash StringMap.t; 172 to_hex : 'hash -> string; 173 equal : 'hash -> 'hash -> bool; 174 mutex : Eio.Mutex.t; 175 } 176 177 let data_path root = Eio.Path.(root / "objects.data") 178 let index_path root = Eio.Path.(root / "objects.idx") 179 let bloom_path root = Eio.Path.(root / "objects.bloom") 180 let wal_path root = Eio.Path.(root / "objects.wal") 181 let refs_path root = Eio.Path.(root / "refs") 182 183 (* Expected number of objects for bloom filter sizing *) 184 let bloom_expected_size = 100_000 185 186 (* Index file format: one line per entry "hex_hash offset length\n" *) 187 let load_index root = 188 let path = index_path root in 189 if Eio.Path.is_file path then 190 Eio.Path.load path |> String.split_on_char '\n' 191 |> List.fold_left 192 (fun idx line -> 193 if String.length line = 0 then idx 194 else 195 match String.split_on_char ' ' line with 196 | [ hex; off_s; len_s ] -> 197 let offset = int_of_string off_s in 198 let length = int_of_string len_s in 199 StringMap.add hex { offset; length } idx 200 | _ -> idx) 201 StringMap.empty 202 else StringMap.empty 203 204 let save_index root index = 205 let path = index_path root in 206 let tmp_path = Eio.Path.(root / "objects.idx.tmp") in 207 let content = 208 StringMap.fold 209 (fun hex entry acc -> 210 Printf.sprintf "%s %d %d\n" hex entry.offset entry.length :: acc) 211 index [] 212 |> String.concat "" 213 in 214 Eio.Path.save ~create:(`Or_truncate 0o644) tmp_path content; 215 Eio.Path.rename tmp_path path 216 217 let load_bloom root = 218 let path = bloom_path root in 219 if Eio.Path.is_file path then 220 match Bloom.of_bytes (Bytes.of_string (Eio.Path.load path)) with 221 | Ok bloom -> bloom 222 | Error _ -> Bloom.create bloom_expected_size 223 else Bloom.create bloom_expected_size 224 225 let save_bloom root bloom = 226 let path = bloom_path root in 227 let tmp_path = Eio.Path.(root / "objects.bloom.tmp") in 228 Eio.Path.save ~create:(`Or_truncate 0o644) tmp_path 229 (Bytes.to_string (Bloom.to_bytes bloom)); 230 Eio.Path.rename tmp_path path 231 232 let load_refs root of_hex = 233 let refs_root = refs_path root in 234 if Eio.Path.is_directory refs_root then 235 let rec scan_dir prefix path acc = 236 let entries = Eio.Path.read_dir path in 237 List.fold_left 238 (fun acc name -> 239 let entry_path = Eio.Path.(path / name) in 240 let full_name = if prefix = "" then name else prefix ^ "/" ^ name in 241 if Eio.Path.is_file entry_path then 242 let hex = String.trim (Eio.Path.load entry_path) in 243 match of_hex hex with 244 | Ok hash -> StringMap.add full_name hash acc 245 | Error _ -> acc 246 else if Eio.Path.is_directory entry_path then 247 scan_dir full_name entry_path acc 248 else acc) 249 acc entries 250 in 251 scan_dir "" refs_root StringMap.empty 252 else StringMap.empty 253 254 let save_ref root name hash to_hex = 255 let path = refs_path root in 256 if not (Eio.Path.is_directory path) then Eio.Path.mkdir ~perm:0o755 path; 257 let ref_path = Eio.Path.(path / name) in 258 (* Handle nested paths like refs/heads/main *) 259 let dir = Filename.dirname name in 260 if dir <> "." && dir <> "" then begin 261 let dir_path = Eio.Path.(path / dir) in 262 if not (Eio.Path.is_directory dir_path) then 263 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 dir_path 264 end; 265 Eio.Path.save ~create:(`Or_truncate 0o644) ref_path (to_hex hash ^ "\n") 266 267 let delete_ref root name = 268 let ref_path = Eio.Path.(refs_path root / name) in 269 if Eio.Path.is_file ref_path then Eio.Path.unlink ref_path 270 271 let open_data_file ~sw root = 272 let path = data_path root in 273 let file = 274 Eio.Path.open_out ~sw ~append:true ~create:(`If_missing 0o644) path 275 in 276 let offset = 277 if Eio.Path.is_file path then 278 let stat = Eio.Path.stat ~follow:true path in 279 Optint.Int63.to_int stat.size 280 else 0 281 in 282 (file, offset) 283 284 (* WAL record format: "hex_hash\x00data" *) 285 let encode_wal_record hex data = hex ^ "\x00" ^ data 286 287 let decode_wal_record record = 288 match String.index_opt record '\x00' with 289 | None -> None 290 | Some i -> 291 let hex = String.sub record 0 i in 292 let data = String.sub record (i + 1) (String.length record - i - 1) in 293 Some (hex, data) 294 295 (* Replay WAL entries that aren't in the index yet *) 296 let replay_wal root index bloom data_file data_offset = 297 let wal_p = wal_path root in 298 if not (Eio.Path.is_file wal_p) then (index, bloom, data_offset) 299 else 300 let records = Wal.read_all wal_p in 301 List.fold_left 302 (fun (idx, blm, offset) record -> 303 match decode_wal_record record with 304 | None -> (idx, blm, offset) 305 | Some (hex, data) -> 306 if StringMap.mem hex idx then (idx, blm, offset) 307 else begin 308 (* Write to data file *) 309 let len = String.length data in 310 Eio.File.pwrite_all data_file 311 ~file_offset:(Optint.Int63.of_int offset) 312 [ Cstruct.of_string data ]; 313 let idx' = StringMap.add hex { offset; length = len } idx in 314 Bloom.add blm hex; 315 (idx', blm, offset + len) 316 end) 317 (index, bloom, data_offset) 318 records 319 320 let create_with_hash (type h) ~sw (root : Eio.Fs.dir_ty Eio.Path.t) 321 (to_hex : h -> string) (of_hex : string -> (h, [ `Msg of string ]) result) 322 (equal : h -> h -> bool) : h t = 323 (* Create root directory if needed *) 324 if not (Eio.Path.is_directory root) then 325 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 root; 326 let index = load_index root in 327 let bloom = load_bloom root in 328 (* Populate bloom from index if empty (first load after upgrade) *) 329 if Bloom.size_estimate bloom = 0 then 330 StringMap.iter (fun hex _ -> Bloom.add bloom hex) index; 331 let refs = load_refs root of_hex in 332 let file, offset = open_data_file ~sw root in 333 let data_file = (file :> Eio.File.rw_ty Eio.Resource.t) in 334 (* Replay any uncommitted WAL entries *) 335 let index, bloom, offset = replay_wal root index bloom data_file offset in 336 (* Open WAL for new writes *) 337 let wal = Wal.create ~sw (wal_path root) in 338 let state = 339 { 340 root; 341 wal = Some wal; 342 data_file = Some data_file; 343 data_offset = offset; 344 index; 345 bloom; 346 refs; 347 to_hex; 348 equal; 349 mutex = Eio.Mutex.create (); 350 } 351 in 352 { 353 read = 354 (fun h -> 355 let key = state.to_hex h in 356 match StringMap.find_opt key state.index with 357 | None -> None 358 | Some entry -> ( 359 match state.data_file with 360 | None -> None 361 | Some file -> 362 let buf = Cstruct.create entry.length in 363 Eio.File.pread_exact file 364 ~file_offset:(Optint.Int63.of_int entry.offset) 365 [ buf ]; 366 Some (Cstruct.to_string buf))); 367 write = 368 (fun h data -> 369 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 370 let key = state.to_hex h in 371 (* Fast path: bloom filter says "definitely not present" *) 372 if Bloom.mem state.bloom key && StringMap.mem key state.index then 373 () 374 else 375 match (state.wal, state.data_file) with 376 | Some wal, Some file -> 377 (* Write to WAL first for crash safety *) 378 Wal.append wal (encode_wal_record key data); 379 Wal.sync wal; 380 (* Then write to data file *) 381 let len = String.length data in 382 let offset = state.data_offset in 383 Eio.File.pwrite_all file 384 ~file_offset:(Optint.Int63.of_int offset) 385 [ Cstruct.of_string data ]; 386 state.data_offset <- offset + len; 387 state.index <- 388 StringMap.add key { offset; length = len } state.index; 389 Bloom.add state.bloom key 390 | _ -> ())); 391 exists = 392 (fun h -> 393 let key = state.to_hex h in 394 (* Fast path: bloom filter for negative lookups *) 395 Bloom.mem state.bloom key && StringMap.mem key state.index); 396 get_ref = (fun name -> StringMap.find_opt name state.refs); 397 set_ref = 398 (fun name hash -> 399 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 400 state.refs <- StringMap.add name hash state.refs; 401 save_ref state.root name hash state.to_hex)); 402 test_and_set_ref = 403 (fun name ~test ~set -> 404 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 405 let current = StringMap.find_opt name state.refs in 406 let matches = 407 match (test, current) with 408 | None, None -> true 409 | Some t, Some c -> state.equal t c 410 | _ -> false 411 in 412 if matches then begin 413 (match set with 414 | None -> 415 state.refs <- StringMap.remove name state.refs; 416 delete_ref state.root name 417 | Some h -> 418 state.refs <- StringMap.add name h state.refs; 419 save_ref state.root name h state.to_hex); 420 true 421 end 422 else false)); 423 list_refs = (fun () -> StringMap.bindings state.refs |> List.map fst); 424 write_batch = 425 (fun objects -> 426 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 427 match (state.wal, state.data_file) with 428 | Some wal, Some file -> 429 (* Write all to WAL first *) 430 List.iter 431 (fun (h, data) -> 432 let key = state.to_hex h in 433 if not (StringMap.mem key state.index) then 434 Wal.append wal (encode_wal_record key data)) 435 objects; 436 Wal.sync wal; 437 (* Then write to data file *) 438 List.iter 439 (fun (h, data) -> 440 let key = state.to_hex h in 441 if StringMap.mem key state.index then () 442 else begin 443 let len = String.length data in 444 let offset = state.data_offset in 445 Eio.File.pwrite_all file 446 ~file_offset:(Optint.Int63.of_int offset) 447 [ Cstruct.of_string data ]; 448 state.data_offset <- offset + len; 449 state.index <- 450 StringMap.add key { offset; length = len } state.index; 451 Bloom.add state.bloom key 452 end) 453 objects 454 | _ -> ())); 455 flush = 456 (fun () -> 457 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 458 (match state.data_file with 459 | Some file -> Eio.File.sync file 460 | None -> ()); 461 save_index state.root state.index; 462 save_bloom state.root state.bloom; 463 (* Clear WAL after persisting index - entries are now recoverable 464 from index + data file *) 465 let wal_p = wal_path state.root in 466 if Eio.Path.is_file wal_p then Eio.Path.unlink wal_p)); 467 close = 468 (fun () -> 469 Eio.Mutex.use_rw ~protect:true state.mutex (fun () -> 470 (match state.wal with Some wal -> Wal.close wal | None -> ()); 471 (match state.data_file with 472 | Some file -> 473 Eio.File.sync file; 474 Eio.Resource.close file 475 | None -> ()); 476 save_index state.root state.index; 477 save_bloom state.root state.bloom; 478 (* Clear WAL *) 479 let wal_p = wal_path state.root in 480 if Eio.Path.is_file wal_p then Eio.Path.unlink wal_p; 481 state.wal <- None; 482 state.data_file <- None)); 483 } 484 485 let create_sha1 ~sw root = 486 create_with_hash ~sw root Hash.to_hex Hash.sha1_of_hex Hash.equal 487 488 let create_sha256 ~sw root = 489 create_with_hash ~sw root Hash.to_hex Hash.sha256_of_hex Hash.equal 490end