+3
-3
cmd/atscanner.go
+3
-3
cmd/atscanner.go
···
36
36
// Initialize logger
37
37
log.Init(cfg.API.Verbose)
38
38
39
-
// Initialize database
40
-
db, err := storage.NewSQLiteDB(cfg.Database.Path)
39
+
// Initialize database using factory pattern
40
+
db, err := storage.NewDatabase(cfg.Database.Type, cfg.Database.Path)
41
41
if err != nil {
42
42
log.Fatal("Failed to initialize database: %v", err)
43
43
}
···
53
53
54
54
// Initialize workers
55
55
plcScanner := plc.NewScanner(db, cfg.PLC)
56
-
defer plcScanner.Close() // Close scanner to cleanup cache
56
+
defer plcScanner.Close()
57
57
58
58
pdsScanner := pds.NewScanner(db, cfg.PDS)
59
59
+3
-2
config.yaml
+3
-2
config.yaml
+5
-1
go.mod
+5
-1
go.mod
···
4
4
5
5
require (
6
6
github.com/gorilla/mux v1.8.1
7
+
github.com/lib/pq v1.10.9
7
8
github.com/mattn/go-sqlite3 v1.14.18
8
9
gopkg.in/yaml.v3 v3.0.1
9
10
)
···
15
16
github.com/gorilla/handlers v1.5.2
16
17
)
17
18
18
-
require github.com/felixge/httpsnoop v1.0.3 // indirect
19
+
require (
20
+
github.com/felixge/httpsnoop v1.0.3 // indirect
21
+
github.com/lib/pq v1.10.9 // indirect
22
+
)
+2
go.sum
+2
go.sum
···
8
8
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
9
9
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
10
10
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
11
+
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
12
+
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
11
13
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
12
14
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
13
15
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+14
-1
internal/storage/db.go
+14
-1
internal/storage/db.go
···
2
2
3
3
import (
4
4
"context"
5
+
"fmt"
5
6
"time"
6
7
)
7
8
9
+
// NewDatabase creates a database connection based on type
10
+
func NewDatabase(dbType, connectionString string) (Database, error) {
11
+
switch dbType {
12
+
case "sqlite":
13
+
return NewSQLiteDB(connectionString)
14
+
case "postgres", "postgresql":
15
+
return NewPostgresDB(connectionString)
16
+
default:
17
+
return nil, fmt.Errorf("unsupported database type: %s (supported: sqlite, postgres)", dbType)
18
+
}
19
+
}
20
+
8
21
type Database interface {
9
22
Close() error
10
23
Migrate() error
11
24
12
-
// Endpoint operations (renamed from PDS)
25
+
// Endpoint operations
13
26
UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error
14
27
GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error)
15
28
GetEndpointByID(ctx context.Context, id int64) (*Endpoint, error)
+1062
internal/storage/postgres.go
+1062
internal/storage/postgres.go
···
1
+
package storage
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"encoding/json"
7
+
"fmt"
8
+
"strings"
9
+
"time"
10
+
11
+
_ "github.com/lib/pq"
12
+
)
13
+
14
+
type PostgresDB struct {
15
+
db *sql.DB
16
+
}
17
+
18
+
func NewPostgresDB(connString string) (*PostgresDB, error) {
19
+
db, err := sql.Open("postgres", connString)
20
+
if err != nil {
21
+
return nil, err
22
+
}
23
+
24
+
// Connection pool settings
25
+
db.SetMaxOpenConns(25)
26
+
db.SetMaxIdleConns(10)
27
+
db.SetConnMaxLifetime(5 * time.Minute)
28
+
db.SetConnMaxIdleTime(2 * time.Minute)
29
+
30
+
// Test connection
31
+
if err := db.Ping(); err != nil {
32
+
return nil, fmt.Errorf("failed to ping database: %w", err)
33
+
}
34
+
35
+
return &PostgresDB{db: db}, nil
36
+
}
37
+
38
+
func (p *PostgresDB) Close() error {
39
+
return p.db.Close()
40
+
}
41
+
42
+
func (p *PostgresDB) Migrate() error {
43
+
schema := `
44
+
-- Endpoints table
45
+
CREATE TABLE IF NOT EXISTS endpoints (
46
+
id BIGSERIAL PRIMARY KEY,
47
+
endpoint_type TEXT NOT NULL DEFAULT 'pds',
48
+
endpoint TEXT NOT NULL,
49
+
discovered_at TIMESTAMP NOT NULL,
50
+
last_checked TIMESTAMP,
51
+
status INTEGER DEFAULT 0,
52
+
user_count BIGINT DEFAULT 0,
53
+
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
54
+
UNIQUE(endpoint_type, endpoint)
55
+
);
56
+
57
+
CREATE INDEX IF NOT EXISTS idx_endpoints_type_endpoint ON endpoints(endpoint_type, endpoint);
58
+
CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status);
59
+
CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type);
60
+
CREATE INDEX IF NOT EXISTS idx_endpoints_user_count ON endpoints(user_count);
61
+
62
+
CREATE TABLE IF NOT EXISTS pds_scans (
63
+
id BIGSERIAL PRIMARY KEY,
64
+
pds_id BIGINT NOT NULL,
65
+
status INTEGER NOT NULL,
66
+
response_time DOUBLE PRECISION,
67
+
scan_data JSONB,
68
+
scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
69
+
FOREIGN KEY (pds_id) REFERENCES endpoints(id) ON DELETE CASCADE
70
+
);
71
+
72
+
CREATE INDEX IF NOT EXISTS idx_pds_scans_pds_id ON pds_scans(pds_id);
73
+
CREATE INDEX IF NOT EXISTS idx_pds_scans_scanned_at ON pds_scans(scanned_at);
74
+
CREATE INDEX IF NOT EXISTS idx_pds_scans_scan_data ON pds_scans USING gin(scan_data);
75
+
76
+
CREATE TABLE IF NOT EXISTS plc_metrics (
77
+
id BIGSERIAL PRIMARY KEY,
78
+
total_dids BIGINT,
79
+
total_pds BIGINT,
80
+
unique_pds BIGINT,
81
+
scan_duration_ms BIGINT,
82
+
error_count INTEGER,
83
+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
84
+
);
85
+
86
+
CREATE TABLE IF NOT EXISTS scan_cursors (
87
+
source TEXT PRIMARY KEY,
88
+
last_bundle_number INTEGER DEFAULT 0,
89
+
last_scan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
90
+
records_processed BIGINT DEFAULT 0
91
+
);
92
+
93
+
CREATE TABLE IF NOT EXISTS plc_bundles (
94
+
bundle_number INTEGER PRIMARY KEY,
95
+
start_time TIMESTAMP NOT NULL,
96
+
end_time TIMESTAMP NOT NULL,
97
+
dids JSONB NOT NULL,
98
+
hash TEXT NOT NULL,
99
+
compressed_hash TEXT NOT NULL,
100
+
compressed_size BIGINT NOT NULL,
101
+
uncompressed_size BIGINT NOT NULL,
102
+
cumulative_compressed_size BIGINT NOT NULL,
103
+
cumulative_uncompressed_size BIGINT NOT NULL,
104
+
cursor TEXT,
105
+
prev_bundle_hash TEXT,
106
+
compressed BOOLEAN DEFAULT true,
107
+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
108
+
);
109
+
110
+
CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time);
111
+
CREATE INDEX IF NOT EXISTS idx_plc_bundles_hash ON plc_bundles(hash);
112
+
CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash);
113
+
CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC);
114
+
CREATE INDEX IF NOT EXISTS idx_plc_bundles_dids ON plc_bundles USING gin(dids);
115
+
116
+
CREATE TABLE IF NOT EXISTS plc_mempool (
117
+
id BIGSERIAL PRIMARY KEY,
118
+
did TEXT NOT NULL,
119
+
operation TEXT NOT NULL,
120
+
cid TEXT NOT NULL UNIQUE,
121
+
created_at TIMESTAMP NOT NULL,
122
+
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
123
+
);
124
+
125
+
CREATE INDEX IF NOT EXISTS idx_mempool_created_at ON plc_mempool(created_at);
126
+
CREATE INDEX IF NOT EXISTS idx_mempool_did ON plc_mempool(did);
127
+
CREATE UNIQUE INDEX IF NOT EXISTS idx_mempool_cid ON plc_mempool(cid);
128
+
129
+
CREATE TABLE IF NOT EXISTS dids (
130
+
did TEXT PRIMARY KEY,
131
+
first_seen_bundle INTEGER NOT NULL,
132
+
last_seen_bundle INTEGER NOT NULL,
133
+
bundle_numbers JSONB NOT NULL,
134
+
operation_count INTEGER DEFAULT 1,
135
+
first_seen_at TIMESTAMP NOT NULL,
136
+
last_seen_at TIMESTAMP NOT NULL
137
+
);
138
+
139
+
CREATE INDEX IF NOT EXISTS idx_dids_did ON dids(did);
140
+
CREATE INDEX IF NOT EXISTS idx_dids_last_bundle ON dids(last_seen_bundle);
141
+
CREATE INDEX IF NOT EXISTS idx_dids_first_seen ON dids(first_seen_at);
142
+
CREATE INDEX IF NOT EXISTS idx_dids_last_seen ON dids(last_seen_at);
143
+
CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers);
144
+
`
145
+
146
+
_, err := p.db.Exec(schema)
147
+
return err
148
+
}
149
+
150
+
// ===== BUNDLE OPERATIONS =====
151
+
152
+
func (p *PostgresDB) CreateBundle(ctx context.Context, bundle *PLCBundle) error {
153
+
didsJSON, err := json.Marshal(bundle.DIDs)
154
+
if err != nil {
155
+
return err
156
+
}
157
+
158
+
// Calculate cumulative sizes from previous bundle
159
+
if bundle.BundleNumber > 1 {
160
+
prevBundle, err := p.GetBundleByNumber(ctx, bundle.BundleNumber-1)
161
+
if err == nil && prevBundle != nil {
162
+
bundle.CumulativeCompressedSize = prevBundle.CumulativeCompressedSize + bundle.CompressedSize
163
+
bundle.CumulativeUncompressedSize = prevBundle.CumulativeUncompressedSize + bundle.UncompressedSize
164
+
} else {
165
+
bundle.CumulativeCompressedSize = bundle.CompressedSize
166
+
bundle.CumulativeUncompressedSize = bundle.UncompressedSize
167
+
}
168
+
} else {
169
+
bundle.CumulativeCompressedSize = bundle.CompressedSize
170
+
bundle.CumulativeUncompressedSize = bundle.UncompressedSize
171
+
}
172
+
173
+
query := `
174
+
INSERT INTO plc_bundles (
175
+
bundle_number, start_time, end_time, dids,
176
+
hash, compressed_hash, compressed_size, uncompressed_size,
177
+
cumulative_compressed_size, cumulative_uncompressed_size,
178
+
cursor, prev_bundle_hash, compressed
179
+
)
180
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
181
+
ON CONFLICT(bundle_number) DO UPDATE SET
182
+
start_time = EXCLUDED.start_time,
183
+
end_time = EXCLUDED.end_time,
184
+
dids = EXCLUDED.dids,
185
+
hash = EXCLUDED.hash,
186
+
compressed_hash = EXCLUDED.compressed_hash,
187
+
compressed_size = EXCLUDED.compressed_size,
188
+
uncompressed_size = EXCLUDED.uncompressed_size,
189
+
cumulative_compressed_size = EXCLUDED.cumulative_compressed_size,
190
+
cumulative_uncompressed_size = EXCLUDED.cumulative_uncompressed_size,
191
+
cursor = EXCLUDED.cursor,
192
+
prev_bundle_hash = EXCLUDED.prev_bundle_hash,
193
+
compressed = EXCLUDED.compressed
194
+
`
195
+
_, err = p.db.ExecContext(ctx, query,
196
+
bundle.BundleNumber, bundle.StartTime, bundle.EndTime,
197
+
didsJSON, bundle.Hash, bundle.CompressedHash,
198
+
bundle.CompressedSize, bundle.UncompressedSize,
199
+
bundle.CumulativeCompressedSize, bundle.CumulativeUncompressedSize,
200
+
bundle.Cursor, bundle.PrevBundleHash, bundle.Compressed,
201
+
)
202
+
203
+
return err
204
+
}
205
+
206
+
func (p *PostgresDB) GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error) {
207
+
query := `
208
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
209
+
compressed_size, uncompressed_size, cumulative_compressed_size,
210
+
cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
211
+
FROM plc_bundles
212
+
WHERE bundle_number = $1
213
+
`
214
+
215
+
var bundle PLCBundle
216
+
var didsJSON []byte
217
+
var prevHash sql.NullString
218
+
var cursor sql.NullString
219
+
220
+
err := p.db.QueryRowContext(ctx, query, bundleNumber).Scan(
221
+
&bundle.BundleNumber, &bundle.StartTime, &bundle.EndTime,
222
+
&didsJSON, &bundle.Hash, &bundle.CompressedHash,
223
+
&bundle.CompressedSize, &bundle.UncompressedSize,
224
+
&bundle.CumulativeCompressedSize, &bundle.CumulativeUncompressedSize,
225
+
&cursor, &prevHash, &bundle.Compressed, &bundle.CreatedAt,
226
+
)
227
+
if err != nil {
228
+
return nil, err
229
+
}
230
+
231
+
if prevHash.Valid {
232
+
bundle.PrevBundleHash = prevHash.String
233
+
}
234
+
if cursor.Valid {
235
+
bundle.Cursor = cursor.String
236
+
}
237
+
238
+
json.Unmarshal(didsJSON, &bundle.DIDs)
239
+
return &bundle, nil
240
+
}
241
+
242
+
func (p *PostgresDB) GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) {
243
+
query := `
244
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
245
+
compressed_size, uncompressed_size, cumulative_compressed_size,
246
+
cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
247
+
FROM plc_bundles
248
+
ORDER BY bundle_number DESC
249
+
LIMIT $1
250
+
`
251
+
252
+
rows, err := p.db.QueryContext(ctx, query, limit)
253
+
if err != nil {
254
+
return nil, err
255
+
}
256
+
defer rows.Close()
257
+
258
+
return p.scanBundles(rows)
259
+
}
260
+
261
+
func (p *PostgresDB) GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) {
262
+
query := `
263
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
264
+
compressed_size, uncompressed_size, cumulative_compressed_size,
265
+
cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
266
+
FROM plc_bundles
267
+
WHERE dids ? $1
268
+
ORDER BY bundle_number ASC
269
+
`
270
+
271
+
rows, err := p.db.QueryContext(ctx, query, did)
272
+
if err != nil {
273
+
return nil, err
274
+
}
275
+
defer rows.Close()
276
+
277
+
return p.scanBundles(rows)
278
+
}
279
+
280
+
func (p *PostgresDB) scanBundles(rows *sql.Rows) ([]*PLCBundle, error) {
281
+
var bundles []*PLCBundle
282
+
283
+
for rows.Next() {
284
+
var bundle PLCBundle
285
+
var didsJSON []byte
286
+
var prevHash sql.NullString
287
+
var cursor sql.NullString
288
+
289
+
if err := rows.Scan(
290
+
&bundle.BundleNumber,
291
+
&bundle.StartTime,
292
+
&bundle.EndTime,
293
+
&didsJSON,
294
+
&bundle.Hash,
295
+
&bundle.CompressedHash,
296
+
&bundle.CompressedSize,
297
+
&bundle.UncompressedSize,
298
+
&bundle.CumulativeCompressedSize,
299
+
&bundle.CumulativeUncompressedSize,
300
+
&cursor,
301
+
&prevHash,
302
+
&bundle.Compressed,
303
+
&bundle.CreatedAt,
304
+
); err != nil {
305
+
return nil, err
306
+
}
307
+
308
+
if prevHash.Valid {
309
+
bundle.PrevBundleHash = prevHash.String
310
+
}
311
+
if cursor.Valid {
312
+
bundle.Cursor = cursor.String
313
+
}
314
+
315
+
json.Unmarshal(didsJSON, &bundle.DIDs)
316
+
bundles = append(bundles, &bundle)
317
+
}
318
+
319
+
return bundles, rows.Err()
320
+
}
321
+
322
+
func (p *PostgresDB) GetBundleStats(ctx context.Context) (int64, int64, int64, int64, error) {
323
+
var count, lastBundleNum int64
324
+
err := p.db.QueryRowContext(ctx, `
325
+
SELECT COUNT(*), COALESCE(MAX(bundle_number), 0)
326
+
FROM plc_bundles
327
+
`).Scan(&count, &lastBundleNum)
328
+
if err != nil {
329
+
return 0, 0, 0, 0, err
330
+
}
331
+
332
+
if lastBundleNum == 0 {
333
+
return 0, 0, 0, 0, nil
334
+
}
335
+
336
+
var compressedSize, uncompressedSize int64
337
+
err = p.db.QueryRowContext(ctx, `
338
+
SELECT cumulative_compressed_size, cumulative_uncompressed_size
339
+
FROM plc_bundles
340
+
WHERE bundle_number = $1
341
+
`, lastBundleNum).Scan(&compressedSize, &uncompressedSize)
342
+
if err != nil {
343
+
return 0, 0, 0, 0, err
344
+
}
345
+
346
+
return count, compressedSize, uncompressedSize, lastBundleNum, nil
347
+
}
348
+
349
+
func (p *PostgresDB) GetLastBundleNumber(ctx context.Context) (int, error) {
350
+
query := "SELECT COALESCE(MAX(bundle_number), 0) FROM plc_bundles"
351
+
var num int
352
+
err := p.db.QueryRowContext(ctx, query).Scan(&num)
353
+
return num, err
354
+
}
355
+
356
+
func (p *PostgresDB) GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error) {
357
+
query := `
358
+
SELECT bundle_number
359
+
FROM plc_bundles
360
+
WHERE start_time <= $1 AND end_time >= $1
361
+
ORDER BY bundle_number ASC
362
+
LIMIT 1
363
+
`
364
+
365
+
var bundleNum int
366
+
err := p.db.QueryRowContext(ctx, query, afterTime).Scan(&bundleNum)
367
+
if err == sql.ErrNoRows {
368
+
query = `
369
+
SELECT bundle_number
370
+
FROM plc_bundles
371
+
WHERE end_time < $1
372
+
ORDER BY bundle_number DESC
373
+
LIMIT 1
374
+
`
375
+
err = p.db.QueryRowContext(ctx, query, afterTime).Scan(&bundleNum)
376
+
if err == sql.ErrNoRows {
377
+
return 1, nil
378
+
}
379
+
if err != nil {
380
+
return 0, err
381
+
}
382
+
return bundleNum, nil
383
+
}
384
+
if err != nil {
385
+
return 0, err
386
+
}
387
+
388
+
return bundleNum, nil
389
+
}
390
+
391
+
// ===== MEMPOOL OPERATIONS =====
392
+
393
+
func (p *PostgresDB) AddToMempool(ctx context.Context, ops []MempoolOperation) error {
394
+
if len(ops) == 0 {
395
+
return nil
396
+
}
397
+
398
+
tx, err := p.db.BeginTx(ctx, nil)
399
+
if err != nil {
400
+
return err
401
+
}
402
+
defer tx.Rollback()
403
+
404
+
stmt, err := tx.PrepareContext(ctx, `
405
+
INSERT INTO plc_mempool (did, operation, cid, created_at)
406
+
VALUES ($1, $2, $3, $4)
407
+
ON CONFLICT(cid) DO NOTHING
408
+
`)
409
+
if err != nil {
410
+
return err
411
+
}
412
+
defer stmt.Close()
413
+
414
+
for _, op := range ops {
415
+
_, err := stmt.ExecContext(ctx, op.DID, op.Operation, op.CID, op.CreatedAt)
416
+
if err != nil {
417
+
return err
418
+
}
419
+
}
420
+
421
+
return tx.Commit()
422
+
}
423
+
424
+
func (p *PostgresDB) GetMempoolCount(ctx context.Context) (int, error) {
425
+
query := "SELECT COUNT(*) FROM plc_mempool"
426
+
var count int
427
+
err := p.db.QueryRowContext(ctx, query).Scan(&count)
428
+
return count, err
429
+
}
430
+
431
+
func (p *PostgresDB) GetMempoolOperations(ctx context.Context, limit int) ([]MempoolOperation, error) {
432
+
query := `
433
+
SELECT id, did, operation, cid, created_at, added_at
434
+
FROM plc_mempool
435
+
ORDER BY created_at ASC
436
+
LIMIT $1
437
+
`
438
+
439
+
rows, err := p.db.QueryContext(ctx, query, limit)
440
+
if err != nil {
441
+
return nil, err
442
+
}
443
+
defer rows.Close()
444
+
445
+
var ops []MempoolOperation
446
+
for rows.Next() {
447
+
var op MempoolOperation
448
+
err := rows.Scan(&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt)
449
+
if err != nil {
450
+
return nil, err
451
+
}
452
+
ops = append(ops, op)
453
+
}
454
+
455
+
return ops, rows.Err()
456
+
}
457
+
458
+
func (p *PostgresDB) DeleteFromMempool(ctx context.Context, ids []int64) error {
459
+
if len(ids) == 0 {
460
+
return nil
461
+
}
462
+
463
+
placeholders := make([]string, len(ids))
464
+
args := make([]interface{}, len(ids))
465
+
for i, id := range ids {
466
+
placeholders[i] = fmt.Sprintf("$%d", i+1)
467
+
args[i] = id
468
+
}
469
+
470
+
query := fmt.Sprintf("DELETE FROM plc_mempool WHERE id IN (%s)",
471
+
strings.Join(placeholders, ","))
472
+
473
+
_, err := p.db.ExecContext(ctx, query, args...)
474
+
return err
475
+
}
476
+
477
+
func (p *PostgresDB) GetFirstMempoolOperation(ctx context.Context) (*MempoolOperation, error) {
478
+
query := `
479
+
SELECT id, did, operation, cid, created_at, added_at
480
+
FROM plc_mempool
481
+
ORDER BY created_at ASC, id ASC
482
+
LIMIT 1
483
+
`
484
+
485
+
var op MempoolOperation
486
+
err := p.db.QueryRowContext(ctx, query).Scan(
487
+
&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt,
488
+
)
489
+
if err == sql.ErrNoRows {
490
+
return nil, nil
491
+
}
492
+
if err != nil {
493
+
return nil, err
494
+
}
495
+
496
+
return &op, nil
497
+
}
498
+
499
+
func (p *PostgresDB) GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error) {
500
+
query := `
501
+
SELECT id, did, operation, cid, created_at, added_at
502
+
FROM plc_mempool
503
+
ORDER BY created_at DESC, id DESC
504
+
LIMIT 1
505
+
`
506
+
507
+
var op MempoolOperation
508
+
err := p.db.QueryRowContext(ctx, query).Scan(
509
+
&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt,
510
+
)
511
+
if err == sql.ErrNoRows {
512
+
return nil, nil
513
+
}
514
+
if err != nil {
515
+
return nil, err
516
+
}
517
+
518
+
return &op, nil
519
+
}
520
+
521
+
func (p *PostgresDB) GetMempoolUniqueDIDCount(ctx context.Context) (int, error) {
522
+
query := "SELECT COUNT(DISTINCT did) FROM plc_mempool"
523
+
var count int
524
+
err := p.db.QueryRowContext(ctx, query).Scan(&count)
525
+
return count, err
526
+
}
527
+
528
+
func (p *PostgresDB) GetMempoolUncompressedSize(ctx context.Context) (int64, error) {
529
+
query := "SELECT COALESCE(SUM(LENGTH(operation)), 0) FROM plc_mempool"
530
+
var size int64
531
+
err := p.db.QueryRowContext(ctx, query).Scan(&size)
532
+
return size, err
533
+
}
534
+
535
+
// ===== ENDPOINT OPERATIONS =====
536
+
537
+
func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error {
538
+
query := `
539
+
INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status)
540
+
VALUES ($1, $2, $3, $4, $5)
541
+
ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET
542
+
last_checked = EXCLUDED.last_checked
543
+
RETURNING id
544
+
`
545
+
err := p.db.QueryRowContext(ctx, query,
546
+
endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt,
547
+
endpoint.LastChecked, endpoint.Status).Scan(&endpoint.ID)
548
+
return err
549
+
}
550
+
551
+
func (p *PostgresDB) EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) {
552
+
query := "SELECT EXISTS(SELECT 1 FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2)"
553
+
var exists bool
554
+
err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&exists)
555
+
return exists, err
556
+
}
557
+
558
+
func (p *PostgresDB) GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) {
559
+
query := "SELECT id FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2"
560
+
var id int64
561
+
err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&id)
562
+
return id, err
563
+
}
564
+
565
+
func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) {
566
+
query := `
567
+
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at
568
+
FROM endpoints
569
+
WHERE endpoint = $1 AND endpoint_type = $2
570
+
`
571
+
572
+
var ep Endpoint
573
+
var lastChecked sql.NullTime
574
+
575
+
err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(
576
+
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
577
+
&ep.Status, &ep.UserCount, &ep.UpdatedAt,
578
+
)
579
+
if err != nil {
580
+
return nil, err
581
+
}
582
+
583
+
if lastChecked.Valid {
584
+
ep.LastChecked = lastChecked.Time
585
+
}
586
+
587
+
return &ep, nil
588
+
}
589
+
590
+
func (p *PostgresDB) GetEndpointByID(ctx context.Context, id int64) (*Endpoint, error) {
591
+
query := `
592
+
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at
593
+
FROM endpoints
594
+
WHERE id = $1
595
+
`
596
+
597
+
var ep Endpoint
598
+
var lastChecked sql.NullTime
599
+
600
+
err := p.db.QueryRowContext(ctx, query, id).Scan(
601
+
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
602
+
&ep.Status, &ep.UserCount, &ep.UpdatedAt,
603
+
)
604
+
if err != nil {
605
+
return nil, err
606
+
}
607
+
608
+
if lastChecked.Valid {
609
+
ep.LastChecked = lastChecked.Time
610
+
}
611
+
612
+
return &ep, nil
613
+
}
614
+
615
+
func (p *PostgresDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) {
616
+
query := `
617
+
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at
618
+
FROM endpoints
619
+
WHERE 1=1
620
+
`
621
+
args := []interface{}{}
622
+
argIdx := 1
623
+
624
+
if filter != nil {
625
+
if filter.Type != "" {
626
+
query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx)
627
+
args = append(args, filter.Type)
628
+
argIdx++
629
+
}
630
+
if filter.Status != "" {
631
+
statusInt := EndpointStatusUnknown
632
+
switch filter.Status {
633
+
case "online":
634
+
statusInt = EndpointStatusOnline
635
+
case "offline":
636
+
statusInt = EndpointStatusOffline
637
+
}
638
+
query += fmt.Sprintf(" AND status = $%d", argIdx)
639
+
args = append(args, statusInt)
640
+
argIdx++
641
+
}
642
+
if filter.MinUserCount > 0 {
643
+
query += fmt.Sprintf(" AND user_count >= $%d", argIdx)
644
+
args = append(args, filter.MinUserCount)
645
+
argIdx++
646
+
}
647
+
}
648
+
649
+
query += " ORDER BY user_count DESC"
650
+
651
+
if filter != nil && filter.Limit > 0 {
652
+
query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
653
+
args = append(args, filter.Limit, filter.Offset)
654
+
}
655
+
656
+
rows, err := p.db.QueryContext(ctx, query, args...)
657
+
if err != nil {
658
+
return nil, err
659
+
}
660
+
defer rows.Close()
661
+
662
+
var endpoints []*Endpoint
663
+
for rows.Next() {
664
+
var ep Endpoint
665
+
var lastChecked sql.NullTime
666
+
667
+
err := rows.Scan(
668
+
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
669
+
&ep.Status, &ep.UserCount, &ep.UpdatedAt,
670
+
)
671
+
if err != nil {
672
+
return nil, err
673
+
}
674
+
675
+
if lastChecked.Valid {
676
+
ep.LastChecked = lastChecked.Time
677
+
}
678
+
679
+
endpoints = append(endpoints, &ep)
680
+
}
681
+
682
+
return endpoints, rows.Err()
683
+
}
684
+
685
+
func (p *PostgresDB) UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error {
686
+
tx, err := p.db.BeginTx(ctx, nil)
687
+
if err != nil {
688
+
return err
689
+
}
690
+
defer tx.Rollback()
691
+
692
+
userCount := 0
693
+
if update.ScanData != nil {
694
+
userCount = update.ScanData.DIDCount
695
+
}
696
+
697
+
query := `
698
+
UPDATE endpoints
699
+
SET status = $1, last_checked = $2, user_count = $3, updated_at = $4
700
+
WHERE id = $5
701
+
`
702
+
_, err = tx.ExecContext(ctx, query, update.Status, update.LastChecked, userCount, time.Now(), endpointID)
703
+
if err != nil {
704
+
return err
705
+
}
706
+
707
+
var scanDataJSON []byte
708
+
if update.ScanData != nil {
709
+
scanDataJSON, _ = json.Marshal(update.ScanData)
710
+
}
711
+
712
+
scanQuery := `
713
+
INSERT INTO pds_scans (pds_id, status, response_time, scan_data)
714
+
VALUES ($1, $2, $3, $4)
715
+
`
716
+
_, err = tx.ExecContext(ctx, scanQuery, endpointID, update.Status, update.ResponseTime, scanDataJSON)
717
+
if err != nil {
718
+
return err
719
+
}
720
+
721
+
return tx.Commit()
722
+
}
723
+
724
+
func (p *PostgresDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) {
725
+
query := `
726
+
SELECT id, pds_id, status, response_time, scan_data, scanned_at
727
+
FROM pds_scans
728
+
WHERE pds_id = $1
729
+
ORDER BY scanned_at DESC
730
+
LIMIT $2
731
+
`
732
+
733
+
rows, err := p.db.QueryContext(ctx, query, endpointID, limit)
734
+
if err != nil {
735
+
return nil, err
736
+
}
737
+
defer rows.Close()
738
+
739
+
var scans []*EndpointScan
740
+
for rows.Next() {
741
+
var scan EndpointScan
742
+
var responseTime sql.NullFloat64
743
+
var scanDataJSON []byte
744
+
745
+
err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &scanDataJSON, &scan.ScannedAt)
746
+
if err != nil {
747
+
return nil, err
748
+
}
749
+
750
+
if responseTime.Valid {
751
+
scan.ResponseTime = responseTime.Float64
752
+
}
753
+
754
+
if len(scanDataJSON) > 0 {
755
+
var scanData EndpointScanData
756
+
if err := json.Unmarshal(scanDataJSON, &scanData); err == nil {
757
+
scan.ScanData = &scanData
758
+
}
759
+
}
760
+
761
+
scans = append(scans, &scan)
762
+
}
763
+
764
+
return scans, rows.Err()
765
+
}
766
+
767
+
func (p *PostgresDB) GetEndpointStats(ctx context.Context) (*EndpointStats, error) {
768
+
query := `
769
+
SELECT
770
+
COUNT(*) as total_endpoints,
771
+
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_endpoints,
772
+
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_endpoints,
773
+
(SELECT AVG(response_time) FROM pds_scans WHERE response_time > 0
774
+
AND scanned_at > NOW() - INTERVAL '1 hour') as avg_response_time,
775
+
SUM(user_count) as total_dids
776
+
FROM endpoints
777
+
`
778
+
779
+
var stats EndpointStats
780
+
var avgResponseTime sql.NullFloat64
781
+
782
+
err := p.db.QueryRowContext(ctx, query).Scan(
783
+
&stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints,
784
+
&avgResponseTime, &stats.TotalDIDs,
785
+
)
786
+
787
+
if avgResponseTime.Valid {
788
+
stats.AvgResponseTime = avgResponseTime.Float64
789
+
}
790
+
791
+
typeQuery := `
792
+
SELECT endpoint_type, COUNT(*)
793
+
FROM endpoints
794
+
GROUP BY endpoint_type
795
+
`
796
+
rows, err := p.db.QueryContext(ctx, typeQuery)
797
+
if err == nil {
798
+
defer rows.Close()
799
+
stats.ByType = make(map[string]int64)
800
+
for rows.Next() {
801
+
var typ string
802
+
var count int64
803
+
if err := rows.Scan(&typ, &count); err == nil {
804
+
stats.ByType[typ] = count
805
+
}
806
+
}
807
+
}
808
+
809
+
return &stats, err
810
+
}
811
+
812
+
// ===== CURSOR OPERATIONS =====
813
+
814
+
func (p *PostgresDB) GetScanCursor(ctx context.Context, source string) (*ScanCursor, error) {
815
+
query := "SELECT source, last_bundle_number, last_scan_time, records_processed FROM scan_cursors WHERE source = $1"
816
+
817
+
var cursor ScanCursor
818
+
err := p.db.QueryRowContext(ctx, query, source).Scan(
819
+
&cursor.Source, &cursor.LastBundleNumber, &cursor.LastScanTime, &cursor.RecordsProcessed,
820
+
)
821
+
if err == sql.ErrNoRows {
822
+
return &ScanCursor{
823
+
Source: source,
824
+
LastBundleNumber: 0,
825
+
LastScanTime: time.Time{},
826
+
}, nil
827
+
}
828
+
return &cursor, err
829
+
}
830
+
831
+
func (p *PostgresDB) UpdateScanCursor(ctx context.Context, cursor *ScanCursor) error {
832
+
query := `
833
+
INSERT INTO scan_cursors (source, last_bundle_number, last_scan_time, records_processed)
834
+
VALUES ($1, $2, $3, $4)
835
+
ON CONFLICT(source) DO UPDATE SET
836
+
last_bundle_number = EXCLUDED.last_bundle_number,
837
+
last_scan_time = EXCLUDED.last_scan_time,
838
+
records_processed = EXCLUDED.records_processed
839
+
`
840
+
_, err := p.db.ExecContext(ctx, query, cursor.Source, cursor.LastBundleNumber, cursor.LastScanTime, cursor.RecordsProcessed)
841
+
return err
842
+
}
843
+
844
+
// ===== METRICS OPERATIONS =====
845
+
846
+
func (p *PostgresDB) StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error {
847
+
query := `
848
+
INSERT INTO plc_metrics (total_dids, total_pds, unique_pds, scan_duration_ms, error_count)
849
+
VALUES ($1, $2, $3, $4, $5)
850
+
`
851
+
_, err := p.db.ExecContext(ctx, query, metrics.TotalDIDs, metrics.TotalPDS,
852
+
metrics.UniquePDS, metrics.ScanDuration, metrics.ErrorCount)
853
+
return err
854
+
}
855
+
856
+
func (p *PostgresDB) GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error) {
857
+
query := `
858
+
SELECT total_dids, total_pds, unique_pds, scan_duration_ms, error_count, created_at
859
+
FROM plc_metrics
860
+
ORDER BY created_at DESC
861
+
LIMIT $1
862
+
`
863
+
864
+
rows, err := p.db.QueryContext(ctx, query, limit)
865
+
if err != nil {
866
+
return nil, err
867
+
}
868
+
defer rows.Close()
869
+
870
+
var metrics []*PLCMetrics
871
+
for rows.Next() {
872
+
var m PLCMetrics
873
+
if err := rows.Scan(&m.TotalDIDs, &m.TotalPDS, &m.UniquePDS, &m.ScanDuration, &m.ErrorCount, &m.LastScanTime); err != nil {
874
+
return nil, err
875
+
}
876
+
metrics = append(metrics, &m)
877
+
}
878
+
879
+
return metrics, rows.Err()
880
+
}
881
+
882
+
// ===== DID OPERATIONS =====
883
+
884
+
func (p *PostgresDB) UpsertDID(ctx context.Context, did *DIDRecord) error {
885
+
bundleNumbersJSON, err := json.Marshal(did.BundleNumbers)
886
+
if err != nil {
887
+
return err
888
+
}
889
+
890
+
query := `
891
+
INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at)
892
+
VALUES ($1, $2, $3, $4, $5, $6, $7)
893
+
ON CONFLICT(did) DO UPDATE SET
894
+
last_seen_bundle = EXCLUDED.last_seen_bundle,
895
+
bundle_numbers = EXCLUDED.bundle_numbers,
896
+
operation_count = EXCLUDED.operation_count,
897
+
last_seen_at = EXCLUDED.last_seen_at
898
+
`
899
+
_, err = p.db.ExecContext(ctx, query,
900
+
did.DID, did.FirstSeenBundle, did.LastSeenBundle,
901
+
bundleNumbersJSON, did.OperationCount,
902
+
did.FirstSeenAt, did.LastSeenAt)
903
+
return err
904
+
}
905
+
906
+
func (p *PostgresDB) GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) {
907
+
query := `
908
+
SELECT did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at
909
+
FROM dids
910
+
WHERE did = $1
911
+
`
912
+
913
+
var record DIDRecord
914
+
var bundleNumbersJSON []byte
915
+
916
+
err := p.db.QueryRowContext(ctx, query, did).Scan(
917
+
&record.DID, &record.FirstSeenBundle, &record.LastSeenBundle,
918
+
&bundleNumbersJSON, &record.OperationCount,
919
+
&record.FirstSeenAt, &record.LastSeenAt,
920
+
)
921
+
if err != nil {
922
+
return nil, err
923
+
}
924
+
925
+
if err := json.Unmarshal(bundleNumbersJSON, &record.BundleNumbers); err != nil {
926
+
return nil, err
927
+
}
928
+
929
+
return &record, nil
930
+
}
931
+
932
+
func (p *PostgresDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string, firstSeenAt, lastSeenAt time.Time) error {
933
+
if len(dids) == 0 {
934
+
return nil
935
+
}
936
+
937
+
tx, err := p.db.BeginTx(ctx, nil)
938
+
if err != nil {
939
+
return err
940
+
}
941
+
defer tx.Rollback()
942
+
943
+
// Get existing DIDs
944
+
existingDIDs := make(map[string][]int)
945
+
if len(dids) > 0 {
946
+
placeholders := make([]string, len(dids))
947
+
args := make([]interface{}, len(dids))
948
+
for i, did := range dids {
949
+
placeholders[i] = fmt.Sprintf("$%d", i+1)
950
+
args[i] = did
951
+
}
952
+
953
+
query := fmt.Sprintf(`
954
+
SELECT did, bundle_numbers
955
+
FROM dids
956
+
WHERE did IN (%s)
957
+
`, strings.Join(placeholders, ","))
958
+
959
+
rows, err := tx.QueryContext(ctx, query, args...)
960
+
if err != nil {
961
+
return err
962
+
}
963
+
964
+
for rows.Next() {
965
+
var did string
966
+
var bundleNumbersJSON []byte
967
+
if err := rows.Scan(&did, &bundleNumbersJSON); err != nil {
968
+
rows.Close()
969
+
return err
970
+
}
971
+
972
+
var bundles []int
973
+
if err := json.Unmarshal(bundleNumbersJSON, &bundles); err != nil {
974
+
rows.Close()
975
+
return err
976
+
}
977
+
978
+
existingDIDs[did] = bundles
979
+
}
980
+
rows.Close()
981
+
982
+
if err := rows.Err(); err != nil {
983
+
return err
984
+
}
985
+
}
986
+
987
+
// Batch upsert
988
+
batchSize := 500
989
+
for i := 0; i < len(dids); i += batchSize {
990
+
end := i + batchSize
991
+
if end > len(dids) {
992
+
end = len(dids)
993
+
}
994
+
batch := dids[i:end]
995
+
996
+
if err := p.bulkUpsertDIDsSimplified(ctx, tx, bundleNum, batch, existingDIDs, firstSeenAt, lastSeenAt); err != nil {
997
+
return err
998
+
}
999
+
}
1000
+
1001
+
return tx.Commit()
1002
+
}
1003
+
1004
+
func (p *PostgresDB) bulkUpsertDIDsSimplified(ctx context.Context, tx *sql.Tx, bundleNum int, dids []string, existingDIDs map[string][]int, firstSeenAt, lastSeenAt time.Time) error {
1005
+
if len(dids) == 0 {
1006
+
return nil
1007
+
}
1008
+
1009
+
var values []string
1010
+
var args []interface{}
1011
+
argIdx := 1
1012
+
1013
+
for _, did := range dids {
1014
+
bundles := existingDIDs[did]
1015
+
1016
+
alreadyHas := false
1017
+
for _, b := range bundles {
1018
+
if b == bundleNum {
1019
+
alreadyHas = true
1020
+
break
1021
+
}
1022
+
}
1023
+
1024
+
if !alreadyHas {
1025
+
bundles = append(bundles, bundleNum)
1026
+
}
1027
+
1028
+
bundleNumbersJSON, _ := json.Marshal(bundles)
1029
+
1030
+
if len(existingDIDs[did]) > 0 {
1031
+
values = append(values, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)",
1032
+
argIdx, argIdx+1, argIdx+2, argIdx+3, argIdx+4, argIdx+5, argIdx+6))
1033
+
args = append(args, did, bundleNum, bundleNum, bundleNumbersJSON, len(bundles), firstSeenAt, lastSeenAt)
1034
+
argIdx += 7
1035
+
} else {
1036
+
values = append(values, fmt.Sprintf("($%d, $%d, $%d, $%d, 1, $%d, $%d)",
1037
+
argIdx, argIdx+1, argIdx+2, argIdx+3, argIdx+4, argIdx+5))
1038
+
args = append(args, did, bundleNum, bundleNum, bundleNumbersJSON, firstSeenAt, lastSeenAt)
1039
+
argIdx += 6
1040
+
}
1041
+
}
1042
+
1043
+
query := fmt.Sprintf(`
1044
+
INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at)
1045
+
VALUES %s
1046
+
ON CONFLICT(did) DO UPDATE SET
1047
+
last_seen_bundle = EXCLUDED.last_seen_bundle,
1048
+
bundle_numbers = EXCLUDED.bundle_numbers,
1049
+
operation_count = EXCLUDED.operation_count,
1050
+
last_seen_at = EXCLUDED.last_seen_at
1051
+
`, strings.Join(values, ","))
1052
+
1053
+
_, err := tx.ExecContext(ctx, query, args...)
1054
+
return err
1055
+
}
1056
+
1057
+
func (p *PostgresDB) GetTotalDIDCount(ctx context.Context) (int64, error) {
1058
+
query := "SELECT COUNT(*) FROM dids"
1059
+
var count int64
1060
+
err := p.db.QueryRowContext(ctx, query).Scan(&count)
1061
+
return count, err
1062
+
}