forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/sotangled/tangled/api/tangled"
14 "github.com/sotangled/tangled/knotserver/db"
15 "github.com/sotangled/tangled/log"
16)
17
18func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
19 l := log.FromContext(ctx)
20 pk := db.PublicKey{
21 Did: did,
22 PublicKey: record,
23 }
24 if err := h.db.AddPublicKey(pk); err != nil {
25 l.Error("failed to add public key", "error", err)
26 return fmt.Errorf("failed to add public key: %w", err)
27 }
28 l.Info("added public key from firehose", "did", did)
29 return nil
30}
31
32func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember, eventTime int64) error {
33 l := log.FromContext(ctx)
34
35 if record.Domain != h.c.Server.Hostname {
36 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
37 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
38 }
39
40 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
41 if err != nil || !ok {
42 l.Error("failed to add member", "did", did)
43 return fmt.Errorf("failed to enforce permissions: %w", err)
44 }
45
46 if err := h.e.AddMember(ThisServer, record.Member); err != nil {
47 l.Error("failed to add member", "error", err)
48 return fmt.Errorf("failed to add member: %w", err)
49 }
50 l.Info("added member from firehose", "member", record.Member)
51
52 if err := h.db.AddDid(did); err != nil {
53 l.Error("failed to add did", "error", err)
54 return fmt.Errorf("failed to add did: %w", err)
55 }
56
57 if err := h.fetchAndAddKeys(ctx, did); err != nil {
58 return fmt.Errorf("failed to fetch and add keys: %w", err)
59 }
60
61 lastTimeUs := eventTime + 1
62 fmt.Println("lastTimeUs", lastTimeUs)
63 if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil {
64 return fmt.Errorf("failed to save last time us: %w", err)
65 }
66 h.jc.UpdateDids([]string{did})
67 return nil
68}
69
70func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
71 l := log.FromContext(ctx)
72
73 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
74 if err != nil {
75 l.Error("error building endpoint url", "did", did, "error", err.Error())
76 return fmt.Errorf("error building endpoint url: %w", err)
77 }
78
79 resp, err := http.Get(keysEndpoint)
80 if err != nil {
81 l.Error("error getting keys", "did", did, "error", err)
82 return fmt.Errorf("error getting keys: %w", err)
83 }
84 defer resp.Body.Close()
85
86 if resp.StatusCode == http.StatusNotFound {
87 l.Info("no keys found for did", "did", did)
88 return nil
89 }
90
91 plaintext, err := io.ReadAll(resp.Body)
92 if err != nil {
93 l.Error("error reading response body", "error", err)
94 return fmt.Errorf("error reading response body: %w", err)
95 }
96
97 for _, key := range strings.Split(string(plaintext), "\n") {
98 if key == "" {
99 continue
100 }
101 pk := db.PublicKey{
102 Did: did,
103 }
104 pk.Key = key
105 if err := h.db.AddPublicKey(pk); err != nil {
106 l.Error("failed to add public key", "error", err)
107 return fmt.Errorf("failed to add public key: %w", err)
108 }
109 }
110 return nil
111}
112
113func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
114 did := event.Did
115 if event.Kind != models.EventKindCommit {
116 return nil
117 }
118
119 raw := json.RawMessage(event.Commit.Record)
120
121 switch event.Commit.Collection {
122 case tangled.PublicKeyNSID:
123 var record tangled.PublicKey
124 if err := json.Unmarshal(raw, &record); err != nil {
125 return fmt.Errorf("failed to unmarshal record: %w", err)
126 }
127 if err := h.processPublicKey(ctx, did, record); err != nil {
128 return fmt.Errorf("failed to process public key: %w", err)
129 }
130
131 case tangled.KnotMemberNSID:
132 var record tangled.KnotMember
133 if err := json.Unmarshal(raw, &record); err != nil {
134 return fmt.Errorf("failed to unmarshal record: %w", err)
135 }
136 if err := h.processKnotMember(ctx, did, record, event.TimeUS); err != nil {
137 return fmt.Errorf("failed to process knot member: %w", err)
138 }
139 }
140
141 return nil
142}