(*--------------------------------------------------------------------------- Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. SPDX-License-Identifier: MIT ---------------------------------------------------------------------------*) let with_temp_db f = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let cwd = Eio.Stdenv.cwd env in let tmp_dir = Eio.Path.(cwd / "_build" / "test_sqlite") in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp_dir with Eio.Io _ -> ()); let path = Eio.Path.(tmp_dir / Fmt.str "test_%d.db" (Random.int 1_000_000)) in Eio.Switch.run @@ fun sw -> let db = Sqlite.v ~sw path in Fun.protect ~finally:(fun () -> Sqlite.close db) (fun () -> f fs db) (* Basic operations *) let test_put_get () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; let result = Sqlite.find db "key1" in Alcotest.(check (option string)) "get returns put value" (Some "value1") result let test_get_missing () = with_temp_db @@ fun _fs db -> let result = Sqlite.find db "nonexistent" in Alcotest.(check (option string)) "missing key returns None" None result let test_put_overwrite () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; Sqlite.put db "key1" "value2"; let result = Sqlite.find db "key1" in Alcotest.(check (option string)) "overwrite works" (Some "value2") result let test_delete () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; Sqlite.delete db "key1"; let result = Sqlite.find db "key1" in Alcotest.(check (option string)) "delete removes key" None result let test_delete_missing () = with_temp_db @@ fun _fs db -> (* Should not raise *) Sqlite.delete db "nonexistent"; Alcotest.(check bool) "delete missing key is no-op" true true let test_mem () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; Alcotest.(check bool) "mem finds existing key" true (Sqlite.mem db "key1"); Alcotest.(check bool) "mem returns false for missing" false (Sqlite.mem db "missing") let test_iter () = with_temp_db @@ fun _fs db -> Sqlite.put db "a" "1"; Sqlite.put db "b" "2"; Sqlite.put db "c" "3"; let items = ref [] in Sqlite.iter db ~f:(fun k v -> items := (k, v) :: !items); let sorted = List.sort compare !items in Alcotest.(check (list (pair string string))) "iter visits all entries" [ ("a", "1"); ("b", "2"); ("c", "3") ] sorted let test_fold () = with_temp_db @@ fun _fs db -> Sqlite.put db "a" "1"; Sqlite.put db "b" "2"; let count = Sqlite.fold db ~init:0 ~f:(fun _ _ acc -> acc + 1) in Alcotest.(check int) "fold counts entries" 2 count (* Binary data *) let test_binary_values () = with_temp_db @@ fun _fs db -> let binary = "\x00\x01\x02\xff\xfe\xfd" in Sqlite.put db "binary" binary; let result = Sqlite.find db "binary" in Alcotest.(check (option string)) "binary data preserved" (Some binary) result let test_empty_value () = with_temp_db @@ fun _fs db -> Sqlite.put db "empty" ""; let result = Sqlite.find db "empty" in Alcotest.(check (option string)) "empty value works" (Some "") result let test_large_value () = with_temp_db @@ fun _fs db -> (* Note: B-tree has page splitting constraints limiting max entry size *) let large = String.make 1000 'x' in Sqlite.put db "large" large; let result = Sqlite.find db "large" in Alcotest.(check (option string)) "large value works" (Some large) result (* Namespaced tables *) let test_table_basic () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"blocks" in Sqlite.Table.put table "cid1" "data1"; let result = Sqlite.Table.find table "cid1" in Alcotest.(check (option string)) "table get/put works" (Some "data1") result let test_table_isolation () = with_temp_db @@ fun _fs db -> let t1 = Sqlite.Table.create db ~name:"table1" in let t2 = Sqlite.Table.create db ~name:"table2" in Sqlite.Table.put t1 "key" "value1"; Sqlite.Table.put t2 "key" "value2"; (* Also put in default table *) Sqlite.put db "key" "default"; Alcotest.(check (option string)) "t1 isolated" (Some "value1") (Sqlite.Table.find t1 "key"); Alcotest.(check (option string)) "t2 isolated" (Some "value2") (Sqlite.Table.find t2 "key"); Alcotest.(check (option string)) "default isolated" (Some "default") (Sqlite.find db "key") let test_table_mem_delete () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"test" in Sqlite.Table.put table "key1" "value1"; Alcotest.(check bool) "mem works" true (Sqlite.Table.mem table "key1"); Sqlite.Table.delete table "key1"; Alcotest.(check bool) "delete works" false (Sqlite.Table.mem table "key1") let test_table_iter () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"iter_test" in Sqlite.Table.put table "a" "1"; Sqlite.Table.put table "b" "2"; let items = ref [] in Sqlite.Table.iter table ~f:(fun k v -> items := (k, v) :: !items); let sorted = List.sort compare !items in Alcotest.(check (list (pair string string))) "table iter works" [ ("a", "1"); ("b", "2") ] sorted (* Security tests - SQL injection resistance *) let test_sql_injection_key () = with_temp_db @@ fun _fs db -> (* These malicious keys should be treated as literal strings *) let malicious_keys = [ "'; DROP TABLE kv; --"; "key' OR '1'='1"; "key\"; DELETE FROM kv; --"; "key\x00injection"; "Robert'); DROP TABLE Students;--"; ] in List.iter (fun key -> Sqlite.put db key "value"; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "injection key %S safe" key) (Some "value") result) malicious_keys let test_sql_injection_value () = with_temp_db @@ fun _fs db -> let malicious_values = [ "'; DROP TABLE kv; --"; "value' OR '1'='1"; "\x00\x00\x00" ] in List.iter (fun value -> Sqlite.put db "key" value; let result = Sqlite.find db "key" in Alcotest.(check (option string)) (Fmt.str "injection value safe") (Some value) result) malicious_values let test_table_name_validation () = with_temp_db @@ fun _fs db -> let invalid_names = [ ""; "table; DROP TABLE kv;"; "table'"; "table\""; "table\x00"; "table name"; "123start"; ] in List.iter (fun name -> try let _ = Sqlite.Table.create db ~name in Alcotest.failf "should reject invalid name: %S" name with Invalid_argument _ -> ()) invalid_names let test_valid_table_names () = with_temp_db @@ fun _fs db -> let valid_names = [ "blocks"; "refs"; "meta"; "Table1"; "my_table"; "a"; "A123_test" ] in List.iter (fun name -> let table = Sqlite.Table.create db ~name in Sqlite.Table.put table "key" "value"; let result = Sqlite.Table.find table "key" in Alcotest.(check (option string)) (Fmt.str "valid table %S works" name) (Some "value") result) valid_names (* Unicode and special characters *) let test_unicode_keys () = with_temp_db @@ fun _fs db -> let unicode_keys = [ "café"; "日本語"; "emoji🎉"; "Ω≈ç√∫" ] in List.iter (fun key -> Sqlite.put db key "value"; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "unicode key %S" key) (Some "value") result) unicode_keys let test_unicode_values () = with_temp_db @@ fun _fs db -> let unicode = "日本語テスト🎉" in Sqlite.put db "key" unicode; let result = Sqlite.find db "key" in Alcotest.(check (option string)) "unicode value" (Some unicode) result (* Sync *) let test_sync () = with_temp_db @@ fun _fs db -> Sqlite.put db "key" "value"; (* sync should not raise *) Sqlite.sync db; let result = Sqlite.find db "key" in Alcotest.(check (option string)) "data persists after sync" (Some "value") result (* Persistence - critical for correctness *) let test_persistence_basic () = Eio_main.run @@ fun env -> let cwd = Eio.Stdenv.cwd env in let tmp_dir = Eio.Path.(cwd / "_build" / "test_sqlite") in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp_dir with Eio.Io _ -> ()); let path = Eio.Path.(tmp_dir / Fmt.str "persist_%d.db" (Random.int 1_000_000)) in (* Create and write *) Eio.Switch.run (fun sw -> let db = Sqlite.v ~sw path in Sqlite.put db "key1" "value1"; Sqlite.put db "key2" "value2"; Sqlite.close db); (* Reopen and read *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let r1 = Sqlite.find db "key1" in let r2 = Sqlite.find db "key2" in Alcotest.(check (option string)) "key1 persisted" (Some "value1") r1; Alcotest.(check (option string)) "key2 persisted" (Some "value2") r2; Sqlite.close db) let test_persistence_with_delete () = Eio_main.run @@ fun env -> let cwd = Eio.Stdenv.cwd env in let tmp_dir = Eio.Path.(cwd / "_build" / "test_sqlite") in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp_dir with Eio.Io _ -> ()); let path = Eio.Path.(tmp_dir / Fmt.str "persist_del_%d.db" (Random.int 1_000_000)) in (* Create, write, delete *) Eio.Switch.run (fun sw -> let db = Sqlite.v ~sw path in Sqlite.put db "keep" "value1"; Sqlite.put db "delete" "value2"; Sqlite.delete db "delete"; Sqlite.close db); (* Reopen and verify *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let r1 = Sqlite.find db "keep" in let r2 = Sqlite.find db "delete" in Alcotest.(check (option string)) "kept key persisted" (Some "value1") r1; Alcotest.(check (option string)) "deleted key gone" None r2; Sqlite.close db) let test_persistence_tables () = Eio_main.run @@ fun env -> let cwd = Eio.Stdenv.cwd env in let tmp_dir = Eio.Path.(cwd / "_build" / "test_sqlite") in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp_dir with Eio.Io _ -> ()); let path = Eio.Path.(tmp_dir / Fmt.str "persist_tbl_%d.db" (Random.int 1_000_000)) in (* Create with tables *) Eio.Switch.run (fun sw -> let db = Sqlite.v ~sw path in let t1 = Sqlite.Table.create db ~name:"blocks" in let t2 = Sqlite.Table.create db ~name:"refs" in Sqlite.Table.put t1 "cid1" "data1"; Sqlite.Table.put t2 "head" "cid123"; Sqlite.close db); (* Reopen and verify tables *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let t1 = Sqlite.Table.create db ~name:"blocks" in let t2 = Sqlite.Table.create db ~name:"refs" in let r1 = Sqlite.Table.find t1 "cid1" in let r2 = Sqlite.Table.find t2 "head" in Alcotest.(check (option string)) "table1 data persisted" (Some "data1") r1; Alcotest.(check (option string)) "table2 data persisted" (Some "cid123") r2; Sqlite.close db) (* Edge cases *) let test_empty_key () = with_temp_db @@ fun _fs db -> Sqlite.put db "" "value_for_empty_key"; let result = Sqlite.find db "" in Alcotest.(check (option string)) "empty key works" (Some "value_for_empty_key") result let test_key_with_nulls () = with_temp_db @@ fun _fs db -> let key = "key\x00with\x00nulls" in let value = "value\x00also\x00has\x00nulls" in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) "null bytes preserved" (Some value) result let test_long_key () = with_temp_db @@ fun _fs db -> (* Note: B-tree has page splitting constraints limiting max entry size *) let key = String.make 500 'k' in let value = "value" in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) "long key works" (Some value) result let test_all_byte_values () = with_temp_db @@ fun _fs db -> (* Test all possible byte values in keys and values *) let all_bytes = String.init 256 Char.chr in Sqlite.put db all_bytes all_bytes; let result = Sqlite.find db all_bytes in Alcotest.(check (option string)) "all byte values preserved" (Some all_bytes) result let test_max_int_key_length () = with_temp_db @@ fun _fs db -> (* Test key length near encoding boundaries *) let lengths = [ 127; 128; 255; 256; 400 ] in List.iter (fun len -> let key = String.make len 'x' in let value = Fmt.str "value_%d" len in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "key length %d" len) (Some value) result) lengths (* Stress tests *) let test_many_keys () = with_temp_db @@ fun _fs db -> let n = 1000 in (* Insert many keys *) for i = 0 to n - 1 do Sqlite.put db (Fmt.str "key_%05d" i) (Fmt.str "value_%d" i) done; (* Verify all present *) for i = 0 to n - 1 do let result = Sqlite.find db (Fmt.str "key_%05d" i) in Alcotest.(check (option string)) (Fmt.str "key %d present" i) (Some (Fmt.str "value_%d" i)) result done let test_many_updates () = with_temp_db @@ fun _fs db -> let n = 100 in (* Update same key many times *) for i = 0 to n - 1 do Sqlite.put db "key" (Fmt.str "value_%d" i) done; let result = Sqlite.find db "key" in Alcotest.(check (option string)) "final value" (Some (Fmt.str "value_%d" (n - 1))) result let test_interleaved_operations () = with_temp_db @@ fun _fs db -> (* Mix of puts, gets, deletes *) for i = 0 to 99 do Sqlite.put db (Fmt.str "a_%d" i) "value"; Sqlite.put db (Fmt.str "b_%d" i) "value"; if i mod 2 = 0 then Sqlite.delete db (Fmt.str "a_%d" i) done; (* Verify state *) let a_count = ref 0 in let b_count = ref 0 in Sqlite.iter db ~f:(fun k _ -> if String.length k > 2 && k.[0] = 'a' then incr a_count else if String.length k > 2 && k.[0] = 'b' then incr b_count); Alcotest.(check int) "a keys (half deleted)" 50 !a_count; Alcotest.(check int) "b keys (all present)" 100 !b_count (* Multiple tables stress *) let test_many_tables () = with_temp_db @@ fun _fs db -> let n = 20 in (* Create many tables *) let tables = Array.init n (fun i -> Sqlite.Table.create db ~name:(Fmt.str "table%d" i)) in (* Write to all tables *) Array.iteri (fun i t -> Sqlite.Table.put t "key" (Fmt.str "value_%d" i)) tables; (* Verify isolation *) Array.iteri (fun i t -> let result = Sqlite.Table.find t "key" in Alcotest.(check (option string)) (Fmt.str "table %d" i) (Some (Fmt.str "value_%d" i)) result) tables (* Regression tests based on SQLite CVE patterns *) let test_cve_key_overflow () = with_temp_db @@ fun _fs db -> (* Ensure large key doesn't cause integer overflow in length encoding *) let key = String.make 500 'x' in Sqlite.put db key "value"; let result = Sqlite.find db key in Alcotest.(check (option string)) "large key no overflow" (Some "value") result let test_cve_like_boundary_conditions () = with_temp_db @@ fun _fs db -> (* Test boundary conditions within B-tree page constraints *) let sizes = [ 100; 200; 300; 400; 500 ] in List.iter (fun size -> let key = Fmt.str "key_%d" size in let value = String.make size 'v' in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "boundary size %d" size) (Some value) result) sizes (* CREATE TABLE parser tests *) let check_columns msg expected actual = let pp_col ppf (c : Sqlite.column) = Fmt.pf ppf "{name=%S; affinity=%S; rowid=%b}" c.col_name c.col_affinity c.col_is_rowid_alias in let col_eq (a : Sqlite.column) (b : Sqlite.column) = a.col_name = b.col_name && a.col_affinity = b.col_affinity && a.col_is_rowid_alias = b.col_is_rowid_alias in let col_testable = Alcotest.testable pp_col col_eq in Alcotest.(check (list col_testable)) msg expected actual let test_parse_simple () = let cols = Sqlite.parse_create_table "CREATE TABLE kv (key TEXT, value BLOB)" in check_columns "simple kv schema" [ { col_name = "key"; col_affinity = "TEXT"; col_is_rowid_alias = false }; { col_name = "value"; col_affinity = "BLOB"; col_is_rowid_alias = false }; ] cols let test_parse_integer_primary_key () = let cols = Sqlite.parse_create_table "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)" in check_columns "integer primary key" [ { col_name = "id"; col_affinity = "INTEGER"; col_is_rowid_alias = true }; { col_name = "name"; col_affinity = "TEXT"; col_is_rowid_alias = false }; { col_name = "age"; col_affinity = "INTEGER"; col_is_rowid_alias = false }; ] cols let test_parse_if_not_exists () = let cols = Sqlite.parse_create_table "CREATE TABLE IF NOT EXISTS foo (bar TEXT, baz REAL)" in check_columns "if not exists" [ { col_name = "bar"; col_affinity = "TEXT"; col_is_rowid_alias = false }; { col_name = "baz"; col_affinity = "REAL"; col_is_rowid_alias = false }; ] cols let test_parse_nested_parens () = let cols = Sqlite.parse_create_table "CREATE TABLE t (a DECIMAL(10,2), b VARCHAR(255) NOT NULL)" in check_columns "nested parens in types" [ { col_name = "a"; col_affinity = "DECIMAL(10,2)"; col_is_rowid_alias = false; }; { col_name = "b"; col_affinity = "VARCHAR(255)"; col_is_rowid_alias = false; }; ] cols let test_parse_table_constraints () = let cols = Sqlite.parse_create_table "CREATE TABLE t (a INTEGER, b TEXT, PRIMARY KEY(a), UNIQUE(b))" in check_columns "table-level constraints skipped" [ { col_name = "a"; col_affinity = "INTEGER"; col_is_rowid_alias = false }; { col_name = "b"; col_affinity = "TEXT"; col_is_rowid_alias = false }; ] cols let test_parse_no_type () = let cols = Sqlite.parse_create_table "CREATE TABLE t (a, b, c)" in check_columns "columns without types" [ { col_name = "a"; col_affinity = ""; col_is_rowid_alias = false }; { col_name = "b"; col_affinity = ""; col_is_rowid_alias = false }; { col_name = "c"; col_affinity = ""; col_is_rowid_alias = false }; ] cols let test_parse_autoincrement () = let cols = Sqlite.parse_create_table "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)" in check_columns "autoincrement" [ { col_name = "id"; col_affinity = "INTEGER"; col_is_rowid_alias = true }; { col_name = "name"; col_affinity = "TEXT"; col_is_rowid_alias = false }; ] cols let test_parse_invalid () = let cols = Sqlite.parse_create_table "not valid sql at all" in Alcotest.(check int) "invalid sql returns empty" 0 (List.length cols) (* Generic table read tests *) let with_temp_path f = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let tmp_dir = "/tmp/test_sqlite" in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 Eio.Path.(fs / tmp_dir) with Eio.Io _ -> ()); let fpath = Fmt.str "%s/test_%d.db" tmp_dir (Random.int 1_000_000) in let path = Eio.Path.(fs / fpath) in Fun.protect ~finally:(fun () -> try Sys.remove fpath with Sys_error _ -> ()) (fun () -> f env fpath path) let test_open_no_kv () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE users (id INTEGER PRIMARY KEY, name \ TEXT, age INTEGER)\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let schemas = Sqlite.tables t in Alcotest.(check int) "one table" 1 (List.length schemas); let s = List.hd schemas in Alcotest.(check string) "table name" "users" s.Sqlite.tbl_name; Alcotest.(check int) "3 columns" 3 (List.length s.Sqlite.columns); (* KV API should fail *) (try Sqlite.iter t ~f:(fun _ _ -> ()); Alcotest.fail "should have raised" with Failure _ -> ()); Sqlite.close t let test_read_generic_table () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE users (id INTEGER PRIMARY KEY, name \ TEXT, age INTEGER); INSERT INTO users VALUES (1, 'Alice', 30); \ INSERT INTO users VALUES (2, 'Bob', 25);\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let rows = Sqlite.read_table t "users" in Alcotest.(check int) "2 rows" 2 (List.length rows); let _rowid1, values1 = List.nth rows 0 in (match values1 with | [ Sqlite.Vint 1L; Sqlite.Vtext "Alice"; Sqlite.Vint 30L ] -> () | _ -> Alcotest.failf "unexpected row 1: %a" Fmt.(list Sqlite.pp_value) values1); let _rowid2, values2 = List.nth rows 1 in (match values2 with | [ Sqlite.Vint 2L; Sqlite.Vtext "Bob"; Sqlite.Vint 25L ] -> () | _ -> Alcotest.failf "unexpected row 2: %a" Fmt.(list Sqlite.pp_value) values2); Sqlite.close t let test_integer_primary_key () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT); \ INSERT INTO t VALUES (42, 'hello');\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let rows = Sqlite.read_table t "t" in Alcotest.(check int) "1 row" 1 (List.length rows); let rowid, values = List.hd rows in Alcotest.(check int64) "rowid is 42" 42L rowid; (match values with | [ Sqlite.Vint 42L; Sqlite.Vtext "hello" ] -> () | _ -> Alcotest.failf "expected [Vint 42; Vtext hello], got: %a" Fmt.(list Sqlite.pp_value) values); Sqlite.close t let test_tables_lists_all () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE t1 (a TEXT); CREATE TABLE t2 (b INTEGER, \ c REAL);\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let schemas = Sqlite.tables t in let names = List.map (fun (s : Sqlite.schema) -> s.tbl_name) schemas |> List.sort String.compare in Alcotest.(check (list string)) "table names" [ "t1"; "t2" ] names; Sqlite.close t let sum_int_values _rowid values acc = match values with [ Sqlite.Vint n ] -> Int64.add acc n | _ -> acc let test_fold_table () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE nums (n INTEGER); INSERT INTO nums \ VALUES (10); INSERT INTO nums VALUES (20); INSERT INTO nums VALUES \ (30);\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let sum = Sqlite.fold_table t "nums" ~init:0L ~f:sum_int_values in Alcotest.(check int64) "sum of values" 60L sum; Sqlite.close t (* ---- SQLite file format spec test vectors ---- *) let with_temp_db_path f = Eio_main.run @@ fun env -> let cwd = Eio.Stdenv.cwd env in let tmp_dir = Eio.Path.(cwd / "_build" / "test_sqlite") in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp_dir with Eio.Io _ -> ()); let path = Eio.Path.(tmp_dir / Fmt.str "spec_%d.db" (Random.int 1_000_000)) in Eio.Switch.run @@ fun sw -> let db = Sqlite.v ~sw path in Fun.protect ~finally:(fun () -> Sqlite.close db) (fun () -> f path db) (* Section 1.2: Database header byte-level verification *) let test_db_header_magic () = with_temp_db_path @@ fun path db -> Sqlite.sync db; let data = Eio.Path.load path in let magic = String.sub data 0 16 in Alcotest.(check string) "magic" "SQLite format 3\000" magic let test_db_header_fixed_values () = with_temp_db_path @@ fun path db -> Sqlite.sync db; let data = Eio.Path.load path in (* Offset 16-17: page size (4096 = 0x10 0x00) *) Alcotest.(check int) "page size hi" 0x10 (Char.code data.[16]); Alcotest.(check int) "page size lo" 0x00 (Char.code data.[17]); (* Offset 18: write version = 1 (legacy) *) Alcotest.(check int) "write version" 1 (Char.code data.[18]); (* Offset 19: read version = 1 (legacy) *) Alcotest.(check int) "read version" 1 (Char.code data.[19]); (* Offset 20: reserved bytes = 0 *) Alcotest.(check int) "reserved" 0 (Char.code data.[20]); (* Offset 21: max_embedded_payload_fraction = 64 (MUST be 64) *) Alcotest.(check int) "max payload fraction" 64 (Char.code data.[21]); (* Offset 22: min_embedded_payload_fraction = 32 (MUST be 32) *) Alcotest.(check int) "min payload fraction" 32 (Char.code data.[22]); (* Offset 23: leaf_payload_fraction = 32 (MUST be 32) *) Alcotest.(check int) "leaf payload fraction" 32 (Char.code data.[23]); (* Offset 44: schema format = 4 *) let schema_format = (Char.code data.[44] lsl 24) lor (Char.code data.[45] lsl 16) lor (Char.code data.[46] lsl 8) lor Char.code data.[47] in Alcotest.(check int) "schema format" 4 schema_format; (* Offset 56: text encoding = 1 (UTF-8) *) let encoding = (Char.code data.[56] lsl 24) lor (Char.code data.[57] lsl 16) lor (Char.code data.[58] lsl 8) lor Char.code data.[59] in Alcotest.(check int) "text encoding UTF-8" 1 encoding; (* Offset 72-91: reserved for expansion = all zeros *) for i = 72 to 91 do Alcotest.(check int) (Fmt.str "reserved byte %d" i) 0 (Char.code data.[i]) done let test_db_header_change_counter () = with_temp_db_path @@ fun path db -> Sqlite.put db "key" "value"; Sqlite.sync db; let data = Eio.Path.load path in let read_u32 off = (Char.code data.[off] lsl 24) lor (Char.code data.[off + 1] lsl 16) lor (Char.code data.[off + 2] lsl 8) lor Char.code data.[off + 3] in let change_counter = read_u32 24 in let version_valid_for = read_u32 92 in Alcotest.(check int) "change_counter == version_valid_for" change_counter version_valid_for (* Section 1.5: Page 1 B-tree header at offset 100 *) let test_page1_btree_header () = with_temp_db_path @@ fun path db -> Sqlite.sync db; let data = Eio.Path.load path in (* Offset 100: page type = 0x0d (leaf table) *) Alcotest.(check int) "page1 type" 0x0d (Char.code data.[100]); (* Offset 107: fragmented bytes <= 60 *) Alcotest.(check bool) "fragmented <= 60" true (Char.code data.[107] <= 60) (* Section 2.1: sqlite_schema table format — columns: type, name, tbl_name, rootpage, sql *) let test_sqlite_schema_format () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"test_table" in Sqlite.Table.put table "key" "value"; let schemas = Sqlite.tables db in let names = List.map (fun (s : Sqlite.schema) -> s.tbl_name) schemas |> List.sort String.compare in (* Should have both the default kv table and test_table *) Alcotest.(check bool) "has test_table" true (List.mem "test_table" names) (* Overflow values in SQLite-compatible files *) let test_sqlite_overflow_values () = with_temp_db @@ fun _fs db -> (* Values larger than max_local (4061 for 4096-byte pages) *) let large = String.make 5000 'X' in Sqlite.put db "overflow_key" large; let result = Sqlite.find db "overflow_key" in Alcotest.(check (option string)) "overflow value roundtrip" (Some large) result let test_sqlite_overflow_persistence () = Eio_main.run @@ fun env -> let cwd = Eio.Stdenv.cwd env in let tmp_dir = Eio.Path.(cwd / "_build" / "test_sqlite") in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp_dir with Eio.Io _ -> ()); let path = Eio.Path.(tmp_dir / Fmt.str "overflow_%d.db" (Random.int 1_000_000)) in let large = String.make 10000 'Y' in (* Write *) Eio.Switch.run (fun sw -> let db = Sqlite.v ~sw path in Sqlite.put db "big" large; Sqlite.close db); (* Read back *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let result = Sqlite.find db "big" in Alcotest.(check (option string)) "overflow persists" (Some large) result; Sqlite.close db) let suite = ( "sqlite", List.concat [ [ Alcotest.test_case "put/get" `Quick test_put_get; Alcotest.test_case "get missing" `Quick test_get_missing; Alcotest.test_case "put overwrite" `Quick test_put_overwrite; Alcotest.test_case "delete" `Quick test_delete; Alcotest.test_case "delete missing" `Quick test_delete_missing; Alcotest.test_case "mem" `Quick test_mem; Alcotest.test_case "iter" `Quick test_iter; Alcotest.test_case "fold" `Quick test_fold; ]; [ Alcotest.test_case "binary values" `Quick test_binary_values; Alcotest.test_case "empty value" `Quick test_empty_value; Alcotest.test_case "large value" `Quick test_large_value; ]; [ Alcotest.test_case "table basic" `Quick test_table_basic; Alcotest.test_case "table isolation" `Quick test_table_isolation; Alcotest.test_case "table mem/delete" `Quick test_table_mem_delete; Alcotest.test_case "table iter" `Quick test_table_iter; ]; [ Alcotest.test_case "sql injection key" `Quick test_sql_injection_key; Alcotest.test_case "sql injection value" `Quick test_sql_injection_value; Alcotest.test_case "table name validation" `Quick test_table_name_validation; Alcotest.test_case "valid table names" `Quick test_valid_table_names; ]; [ Alcotest.test_case "unicode keys" `Quick test_unicode_keys; Alcotest.test_case "unicode values" `Quick test_unicode_values; ]; [ Alcotest.test_case "sync" `Quick test_sync; Alcotest.test_case "persistence basic" `Quick test_persistence_basic; Alcotest.test_case "persistence with delete" `Quick test_persistence_with_delete; Alcotest.test_case "persistence tables" `Quick test_persistence_tables; ]; [ Alcotest.test_case "empty key" `Quick test_empty_key; Alcotest.test_case "key with nulls" `Quick test_key_with_nulls; Alcotest.test_case "long key" `Quick test_long_key; Alcotest.test_case "all byte values" `Quick test_all_byte_values; Alcotest.test_case "max int key length" `Quick test_max_int_key_length; ]; [ Alcotest.test_case "many keys" `Slow test_many_keys; Alcotest.test_case "many updates" `Quick test_many_updates; Alcotest.test_case "interleaved ops" `Quick test_interleaved_operations; Alcotest.test_case "many tables" `Quick test_many_tables; ]; [ Alcotest.test_case "overflow key length" `Quick test_cve_key_overflow; Alcotest.test_case "boundary conditions" `Quick test_cve_like_boundary_conditions; ]; [ Alcotest.test_case "parse simple" `Quick test_parse_simple; Alcotest.test_case "parse integer pk" `Quick test_parse_integer_primary_key; Alcotest.test_case "parse if not exists" `Quick test_parse_if_not_exists; Alcotest.test_case "parse nested parens" `Quick test_parse_nested_parens; Alcotest.test_case "parse table constraints" `Quick test_parse_table_constraints; Alcotest.test_case "parse no type" `Quick test_parse_no_type; Alcotest.test_case "parse autoincrement" `Quick test_parse_autoincrement; Alcotest.test_case "parse invalid" `Quick test_parse_invalid; Alcotest.test_case "open no kv" `Quick test_open_no_kv; Alcotest.test_case "read generic table" `Quick test_read_generic_table; Alcotest.test_case "integer primary key" `Quick test_integer_primary_key; Alcotest.test_case "tables lists all" `Quick test_tables_lists_all; Alcotest.test_case "fold table" `Quick test_fold_table; ]; [ Alcotest.test_case "spec header magic" `Quick test_db_header_magic; Alcotest.test_case "spec header values" `Quick test_db_header_fixed_values; Alcotest.test_case "spec change counter" `Quick test_db_header_change_counter; Alcotest.test_case "spec page1 btree" `Quick test_page1_btree_header; Alcotest.test_case "spec schema format" `Quick test_sqlite_schema_format; Alcotest.test_case "spec overflow values" `Quick test_sqlite_overflow_values; Alcotest.test_case "spec overflow persist" `Quick test_sqlite_overflow_persistence; ]; ] )