Elixir ATProtocol ingestion and sync library.
at main 78 lines 2.1 kB view raw
1defmodule Drinkup.Jetstream.Event.Commit do 2 @moduledoc """ 3 Struct for commit events from Jetstream. 4 5 Represents a repository commit containing either a create, update, or delete 6 operation on a record. Unlike the Firehose commit events, Jetstream provides 7 simplified JSON structures without CAR/CBOR encoding. 8 """ 9 10 use TypedStruct 11 12 typedstruct enforce: true do 13 @typedoc """ 14 The operation type for this commit. 15 16 - `:create` - A new record was created 17 - `:update` - An existing record was updated 18 - `:delete` - An existing record was deleted 19 """ 20 @type operation() :: :create | :update | :delete 21 22 field :did, String.t() 23 field :time_us, integer() 24 field :kind, :commit, default: :commit 25 field :operation, operation() 26 field :collection, String.t() 27 field :rkey, String.t() 28 field :rev, String.t() 29 field :record, map() | nil 30 field :cid, String.t() | nil 31 end 32 33 @doc """ 34 Parses a Jetstream commit payload into a Commit struct. 35 36 ## Example Payload 37 38 %{ 39 "rev" => "3l3qo2vutsw2b", 40 "operation" => "create", 41 "collection" => "app.bsky.feed.like", 42 "rkey" => "3l3qo2vuowo2b", 43 "record" => %{ 44 "$type" => "app.bsky.feed.like", 45 "createdAt" => "2024-09-09T19:46:02.102Z", 46 "subject" => %{...} 47 }, 48 "cid" => "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi" 49 } 50 """ 51 @spec from(String.t(), integer(), map()) :: t() 52 def from( 53 did, 54 time_us, 55 %{ 56 "rev" => rev, 57 "operation" => operation, 58 "collection" => collection, 59 "rkey" => rkey 60 } = commit 61 ) do 62 %__MODULE__{ 63 did: did, 64 time_us: time_us, 65 operation: parse_operation(operation), 66 collection: collection, 67 rkey: rkey, 68 rev: rev, 69 record: Map.get(commit, "record"), 70 cid: Map.get(commit, "cid") 71 } 72 end 73 74 @spec parse_operation(String.t()) :: operation() 75 defp parse_operation("create"), do: :create 76 defp parse_operation("update"), do: :update 77 defp parse_operation("delete"), do: :delete 78end