forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "tangled.sh/tangled.sh/core/api/tangled"
14 "tangled.sh/tangled.sh/core/knotserver/db"
15 "tangled.sh/tangled.sh/core/log"
16)
17
18func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
19 l := log.FromContext(ctx)
20 pk := db.PublicKey{
21 Did: did,
22 PublicKey: record,
23 }
24 if err := h.db.AddPublicKey(pk); err != nil {
25 l.Error("failed to add public key", "error", err)
26 return fmt.Errorf("failed to add public key: %w", err)
27 }
28 l.Info("added public key from firehose", "did", did)
29 return nil
30}
31
32func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
33 l := log.FromContext(ctx)
34
35 if record.Domain != h.c.Server.Hostname {
36 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
37 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
38 }
39
40 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
41 if err != nil || !ok {
42 l.Error("failed to add member", "did", did)
43 return fmt.Errorf("failed to enforce permissions: %w", err)
44 }
45
46 if err := h.e.AddMember(ThisServer, record.Subject); err != nil {
47 l.Error("failed to add member", "error", err)
48 return fmt.Errorf("failed to add member: %w", err)
49 }
50 l.Info("added member from firehose", "member", record.Subject)
51
52 if err := h.db.AddDid(did); err != nil {
53 l.Error("failed to add did", "error", err)
54 return fmt.Errorf("failed to add did: %w", err)
55 }
56 h.jc.AddDid(did)
57
58 if err := h.fetchAndAddKeys(ctx, did); err != nil {
59 return fmt.Errorf("failed to fetch and add keys: %w", err)
60 }
61
62 return nil
63}
64
65func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
66 l := log.FromContext(ctx)
67
68 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
69 if err != nil {
70 l.Error("error building endpoint url", "did", did, "error", err.Error())
71 return fmt.Errorf("error building endpoint url: %w", err)
72 }
73
74 resp, err := http.Get(keysEndpoint)
75 if err != nil {
76 l.Error("error getting keys", "did", did, "error", err)
77 return fmt.Errorf("error getting keys: %w", err)
78 }
79 defer resp.Body.Close()
80
81 if resp.StatusCode == http.StatusNotFound {
82 l.Info("no keys found for did", "did", did)
83 return nil
84 }
85
86 plaintext, err := io.ReadAll(resp.Body)
87 if err != nil {
88 l.Error("error reading response body", "error", err)
89 return fmt.Errorf("error reading response body: %w", err)
90 }
91
92 for _, key := range strings.Split(string(plaintext), "\n") {
93 if key == "" {
94 continue
95 }
96 pk := db.PublicKey{
97 Did: did,
98 }
99 pk.Key = key
100 if err := h.db.AddPublicKey(pk); err != nil {
101 l.Error("failed to add public key", "error", err)
102 return fmt.Errorf("failed to add public key: %w", err)
103 }
104 }
105 return nil
106}
107
108func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
109 did := event.Did
110 if event.Kind != models.EventKindCommit {
111 return nil
112 }
113
114 var err error
115 defer func() {
116 eventTime := event.TimeUS
117 lastTimeUs := eventTime + 1
118 fmt.Println("lastTimeUs", lastTimeUs)
119 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
120 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
121 }
122 }()
123
124 raw := json.RawMessage(event.Commit.Record)
125
126 switch event.Commit.Collection {
127 case tangled.PublicKeyNSID:
128 var record tangled.PublicKey
129 if err := json.Unmarshal(raw, &record); err != nil {
130 return fmt.Errorf("failed to unmarshal record: %w", err)
131 }
132 if err := h.processPublicKey(ctx, did, record); err != nil {
133 return fmt.Errorf("failed to process public key: %w", err)
134 }
135
136 case tangled.KnotMemberNSID:
137 var record tangled.KnotMember
138 if err := json.Unmarshal(raw, &record); err != nil {
139 return fmt.Errorf("failed to unmarshal record: %w", err)
140 }
141 if err := h.processKnotMember(ctx, did, record); err != nil {
142 return fmt.Errorf("failed to process knot member: %w", err)
143 }
144 }
145
146 return err
147}