+8
cmd/photocopy/main.go
+8
cmd/photocopy/main.go
···
70
70
&cli.BoolFlag{
71
71
Name: "with-backfill",
72
72
},
73
+
&cli.StringFlag{
74
+
Name: "nervana-endpoint",
75
+
},
76
+
&cli.StringFlag{
77
+
Name: "nervana-api-key",
78
+
},
73
79
},
74
80
Commands: cli.Commands{
75
81
&cli.Command{
···
121
127
ClickhouseUser: cmd.String("clickhouse-user"),
122
128
ClickhousePass: cmd.String("clickhouse-pass"),
123
129
RatelimitBypassKey: cmd.String("ratelimit-bypass-key"),
130
+
NervanaEndpoint: cmd.String("nervana-endpoint"),
131
+
NervanaApiKey: cmd.String("nervana-api-key"),
124
132
})
125
133
if err != nil {
126
134
panic(err)
+27
handle_create.go
+27
handle_create.go
···
84
84
IndexedAt: indexedAt,
85
85
Did: did,
86
86
Lang: lang,
87
+
Text: rec.Text,
87
88
}
88
89
89
90
if rec.Reply != nil {
···
127
128
128
129
if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
129
130
return err
131
+
}
132
+
133
+
if rec.Text != "" {
134
+
go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) {
135
+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
136
+
defer cancel()
137
+
138
+
nervanaItems, err := p.makeNervanaRequest(ctx, rec.Text)
139
+
if err != nil {
140
+
p.logger.Error("error making nervana items request", "error", err)
141
+
return
142
+
}
143
+
144
+
for _, ni := range nervanaItems {
145
+
postLabel := models.PostLabel{
146
+
Did: did,
147
+
Rkey: rkey,
148
+
Text: ni.Text,
149
+
Label: ni.Label,
150
+
EntityId: ni.EntityId,
151
+
Description: ni.Description,
152
+
Topic: "",
153
+
}
154
+
p.inserters.labelsInserter.Insert(ctx, postLabel)
155
+
}
156
+
}(ctx, rec, did, rkey)
130
157
}
131
158
132
159
return nil
+1
models/post.go
+1
models/post.go
+14
models/post_label.go
+14
models/post_label.go
···
1
+
package models
2
+
3
+
import "time"
4
+
5
+
type PostLabel struct {
6
+
Did string `ch:"did"`
7
+
Rkey string `ch:"rkey"`
8
+
CreatedAt time.Time `ch:"created_at"`
9
+
Text string `ch:"text"`
10
+
Label string `ch:"label"`
11
+
EntityId string `ch:"entity_id"`
12
+
Description string `ch:"description"`
13
+
Topic string `ch:"topic"`
14
+
}
+60
nervana.go
+60
nervana.go
···
1
+
package photocopy
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"encoding/json"
7
+
"fmt"
8
+
"io"
9
+
"net/http"
10
+
)
11
+
12
+
type NervanaItem struct {
13
+
Text string `json:"text"`
14
+
Label string `json:"label"`
15
+
EntityId string `json:"entityId"`
16
+
Description string `json:"description"`
17
+
}
18
+
19
+
func (p *Photocopy) newNervanaRequest(ctx context.Context, text string) (*http.Request, error) {
20
+
payload := map[string]string{
21
+
"text": text,
22
+
"language": "en",
23
+
}
24
+
25
+
b, err := json.Marshal(payload)
26
+
if err != nil {
27
+
return nil, err
28
+
}
29
+
30
+
req, err := http.NewRequestWithContext(ctx, "GET", p.nervanaEndpoint, bytes.NewReader(b))
31
+
32
+
req.Header.Set("Authorization", "Bearer "+p.nervanaApiKey)
33
+
34
+
return req, err
35
+
}
36
+
37
+
func (p *Photocopy) makeNervanaRequest(ctx context.Context, text string) ([]NervanaItem, error) {
38
+
req, err := p.newNervanaRequest(ctx, text)
39
+
if err != nil {
40
+
return nil, err
41
+
}
42
+
43
+
resp, err := p.nervanaClient.Do(req)
44
+
if err != nil {
45
+
return nil, err
46
+
}
47
+
defer resp.Body.Close()
48
+
49
+
if resp.StatusCode != 200 {
50
+
io.Copy(io.Discard, resp.Body)
51
+
return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
52
+
}
53
+
54
+
var nervanaResp []NervanaItem
55
+
if err := json.NewDecoder(resp.Body).Decode(&nervanaResp); err != nil {
56
+
return nil, err
57
+
}
58
+
59
+
return nervanaResp, nil
60
+
}
+29
photocopy.go
+29
photocopy.go
···
32
32
ratelimitBypassKey string
33
33
34
34
conn driver.Conn
35
+
36
+
nervanaClient *http.Client
37
+
nervanaEndpoint string
38
+
nervanaApiKey string
35
39
}
36
40
37
41
type Inserters struct {
···
41
45
plcInserter *clickhouse_inserter.Inserter
42
46
recordsInserter *clickhouse_inserter.Inserter
43
47
deletesInserter *clickhouse_inserter.Inserter
48
+
labelsInserter *clickhouse_inserter.Inserter
44
49
}
45
50
46
51
type Args struct {
···
54
59
ClickhouseUser string
55
60
ClickhousePass string
56
61
RatelimitBypassKey string
62
+
NervanaEndpoint string
63
+
NervanaApiKey string
57
64
}
58
65
59
66
func New(ctx context.Context, args *Args) (*Photocopy, error) {
···
150
157
return nil, err
151
158
}
152
159
160
+
li, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
161
+
PrometheusCounterPrefix: "photocopy_labels",
162
+
Histogram: insertionsHist,
163
+
BatchSize: 100,
164
+
Logger: p.logger,
165
+
Conn: conn,
166
+
Query: "INSERT INTO post_label (did, rkey, text, label, entity_id, description, topic, created_at)",
167
+
RateLimit: 3,
168
+
})
169
+
if err != nil {
170
+
return nil, err
171
+
}
172
+
153
173
is := &Inserters{
154
174
followsInserter: fi,
155
175
postsInserter: pi,
156
176
interactionsInserter: ii,
157
177
recordsInserter: ri,
158
178
deletesInserter: di,
179
+
labelsInserter: li,
159
180
}
160
181
161
182
p.inserters = is
···
189
210
190
211
p.inserters.plcInserter = plci
191
212
p.plcScraper = plcs
213
+
214
+
if args.NervanaApiKey != "" && args.NervanaEndpoint != "" {
215
+
p.nervanaClient = &http.Client{
216
+
Timeout: 5 * time.Second,
217
+
}
218
+
p.nervanaEndpoint = args.NervanaEndpoint
219
+
p.nervanaApiKey = args.NervanaApiKey
220
+
}
192
221
193
222
return p, nil
194
223
}