a database layer insipred by caqti and ecto

Transactions#

repodb provides two ways to handle transactions: simple transactions via Repo.transaction and composable multi-step transactions via the Multi module.

Simple Transactions#

For straightforward transactions with a few operations:

let transfer_funds conn ~from_id ~to_id ~amount =
  Repo.transaction conn (fun conn ->
    (* Debit from account *)
    match Repo.update conn
      ~table:accounts_table
      ~columns:["balance"]
      ~values:[Driver.Value.float (-.amount)]
      ~where_column:"id"
      ~where_value:(Driver.Value.int from_id)
    with
    | Error e -> Error e
    | Ok () ->
        (* Credit to account *)
        Repo.update conn
          ~table:accounts_table
          ~columns:["balance"]
          ~values:[Driver.Value.float amount]
          ~where_column:"id"
          ~where_value:(Driver.Value.int to_id)
  )

If any operation returns Error, the entire transaction is rolled back.

Multi: Composable Transactions#

The Multi module enables building complex, multi-step transactions with:

  • Named operations for easy debugging
  • Dependencies between operations
  • Access to previous operation results
  • Automatic rollback on failure

Setup#

module Multi = Repodb.Multi.Make(Repodb_sqlite)
(* or *)
module Multi = Repodb.Multi.Make(Repodb_postgresql)

Basic Multi#

let create_user_with_profile conn ~name ~email ~bio =
  let m = Multi.(
    empty
    |> insert "user"
         ~table:users_table
         ~columns:["name"; "email"]
         ~values:[Driver.Value.text name; Driver.Value.text email]
  ) in
  Multi.execute conn m

Operations with RETURNING#

Get the inserted row back:

let m = Multi.(
  empty
  |> insert_returning "user"
       ~table:users_table
       ~columns:["name"; "email"]
       ~values:[Driver.Value.text "Alice"; Driver.Value.text "alice@example.com"]
       ~decode:decode_user
)

match Multi.execute conn m with
| Ok results ->
    let user : user = Multi.get_exn results "user" in
    Printf.printf "Created user with ID: %d\n" user.id
| Error { failed_operation; error; _ } ->
    Printf.printf "Failed at %s: %s\n" failed_operation (Error.show_db_error error)

Dependent Operations#

Use _fn variants to access results from previous operations:

let create_user_with_profile conn ~name ~email ~bio =
  let m = Multi.(
    empty
    |> insert_returning "user"
         ~table:users_table
         ~columns:["name"; "email"]
         ~values:[Driver.Value.text name; Driver.Value.text email]
         ~decode:decode_user
    |> insert_fn "profile" ~f:(fun results ->
         let user : user = Multi.get_exn results "user" in
         (profiles_table,
          ["user_id"; "bio"],
          [Driver.Value.int user.id; Driver.Value.text bio])
       )
  )
  in
  Multi.execute conn m

All Operation Types#

let m = Multi.(
  empty
  (* Insert *)
  |> insert "op1"
       ~table:users_table
       ~columns:["name"]
       ~values:[Driver.Value.text "Alice"]

  (* Insert with RETURNING *)
  |> insert_returning "op2"
       ~table:users_table
       ~columns:["name"]
       ~values:[Driver.Value.text "Bob"]
       ~decode:decode_user

  (* Insert depending on previous result *)
  |> insert_fn "op3" ~f:(fun results ->
       let user : user = Multi.get_exn results "op2" in
       (posts_table, ["author_id"; "title"],
        [Driver.Value.int user.id; Driver.Value.text "Hello"])
     )

  (* Insert with RETURNING depending on previous result *)
  |> insert_returning_fn "op4" ~f:(fun results ->
       let user : user = Multi.get_exn results "op2" in
       (posts_table, ["author_id"; "title"],
        [Driver.Value.int user.id; Driver.Value.text "World"],
        decode_post)
     )

  (* Update *)
  |> update "op5"
       ~table:users_table
       ~columns:["name"]
       ~values:[Driver.Value.text "Alice Updated"]
       ~where_column:"id"
       ~where_value:(Driver.Value.int 1)

  (* Update depending on previous result *)
  |> update_fn "op6" ~f:(fun results ->
       let user : user = Multi.get_exn results "op2" in
       (users_table, ["name"], [Driver.Value.text "Updated"],
        "id", Driver.Value.int user.id)
     )

  (* Delete *)
  |> delete "op7"
       ~table:posts_table
       ~where_column:"id"
       ~where_value:(Driver.Value.int 999)

  (* Delete depending on previous result *)
  |> delete_fn "op8" ~f:(fun results ->
       let post : post = Multi.get_exn results "op4" in
       (posts_table, "id", Driver.Value.int post.id)
     )
)

Custom Operations#

For complex logic that doesn't fit the predefined operations:

let m = Multi.(
  empty
  |> run "custom_op" ~f:(fun conn results ->
       (* Full access to connection and all previous results *)
       let user : user = Multi.get_exn results "user" in

       (* Run any database operation *)
       match Repo.all_query conn
         Query.(from posts_table |> where Expr.(raw "author_id" = int user.id))
         ~decode:decode_post
       with
       | Error e -> Error e
       | Ok posts -> Ok (Multi.Result posts)  (* Return packed result *)
     )

  (* Operation that doesn't return a value *)
  |> run_no_result "side_effect" ~f:(fun conn results ->
       (* Do something without returning a value *)
       match send_welcome_email (Multi.get_exn results "user") with
       | true -> Ok ()
       | false -> Error (Error.Query_failed "Email failed")
     )
)

Merging Multis#

Combine multiple Multi transactions:

let create_user = Multi.(
  empty
  |> insert_returning "user" ~table:users_table ~columns:["name"]
       ~values:[Driver.Value.text "Alice"] ~decode:decode_user
)

let create_profile = Multi.(
  empty
  |> insert_fn "profile" ~f:(fun results ->
       let user : user = Multi.get_exn results "user" in
       (profiles_table, ["user_id"], [Driver.Value.int user.id])
     )
)

let combined = Multi.merge create_user create_profile

Inspecting Multis#

let names = Multi.names m  (* ["user"; "profile"; ...] *)
let has_user = Multi.has_name "user" m  (* true *)

(* Validate before executing *)
match Multi.validate_multi m with
| Ok () -> Multi.execute conn m
| Error msg -> Printf.printf "Invalid multi: %s\n" msg

Error Handling#

match Multi.execute conn m with
| Ok results ->
    (* Access results by name *)
    let user : user option = Multi.get results "user" in
    let post : post = Multi.get_exn results "post" in  (* raises if missing *)
    ...

| Error { failed_operation; error; completed } ->
    Printf.printf "Failed at operation: %s\n" failed_operation;
    Printf.printf "Error: %s\n" (Error.show_db_error error);
    (* completed contains results from operations that succeeded before failure *)
    let partial_user : user option = Multi.get completed "user" in
    ...

Working with Results#

(* Safe get - returns option *)
let user : user option = Multi.get results "user"

(* Unsafe get - raises if not found *)
let user : user = Multi.get_exn results "user"

(* Add to results manually (in run functions) *)
let results = Multi.put "key" value results

Complete Example#

let create_blog_post conn ~author_id ~title ~body ~tags =
  let m = Multi.(
    empty
    (* Create the post *)
    |> insert_returning "post"
         ~table:posts_table
         ~columns:["author_id"; "title"; "body"]
         ~values:[
           Driver.Value.int author_id;
           Driver.Value.text title;
           Driver.Value.text body;
         ]
         ~decode:decode_post

    (* Create tags and associations *)
    |> run "tags" ~f:(fun conn results ->
         let post : post = get_exn results "post" in

         (* Insert each tag and create association *)
         let insert_tag tag_name =
           match Repo.insert_returning conn
             ~table:tags_table
             ~columns:["name"]
             ~values:[Driver.Value.text tag_name]
             ~decode:decode_tag
           with
           | Error e -> Error e
           | Ok tag ->
               Repo.insert conn
                 ~table:post_tags_table
                 ~columns:["post_id"; "tag_id"]
                 ~values:[Driver.Value.int post.id; Driver.Value.int tag.id]
         in

         let rec process = function
           | [] -> Ok (Result tags)
           | tag :: rest ->
               match insert_tag tag with
               | Error e -> Error e
               | Ok () -> process rest
         in
         process tags
       )

    (* Update author's post count *)
    |> run_no_result "update_count" ~f:(fun conn results ->
         Repo.exec conn
           "UPDATE users SET post_count = post_count + 1 WHERE id = $1"
           ~params:[Driver.Value.int author_id]
       )
  )
  in
  match Multi.execute conn m with
  | Ok results -> Ok (Multi.get_exn results "post")
  | Error { error; _ } -> Error error

Next Steps#

  • Repo - Basic database operations
  • Migrations - Database schema versioning
  • Queries - Building complex queries