Monorepo for Tangled tangled.org

knotserver: rework knot publish mechanism

knots require owners to publish a record to their own PDS declaring the
knot domain

Signed-off-by: oppiliappan <me@oppi.li>

Changed files
+139 -33
knotserver
+1 -1
go.mod
··· 29 29 github.com/posthog/posthog-go v1.5.5 30 30 github.com/resend/resend-go/v2 v2.15.0 31 31 github.com/sethvargo/go-envconfig v1.1.0 32 - github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e 32 + github.com/whyrusleeping/cbor-gen v0.3.1 33 33 github.com/yuin/goldmark v1.4.13 34 34 golang.org/x/net v0.39.0 35 35 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028
+2 -2
go.sum
··· 350 350 github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= 351 351 github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= 352 352 github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= 353 - github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e h1:28X54ciEwwUxyHn9yrZfl5ojgF4CBNLWX7LR0rvBkf4= 354 - github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so= 353 + github.com/whyrusleeping/cbor-gen v0.3.1 h1:82ioxmhEYut7LBVGhGq8xoRkXPLElVuh5mV67AFfdv0= 354 + github.com/whyrusleeping/cbor-gen v0.3.1/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so= 355 355 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= 356 356 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= 357 357 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+6
knotserver/db/init.go
··· 26 26 pragma auto_vacuum = incremental; 27 27 pragma busy_timeout = 5000; 28 28 29 + create table if not exists owner ( 30 + id integer primary key check (id = 0), 31 + did text not null, 32 + createdAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 33 + ); 34 + 29 35 create table if not exists known_dids ( 30 36 did text primary key 31 37 );
+18
knotserver/db/owner.go
··· 1 + package db 2 + 3 + func (d *DB) SetOwner(did string) error { 4 + query := `insert into owner (id, did) values (?, ?)` 5 + _, err := d.db.Exec(query, 0, did) 6 + return err 7 + } 8 + 9 + func (d *DB) Owner() (string, error) { 10 + query := `select did from owner` 11 + 12 + var did string 13 + err := d.db.QueryRow(query).Scan(&did) 14 + if err != nil { 15 + return "", err 16 + } 17 + return did, nil 18 + }
+100 -27
knotserver/handler.go
··· 6 6 "log/slog" 7 7 "net/http" 8 8 "runtime/debug" 9 + "time" 9 10 11 + "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + lexutil "github.com/bluesky-social/indigo/lex/util" 14 + "github.com/bluesky-social/indigo/xrpc" 10 15 "github.com/go-chi/chi/v5" 16 + "tangled.sh/tangled.sh/core/api/tangled" 11 17 "tangled.sh/tangled.sh/core/jetstream" 12 18 "tangled.sh/tangled.sh/core/knotserver/config" 13 19 "tangled.sh/tangled.sh/core/knotserver/db" 14 20 "tangled.sh/tangled.sh/core/rbac" 21 + "tangled.sh/tangled.sh/core/resolver" 15 22 ) 16 23 17 24 const ( ··· 19 26 ) 20 27 21 28 type Handle struct { 22 - c *config.Config 23 - db *db.DB 24 - jc *jetstream.JetstreamClient 25 - e *rbac.Enforcer 26 - l *slog.Logger 27 - 28 - // init is a channel that is closed when the knot has been initailized 29 - // i.e. when the first user (knot owner) has been added. 30 - init chan struct{} 31 - knotInitialized bool 29 + c *config.Config 30 + db *db.DB 31 + jc *jetstream.JetstreamClient 32 + e *rbac.Enforcer 33 + l *slog.Logger 34 + clock syntax.TIDClock 32 35 } 33 36 34 37 func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, jc *jetstream.JetstreamClient, l *slog.Logger) (http.Handler, error) { 35 - r := chi.NewRouter() 36 - 37 38 h := Handle{ 38 - c: c, 39 - db: db, 40 - e: e, 41 - l: l, 42 - jc: jc, 43 - init: make(chan struct{}), 39 + c: c, 40 + db: db, 41 + e: e, 42 + l: l, 43 + jc: jc, 44 44 } 45 45 46 46 err := e.AddDomain(ThisServer) ··· 48 48 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 49 49 } 50 50 51 + // if this knot does not already have an owner, publish it 52 + if _, err := h.db.Owner(); err != nil { 53 + l.Info("publishing this knot ...", "owner", h.c.Owner.Did) 54 + err = h.Publish() 55 + if err != nil { 56 + return nil, fmt.Errorf("failed to announce knot: %w", err) 57 + } 58 + } 59 + 60 + l.Info("this knot has been published", "owner", h.c.Owner.Did) 61 + 51 62 err = h.jc.StartJetstream(ctx, h.processMessages) 52 63 if err != nil { 53 64 return nil, fmt.Errorf("failed to start jetstream: %w", err) 54 65 } 55 66 56 - // Check if the knot knows about any Dids; 57 - // if it does, it is already initialized and we can repopulate the 58 - // Jetstream subscriptions. 59 67 dids, err := db.GetAllDids() 60 68 if err != nil { 61 69 return nil, fmt.Errorf("failed to get all Dids: %w", err) 62 70 } 63 71 64 - if len(dids) > 0 { 65 - h.knotInitialized = true 66 - close(h.init) 67 - for _, d := range dids { 68 - h.jc.AddDid(d) 69 - } 72 + for _, d := range dids { 73 + h.jc.AddDid(d) 70 74 } 75 + 76 + r := chi.NewRouter() 71 77 72 78 r.Get("/", h.Index) 73 79 r.Get("/capabilities", h.Capabilities) ··· 184 190 w.Header().Set("Content-Type", "text/plain") 185 191 fmt.Fprintf(w, "knotserver/%s", version) 186 192 } 193 + 194 + func (h *Handle) Publish() error { 195 + ownerDid := h.c.Owner.Did 196 + appPassword := h.c.Owner.AppPassword 197 + 198 + res := resolver.DefaultResolver() 199 + ident, err := res.ResolveIdent(context.Background(), ownerDid) 200 + if err != nil { 201 + return err 202 + } 203 + 204 + client := xrpc.Client{ 205 + Host: ident.PDSEndpoint(), 206 + } 207 + 208 + resp, err := atproto.ServerCreateSession(context.Background(), &client, &atproto.ServerCreateSession_Input{ 209 + Identifier: ownerDid, 210 + Password: appPassword, 211 + }) 212 + if err != nil { 213 + return err 214 + } 215 + 216 + authClient := xrpc.Client{ 217 + Host: ident.PDSEndpoint(), 218 + Auth: &xrpc.AuthInfo{ 219 + AccessJwt: resp.AccessJwt, 220 + RefreshJwt: resp.RefreshJwt, 221 + Handle: resp.Handle, 222 + Did: resp.Did, 223 + }, 224 + } 225 + 226 + rkey := h.TID() 227 + 228 + // write a "knot" record to the owners's pds 229 + _, err = atproto.RepoPutRecord(context.Background(), &authClient, &atproto.RepoPutRecord_Input{ 230 + Collection: tangled.KnotNSID, 231 + Repo: ownerDid, 232 + Rkey: rkey, 233 + Record: &lexutil.LexiconTypeDecoder{ 234 + Val: &tangled.Knot{ 235 + CreatedAt: time.Now().Format(time.RFC3339), 236 + Host: h.c.Server.Hostname, 237 + }, 238 + }, 239 + }) 240 + if err != nil { 241 + return err 242 + } 243 + 244 + err = h.db.SetOwner(ownerDid) 245 + if err != nil { 246 + return err 247 + } 248 + 249 + err = h.db.AddDid(ownerDid) 250 + if err != nil { 251 + return err 252 + } 253 + 254 + return nil 255 + } 256 + 257 + func (h *Handle) TID() string { 258 + return h.clock.Next().String() 259 + }
+1 -3
knotserver/routes.go
··· 1236 1236 func (h *Handle) Init(w http.ResponseWriter, r *http.Request) { 1237 1237 l := h.l.With("handler", "Init") 1238 1238 1239 - if h.knotInitialized { 1239 + if _, err := h.db.Owner(); err == nil { 1240 1240 writeError(w, "knot already initialized", http.StatusConflict) 1241 1241 return 1242 1242 } ··· 1275 1275 writeError(w, err.Error(), http.StatusInternalServerError) 1276 1276 return 1277 1277 } 1278 - 1279 - close(h.init) 1280 1278 1281 1279 mac := hmac.New(sha256.New, []byte(h.c.Server.Secret)) 1282 1280 mac.Write([]byte("ok"))
+11
knotserver/tid.go
··· 1 + package knotserver 2 + 3 + import ( 4 + "github.com/bluesky-social/indigo/atproto/syntax" 5 + ) 6 + 7 + var c syntax.TIDClock = syntax.NewTIDClock(0) 8 + 9 + func TID() string { 10 + return c.Next().String() 11 + }