defmodule Statusphere.Consumer do alias Statusphere.Repo require Logger use Drinkup.RecordConsumer, collections: ["xyz.statusphere.status"] def handle_create(record), do: upsert(record) def handle_update(record), do: upsert(record) def handle_delete(record) do IO.inspect(record, label: "delete") end defp upsert(%{type: "xyz.statusphere.status", record: record} = evt) do case Xyz.Statusphere.Status.from_json(record) do {:ok, record} -> uri = Atex.AtURI.to_string(%Atex.AtURI{ authority: evt.did, collection: evt.type, rkey: evt.rkey }) status = %Statusphere.Status{} |> Statusphere.Status.changeset(%{ uri: uri, author_did: evt.did, status: record.status, created_at: NaiveDateTime.from_iso8601!(record.createdAt), indexed_at: NaiveDateTime.utc_now() }) |> Repo.insert!( on_conflict: [set: [status: record.status, indexed_at: NaiveDateTime.utc_now()]], conflict_target: :uri ) Logger.debug("ingested status: #{inspect(status)}") _ -> nil end end defp upsert(_), do: nil end