+44
-25
appview/repo/index.go
+44
-25
appview/repo/index.go
···
123
123
}
124
124
}
125
125
126
-
languageInfo, err := getLanguageInfo(f, signedClient, ref)
126
+
languageInfo, err := rp.getLanguageInfo(f, signedClient, ref)
127
127
if err != nil {
128
128
log.Printf("failed to compute language percentages: %s", err)
129
129
// non-fatal
···
153
153
Languages: languageInfo,
154
154
Pipelines: pipelines,
155
155
})
156
-
return
157
156
}
158
157
159
-
func getLanguageInfo(
158
+
func (rp *Repo) getLanguageInfo(
160
159
f *reporesolver.ResolvedRepo,
161
160
signedClient *knotclient.SignedClient,
162
161
ref string,
163
162
) ([]types.RepoLanguageDetails, error) {
164
-
repoLanguages, err := signedClient.RepoLanguages(f.OwnerDid(), f.RepoName, ref)
165
-
if err != nil {
166
-
return []types.RepoLanguageDetails{}, err
167
-
}
168
-
if repoLanguages == nil {
169
-
repoLanguages = &types.RepoLanguageResponse{Languages: make(map[string]int64)}
163
+
// first attempt to fetch from db
164
+
langs, err := db.GetRepoLanguages(
165
+
rp.db,
166
+
db.FilterEq("repo_at", f.RepoAt),
167
+
db.FilterEq("ref", ref),
168
+
)
169
+
170
+
if err != nil || langs == nil {
171
+
// non-fatal, fetch langs from ks
172
+
ls, err := signedClient.RepoLanguages(f.OwnerDid(), f.RepoName, ref)
173
+
if err != nil {
174
+
return nil, err
175
+
}
176
+
if ls == nil {
177
+
return nil, nil
178
+
}
179
+
for l, s := range ls.Languages {
180
+
langs = append(langs, db.RepoLanguage{
181
+
RepoAt: f.RepoAt,
182
+
Ref: ref,
183
+
Language: l,
184
+
Bytes: s,
185
+
})
186
+
}
187
+
188
+
// update appview's cache
189
+
err = db.InsertRepoLanguages(rp.db, langs)
190
+
if err != nil {
191
+
// non-fatal
192
+
log.Println("failed to cache lang results", err)
193
+
}
170
194
}
171
195
172
-
var totalSize int64
173
-
for _, fileSize := range repoLanguages.Languages {
174
-
totalSize += fileSize
196
+
var total int64
197
+
for _, l := range langs {
198
+
total += l.Bytes
175
199
}
176
200
177
201
var languageStats []types.RepoLanguageDetails
178
-
var otherPercentage float32 = 0
179
-
180
-
for lang, size := range repoLanguages.Languages {
181
-
percentage := (float32(size) / float32(totalSize)) * 100
182
-
183
-
if percentage <= 0.5 {
184
-
otherPercentage += percentage
185
-
continue
186
-
}
187
-
188
-
color := enry.GetColor(lang)
189
-
190
-
languageStats = append(languageStats, types.RepoLanguageDetails{Name: lang, Percentage: percentage, Color: color})
202
+
for _, l := range langs {
203
+
percentage := float32(l.Bytes) / float32(total) * 100
204
+
color := enry.GetColor(l.Language)
205
+
languageStats = append(languageStats, types.RepoLanguageDetails{
206
+
Name: l.Language,
207
+
Percentage: percentage,
208
+
Color: color,
209
+
})
191
210
}
192
211
193
212
sort.Slice(languageStats, func(i, j int) bool {
+56
-12
appview/state/knotstream.go
+56
-12
appview/state/knotstream.go
···
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+
"errors"
6
7
"fmt"
7
8
"slices"
8
9
"time"
···
18
19
"tangled.sh/tangled.sh/core/workflow"
19
20
20
21
"github.com/bluesky-social/indigo/atproto/syntax"
22
+
"github.com/go-git/go-git/v5/plumbing"
21
23
"github.com/posthog/posthog-go"
22
24
)
23
25
···
39
41
40
42
cfg := ec.ConsumerConfig{
41
43
Sources: srcs,
42
-
ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev),
44
+
ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
43
45
RetryInterval: c.Knotstream.RetryInterval,
44
46
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
45
47
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
53
55
return ec.NewConsumer(cfg), nil
54
56
}
55
57
56
-
func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
58
+
func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
57
59
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
58
60
switch msg.Nsid {
59
61
case tangled.GitRefUpdateNSID:
···
81
83
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
82
84
}
83
85
86
+
err1 := populatePunchcard(d, record)
87
+
err2 := updateRepoLanguages(d, record)
88
+
89
+
var err3 error
90
+
if !dev {
91
+
err3 = pc.Enqueue(posthog.Capture{
92
+
DistinctId: record.CommitterDid,
93
+
Event: "git_ref_update",
94
+
})
95
+
}
96
+
97
+
return errors.Join(err1, err2, err3)
98
+
}
99
+
100
+
func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
84
101
knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
85
102
if err != nil {
86
103
return err
87
104
}
105
+
88
106
count := 0
89
107
for _, ke := range knownEmails {
90
108
if record.Meta == nil {
···
108
126
Date: time.Now(),
109
127
Count: count,
110
128
}
111
-
if err := db.AddPunch(d, punch); err != nil {
112
-
return err
129
+
return db.AddPunch(d, punch)
130
+
}
131
+
132
+
func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
133
+
if record.Meta == nil && record.Meta.LangBreakdown == nil {
134
+
return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
113
135
}
114
136
115
-
if !dev {
116
-
err = pc.Enqueue(posthog.Capture{
117
-
DistinctId: record.CommitterDid,
118
-
Event: "git_ref_update",
119
-
})
120
-
if err != nil {
121
-
// non-fatal, TODO: log this
137
+
repos, err := db.GetRepos(
138
+
d,
139
+
db.FilterEq("did", record.RepoDid),
140
+
db.FilterEq("name", record.RepoName),
141
+
)
142
+
if err != nil {
143
+
return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
144
+
}
145
+
if len(repos) != 1 {
146
+
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
147
+
}
148
+
repo := repos[0]
149
+
150
+
ref := plumbing.ReferenceName(record.Ref)
151
+
if !ref.IsBranch() {
152
+
return fmt.Errorf("%s is not a valid reference name", ref)
153
+
}
154
+
155
+
var langs []db.RepoLanguage
156
+
for _, l := range record.Meta.LangBreakdown.Inputs {
157
+
if l == nil {
158
+
continue
122
159
}
160
+
161
+
langs = append(langs, db.RepoLanguage{
162
+
RepoAt: repo.RepoAt(),
163
+
Ref: ref.Short(),
164
+
Language: l.Lang,
165
+
Bytes: l.Size,
166
+
})
123
167
}
124
168
125
-
return nil
169
+
return db.InsertRepoLanguages(d, langs)
126
170
}
127
171
128
172
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {