forked from tangled.org/core
this repo has no description

knotserver: replace oplog with generic event store

the `db.Op` event is now replaced by the `refUpdate` event. all knot
generated events will be stored in the events db as raw json.

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li a4794cdf ecb2a9fe

verified
Changed files
+131 -97
knotserver
+4
go.mod
··· 30 github.com/posthog/posthog-go v1.5.5 31 github.com/resend/resend-go/v2 v2.15.0 32 github.com/sethvargo/go-envconfig v1.1.0 33 github.com/urfave/cli/v3 v3.3.3 34 github.com/whyrusleeping/cbor-gen v0.3.1 35 github.com/yuin/goldmark v1.4.13 36 golang.org/x/crypto v0.38.0 37 golang.org/x/net v0.39.0 38 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 39 tangled.sh/icyphox.sh/atproto-oauth v0.0.0-20250526154904-3906c5336421 40 ) 41 ··· 50 github.com/casbin/govaluate v1.3.0 // indirect 51 github.com/cespare/xxhash/v2 v2.3.0 // indirect 52 github.com/cloudflare/circl v1.6.0 // indirect 53 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect 54 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect 55 github.com/dlclark/regexp2 v1.11.5 // indirect ··· 102 github.com/opentracing/opentracing-go v1.2.0 // indirect 103 github.com/pjbgf/sha1cd v0.3.2 // indirect 104 github.com/pkg/errors v0.9.1 // indirect 105 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 106 github.com/prometheus/client_golang v1.22.0 // indirect 107 github.com/prometheus/client_model v0.6.2 // indirect
··· 30 github.com/posthog/posthog-go v1.5.5 31 github.com/resend/resend-go/v2 v2.15.0 32 github.com/sethvargo/go-envconfig v1.1.0 33 + github.com/stretchr/testify v1.10.0 34 github.com/urfave/cli/v3 v3.3.3 35 github.com/whyrusleeping/cbor-gen v0.3.1 36 github.com/yuin/goldmark v1.4.13 37 golang.org/x/crypto v0.38.0 38 golang.org/x/net v0.39.0 39 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 40 + gopkg.in/yaml.v3 v3.0.1 41 tangled.sh/icyphox.sh/atproto-oauth v0.0.0-20250526154904-3906c5336421 42 ) 43 ··· 52 github.com/casbin/govaluate v1.3.0 // indirect 53 github.com/cespare/xxhash/v2 v2.3.0 // indirect 54 github.com/cloudflare/circl v1.6.0 // indirect 55 + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect 56 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect 57 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect 58 github.com/dlclark/regexp2 v1.11.5 // indirect ··· 105 github.com/opentracing/opentracing-go v1.2.0 // indirect 106 github.com/pjbgf/sha1cd v0.3.2 // indirect 107 github.com/pkg/errors v0.9.1 // indirect 108 + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect 109 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 110 github.com/prometheus/client_golang v1.22.0 // indirect 111 github.com/prometheus/client_model v0.6.2 // indirect
+62
knotserver/db/events.go
···
··· 1 + package db 2 + 3 + import ( 4 + "fmt" 5 + 6 + "tangled.sh/tangled.sh/core/knotserver/notifier" 7 + ) 8 + 9 + type Event struct { 10 + Rkey string `json:"rkey"` 11 + Nsid string `json:"nsid"` 12 + EventJson string `json:"event"` 13 + } 14 + 15 + func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 16 + _, err := d.db.Exec( 17 + `insert into events (rkey, nsid, event) values (?, ?, ?)`, 18 + event.Rkey, 19 + event.Nsid, 20 + event.EventJson, 21 + ) 22 + 23 + notifier.NotifyAll() 24 + 25 + return err 26 + } 27 + 28 + func (d *DB) GetEvents(cursor string) ([]Event, error) { 29 + whereClause := "" 30 + args := []any{} 31 + if cursor != "" { 32 + whereClause = "where rkey > ?" 33 + args = append(args, cursor) 34 + } 35 + 36 + query := fmt.Sprintf(` 37 + select rkey, nsid, event 38 + from events 39 + %s 40 + order by rkey asc 41 + limit 100 42 + `, whereClause) 43 + 44 + rows, err := d.db.Query(query, args...) 45 + if err != nil { 46 + return nil, err 47 + } 48 + defer rows.Close() 49 + 50 + var evts []Event 51 + for rows.Next() { 52 + var ev Event 53 + rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson) 54 + evts = append(evts, ev) 55 + } 56 + 57 + if err := rows.Err(); err != nil { 58 + return nil, err 59 + } 60 + 61 + return evts, nil 62 + }
+5 -7
knotserver/db/init.go
··· 44 last_time_us integer not null 45 ); 46 47 - create table if not exists oplog ( 48 - tid text primary key, 49 - did text not null, 50 - repo text not null, 51 - old_sha text not null, 52 - new_sha text not null, 53 - ref text not null 54 ); 55 `) 56 if err != nil {
··· 44 last_time_us integer not null 45 ); 46 47 + create table if not exists events ( 48 + rkey text not null, 49 + nsid text not null, 50 + event text not null, -- json 51 + primary key (rkey, nsid) 52 ); 53 `) 54 if err != nil {
-70
knotserver/db/oplog.go
··· 1 - package db 2 - 3 - import ( 4 - "fmt" 5 - 6 - "tangled.sh/tangled.sh/core/knotserver/notifier" 7 - ) 8 - 9 - type Op struct { 10 - Tid string // time based ID, easy to enumerate & monotonic 11 - Did string // did of pusher 12 - Repo string // <did/repo> fully qualified repo 13 - OldSha string // old sha of reference being updated 14 - NewSha string // new sha of reference being updated 15 - Ref string // the reference being updated 16 - } 17 - 18 - func (d *DB) InsertOp(op Op, notifier *notifier.Notifier) error { 19 - _, err := d.db.Exec( 20 - `insert into oplog (tid, did, repo, old_sha, new_sha, ref) values (?, ?, ?, ?, ?, ?)`, 21 - op.Tid, 22 - op.Did, 23 - op.Repo, 24 - op.OldSha, 25 - op.NewSha, 26 - op.Ref, 27 - ) 28 - if err != nil { 29 - return err 30 - } 31 - 32 - notifier.NotifyAll() 33 - return nil 34 - } 35 - 36 - func (d *DB) GetOps(cursor string) ([]Op, error) { 37 - whereClause := "" 38 - args := []any{} 39 - if cursor != "" { 40 - whereClause = "where tid > ?" 41 - args = append(args, cursor) 42 - } 43 - 44 - query := fmt.Sprintf(` 45 - select tid, did, repo, old_sha, new_sha, ref 46 - from oplog 47 - %s 48 - order by tid asc 49 - limit 100 50 - `, whereClause) 51 - 52 - rows, err := d.db.Query(query, args...) 53 - if err != nil { 54 - return nil, err 55 - } 56 - defer rows.Close() 57 - 58 - var ops []Op 59 - for rows.Next() { 60 - var op Op 61 - rows.Scan(&op.Tid, &op.Did, &op.Repo, &op.OldSha, &op.NewSha, &op.Ref) 62 - ops = append(ops, op) 63 - } 64 - 65 - if err := rows.Err(); err != nil { 66 - return nil, err 67 - } 68 - 69 - return ops, nil 70 - }
···
+26 -7
knotserver/events.go
··· 2 3 import ( 4 "context" 5 "net/http" 6 "time" 7 ··· 13 WriteBufferSize: 1024, 14 } 15 16 - func (h *Handle) OpLog(w http.ResponseWriter, r *http.Request) { 17 l := h.l.With("handler", "OpLog") 18 l.Info("received new connection") 19 ··· 74 } 75 76 func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error { 77 - ops, err := h.db.GetOps(*cursor) 78 if err != nil { 79 - h.l.Debug("err", "err", err) 80 return err 81 } 82 - h.l.Debug("ops", "ops", ops) 83 84 - for _, op := range ops { 85 - if err := conn.WriteJSON(op); err != nil { 86 h.l.Debug("err", "err", err) 87 return err 88 } 89 - *cursor = op.Tid 90 } 91 92 return nil
··· 2 3 import ( 4 "context" 5 + "encoding/json" 6 "net/http" 7 "time" 8 ··· 14 WriteBufferSize: 1024, 15 } 16 17 + func (h *Handle) Events(w http.ResponseWriter, r *http.Request) { 18 l := h.l.With("handler", "OpLog") 19 l.Info("received new connection") 20 ··· 75 } 76 77 func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error { 78 + events, err := h.db.GetEvents(*cursor) 79 if err != nil { 80 + h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 81 return err 82 } 83 + h.l.Debug("ops", "ops", events) 84 85 + for _, event := range events { 86 + // first extract the inner json into a map 87 + var eventJson map[string]any 88 + err := json.Unmarshal([]byte(event.EventJson), &eventJson) 89 + if err != nil { 90 + h.l.Error("failed to unmarshal event", "err", err) 91 + return err 92 + } 93 + 94 + jsonMsg, err := json.Marshal(map[string]any{ 95 + "rkey": event.Rkey, 96 + "nsid": event.Nsid, 97 + "event": eventJson, 98 + }) 99 + if err != nil { 100 + h.l.Error("failed to marshal record", "err", err) 101 + return err 102 + } 103 + 104 + if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 105 h.l.Debug("err", "err", err) 106 return err 107 } 108 + *cursor = event.Rkey 109 } 110 111 return nil
+1 -1
knotserver/handler.go
··· 149 }) 150 151 // Socket that streams git oplogs 152 - r.Get("/oplog", h.OpLog) 153 154 // Initialize the knot with an owner and public key. 155 r.With(h.VerifySignature).Post("/init", h.Init)
··· 149 }) 150 151 // Socket that streams git oplogs 152 + r.Get("/events", h.Events) 153 154 // Initialize the knot with an owner and public key. 155 r.With(h.VerifySignature).Post("/init", h.Init)
+33 -12
knotserver/internal.go
··· 2 3 import ( 4 "context" 5 "log/slog" 6 "net/http" 7 "path/filepath" 8 9 "github.com/go-chi/chi/v5" 10 "github.com/go-chi/chi/v5/middleware" 11 "tangled.sh/tangled.sh/core/knotserver/config" 12 "tangled.sh/tangled.sh/core/knotserver/db" 13 "tangled.sh/tangled.sh/core/knotserver/git" ··· 67 l.Error("failed to calculate relative git dir", "scanPath", h.c.Repo.ScanPath, "gitAbsoluteDir", gitAbsoluteDir) 68 return 69 } 70 gitUserDid := r.Header.Get("X-Git-User-Did") 71 72 lines, err := git.ParsePostReceive(r.Body) ··· 76 } 77 78 for _, line := range lines { 79 - err := h.updateOpLog(line, gitUserDid, gitRelativeDir) 80 if err != nil { 81 l.Error("failed to insert op", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 82 } 83 } 84 85 - return 86 - } 87 88 - func (h *InternalHandle) updateOpLog(line git.PostReceiveLine, did, repo string) error { 89 - op := db.Op{ 90 - Tid: TID(), 91 - Did: did, 92 - Repo: repo, 93 - OldSha: line.OldSha, 94 - NewSha: line.NewSha, 95 - Ref: line.Ref, 96 } 97 - return h.db.InsertOp(op, h.n) 98 } 99 100 func Internal(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, l *slog.Logger, n *notifier.Notifier) http.Handler {
··· 2 3 import ( 4 "context" 5 + "encoding/json" 6 "log/slog" 7 "net/http" 8 "path/filepath" 9 10 "github.com/go-chi/chi/v5" 11 "github.com/go-chi/chi/v5/middleware" 12 + "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/knotserver/config" 14 "tangled.sh/tangled.sh/core/knotserver/db" 15 "tangled.sh/tangled.sh/core/knotserver/git" ··· 69 l.Error("failed to calculate relative git dir", "scanPath", h.c.Repo.ScanPath, "gitAbsoluteDir", gitAbsoluteDir) 70 return 71 } 72 + 73 + parts := strings.SplitN(gitRelativeDir, "/", 2) 74 + if len(parts) != 2 { 75 + l.Error("invalid git dir", "gitRelativeDir", gitRelativeDir) 76 + return 77 + } 78 + repoDid := parts[0] 79 + repoName := parts[1] 80 + 81 gitUserDid := r.Header.Get("X-Git-User-Did") 82 83 lines, err := git.ParsePostReceive(r.Body) ··· 87 } 88 89 for _, line := range lines { 90 + err := h.insertRefUpdate(line, gitUserDid, repoDid, repoName) 91 if err != nil { 92 l.Error("failed to insert op", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 93 + // non-fatal 94 } 95 } 96 + } 97 98 + func (h *InternalHandle) insertRefUpdate(line git.PostReceiveLine, gitUserDid, repoDid, repoName string) error { 99 + refUpdate := tangled.GitRefUpdate{ 100 + OldSha: line.OldSha, 101 + NewSha: line.NewSha, 102 + Ref: line.Ref, 103 + CommitterDid: gitUserDid, 104 + RepoDid: repoDid, 105 + RepoName: repoName, 106 + } 107 + eventJson, err := json.Marshal(refUpdate) 108 + if err != nil { 109 + return err 110 + } 111 112 + event := db.Event{ 113 + Rkey: TID(), 114 + Nsid: tangled.GitRefUpdateNSID, 115 + EventJson: string(eventJson), 116 } 117 + 118 + return h.db.InsertEvent(event, h.n) 119 } 120 121 func Internal(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, l *slog.Logger, n *notifier.Notifier) http.Handler {