forked from
tangled.org/core
Monorepo for Tangled
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/netip"
10 "net/url"
11 "strings"
12 "time"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/knotmirror/config"
16 "tangled.org/core/knotmirror/db"
17 "tangled.org/core/knotmirror/knotstream"
18 "tangled.org/core/knotmirror/models"
19 "tangled.org/core/log"
20 "tangled.org/core/tapc"
21)
22
23type Tap struct {
24 logger *slog.Logger
25 cfg *config.Config
26 tap tapc.Client
27 db *sql.DB
28 gitm GitMirrorManager
29 ks *knotstream.KnotStream
30}
31
32func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap {
33 return &Tap{
34 logger: log.SubLogger(l, "tapclient"),
35 cfg: cfg,
36 tap: tapc.NewClient(cfg.TapUrl, ""),
37 db: db,
38 gitm: gitm,
39 ks: ks,
40 }
41}
42
43func (t *Tap) Start(ctx context.Context) {
44 // TODO: better reconnect logic
45 go func() {
46 for {
47 t.tap.Connect(ctx, &tapc.SimpleIndexer{
48 EventHandler: t.processEvent,
49 })
50 time.Sleep(time.Second)
51 }
52 }()
53}
54
55func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
56 l := t.logger.With("component", "tapIndexer")
57
58 var err error
59 switch evt.Type {
60 case tapc.EvtRecord:
61 switch evt.Record.Collection.String() {
62 case tangled.RepoNSID:
63 err = t.processRepo(ctx, evt.Record)
64 }
65 }
66
67 if err != nil {
68 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
69 return err
70 }
71 return nil
72}
73
74func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
75 switch evt.Action {
76 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
77 record := tangled.Repo{}
78 if err := json.Unmarshal(evt.Record, &record); err != nil {
79 return fmt.Errorf("parsing record: %w", err)
80 }
81
82 knotUrl := record.Knot
83 if !strings.Contains(record.Knot, "://") {
84 if host, _ := db.GetHost(ctx, t.db, record.Knot); host != nil {
85 knotUrl = host.URL()
86 } else {
87 t.logger.Warn("repo is from unknown knot")
88 if t.cfg.KnotUseSSL {
89 knotUrl = "https://" + knotUrl
90 } else {
91 knotUrl = "http://" + knotUrl
92 }
93 }
94 }
95
96 status := models.RepoStatePending
97 errMsg := ""
98 u, err := url.Parse(knotUrl)
99 if err != nil {
100 status = models.RepoStateSuspended
101 errMsg = "failed to parse knot url"
102 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) {
103 status = models.RepoStateSuspended
104 errMsg = "suspending non-public knot"
105 }
106
107 repo := &models.Repo{
108 Did: evt.Did,
109 Rkey: evt.Rkey,
110 Cid: evt.CID,
111 Name: record.Name,
112 KnotDomain: knotUrl,
113 State: status,
114 ErrorMsg: errMsg,
115 RetryAfter: 0, // clear retry info
116 RetryCount: 0,
117 }
118
119 if evt.Action == tapc.RecordUpdateAction {
120 exist, err := t.gitm.Exist(repo)
121 if err != nil {
122 return fmt.Errorf("checking git repo existance: %w", err)
123 }
124 if exist {
125 // update git repo remote url
126 if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil {
127 return fmt.Errorf("updating git repo remote url: %w", err)
128 }
129 }
130 }
131
132 if err := db.UpsertRepo(ctx, t.db, repo); err != nil {
133 return fmt.Errorf("upserting repo to db: %w", err)
134 }
135
136 if !t.ks.CheckIfSubscribed(record.Knot) {
137 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil {
138 return fmt.Errorf("subscribing to knot: %w", err)
139 }
140 }
141
142 case tapc.RecordDeleteAction:
143 if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil {
144 return fmt.Errorf("deleting repo from db: %w", err)
145 }
146 }
147 return nil
148}
149
150// isPrivate checks if host is private network. It doesn't perform DNS resolution
151func isPrivate(host string) bool {
152 if host == "localhost" {
153 return true
154 }
155 addr, err := netip.ParseAddr(host)
156 if err != nil {
157 return false
158 }
159 return isPrivateAddr(addr)
160}
161
162func isPrivateAddr(addr netip.Addr) bool {
163 return addr.IsLoopback() ||
164 addr.IsPrivate() ||
165 addr.IsLinkLocalUnicast() ||
166 addr.IsLinkLocalMulticast()
167}