Monorepo for Tangled
at master 167 lines 3.9 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/netip" 10 "net/url" 11 "strings" 12 "time" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/knotmirror/config" 16 "tangled.org/core/knotmirror/db" 17 "tangled.org/core/knotmirror/knotstream" 18 "tangled.org/core/knotmirror/models" 19 "tangled.org/core/log" 20 "tangled.org/core/tapc" 21) 22 23type Tap struct { 24 logger *slog.Logger 25 cfg *config.Config 26 tap tapc.Client 27 db *sql.DB 28 gitm GitMirrorManager 29 ks *knotstream.KnotStream 30} 31 32func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap { 33 return &Tap{ 34 logger: log.SubLogger(l, "tapclient"), 35 cfg: cfg, 36 tap: tapc.NewClient(cfg.TapUrl, ""), 37 db: db, 38 gitm: gitm, 39 ks: ks, 40 } 41} 42 43func (t *Tap) Start(ctx context.Context) { 44 // TODO: better reconnect logic 45 go func() { 46 for { 47 t.tap.Connect(ctx, &tapc.SimpleIndexer{ 48 EventHandler: t.processEvent, 49 }) 50 time.Sleep(time.Second) 51 } 52 }() 53} 54 55func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 56 l := t.logger.With("component", "tapIndexer") 57 58 var err error 59 switch evt.Type { 60 case tapc.EvtRecord: 61 switch evt.Record.Collection.String() { 62 case tangled.RepoNSID: 63 err = t.processRepo(ctx, evt.Record) 64 } 65 } 66 67 if err != nil { 68 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 69 return err 70 } 71 return nil 72} 73 74func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 75 switch evt.Action { 76 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 77 record := tangled.Repo{} 78 if err := json.Unmarshal(evt.Record, &record); err != nil { 79 return fmt.Errorf("parsing record: %w", err) 80 } 81 82 knotUrl := record.Knot 83 if !strings.Contains(record.Knot, "://") { 84 if host, _ := db.GetHost(ctx, t.db, record.Knot); host != nil { 85 knotUrl = host.URL() 86 } else { 87 t.logger.Warn("repo is from unknown knot") 88 if t.cfg.KnotUseSSL { 89 knotUrl = "https://" + knotUrl 90 } else { 91 knotUrl = "http://" + knotUrl 92 } 93 } 94 } 95 96 status := models.RepoStatePending 97 errMsg := "" 98 u, err := url.Parse(knotUrl) 99 if err != nil { 100 status = models.RepoStateSuspended 101 errMsg = "failed to parse knot url" 102 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { 103 status = models.RepoStateSuspended 104 errMsg = "suspending non-public knot" 105 } 106 107 repo := &models.Repo{ 108 Did: evt.Did, 109 Rkey: evt.Rkey, 110 Cid: evt.CID, 111 Name: record.Name, 112 KnotDomain: knotUrl, 113 State: status, 114 ErrorMsg: errMsg, 115 RetryAfter: 0, // clear retry info 116 RetryCount: 0, 117 } 118 119 if evt.Action == tapc.RecordUpdateAction { 120 exist, err := t.gitm.Exist(repo) 121 if err != nil { 122 return fmt.Errorf("checking git repo existance: %w", err) 123 } 124 if exist { 125 // update git repo remote url 126 if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil { 127 return fmt.Errorf("updating git repo remote url: %w", err) 128 } 129 } 130 } 131 132 if err := db.UpsertRepo(ctx, t.db, repo); err != nil { 133 return fmt.Errorf("upserting repo to db: %w", err) 134 } 135 136 if !t.ks.CheckIfSubscribed(record.Knot) { 137 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 138 return fmt.Errorf("subscribing to knot: %w", err) 139 } 140 } 141 142 case tapc.RecordDeleteAction: 143 if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 144 return fmt.Errorf("deleting repo from db: %w", err) 145 } 146 } 147 return nil 148} 149 150// isPrivate checks if host is private network. It doesn't perform DNS resolution 151func isPrivate(host string) bool { 152 if host == "localhost" { 153 return true 154 } 155 addr, err := netip.ParseAddr(host) 156 if err != nil { 157 return false 158 } 159 return isPrivateAddr(addr) 160} 161 162func isPrivateAddr(addr netip.Addr) bool { 163 return addr.IsLoopback() || 164 addr.IsPrivate() || 165 addr.IsLinkLocalUnicast() || 166 addr.IsLinkLocalMulticast() 167}