···1+package knotserver
2+3+import (
4+ "context"
5+ "encoding/json"
6+ "fmt"
7+ "io"
8+ "log"
9+ "net/http"
10+ "path"
11+ "strings"
12+ "time"
13+14+ "github.com/sotangled/tangled/api/tangled"
15+ "github.com/sotangled/tangled/knotserver/db"
16+ "github.com/sotangled/tangled/knotserver/jsclient"
17+)
18+19+func (h *Handle) StartJetstream(ctx context.Context) error {
20+ collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
21+ dids := []string{}
22+23+ lastTimeUs, err := h.getLastTimeUs()
24+ if err != nil {
25+ return err
26+ }
27+28+ h.js = jsclient.NewJetstreamClient(collections, dids)
29+ messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
30+ if err != nil {
31+ return fmt.Errorf("failed to read from jetstream: %w", err)
32+ }
33+34+ go h.processMessages(messages)
35+36+ return nil
37+}
38+39+func (h *Handle) getLastTimeUs() (int64, error) {
40+ lastTimeUs, err := h.db.GetLastTimeUs()
41+ if err != nil {
42+ log.Println("couldn't get last time us, starting from now")
43+ lastTimeUs = time.Now().UnixMicro()
44+ }
45+46+ // If last time is older than a week, start from now
47+ if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
48+ lastTimeUs = time.Now().UnixMicro()
49+ log.Printf("last time us is older than a week. discarding that and starting from now.")
50+ err = h.db.SaveLastTimeUs(lastTimeUs)
51+ if err != nil {
52+ log.Println("failed to save last time us")
53+ }
54+ }
55+56+ log.Printf("found last time_us %d", lastTimeUs)
57+ return lastTimeUs, nil
58+}
59+60+func (h *Handle) processPublicKey(did string, record map[string]interface{}) {
61+ if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
62+ log.Printf("failed to add public key: %v", err)
63+ } else {
64+ log.Printf("added public key from firehose: %s", did)
65+ }
66+}
67+68+func (h *Handle) fetchAndAddKeys(did string) {
69+ resp, err := http.Get(path.Join(h.c.AppViewEndpoint, did))
70+ if err != nil {
71+ log.Printf("error getting keys for %s: %v", did, err)
72+ return
73+ }
74+ defer resp.Body.Close()
75+76+ plaintext, err := io.ReadAll(resp.Body)
77+ if err != nil {
78+ log.Printf("error reading response body: %v", err)
79+ return
80+ }
81+82+ for _, key := range strings.Split(string(plaintext), "\n") {
83+ if key == "" {
84+ continue
85+ }
86+ pk := db.PublicKey{
87+ Did: did,
88+ }
89+ pk.Key = key
90+ if err := h.db.AddPublicKey(pk); err != nil {
91+ log.Printf("failed to add public key: %v", err)
92+ }
93+ }
94+}
95+96+func (h *Handle) processKnotMember(did string, record map[string]interface{}) {
97+ ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
98+ if err != nil || !ok {
99+ log.Printf("failed to add member from did %s", did)
100+ return
101+ }
102+103+ log.Printf("adding member")
104+ if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil {
105+ log.Printf("failed to add member: %v", err)
106+ } else {
107+ log.Printf("added member from firehose: %s", record["member"])
108+ }
109+110+ h.fetchAndAddKeys(did)
111+ h.js.UpdateDids([]string{did})
112+}
113+114+func (h *Handle) processMessages(messages <-chan []byte) {
115+ log.Println("waiting for knot to be initialized")
116+ <-h.init
117+ log.Println("initalized jetstream watcher")
118+119+ for msg := range messages {
120+ var data map[string]interface{}
121+ if err := json.Unmarshal(msg, &data); err != nil {
122+ log.Printf("error unmarshaling message: %v", err)
123+ continue
124+ }
125+126+ if kind, ok := data["kind"].(string); ok && kind == "commit" {
127+ commit := data["commit"].(map[string]interface{})
128+ did := data["did"].(string)
129+ record := commit["record"].(map[string]interface{})
130+131+ switch commit["collection"].(string) {
132+ case tangled.PublicKeyNSID:
133+ h.processPublicKey(did, record)
134+ case tangled.KnotMemberNSID:
135+ h.processKnotMember(did, record)
136+ }
137+138+ lastTimeUs := int64(data["time_us"].(float64))
139+ if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
140+ log.Printf("failed to save last time us: %v", err)
141+ }
142+ }
143+ }
144+}
+39-2
knotserver/routes.go
···401 w.WriteHeader(http.StatusNoContent)
402}
403404-// TODO: make this set the initial user as the owner
000000000000000000000000000000000405func (h *Handle) Init(w http.ResponseWriter, r *http.Request) {
406 if h.knotInitialized {
407 writeError(w, "knot already initialized", http.StatusConflict)
···441 }
442443 h.js.UpdateDids([]string{data.Did})
444- h.e.AddOwner(ThisServer, data.Did)
0000445 // Signal that the knot is ready
446 close(h.init)
447
···401 w.WriteHeader(http.StatusNoContent)
402}
403404+func (h *Handle) AddMember(w http.ResponseWriter, r *http.Request) {
405+ data := struct {
406+ Did string `json:"did"`
407+ PublicKeys []string `json:"keys"`
408+ }{}
409+410+ if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
411+ writeError(w, "invalid request body", http.StatusBadRequest)
412+ return
413+ }
414+415+ did := data.Did
416+ for _, k := range data.PublicKeys {
417+ pk := db.PublicKey{
418+ Did: did,
419+ }
420+ pk.Key = k
421+ err := h.db.AddPublicKey(pk)
422+ if err != nil {
423+ writeError(w, err.Error(), http.StatusInternalServerError)
424+ return
425+ }
426+ }
427+428+ h.js.UpdateDids([]string{did})
429+ if err := h.e.AddMember(ThisServer, did); err != nil {
430+ log.Println(err)
431+ writeError(w, err.Error(), http.StatusInternalServerError)
432+ return
433+ }
434+435+ w.WriteHeader(http.StatusNoContent)
436+}
437+438func (h *Handle) Init(w http.ResponseWriter, r *http.Request) {
439 if h.knotInitialized {
440 writeError(w, "knot already initialized", http.StatusConflict)
···474 }
475476 h.js.UpdateDids([]string{data.Did})
477+ if err := h.e.AddOwner(ThisServer, data.Did); err != nil {
478+ log.Println(err)
479+ writeError(w, err.Error(), http.StatusInternalServerError)
480+ return
481+ }
482 // Signal that the knot is ready
483 close(h.init)
484