+5
-3
go.mod
+5
-3
go.mod
···
5
5
require (
6
6
github.com/gorilla/mux v1.8.1
7
7
github.com/lib/pq v1.10.9
8
-
github.com/mattn/go-sqlite3 v1.14.18
9
8
gopkg.in/yaml.v3 v3.0.1
10
9
)
11
10
12
-
require github.com/klauspost/compress v1.18.0
11
+
require github.com/klauspost/compress v1.18.1 // indirect
13
12
14
13
require (
15
14
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
16
15
github.com/gorilla/handlers v1.5.2
16
+
github.com/jackc/pgx/v5 v5.7.6
17
17
)
18
18
19
19
require (
20
+
github.com/atscan/plcbundle v0.0.0-20251027193653-3678d57c1dee // indirect
20
21
github.com/felixge/httpsnoop v1.0.3 // indirect
21
22
github.com/jackc/pgpassfile v1.0.0 // indirect
22
23
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
23
-
github.com/jackc/pgx/v5 v5.7.6 // indirect
24
24
github.com/jackc/puddle/v2 v2.2.2 // indirect
25
+
github.com/kr/text v0.2.0 // indirect
26
+
github.com/rogpeppe/go-internal v1.14.1 // indirect
25
27
golang.org/x/crypto v0.37.0 // indirect
26
28
golang.org/x/sync v0.13.0 // indirect
27
29
golang.org/x/text v0.24.0 // indirect
+19
-5
go.sum
+19
-5
go.sum
···
1
1
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
2
2
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
3
+
github.com/atscan/plcbundle v0.0.0-20251027192009-9350d30fd185 h1:E/fQ1jsaydY6x5JRv+gBiMZVHxKEGD4cK+JxZUZuskU=
4
+
github.com/atscan/plcbundle v0.0.0-20251027192009-9350d30fd185/go.mod h1:vqyqs+zyaxFYtIp6I4+zSQD76oiylnGenzD7ZeA4cxs=
5
+
github.com/atscan/plcbundle v0.0.0-20251027193653-3678d57c1dee h1:wepjgNZxBJGuWmVpplG2BTcoICGafaHALiQoXJV1Iwk=
6
+
github.com/atscan/plcbundle v0.0.0-20251027193653-3678d57c1dee/go.mod h1:vqyqs+zyaxFYtIp6I4+zSQD76oiylnGenzD7ZeA4cxs=
7
+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
3
8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9
+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
10
+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4
11
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
5
12
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
6
13
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
···
15
22
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
16
23
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
17
24
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
18
-
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
19
-
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
25
+
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
26
+
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
27
+
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
28
+
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
29
+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
30
+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
20
31
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
21
32
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
22
-
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
23
-
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
33
+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
24
34
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
35
+
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
36
+
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
25
37
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
26
38
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
27
39
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
40
+
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
41
+
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
28
42
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
29
43
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
30
44
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
31
45
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
32
46
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
33
47
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
34
-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
35
48
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
36
49
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
50
+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
37
51
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
38
52
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
39
53
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+4
-2
internal/api/server.go
+4
-2
internal/api/server.go
···
25
25
}
26
26
27
27
func NewServer(db storage.Database, apiCfg config.APIConfig, plcCfg config.PLCConfig) *Server {
28
-
bundleManager, _ := plc.NewBundleManager(plcCfg.BundleDir, plcCfg.UseCache, db, plcCfg.IndexDIDs)
28
+
bundleManager, err := plc.NewBundleManager(plcCfg.BundleDir, plcCfg.DirectoryURL, db, plcCfg.IndexDIDs)
29
+
if err != nil {
30
+
log.Fatal("Failed to create bundle manager: %v", err)
31
+
}
29
32
30
33
s := &Server{
31
34
router: mux.NewRouter(),
32
35
db: db,
33
-
plcClient: plc.NewClient(plcCfg.DirectoryURL),
34
36
plcBundleDir: plcCfg.BundleDir,
35
37
bundleManager: bundleManager,
36
38
plcIndexDIDs: plcCfg.IndexDIDs,
-696
internal/plc/bundle.go
-696
internal/plc/bundle.go
···
1
-
package plc
2
-
3
-
import (
4
-
"bufio"
5
-
"bytes"
6
-
"context"
7
-
"crypto/sha256"
8
-
"encoding/hex"
9
-
"encoding/json"
10
-
"fmt"
11
-
"os"
12
-
"path/filepath"
13
-
"time"
14
-
15
-
"github.com/atscan/atscanner/internal/log"
16
-
"github.com/atscan/atscanner/internal/storage"
17
-
"github.com/klauspost/compress/zstd"
18
-
)
19
-
20
-
const BUNDLE_SIZE = 10000
21
-
22
-
type BundleManager struct {
23
-
dir string
24
-
enabled bool
25
-
encoder *zstd.Encoder
26
-
decoder *zstd.Decoder
27
-
db storage.Database
28
-
indexDIDs bool
29
-
}
30
-
31
-
// ===== INITIALIZATION =====
32
-
33
-
func NewBundleManager(dir string, enabled bool, db storage.Database, indexDIDs bool) (*BundleManager, error) {
34
-
if !enabled {
35
-
log.Verbose("BundleManager disabled (enabled=false)")
36
-
return &BundleManager{enabled: false}, nil
37
-
}
38
-
39
-
if err := os.MkdirAll(dir, 0755); err != nil {
40
-
return nil, fmt.Errorf("failed to create bundle dir: %w", err)
41
-
}
42
-
43
-
encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
44
-
if err != nil {
45
-
return nil, err
46
-
}
47
-
48
-
decoder, err := zstd.NewReader(nil)
49
-
if err != nil {
50
-
return nil, err
51
-
}
52
-
53
-
log.Verbose("BundleManager initialized: enabled=%v, indexDIDs=%v, dir=%s", enabled, indexDIDs, dir)
54
-
55
-
return &BundleManager{
56
-
dir: dir,
57
-
enabled: enabled,
58
-
encoder: encoder,
59
-
decoder: decoder,
60
-
db: db,
61
-
indexDIDs: indexDIDs,
62
-
}, nil
63
-
}
64
-
65
-
func (bm *BundleManager) Close() {
66
-
if bm.encoder != nil {
67
-
bm.encoder.Close()
68
-
}
69
-
if bm.decoder != nil {
70
-
bm.decoder.Close()
71
-
}
72
-
}
73
-
74
-
// ===== BUNDLE FILE ABSTRACTION =====
75
-
76
-
type bundleFile struct {
77
-
path string
78
-
operations []PLCOperation
79
-
uncompressedHash string
80
-
compressedHash string
81
-
}
82
-
83
-
func (bm *BundleManager) newBundleFile(bundleNum int) *bundleFile {
84
-
return &bundleFile{
85
-
path: filepath.Join(bm.dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)),
86
-
}
87
-
}
88
-
89
-
func (bf *bundleFile) exists() bool {
90
-
_, err := os.Stat(bf.path)
91
-
return err == nil
92
-
}
93
-
94
-
func (bm *BundleManager) load(bf *bundleFile) error {
95
-
compressed, err := os.ReadFile(bf.path)
96
-
if err != nil {
97
-
return fmt.Errorf("read failed: %w", err)
98
-
}
99
-
100
-
decompressed, err := bm.decoder.DecodeAll(compressed, nil)
101
-
if err != nil {
102
-
return fmt.Errorf("decompress failed: %w", err)
103
-
}
104
-
105
-
bf.operations = bm.parseJSONL(decompressed)
106
-
return nil
107
-
}
108
-
109
-
func (bm *BundleManager) save(bf *bundleFile) error {
110
-
jsonlData := bm.serializeJSONL(bf.operations)
111
-
bf.uncompressedHash = bm.hash(jsonlData)
112
-
113
-
compressed := bm.encoder.EncodeAll(jsonlData, nil)
114
-
bf.compressedHash = bm.hash(compressed)
115
-
116
-
return os.WriteFile(bf.path, compressed, 0644)
117
-
}
118
-
119
-
func (bm *BundleManager) parseJSONL(data []byte) []PLCOperation {
120
-
var ops []PLCOperation
121
-
scanner := bufio.NewScanner(bytes.NewReader(data))
122
-
123
-
for scanner.Scan() {
124
-
line := scanner.Bytes()
125
-
if len(line) == 0 {
126
-
continue
127
-
}
128
-
129
-
var op PLCOperation
130
-
if err := json.Unmarshal(line, &op); err == nil {
131
-
op.RawJSON = append([]byte(nil), line...)
132
-
ops = append(ops, op)
133
-
}
134
-
}
135
-
136
-
return ops
137
-
}
138
-
139
-
func (bm *BundleManager) serializeJSONL(ops []PLCOperation) []byte {
140
-
var buf []byte
141
-
for _, op := range ops {
142
-
buf = append(buf, op.RawJSON...)
143
-
buf = append(buf, '\n')
144
-
}
145
-
return buf
146
-
}
147
-
148
-
// ===== BUNDLE FETCHING =====
149
-
150
-
type bundleFetcher struct {
151
-
client *Client
152
-
seenCIDs map[string]bool
153
-
currentAfter string
154
-
fetchCount int
155
-
}
156
-
157
-
func newBundleFetcher(client *Client, afterTime string, prevBoundaryCIDs map[string]bool) *bundleFetcher {
158
-
seen := make(map[string]bool)
159
-
for cid := range prevBoundaryCIDs {
160
-
seen[cid] = true
161
-
}
162
-
163
-
return &bundleFetcher{
164
-
client: client,
165
-
seenCIDs: seen,
166
-
currentAfter: afterTime,
167
-
}
168
-
}
169
-
170
-
func (bf *bundleFetcher) fetchUntilComplete(ctx context.Context, target int) ([]PLCOperation, bool) {
171
-
var ops []PLCOperation
172
-
maxFetches := (target / 900) + 5
173
-
174
-
for len(ops) < target && bf.fetchCount < maxFetches {
175
-
bf.fetchCount++
176
-
batchSize := bf.calculateBatchSize(target - len(ops))
177
-
178
-
log.Verbose(" Fetch #%d: need %d more, requesting %d", bf.fetchCount, target-len(ops), batchSize)
179
-
180
-
batch, shouldContinue := bf.fetchBatch(ctx, batchSize)
181
-
182
-
for _, op := range batch {
183
-
if !bf.seenCIDs[op.CID] {
184
-
bf.seenCIDs[op.CID] = true
185
-
ops = append(ops, op)
186
-
187
-
if len(ops) >= target {
188
-
return ops[:target], true
189
-
}
190
-
}
191
-
}
192
-
193
-
if !shouldContinue {
194
-
break
195
-
}
196
-
}
197
-
198
-
return ops, len(ops) >= target
199
-
}
200
-
201
-
func (bf *bundleFetcher) calculateBatchSize(remaining int) int {
202
-
if bf.fetchCount == 0 {
203
-
return 1000
204
-
}
205
-
if remaining < 100 {
206
-
return 50
207
-
}
208
-
if remaining < 500 {
209
-
return 200
210
-
}
211
-
return 1000
212
-
}
213
-
214
-
func (bf *bundleFetcher) fetchBatch(ctx context.Context, size int) ([]PLCOperation, bool) {
215
-
ops, err := bf.client.Export(ctx, ExportOptions{
216
-
Count: size,
217
-
After: bf.currentAfter,
218
-
})
219
-
220
-
if err != nil || len(ops) == 0 {
221
-
return nil, false
222
-
}
223
-
224
-
if len(ops) > 0 {
225
-
bf.currentAfter = ops[len(ops)-1].CreatedAt.Format(time.RFC3339Nano)
226
-
}
227
-
228
-
return ops, len(ops) >= size
229
-
}
230
-
231
-
// ===== MAIN BUNDLE LOADING =====
232
-
233
-
func (bm *BundleManager) LoadBundle(ctx context.Context, bundleNum int, plcClient *Client) ([]PLCOperation, bool, error) {
234
-
if !bm.enabled {
235
-
return nil, false, fmt.Errorf("bundle manager disabled")
236
-
}
237
-
238
-
bf := bm.newBundleFile(bundleNum)
239
-
240
-
// Try local file first
241
-
if bf.exists() {
242
-
return bm.loadFromFile(ctx, bundleNum, bf)
243
-
}
244
-
245
-
// Fetch from PLC
246
-
return bm.fetchFromPLC(ctx, bundleNum, bf, plcClient)
247
-
}
248
-
249
-
func (bm *BundleManager) loadFromFile(ctx context.Context, bundleNum int, bf *bundleFile) ([]PLCOperation, bool, error) {
250
-
log.Verbose("→ Loading bundle %06d from local file", bundleNum)
251
-
252
-
// Verify hash if bundle is in DB
253
-
if dbBundle, err := bm.db.GetBundleByNumber(ctx, bundleNum); err == nil && dbBundle != nil {
254
-
if err := bm.verifyHash(bf.path, dbBundle.CompressedHash); err != nil {
255
-
log.Error("⚠ Hash mismatch for bundle %06d! Re-fetching...", bundleNum)
256
-
os.Remove(bf.path)
257
-
return nil, false, fmt.Errorf("hash mismatch")
258
-
}
259
-
log.Verbose("✓ Hash verified for bundle %06d", bundleNum)
260
-
}
261
-
262
-
if err := bm.load(bf); err != nil {
263
-
return nil, false, err
264
-
}
265
-
266
-
// Index if not in DB
267
-
if _, err := bm.db.GetBundleByNumber(ctx, bundleNum); err != nil {
268
-
bf.compressedHash = bm.hashFile(bf.path)
269
-
bf.uncompressedHash = bm.hash(bm.serializeJSONL(bf.operations))
270
-
271
-
// Calculate cursor from previous bundle
272
-
cursor := bm.calculateCursor(ctx, bundleNum)
273
-
274
-
bm.indexBundle(ctx, bundleNum, bf, cursor)
275
-
}
276
-
277
-
return bf.operations, true, nil
278
-
}
279
-
280
-
func (bm *BundleManager) fetchFromPLC(ctx context.Context, bundleNum int, bf *bundleFile, client *Client) ([]PLCOperation, bool, error) {
281
-
log.Info("→ Bundle %06d not found locally, fetching from PLC directory...", bundleNum)
282
-
283
-
afterTime, prevCIDs := bm.getBoundaryInfo(ctx, bundleNum)
284
-
fetcher := newBundleFetcher(client, afterTime, prevCIDs)
285
-
286
-
ops, isComplete := fetcher.fetchUntilComplete(ctx, BUNDLE_SIZE)
287
-
288
-
log.Info(" Collected %d unique operations after %d fetches (complete=%v)",
289
-
len(ops), fetcher.fetchCount, isComplete)
290
-
291
-
if isComplete {
292
-
bf.operations = ops
293
-
if err := bm.save(bf); err != nil {
294
-
log.Error("Warning: failed to save bundle: %v", err)
295
-
} else {
296
-
// The cursor is the afterTime that was used to fetch this bundle
297
-
cursor := afterTime
298
-
bm.indexBundle(ctx, bundleNum, bf, cursor)
299
-
log.Info("✓ Bundle %06d saved [%d ops, hash: %s..., cursor: %s]",
300
-
bundleNum, len(ops), bf.uncompressedHash[:16], cursor)
301
-
}
302
-
}
303
-
304
-
return ops, isComplete, nil
305
-
}
306
-
307
-
func (bm *BundleManager) getBoundaryInfo(ctx context.Context, bundleNum int) (string, map[string]bool) {
308
-
if bundleNum == 1 {
309
-
return "", nil
310
-
}
311
-
312
-
prevBundle, err := bm.db.GetBundleByNumber(ctx, bundleNum-1)
313
-
if err != nil {
314
-
return "", nil
315
-
}
316
-
317
-
afterTime := prevBundle.EndTime.Format(time.RFC3339Nano)
318
-
319
-
// Return stored boundary CIDs if available
320
-
if len(prevBundle.BoundaryCIDs) > 0 {
321
-
cids := make(map[string]bool)
322
-
for _, cid := range prevBundle.BoundaryCIDs {
323
-
cids[cid] = true
324
-
}
325
-
return afterTime, cids
326
-
}
327
-
328
-
// Fallback: compute from file
329
-
bf := bm.newBundleFile(bundleNum - 1)
330
-
if bf.exists() {
331
-
if err := bm.load(bf); err == nil {
332
-
_, cids := GetBoundaryCIDs(bf.operations)
333
-
return afterTime, cids
334
-
}
335
-
}
336
-
337
-
return afterTime, nil
338
-
}
339
-
340
-
// ===== BUNDLE INDEXING =====
341
-
342
-
func (bm *BundleManager) indexBundle(ctx context.Context, bundleNum int, bf *bundleFile, cursor string) error {
343
-
log.Verbose("indexBundle called for bundle %06d: indexDIDs=%v", bundleNum, bm.indexDIDs)
344
-
345
-
prevHash := ""
346
-
if bundleNum > 1 {
347
-
if prev, err := bm.db.GetBundleByNumber(ctx, bundleNum-1); err == nil {
348
-
prevHash = prev.Hash
349
-
}
350
-
}
351
-
352
-
dids := bm.extractUniqueDIDs(bf.operations)
353
-
log.Verbose("Extracted %d unique DIDs from bundle %06d", len(dids), bundleNum)
354
-
355
-
compressedFileSize := bm.getFileSize(bf.path)
356
-
357
-
// Calculate uncompressed size
358
-
uncompressedSize := int64(0)
359
-
for _, op := range bf.operations {
360
-
uncompressedSize += int64(len(op.RawJSON)) + 1
361
-
}
362
-
363
-
// Get time range from operations
364
-
firstSeenAt := bf.operations[0].CreatedAt
365
-
lastSeenAt := bf.operations[len(bf.operations)-1].CreatedAt
366
-
367
-
bundle := &storage.PLCBundle{
368
-
BundleNumber: bundleNum,
369
-
StartTime: firstSeenAt,
370
-
EndTime: lastSeenAt,
371
-
DIDCount: len(dids),
372
-
Hash: bf.uncompressedHash,
373
-
CompressedHash: bf.compressedHash,
374
-
CompressedSize: compressedFileSize,
375
-
UncompressedSize: uncompressedSize,
376
-
Cursor: cursor,
377
-
PrevBundleHash: prevHash,
378
-
Compressed: true,
379
-
CreatedAt: time.Now().UTC(),
380
-
}
381
-
382
-
log.Verbose("About to create bundle %06d in database (DIDCount=%d)", bundleNum, bundle.DIDCount)
383
-
384
-
// Create bundle first
385
-
if err := bm.db.CreateBundle(ctx, bundle); err != nil {
386
-
log.Error("Failed to create bundle %06d in database: %v", bundleNum, err)
387
-
return err
388
-
}
389
-
390
-
log.Verbose("Bundle %06d created successfully in database", bundleNum)
391
-
392
-
// Index DIDs if enabled
393
-
if bm.indexDIDs {
394
-
start := time.Now()
395
-
log.Verbose("Starting DID indexing for bundle %06d: %d unique DIDs", bundleNum, len(dids))
396
-
397
-
// Extract handle and PDS for each DID
398
-
didInfoMap := ExtractDIDInfoMap(bf.operations)
399
-
log.Verbose("Extracted info for %d DIDs from operations", len(didInfoMap))
400
-
401
-
successCount := 0
402
-
errorCount := 0
403
-
invalidHandleCount := 0
404
-
405
-
// Upsert each DID with handle, pds, and bundle number
406
-
for did, info := range didInfoMap {
407
-
validHandle := ValidateHandle(info.Handle)
408
-
if info.Handle != "" && validHandle == "" {
409
-
//log.Verbose("Bundle %06d: Skipping invalid handle for DID %s (length: %d)", bundleNum, did, len(info.Handle))
410
-
invalidHandleCount++
411
-
}
412
-
413
-
if err := bm.db.UpsertDID(ctx, did, bundleNum, validHandle, info.PDS); err != nil {
414
-
log.Error("Failed to index DID %s for bundle %06d: %v", did, bundleNum, err)
415
-
errorCount++
416
-
} else {
417
-
successCount++
418
-
}
419
-
}
420
-
421
-
elapsed := time.Since(start)
422
-
log.Info("✓ Indexed bundle %06d: %d DIDs succeeded, %d errors, %d invalid handles in %v",
423
-
bundleNum, successCount, errorCount, invalidHandleCount, elapsed)
424
-
} else {
425
-
log.Verbose("⊘ Skipped DID indexing for bundle %06d (disabled in config)", bundleNum)
426
-
}
427
-
428
-
return nil
429
-
}
430
-
431
-
func (bm *BundleManager) extractUniqueDIDs(ops []PLCOperation) []string {
432
-
didSet := make(map[string]bool)
433
-
for _, op := range ops {
434
-
didSet[op.DID] = true
435
-
}
436
-
437
-
dids := make([]string, 0, len(didSet))
438
-
for did := range didSet {
439
-
dids = append(dids, did)
440
-
}
441
-
return dids
442
-
}
443
-
444
-
// ===== MEMPOOL BUNDLE CREATION =====
445
-
446
-
func (bm *BundleManager) CreateBundleFromMempool(ctx context.Context, operations []PLCOperation, cursor string) (int, error) {
447
-
if !bm.enabled {
448
-
return 0, fmt.Errorf("bundle manager disabled")
449
-
}
450
-
451
-
if len(operations) != BUNDLE_SIZE {
452
-
return 0, fmt.Errorf("bundle must have exactly %d operations, got %d", BUNDLE_SIZE, len(operations))
453
-
}
454
-
455
-
lastBundle, err := bm.db.GetLastBundleNumber(ctx)
456
-
if err != nil {
457
-
return 0, err
458
-
}
459
-
bundleNum := lastBundle + 1
460
-
461
-
bf := bm.newBundleFile(bundleNum)
462
-
bf.operations = operations
463
-
464
-
if err := bm.save(bf); err != nil {
465
-
return 0, err
466
-
}
467
-
468
-
if err := bm.indexBundle(ctx, bundleNum, bf, cursor); err != nil {
469
-
return 0, err
470
-
}
471
-
472
-
log.Info("✓ Created bundle %06d from mempool (hash: %s...)",
473
-
bundleNum, bf.uncompressedHash[:16])
474
-
475
-
return bundleNum, nil
476
-
}
477
-
478
-
// ===== VERIFICATION =====
479
-
480
-
func (bm *BundleManager) VerifyChain(ctx context.Context, endBundle int) error {
481
-
if !bm.enabled {
482
-
return fmt.Errorf("bundle manager disabled")
483
-
}
484
-
485
-
log.Info("Verifying bundle chain from 1 to %06d...", endBundle)
486
-
487
-
for i := 1; i <= endBundle; i++ {
488
-
bundle, err := bm.db.GetBundleByNumber(ctx, i)
489
-
if err != nil {
490
-
return fmt.Errorf("bundle %06d not found: %w", i, err)
491
-
}
492
-
493
-
// Verify file hash
494
-
path := bm.newBundleFile(i).path
495
-
if err := bm.verifyHash(path, bundle.CompressedHash); err != nil {
496
-
return fmt.Errorf("bundle %06d hash verification failed: %w", i, err)
497
-
}
498
-
499
-
// Verify chain link
500
-
if i > 1 {
501
-
prevBundle, err := bm.db.GetBundleByNumber(ctx, i-1)
502
-
if err != nil {
503
-
return fmt.Errorf("bundle %06d missing (required by %06d)", i-1, i)
504
-
}
505
-
506
-
if bundle.PrevBundleHash != prevBundle.Hash {
507
-
return fmt.Errorf("bundle %06d chain broken! Expected prev_hash=%s, got=%s",
508
-
i, prevBundle.Hash[:16], bundle.PrevBundleHash[:16])
509
-
}
510
-
}
511
-
512
-
if i%100 == 0 {
513
-
log.Verbose(" ✓ Verified bundles 1-%06d", i)
514
-
}
515
-
}
516
-
517
-
log.Info("✓ Chain verification complete: bundles 1-%06d are valid and continuous", endBundle)
518
-
return nil
519
-
}
520
-
521
-
func (bm *BundleManager) EnsureBundleContinuity(ctx context.Context, targetBundle int) error {
522
-
if !bm.enabled {
523
-
return nil
524
-
}
525
-
526
-
for i := 1; i < targetBundle; i++ {
527
-
if !bm.newBundleFile(i).exists() {
528
-
if _, err := bm.db.GetBundleByNumber(ctx, i); err != nil {
529
-
return fmt.Errorf("bundle %06d is missing (required for continuity)", i)
530
-
}
531
-
}
532
-
}
533
-
534
-
return nil
535
-
}
536
-
537
-
// ===== UTILITY METHODS =====
538
-
539
-
func (bm *BundleManager) hash(data []byte) string {
540
-
h := sha256.Sum256(data)
541
-
return hex.EncodeToString(h[:])
542
-
}
543
-
544
-
func (bm *BundleManager) hashFile(path string) string {
545
-
data, _ := os.ReadFile(path)
546
-
return bm.hash(data)
547
-
}
548
-
549
-
func (bm *BundleManager) verifyHash(path, expectedHash string) error {
550
-
if expectedHash == "" {
551
-
return nil
552
-
}
553
-
554
-
actualHash := bm.hashFile(path)
555
-
if actualHash != expectedHash {
556
-
return fmt.Errorf("hash mismatch")
557
-
}
558
-
return nil
559
-
}
560
-
561
-
func (bm *BundleManager) getFileSize(path string) int64 {
562
-
if info, err := os.Stat(path); err == nil {
563
-
return info.Size()
564
-
}
565
-
return 0
566
-
}
567
-
568
-
func (bm *BundleManager) GetStats(ctx context.Context) (int64, int64, int64, int64, error) {
569
-
if !bm.enabled {
570
-
return 0, 0, 0, 0, nil
571
-
}
572
-
return bm.db.GetBundleStats(ctx)
573
-
}
574
-
575
-
func (bm *BundleManager) GetChainInfo(ctx context.Context) (map[string]interface{}, error) {
576
-
lastBundle, err := bm.db.GetLastBundleNumber(ctx)
577
-
if err != nil {
578
-
return nil, err
579
-
}
580
-
581
-
if lastBundle == 0 {
582
-
return map[string]interface{}{
583
-
"chain_length": 0,
584
-
"status": "empty",
585
-
}, nil
586
-
}
587
-
588
-
firstBundle, _ := bm.db.GetBundleByNumber(ctx, 1)
589
-
lastBundleData, _ := bm.db.GetBundleByNumber(ctx, lastBundle)
590
-
591
-
return map[string]interface{}{
592
-
"chain_length": lastBundle,
593
-
"first_bundle": 1,
594
-
"last_bundle": lastBundle,
595
-
"chain_start_time": firstBundle.StartTime,
596
-
"chain_end_time": lastBundleData.EndTime,
597
-
"chain_head_hash": lastBundleData.Hash,
598
-
}, nil
599
-
}
600
-
601
-
// ===== EXPORTED HELPERS =====
602
-
603
-
func GetBoundaryCIDs(operations []PLCOperation) (time.Time, map[string]bool) {
604
-
if len(operations) == 0 {
605
-
return time.Time{}, nil
606
-
}
607
-
608
-
lastOp := operations[len(operations)-1]
609
-
boundaryTime := lastOp.CreatedAt
610
-
cidSet := make(map[string]bool)
611
-
612
-
for i := len(operations) - 1; i >= 0; i-- {
613
-
op := operations[i]
614
-
if op.CreatedAt.Equal(boundaryTime) {
615
-
cidSet[op.CID] = true
616
-
} else {
617
-
break
618
-
}
619
-
}
620
-
621
-
return boundaryTime, cidSet
622
-
}
623
-
624
-
func StripBoundaryDuplicates(operations []PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []PLCOperation {
625
-
if len(operations) == 0 {
626
-
return operations
627
-
}
628
-
629
-
boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp)
630
-
if err != nil {
631
-
return operations
632
-
}
633
-
634
-
startIdx := 0
635
-
for startIdx < len(operations) {
636
-
op := operations[startIdx]
637
-
638
-
if op.CreatedAt.After(boundaryTime) {
639
-
break
640
-
}
641
-
642
-
if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] {
643
-
startIdx++
644
-
continue
645
-
}
646
-
647
-
break
648
-
}
649
-
650
-
return operations[startIdx:]
651
-
}
652
-
653
-
// LoadBundleOperations is a public method for external access (e.g., API handlers)
654
-
func (bm *BundleManager) LoadBundleOperations(ctx context.Context, bundleNum int) ([]PLCOperation, error) {
655
-
if !bm.enabled {
656
-
return nil, fmt.Errorf("bundle manager disabled")
657
-
}
658
-
659
-
bf := bm.newBundleFile(bundleNum)
660
-
661
-
if !bf.exists() {
662
-
return nil, fmt.Errorf("bundle %06d not found", bundleNum)
663
-
}
664
-
665
-
if err := bm.load(bf); err != nil {
666
-
return nil, err
667
-
}
668
-
669
-
return bf.operations, nil
670
-
}
671
-
672
-
// calculateCursor determines the cursor value for a given bundle
673
-
// For bundle 1: returns empty string
674
-
// For bundle N: returns the end_time of bundle N-1 in RFC3339Nano format
675
-
func (bm *BundleManager) calculateCursor(ctx context.Context, bundleNum int) string {
676
-
if bundleNum == 1 {
677
-
return ""
678
-
}
679
-
680
-
// Try to get cursor from previous bundle in DB
681
-
if prevBundle, err := bm.db.GetBundleByNumber(ctx, bundleNum-1); err == nil {
682
-
return prevBundle.EndTime.Format(time.RFC3339Nano)
683
-
}
684
-
685
-
// If previous bundle not in DB, try to load it from file
686
-
prevBf := bm.newBundleFile(bundleNum - 1)
687
-
if prevBf.exists() {
688
-
if err := bm.load(prevBf); err == nil && len(prevBf.operations) > 0 {
689
-
// Return the createdAt of the last operation in previous bundle
690
-
lastOp := prevBf.operations[len(prevBf.operations)-1]
691
-
return lastOp.CreatedAt.Format(time.RFC3339Nano)
692
-
}
693
-
}
694
-
695
-
return ""
696
-
}
-237
internal/plc/client.go
-237
internal/plc/client.go
···
1
-
package plc
2
-
3
-
import (
4
-
"bufio"
5
-
"context"
6
-
"encoding/json"
7
-
"fmt"
8
-
"io"
9
-
"net/http"
10
-
"strconv"
11
-
"time"
12
-
13
-
"github.com/atscan/atscanner/internal/log"
14
-
)
15
-
16
-
type Client struct {
17
-
baseURL string
18
-
httpClient *http.Client
19
-
rateLimiter *RateLimiter
20
-
}
21
-
22
-
func NewClient(baseURL string) *Client {
23
-
// Rate limit: 90 requests per minute (leaving buffer below 100/min limit)
24
-
rateLimiter := NewRateLimiter(90, time.Minute)
25
-
26
-
return &Client{
27
-
baseURL: baseURL,
28
-
httpClient: &http.Client{
29
-
Timeout: 60 * time.Second,
30
-
},
31
-
rateLimiter: rateLimiter,
32
-
}
33
-
}
34
-
35
-
func (c *Client) Close() {
36
-
if c.rateLimiter != nil {
37
-
c.rateLimiter.Stop()
38
-
}
39
-
}
40
-
41
-
type ExportOptions struct {
42
-
Count int
43
-
After string // ISO 8601 datetime string
44
-
}
45
-
46
-
// Export fetches export data from PLC directory with rate limiting and retry
47
-
func (c *Client) Export(ctx context.Context, opts ExportOptions) ([]PLCOperation, error) {
48
-
return c.exportWithRetry(ctx, opts, 5)
49
-
}
50
-
51
-
// exportWithRetry implements retry logic with exponential backoff for rate limits
52
-
func (c *Client) exportWithRetry(ctx context.Context, opts ExportOptions, maxRetries int) ([]PLCOperation, error) {
53
-
var lastErr error
54
-
backoff := 1 * time.Second
55
-
56
-
for attempt := 1; attempt <= maxRetries; attempt++ {
57
-
// Wait for rate limiter token
58
-
if err := c.rateLimiter.Wait(ctx); err != nil {
59
-
return nil, err
60
-
}
61
-
62
-
operations, retryAfter, err := c.doExport(ctx, opts)
63
-
64
-
if err == nil {
65
-
return operations, nil
66
-
}
67
-
68
-
lastErr = err
69
-
70
-
// Check if it's a rate limit error (429)
71
-
if retryAfter > 0 {
72
-
log.Info("⚠ Rate limited by PLC directory, waiting %v before retry %d/%d",
73
-
retryAfter, attempt, maxRetries)
74
-
75
-
select {
76
-
case <-time.After(retryAfter):
77
-
continue
78
-
case <-ctx.Done():
79
-
return nil, ctx.Err()
80
-
}
81
-
}
82
-
83
-
// Other errors - exponential backoff
84
-
if attempt < maxRetries {
85
-
log.Verbose("Request failed (attempt %d/%d): %v, retrying in %v",
86
-
attempt, maxRetries, err, backoff)
87
-
88
-
select {
89
-
case <-time.After(backoff):
90
-
backoff *= 2 // Exponential backoff
91
-
case <-ctx.Done():
92
-
return nil, ctx.Err()
93
-
}
94
-
}
95
-
}
96
-
97
-
return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr)
98
-
}
99
-
100
-
// doExport performs the actual HTTP request
101
-
func (c *Client) doExport(ctx context.Context, opts ExportOptions) ([]PLCOperation, time.Duration, error) {
102
-
url := fmt.Sprintf("%s/export", c.baseURL)
103
-
104
-
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
105
-
if err != nil {
106
-
return nil, 0, err
107
-
}
108
-
109
-
// Add query parameters
110
-
q := req.URL.Query()
111
-
if opts.Count > 0 {
112
-
q.Add("count", fmt.Sprintf("%d", opts.Count))
113
-
}
114
-
if opts.After != "" {
115
-
q.Add("after", opts.After)
116
-
}
117
-
req.URL.RawQuery = q.Encode()
118
-
119
-
resp, err := c.httpClient.Do(req)
120
-
if err != nil {
121
-
return nil, 0, fmt.Errorf("request failed: %w", err)
122
-
}
123
-
defer resp.Body.Close()
124
-
125
-
// Handle rate limiting (429)
126
-
if resp.StatusCode == http.StatusTooManyRequests {
127
-
retryAfter := parseRetryAfter(resp)
128
-
129
-
// Also check x-ratelimit headers for info
130
-
if limit := resp.Header.Get("x-ratelimit-limit"); limit != "" {
131
-
log.Verbose("Rate limit: %s", limit)
132
-
}
133
-
134
-
return nil, retryAfter, fmt.Errorf("rate limited (429)")
135
-
}
136
-
137
-
if resp.StatusCode != http.StatusOK {
138
-
body, _ := io.ReadAll(resp.Body)
139
-
return nil, 0, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
140
-
}
141
-
142
-
var operations []PLCOperation
143
-
144
-
// PLC export returns newline-delimited JSON
145
-
scanner := bufio.NewScanner(resp.Body)
146
-
buf := make([]byte, 0, 64*1024)
147
-
scanner.Buffer(buf, 1024*1024)
148
-
149
-
lineCount := 0
150
-
for scanner.Scan() {
151
-
lineCount++
152
-
line := scanner.Bytes()
153
-
154
-
if len(line) == 0 {
155
-
continue
156
-
}
157
-
158
-
var op PLCOperation
159
-
if err := json.Unmarshal(line, &op); err != nil {
160
-
log.Error("Warning: failed to parse operation on line %d: %v", lineCount, err)
161
-
continue
162
-
}
163
-
164
-
// CRITICAL: Store the original raw JSON bytes
165
-
op.RawJSON = make([]byte, len(line))
166
-
copy(op.RawJSON, line)
167
-
168
-
operations = append(operations, op)
169
-
}
170
-
171
-
if err := scanner.Err(); err != nil {
172
-
return nil, 0, fmt.Errorf("error reading response: %w", err)
173
-
}
174
-
175
-
return operations, 0, nil
176
-
177
-
}
178
-
179
-
// parseRetryAfter parses the Retry-After header
180
-
func parseRetryAfter(resp *http.Response) time.Duration {
181
-
retryAfter := resp.Header.Get("Retry-After")
182
-
if retryAfter == "" {
183
-
// Default to 5 minutes if no header
184
-
return 5 * time.Minute
185
-
}
186
-
187
-
// Try parsing as seconds
188
-
if seconds, err := strconv.Atoi(retryAfter); err == nil {
189
-
return time.Duration(seconds) * time.Second
190
-
}
191
-
192
-
// Try parsing as HTTP date
193
-
if t, err := http.ParseTime(retryAfter); err == nil {
194
-
return time.Until(t)
195
-
}
196
-
197
-
// Default
198
-
return 5 * time.Minute
199
-
}
200
-
201
-
// GetDID fetches a specific DID document from PLC
202
-
func (c *Client) GetDID(ctx context.Context, did string) (*DIDDocument, error) {
203
-
// Wait for rate limiter
204
-
if err := c.rateLimiter.Wait(ctx); err != nil {
205
-
return nil, err
206
-
}
207
-
208
-
url := fmt.Sprintf("%s/%s", c.baseURL, did)
209
-
210
-
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
211
-
if err != nil {
212
-
return nil, err
213
-
}
214
-
215
-
resp, err := c.httpClient.Do(req)
216
-
if err != nil {
217
-
return nil, err
218
-
}
219
-
defer resp.Body.Close()
220
-
221
-
if resp.StatusCode == http.StatusTooManyRequests {
222
-
retryAfter := parseRetryAfter(resp)
223
-
return nil, fmt.Errorf("rate limited, retry after %v", retryAfter)
224
-
}
225
-
226
-
if resp.StatusCode != http.StatusOK {
227
-
body, _ := io.ReadAll(resp.Body)
228
-
return nil, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
229
-
}
230
-
231
-
var doc DIDDocument
232
-
if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil {
233
-
return nil, err
234
-
}
235
-
236
-
return &doc, nil
237
-
}
+173
internal/plc/manager.go
+173
internal/plc/manager.go
···
1
+
package plc
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"time"
7
+
8
+
"github.com/atscan/atscanner/internal/log"
9
+
"github.com/atscan/atscanner/internal/storage"
10
+
plcbundle "github.com/atscan/plcbundle"
11
+
)
12
+
13
+
// BundleManager wraps the library's manager with database integration
14
+
type BundleManager struct {
15
+
libManager *plcbundle.Manager
16
+
db storage.Database
17
+
bundleDir string
18
+
indexDIDs bool
19
+
}
20
+
21
+
func NewBundleManager(bundleDir string, plcURL string, db storage.Database, indexDIDs bool) (*BundleManager, error) {
22
+
// Create library config
23
+
config := plcbundle.DefaultConfig(bundleDir)
24
+
25
+
// Create PLC client
26
+
var client *plcbundle.PLCClient
27
+
if plcURL != "" {
28
+
client = plcbundle.NewPLCClient(plcURL)
29
+
}
30
+
31
+
// Create library manager
32
+
libMgr, err := plcbundle.NewManager(config, client)
33
+
if err != nil {
34
+
return nil, fmt.Errorf("failed to create library manager: %w", err)
35
+
}
36
+
37
+
return &BundleManager{
38
+
libManager: libMgr,
39
+
db: db,
40
+
bundleDir: bundleDir,
41
+
indexDIDs: indexDIDs,
42
+
}, nil
43
+
}
44
+
45
+
func (bm *BundleManager) Close() {
46
+
if bm.libManager != nil {
47
+
bm.libManager.Close()
48
+
}
49
+
}
50
+
51
+
// LoadBundle loads a bundle (from library) and returns operations
52
+
func (bm *BundleManager) LoadBundleOperations(ctx context.Context, bundleNum int) ([]PLCOperation, error) {
53
+
bundle, err := bm.libManager.LoadBundle(ctx, bundleNum)
54
+
if err != nil {
55
+
return nil, err
56
+
}
57
+
return bundle.Operations, nil
58
+
}
59
+
60
+
// LoadBundle loads a full bundle with metadata
61
+
func (bm *BundleManager) LoadBundle(ctx context.Context, bundleNum int) (*plcbundle.Bundle, error) {
62
+
return bm.libManager.LoadBundle(ctx, bundleNum)
63
+
}
64
+
65
+
// FetchAndSaveBundle fetches next bundle from PLC and saves to both disk and DB
66
+
func (bm *BundleManager) FetchAndSaveBundle(ctx context.Context) (*plcbundle.Bundle, error) {
67
+
// Fetch from PLC using library
68
+
bundle, err := bm.libManager.FetchNextBundle(ctx)
69
+
if err != nil {
70
+
return nil, err
71
+
}
72
+
73
+
// Save to disk (library)
74
+
if err := bm.libManager.SaveBundle(ctx, bundle); err != nil {
75
+
return nil, fmt.Errorf("failed to save bundle to disk: %w", err)
76
+
}
77
+
78
+
// Save to database
79
+
if err := bm.saveBundleToDatabase(ctx, bundle); err != nil {
80
+
return nil, fmt.Errorf("failed to save bundle to database: %w", err)
81
+
}
82
+
83
+
log.Info("✓ Saved bundle %06d (disk + database)", bundle.BundleNumber)
84
+
85
+
return bundle, nil
86
+
}
87
+
88
+
// saveBundleToDatabase saves bundle metadata to PostgreSQL
89
+
func (bm *BundleManager) saveBundleToDatabase(ctx context.Context, bundle *plcbundle.Bundle) error {
90
+
// Convert library bundle to storage bundle
91
+
dbBundle := &storage.PLCBundle{
92
+
BundleNumber: bundle.BundleNumber,
93
+
StartTime: bundle.StartTime,
94
+
EndTime: bundle.EndTime,
95
+
DIDCount: bundle.DIDCount,
96
+
Hash: bundle.Hash,
97
+
CompressedHash: bundle.CompressedHash,
98
+
CompressedSize: bundle.CompressedSize,
99
+
UncompressedSize: bundle.UncompressedSize,
100
+
Cursor: bundle.Cursor,
101
+
PrevBundleHash: bundle.PrevBundleHash,
102
+
Compressed: bundle.Compressed,
103
+
CreatedAt: bundle.CreatedAt,
104
+
}
105
+
106
+
// Save to database
107
+
if err := bm.db.CreateBundle(ctx, dbBundle); err != nil {
108
+
return err
109
+
}
110
+
111
+
// Index DIDs if enabled
112
+
if bm.indexDIDs && len(bundle.Operations) > 0 {
113
+
if err := bm.indexBundleDIDs(ctx, bundle); err != nil {
114
+
log.Error("Failed to index DIDs for bundle %d: %v", bundle.BundleNumber, err)
115
+
// Don't fail the entire operation
116
+
}
117
+
}
118
+
119
+
return nil
120
+
}
121
+
122
+
// indexBundleDIDs indexes DIDs from a bundle into the database
123
+
func (bm *BundleManager) indexBundleDIDs(ctx context.Context, bundle *plcbundle.Bundle) error {
124
+
start := time.Now()
125
+
log.Verbose("Indexing DIDs for bundle %06d...", bundle.BundleNumber)
126
+
127
+
// Extract DID info from operations
128
+
didInfoMap := ExtractDIDInfoMap(bundle.Operations)
129
+
130
+
successCount := 0
131
+
errorCount := 0
132
+
invalidHandleCount := 0
133
+
134
+
// Upsert each DID
135
+
for did, info := range didInfoMap {
136
+
validHandle := ValidateHandle(info.Handle)
137
+
if info.Handle != "" && validHandle == "" {
138
+
invalidHandleCount++
139
+
}
140
+
141
+
if err := bm.db.UpsertDID(ctx, did, bundle.BundleNumber, validHandle, info.PDS); err != nil {
142
+
log.Error("Failed to index DID %s: %v", did, err)
143
+
errorCount++
144
+
} else {
145
+
successCount++
146
+
}
147
+
}
148
+
149
+
elapsed := time.Since(start)
150
+
log.Info("✓ Indexed %d DIDs for bundle %06d (%d errors, %d invalid handles) in %v",
151
+
successCount, bundle.BundleNumber, errorCount, invalidHandleCount, elapsed)
152
+
153
+
return nil
154
+
}
155
+
156
+
// VerifyChain verifies bundle chain integrity
157
+
func (bm *BundleManager) VerifyChain(ctx context.Context, endBundle int) error {
158
+
result, err := bm.libManager.VerifyChain(ctx)
159
+
if err != nil {
160
+
return err
161
+
}
162
+
163
+
if !result.Valid {
164
+
return fmt.Errorf("chain verification failed at bundle %d: %s", result.BrokenAt, result.Error)
165
+
}
166
+
167
+
return nil
168
+
}
169
+
170
+
// GetChainInfo returns chain information
171
+
func (bm *BundleManager) GetChainInfo(ctx context.Context) (map[string]interface{}, error) {
172
+
return bm.libManager.GetInfo(), nil
173
+
}
-70
internal/plc/ratelimiter.go
-70
internal/plc/ratelimiter.go
···
1
-
package plc
2
-
3
-
import (
4
-
"context"
5
-
"time"
6
-
)
7
-
8
-
// RateLimiter implements a token bucket rate limiter
9
-
type RateLimiter struct {
10
-
tokens chan struct{}
11
-
refillRate time.Duration
12
-
maxTokens int
13
-
stopRefill chan struct{}
14
-
}
15
-
16
-
// NewRateLimiter creates a new rate limiter
17
-
// Example: NewRateLimiter(90, time.Minute) = 90 requests per minute
18
-
func NewRateLimiter(requestsPerPeriod int, period time.Duration) *RateLimiter {
19
-
rl := &RateLimiter{
20
-
tokens: make(chan struct{}, requestsPerPeriod),
21
-
refillRate: period / time.Duration(requestsPerPeriod),
22
-
maxTokens: requestsPerPeriod,
23
-
stopRefill: make(chan struct{}),
24
-
}
25
-
26
-
// Fill initially
27
-
for i := 0; i < requestsPerPeriod; i++ {
28
-
rl.tokens <- struct{}{}
29
-
}
30
-
31
-
// Start refill goroutine
32
-
go rl.refill()
33
-
34
-
return rl
35
-
}
36
-
37
-
// refill adds tokens at the specified rate
38
-
func (rl *RateLimiter) refill() {
39
-
ticker := time.NewTicker(rl.refillRate)
40
-
defer ticker.Stop()
41
-
42
-
for {
43
-
select {
44
-
case <-ticker.C:
45
-
select {
46
-
case rl.tokens <- struct{}{}:
47
-
// Token added
48
-
default:
49
-
// Buffer full, skip
50
-
}
51
-
case <-rl.stopRefill:
52
-
return
53
-
}
54
-
}
55
-
}
56
-
57
-
// Wait blocks until a token is available
58
-
func (rl *RateLimiter) Wait(ctx context.Context) error {
59
-
select {
60
-
case <-rl.tokens:
61
-
return nil
62
-
case <-ctx.Done():
63
-
return ctx.Err()
64
-
}
65
-
}
66
-
67
-
// Stop stops the rate limiter
68
-
func (rl *RateLimiter) Stop() {
69
-
close(rl.stopRefill)
70
-
}
+88
-422
internal/plc/scanner.go
+88
-422
internal/plc/scanner.go
···
2
2
3
3
import (
4
4
"context"
5
-
"encoding/json"
6
5
"fmt"
7
6
"strings"
8
7
"time"
9
8
10
-
"github.com/acarl005/stripansi"
11
9
"github.com/atscan/atscanner/internal/config"
12
10
"github.com/atscan/atscanner/internal/log"
13
11
"github.com/atscan/atscanner/internal/storage"
14
12
)
15
13
16
14
type Scanner struct {
17
-
client *Client
15
+
bundleManager *BundleManager
18
16
db storage.Database
19
17
config config.PLCConfig
20
-
bundleManager *BundleManager
21
18
}
22
19
23
20
func NewScanner(db storage.Database, cfg config.PLCConfig) *Scanner {
24
21
log.Verbose("NewScanner: IndexDIDs config = %v", cfg.IndexDIDs)
25
22
26
-
bundleManager, err := NewBundleManager(cfg.BundleDir, cfg.UseCache, db, cfg.IndexDIDs)
23
+
bundleManager, err := NewBundleManager(cfg.BundleDir, cfg.DirectoryURL, db, cfg.IndexDIDs)
27
24
if err != nil {
28
-
log.Error("Warning: failed to initialize bundle manager: %v", err)
29
-
bundleManager = &BundleManager{enabled: false}
25
+
log.Error("Failed to initialize bundle manager: %v", err)
26
+
return nil
30
27
}
31
28
32
29
return &Scanner{
33
-
client: NewClient(cfg.DirectoryURL),
30
+
bundleManager: bundleManager,
34
31
db: db,
35
32
config: cfg,
36
-
bundleManager: bundleManager,
37
33
}
38
34
}
39
35
···
43
39
}
44
40
}
45
41
46
-
// ScanMetrics tracks scan progress
47
-
type ScanMetrics struct {
48
-
totalFetched int64 // Total ops fetched from PLC/bundles
49
-
totalProcessed int64 // Unique ops processed (after dedup)
50
-
newEndpoints int64 // New endpoints discovered
51
-
endpointCounts map[string]int64
52
-
currentBundle int
53
-
startTime time.Time
54
-
}
55
-
56
-
func newMetrics(startBundle int) *ScanMetrics {
57
-
return &ScanMetrics{
58
-
endpointCounts: make(map[string]int64),
59
-
currentBundle: startBundle,
60
-
startTime: time.Now(),
61
-
}
62
-
}
63
-
64
-
func (m *ScanMetrics) logSummary() {
65
-
summary := formatEndpointCounts(m.endpointCounts)
66
-
if m.newEndpoints > 0 {
67
-
log.Info("PLC scan completed: %d operations processed (%d fetched), %s in %v",
68
-
m.totalProcessed, m.totalFetched, summary, time.Since(m.startTime))
69
-
} else {
70
-
log.Info("PLC scan completed: %d operations processed (%d fetched), 0 new endpoints in %v",
71
-
m.totalProcessed, m.totalFetched, time.Since(m.startTime))
72
-
}
73
-
}
74
-
75
42
func (s *Scanner) Scan(ctx context.Context) error {
76
43
log.Info("Starting PLC directory scan...")
77
-
log.Info("⚠ Note: PLC directory has rate limit of 500 requests per 5 minutes")
78
44
79
45
cursor, err := s.db.GetScanCursor(ctx, "plc_directory")
80
46
if err != nil {
81
47
return fmt.Errorf("failed to get scan cursor: %w", err)
82
48
}
83
49
84
-
startBundle := s.calculateStartBundle(cursor.LastBundleNumber)
85
-
metrics := newMetrics(startBundle)
86
-
87
-
if startBundle > 1 {
88
-
if err := s.ensureContinuity(ctx, startBundle); err != nil {
89
-
return err
90
-
}
91
-
}
92
-
93
-
// Handle existing mempool first
94
-
if hasMempool, _ := s.hasSufficientMempool(ctx); hasMempool {
95
-
return s.handleMempoolOnly(ctx, metrics)
96
-
}
50
+
metrics := newMetrics(cursor.LastBundleNumber + 1)
97
51
98
-
// Process bundles until incomplete or error
52
+
// Main processing loop
99
53
for {
100
54
if err := ctx.Err(); err != nil {
101
55
return err
102
56
}
103
57
104
-
if err := s.processSingleBundle(ctx, metrics); err != nil {
105
-
if s.shouldRetry(err) {
106
-
continue
107
-
}
108
-
break
109
-
}
110
-
111
-
if err := s.updateCursor(ctx, cursor, metrics); err != nil {
112
-
log.Error("Warning: failed to update cursor: %v", err)
113
-
}
114
-
}
115
-
116
-
// Try to finalize mempool
117
-
s.finalizeMempool(ctx, metrics)
118
-
119
-
metrics.logSummary()
120
-
return nil
121
-
}
122
-
123
-
func (s *Scanner) calculateStartBundle(lastBundle int) int {
124
-
if lastBundle == 0 {
125
-
return 1
126
-
}
127
-
return lastBundle + 1
128
-
}
129
-
130
-
func (s *Scanner) ensureContinuity(ctx context.Context, bundle int) error {
131
-
log.Info("Checking bundle continuity...")
132
-
if err := s.bundleManager.EnsureBundleContinuity(ctx, bundle); err != nil {
133
-
return fmt.Errorf("bundle continuity check failed: %w", err)
134
-
}
135
-
return nil
136
-
}
137
-
138
-
func (s *Scanner) hasSufficientMempool(ctx context.Context) (bool, error) {
139
-
count, err := s.db.GetMempoolCount(ctx)
140
-
if err != nil {
141
-
return false, err
142
-
}
143
-
return count > 0, nil
144
-
}
145
-
146
-
func (s *Scanner) handleMempoolOnly(ctx context.Context, m *ScanMetrics) error {
147
-
count, _ := s.db.GetMempoolCount(ctx)
148
-
log.Info("→ Mempool has %d operations, continuing to fill it before fetching new bundles", count)
149
-
150
-
if err := s.fillMempool(ctx, m); err != nil {
151
-
return err
152
-
}
153
-
154
-
if err := s.processMempool(ctx, m); err != nil {
155
-
log.Error("Error processing mempool: %v", err)
156
-
}
157
-
158
-
m.logSummary()
159
-
return nil
160
-
}
161
-
162
-
func (s *Scanner) processSingleBundle(ctx context.Context, m *ScanMetrics) error {
163
-
log.Verbose("→ Processing bundle %06d...", m.currentBundle)
164
-
165
-
ops, isComplete, err := s.bundleManager.LoadBundle(ctx, m.currentBundle, s.client)
166
-
if err != nil {
167
-
return s.handleBundleError(err, m)
168
-
}
169
-
170
-
if isComplete {
171
-
return s.handleCompleteBundle(ctx, ops, m)
172
-
}
173
-
return s.handleIncompleteBundle(ctx, ops, m)
174
-
}
175
-
176
-
func (s *Scanner) handleBundleError(err error, m *ScanMetrics) error {
177
-
log.Error("Failed to load bundle %06d: %v", m.currentBundle, err)
178
-
179
-
if strings.Contains(err.Error(), "rate limited") {
180
-
log.Info("⚠ Rate limit hit, pausing for 5 minutes...")
181
-
time.Sleep(5 * time.Minute)
182
-
return fmt.Errorf("retry")
183
-
}
184
-
185
-
if m.currentBundle > 1 {
186
-
log.Info("→ Reached end of available data")
187
-
}
188
-
return err
189
-
}
190
-
191
-
func (s *Scanner) shouldRetry(err error) bool {
192
-
return err != nil && err.Error() == "retry"
193
-
}
194
-
195
-
func (s *Scanner) handleCompleteBundle(ctx context.Context, ops []PLCOperation, m *ScanMetrics) error {
196
-
counts, err := s.processBatch(ctx, ops)
197
-
if err != nil {
198
-
return err
199
-
}
200
-
201
-
s.mergeCounts(m.endpointCounts, counts)
202
-
m.totalProcessed += int64(len(ops)) // Unique ops after dedup
203
-
m.newEndpoints += sumCounts(counts) // NEW: Track new endpoints
204
-
205
-
batchTotal := sumCounts(counts)
206
-
log.Verbose("✓ Processed bundle %06d: %d operations (after dedup), %d new endpoints",
207
-
m.currentBundle, len(ops), batchTotal)
208
-
209
-
m.currentBundle++
210
-
return nil
211
-
}
212
-
213
-
func (s *Scanner) handleIncompleteBundle(ctx context.Context, ops []PLCOperation, m *ScanMetrics) error {
214
-
log.Info("→ Bundle %06d incomplete (%d ops), adding to mempool", m.currentBundle, len(ops))
215
-
216
-
if err := s.addToMempool(ctx, ops, m.endpointCounts); err != nil {
217
-
return err
218
-
}
219
-
220
-
s.finalizeMempool(ctx, m)
221
-
return fmt.Errorf("incomplete") // Signal end of processing
222
-
}
223
-
224
-
func (s *Scanner) finalizeMempool(ctx context.Context, m *ScanMetrics) {
225
-
if err := s.fillMempool(ctx, m); err != nil {
226
-
log.Error("Error filling mempool: %v", err)
227
-
}
228
-
if err := s.processMempool(ctx, m); err != nil {
229
-
log.Error("Error processing mempool: %v", err)
230
-
}
231
-
}
232
-
233
-
func (s *Scanner) fillMempool(ctx context.Context, m *ScanMetrics) error {
234
-
const fetchLimit = 1000
235
-
236
-
for {
237
-
count, err := s.db.GetMempoolCount(ctx)
58
+
// Fetch and save bundle (library handles mempool internally)
59
+
bundle, err := s.bundleManager.FetchAndSaveBundle(ctx)
238
60
if err != nil {
239
-
return err
240
-
}
241
-
242
-
if count >= BUNDLE_SIZE {
243
-
log.Info("✓ Mempool filled to %d operations (target: %d)", count, BUNDLE_SIZE)
244
-
return nil
245
-
}
246
-
247
-
log.Info("→ Mempool has %d/%d operations, fetching more from PLC directory...", count, BUNDLE_SIZE)
248
-
249
-
// ✅ Fix: Don't capture unused 'ops' variable
250
-
shouldContinue, err := s.fetchNextBatch(ctx, fetchLimit, m)
251
-
if err != nil {
252
-
return err
253
-
}
254
-
255
-
if !shouldContinue {
256
-
finalCount, _ := s.db.GetMempoolCount(ctx)
257
-
log.Info("→ Stopping fill, mempool has %d/%d operations", finalCount, BUNDLE_SIZE)
258
-
return nil
259
-
}
260
-
}
261
-
}
262
-
263
-
func (s *Scanner) fetchNextBatch(ctx context.Context, limit int, m *ScanMetrics) (bool, error) {
264
-
lastOp, err := s.db.GetLastMempoolOperation(ctx)
265
-
if err != nil {
266
-
return false, err
267
-
}
268
-
269
-
var after string
270
-
if lastOp != nil {
271
-
after = lastOp.CreatedAt.Format(time.RFC3339Nano)
272
-
log.Verbose(" Using cursor: %s", after)
273
-
}
274
-
275
-
ops, err := s.client.Export(ctx, ExportOptions{Count: limit, After: after})
276
-
if err != nil {
277
-
return false, fmt.Errorf("failed to fetch from PLC: %w", err)
278
-
}
279
-
280
-
fetchedCount := len(ops)
281
-
m.totalFetched += int64(fetchedCount) // Track all fetched
282
-
log.Verbose(" Fetched %d operations from PLC", fetchedCount)
283
-
284
-
if fetchedCount == 0 {
285
-
count, _ := s.db.GetMempoolCount(ctx)
286
-
log.Info("→ No more data available from PLC directory (mempool has %d/%d)", count, BUNDLE_SIZE)
287
-
return false, nil
288
-
}
289
-
290
-
beforeCount, err := s.db.GetMempoolCount(ctx)
291
-
if err != nil {
292
-
return false, err
293
-
}
294
-
295
-
endpointsBefore := sumCounts(m.endpointCounts)
296
-
if err := s.addToMempool(ctx, ops, m.endpointCounts); err != nil {
297
-
return false, err
298
-
}
299
-
endpointsAfter := sumCounts(m.endpointCounts)
300
-
m.newEndpoints += (endpointsAfter - endpointsBefore) // Add new endpoints found
301
-
302
-
afterCount, err := s.db.GetMempoolCount(ctx)
303
-
if err != nil {
304
-
return false, err
305
-
}
306
-
307
-
uniqueAdded := int64(afterCount - beforeCount) // Cast to int64
308
-
m.totalProcessed += uniqueAdded // Track unique ops processed
309
-
310
-
log.Verbose(" Added %d new unique operations to mempool (%d were duplicates)",
311
-
uniqueAdded, int64(fetchedCount)-uniqueAdded)
312
-
313
-
// Continue only if got full batch
314
-
shouldContinue := fetchedCount >= limit
315
-
if !shouldContinue {
316
-
log.Info("→ Received incomplete batch (%d/%d), caught up to latest data", fetchedCount, limit)
317
-
}
318
-
319
-
return shouldContinue, nil
320
-
}
321
-
322
-
func (s *Scanner) addToMempool(ctx context.Context, ops []PLCOperation, counts map[string]int64) error {
323
-
mempoolOps := make([]storage.MempoolOperation, len(ops))
324
-
for i, op := range ops {
325
-
mempoolOps[i] = storage.MempoolOperation{
326
-
DID: op.DID,
327
-
Operation: string(op.RawJSON),
328
-
CID: op.CID,
329
-
CreatedAt: op.CreatedAt,
330
-
}
331
-
}
332
-
333
-
if err := s.db.AddToMempool(ctx, mempoolOps); err != nil {
334
-
return err
335
-
}
336
-
337
-
// NEW: Create/update DID records immediately when adding to mempool
338
-
for _, op := range ops {
339
-
info := ExtractDIDInfo(&op)
340
-
341
-
// Validate handle length before saving
342
-
validHandle := ValidateHandle(info.Handle)
343
-
if info.Handle != "" && validHandle == "" {
344
-
log.Verbose("Skipping invalid handle for DID %s (length: %d)", op.DID, len(info.Handle))
345
-
}
346
-
347
-
if err := s.db.UpsertDIDFromMempool(ctx, op.DID, validHandle, info.PDS); err != nil {
348
-
log.Error("Failed to upsert DID %s in mempool: %v", op.DID, err)
349
-
// Don't fail the whole operation, just log
350
-
}
351
-
}
352
-
353
-
// Process for endpoint discovery
354
-
batchCounts, err := s.processBatch(ctx, ops)
355
-
s.mergeCounts(counts, batchCounts)
356
-
return err
357
-
}
61
+
if isInsufficientOpsError(err) {
62
+
// Show mempool status
63
+
stats := s.bundleManager.libManager.GetMempoolStats()
64
+
mempoolCount := stats["count"].(int)
358
65
359
-
func (s *Scanner) processMempool(ctx context.Context, m *ScanMetrics) error {
360
-
for {
361
-
count, err := s.db.GetMempoolCount(ctx)
362
-
if err != nil {
363
-
return err
364
-
}
66
+
if mempoolCount > 0 {
67
+
log.Info("→ Waiting for more operations (mempool has %d/%d ops)",
68
+
mempoolCount, BUNDLE_SIZE)
69
+
} else {
70
+
log.Info("→ Caught up! No operations available")
71
+
}
72
+
break
73
+
}
365
74
366
-
log.Verbose("Mempool contains %d operations", count)
75
+
if strings.Contains(err.Error(), "rate limited") {
76
+
log.Info("⚠ Rate limited, pausing for 5 minutes...")
77
+
time.Sleep(5 * time.Minute)
78
+
continue
79
+
}
367
80
368
-
if count < BUNDLE_SIZE {
369
-
log.Info("Mempool has %d/%d operations, cannot create bundle yet", count, BUNDLE_SIZE)
370
-
return nil
81
+
return fmt.Errorf("failed to fetch bundle: %w", err)
371
82
}
372
83
373
-
log.Info("→ Creating bundle from mempool (%d operations available)...", count)
374
-
375
-
// Updated to receive 4 values instead of 3
376
-
bundleNum, ops, cursor, err := s.createBundleFromMempool(ctx)
84
+
// Process operations for endpoint discovery
85
+
counts, err := s.processBatch(ctx, bundle.Operations)
377
86
if err != nil {
378
-
return err
87
+
log.Error("Failed to process batch: %v", err)
88
+
// Continue anyway
379
89
}
380
90
381
-
// Process and update metrics
382
-
countsBefore := sumCounts(m.endpointCounts)
383
-
counts, _ := s.processBatch(ctx, ops)
384
-
s.mergeCounts(m.endpointCounts, counts)
385
-
newEndpointsFound := sumCounts(m.endpointCounts) - countsBefore
91
+
// Update metrics
92
+
s.mergeCounts(metrics.endpointCounts, counts)
93
+
metrics.totalProcessed += int64(len(bundle.Operations))
94
+
metrics.newEndpoints += sumCounts(counts)
95
+
metrics.currentBundle = bundle.BundleNumber
386
96
387
-
m.totalProcessed += int64(len(ops))
388
-
m.newEndpoints += newEndpointsFound
389
-
m.currentBundle = bundleNum
97
+
log.Info("✓ Processed bundle %06d: %d operations, %d new endpoints",
98
+
bundle.BundleNumber, len(bundle.Operations), sumCounts(counts))
390
99
391
-
if err := s.updateCursorForBundle(ctx, bundleNum, m.totalProcessed); err != nil {
100
+
// Update cursor
101
+
if err := s.updateCursorForBundle(ctx, bundle.BundleNumber, metrics.totalProcessed); err != nil {
392
102
log.Error("Warning: failed to update cursor: %v", err)
393
103
}
394
-
395
-
log.Info("✓ Created bundle %06d from mempool (cursor: %s)", bundleNum, cursor)
396
104
}
397
-
}
398
105
399
-
func (s *Scanner) createBundleFromMempool(ctx context.Context) (int, []PLCOperation, string, error) {
400
-
mempoolOps, err := s.db.GetMempoolOperations(ctx, BUNDLE_SIZE)
401
-
if err != nil {
402
-
return 0, nil, "", err
106
+
// Show final mempool status
107
+
stats := s.bundleManager.libManager.GetMempoolStats()
108
+
if count, ok := stats["count"].(int); ok && count > 0 {
109
+
log.Info("Mempool contains %d operations (%.1f%% of next bundle)",
110
+
count, float64(count)/float64(BUNDLE_SIZE)*100)
403
111
}
404
112
405
-
ops, ids := s.deduplicateMempool(mempoolOps)
406
-
if len(ops) < BUNDLE_SIZE {
407
-
return 0, nil, "", fmt.Errorf("only got %d unique operations from mempool, need %d", len(ops), BUNDLE_SIZE)
408
-
}
409
-
410
-
// Determine cursor from last bundle
411
-
cursor := ""
412
-
lastBundle, err := s.db.GetLastBundleNumber(ctx)
413
-
if err == nil && lastBundle > 0 {
414
-
if bundle, err := s.db.GetBundleByNumber(ctx, lastBundle); err == nil {
415
-
cursor = bundle.EndTime.Format(time.RFC3339Nano)
416
-
}
417
-
}
418
-
419
-
bundleNum, err := s.bundleManager.CreateBundleFromMempool(ctx, ops, cursor)
420
-
if err != nil {
421
-
return 0, nil, "", err
422
-
}
423
-
424
-
if err := s.db.DeleteFromMempool(ctx, ids[:len(ops)]); err != nil {
425
-
return 0, nil, "", err
426
-
}
427
-
428
-
return bundleNum, ops, cursor, nil
429
-
}
430
-
431
-
func (s *Scanner) deduplicateMempool(mempoolOps []storage.MempoolOperation) ([]PLCOperation, []int64) {
432
-
ops := make([]PLCOperation, 0, BUNDLE_SIZE)
433
-
ids := make([]int64, 0, BUNDLE_SIZE)
434
-
seenCIDs := make(map[string]bool)
435
-
436
-
for _, mop := range mempoolOps {
437
-
if seenCIDs[mop.CID] {
438
-
ids = append(ids, mop.ID)
439
-
continue
440
-
}
441
-
seenCIDs[mop.CID] = true
442
-
443
-
var op PLCOperation
444
-
json.Unmarshal([]byte(mop.Operation), &op)
445
-
op.RawJSON = []byte(mop.Operation)
446
-
447
-
ops = append(ops, op)
448
-
ids = append(ids, mop.ID)
449
-
450
-
if len(ops) >= BUNDLE_SIZE {
451
-
break
452
-
}
453
-
}
454
-
455
-
return ops, ids
113
+
metrics.logSummary()
114
+
return nil
456
115
}
457
116
117
+
// processBatch extracts endpoints from operations
458
118
func (s *Scanner) processBatch(ctx context.Context, ops []PLCOperation) (map[string]int64, error) {
459
119
counts := make(map[string]int64)
460
120
seen := make(map[string]*PLCOperation)
461
121
462
122
// Collect unique endpoints
463
-
for _, op := range ops {
123
+
for i := range ops {
124
+
op := &ops[i]
125
+
464
126
if op.IsNullified() {
465
127
continue
466
128
}
467
-
for _, ep := range s.extractEndpointsFromOperation(op) {
129
+
130
+
for _, ep := range s.extractEndpointsFromOperation(*op) {
468
131
key := fmt.Sprintf("%s:%s", ep.Type, ep.Endpoint)
469
132
if _, exists := seen[key]; !exists {
470
-
seen[key] = &op
133
+
seen[key] = op
471
134
}
472
135
}
473
136
}
···
483
146
}
484
147
485
148
if err := s.storeEndpoint(ctx, epType, endpoint, firstOp.CreatedAt); err != nil {
486
-
log.Error("Error storing %s endpoint %s: %v", epType, stripansi.Strip(endpoint), err)
149
+
log.Error("Error storing %s endpoint %s: %v", epType, endpoint, err)
487
150
continue
488
151
}
489
152
490
-
log.Info("✓ Discovered new %s endpoint: %s", epType, stripansi.Strip(endpoint))
153
+
log.Info("✓ Discovered new %s endpoint: %s", epType, endpoint)
491
154
counts[epType]++
492
155
}
493
156
494
157
return counts, nil
495
-
}
496
-
497
-
func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error {
498
-
return s.db.UpsertEndpoint(ctx, &storage.Endpoint{
499
-
EndpointType: epType,
500
-
Endpoint: endpoint,
501
-
DiscoveredAt: discoveredAt,
502
-
LastChecked: time.Time{},
503
-
Status: storage.EndpointStatusUnknown,
504
-
})
505
158
}
506
159
507
160
func (s *Scanner) extractEndpointsFromOperation(op PLCOperation) []EndpointInfo {
···
544
197
return nil
545
198
}
546
199
547
-
func (s *Scanner) updateCursor(ctx context.Context, cursor *storage.ScanCursor, m *ScanMetrics) error {
548
-
return s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
549
-
Source: "plc_directory",
550
-
LastBundleNumber: m.currentBundle - 1,
551
-
LastScanTime: time.Now().UTC(),
552
-
RecordsProcessed: cursor.RecordsProcessed + m.totalProcessed,
200
+
func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error {
201
+
return s.db.UpsertEndpoint(ctx, &storage.Endpoint{
202
+
EndpointType: epType,
203
+
Endpoint: endpoint,
204
+
DiscoveredAt: discoveredAt,
205
+
LastChecked: time.Time{},
206
+
Status: storage.EndpointStatusUnknown,
553
207
})
554
208
}
555
209
···
577
231
return total
578
232
}
579
233
580
-
func formatEndpointCounts(counts map[string]int64) string {
581
-
if len(counts) == 0 {
582
-
return "0 new endpoints"
583
-
}
234
+
func isInsufficientOpsError(err error) bool {
235
+
return err != nil && strings.Contains(err.Error(), "insufficient operations")
236
+
}
584
237
585
-
total := sumCounts(counts)
238
+
// ScanMetrics tracks scan progress
239
+
type ScanMetrics struct {
240
+
totalFetched int64
241
+
totalProcessed int64
242
+
newEndpoints int64
243
+
endpointCounts map[string]int64
244
+
currentBundle int
245
+
startTime time.Time
246
+
}
586
247
587
-
if len(counts) == 1 {
588
-
for typ, count := range counts {
589
-
return fmt.Sprintf("%d new %s endpoint(s)", count, typ)
590
-
}
248
+
func newMetrics(startBundle int) *ScanMetrics {
249
+
return &ScanMetrics{
250
+
endpointCounts: make(map[string]int64),
251
+
currentBundle: startBundle,
252
+
startTime: time.Now(),
591
253
}
254
+
}
592
255
593
-
parts := make([]string, 0, len(counts))
594
-
for typ, count := range counts {
595
-
parts = append(parts, fmt.Sprintf("%d %s", count, typ))
256
+
func (m *ScanMetrics) logSummary() {
257
+
if m.newEndpoints > 0 {
258
+
log.Info("PLC scan completed: %d operations processed, %d new endpoints in %v",
259
+
m.totalProcessed, m.newEndpoints, time.Since(m.startTime))
260
+
} else {
261
+
log.Info("PLC scan completed: %d operations processed, 0 new endpoints in %v",
262
+
m.totalProcessed, time.Since(m.startTime))
596
263
}
597
-
return fmt.Sprintf("%d new endpoints (%s)", total, strings.Join(parts, ", "))
598
264
}
+10
-56
internal/plc/types.go
+10
-56
internal/plc/types.go
···
1
1
package plc
2
2
3
-
import "time"
4
-
5
-
type PLCOperation struct {
6
-
DID string `json:"did"`
7
-
Operation map[string]interface{} `json:"operation"`
8
-
CID string `json:"cid"`
9
-
Nullified interface{} `json:"nullified,omitempty"`
10
-
CreatedAt time.Time `json:"createdAt"`
11
-
12
-
RawJSON []byte `json:"-"` // ✅ Exported (capital R)
13
-
}
14
-
15
-
// Helper method to check if nullified
16
-
func (op *PLCOperation) IsNullified() bool {
17
-
if op.Nullified == nil {
18
-
return false
19
-
}
20
-
21
-
switch v := op.Nullified.(type) {
22
-
case bool:
23
-
return v
24
-
case string:
25
-
return v != ""
26
-
default:
27
-
return false
28
-
}
29
-
}
30
-
31
-
// Get nullifying CID if available
32
-
func (op *PLCOperation) GetNullifyingCID() string {
33
-
if s, ok := op.Nullified.(string); ok {
34
-
return s
35
-
}
36
-
return ""
37
-
}
3
+
import (
4
+
plclib "github.com/atscan/plcbundle/plc"
5
+
)
38
6
39
-
type DIDDocument struct {
40
-
Context []string `json:"@context"`
41
-
ID string `json:"id"`
42
-
AlsoKnownAs []string `json:"alsoKnownAs"`
43
-
VerificationMethod []VerificationMethod `json:"verificationMethod"`
44
-
Service []Service `json:"service"`
45
-
}
7
+
// Re-export library types
8
+
type PLCOperation = plclib.PLCOperation
9
+
type DIDDocument = plclib.DIDDocument
10
+
type Client = plclib.Client
11
+
type ExportOptions = plclib.ExportOptions
46
12
47
-
type VerificationMethod struct {
48
-
ID string `json:"id"`
49
-
Type string `json:"type"`
50
-
Controller string `json:"controller"`
51
-
PublicKeyMultibase string `json:"publicKeyMultibase"`
52
-
}
53
-
54
-
type Service struct {
55
-
ID string `json:"id"`
56
-
Type string `json:"type"`
57
-
ServiceEndpoint string `json:"serviceEndpoint"`
58
-
}
13
+
// Keep your custom types
14
+
const BUNDLE_SIZE = 10000
59
15
60
-
// DIDHistoryEntry represents a single operation in DID history
61
16
type DIDHistoryEntry struct {
62
17
Operation PLCOperation `json:"operation"`
63
18
PLCBundle string `json:"plc_bundle,omitempty"`
64
19
}
65
20
66
-
// DIDHistory represents the full history of a DID
67
21
type DIDHistory struct {
68
22
DID string `json:"did"`
69
23
Current *PLCOperation `json:"current"`