check-order.sh
utils/check-order.sh
check-order.sh
utils/check-order.sh
check.sh
utils/check.sh
check.sh
utils/check.sh
-1
config.yaml
-1
config.yaml
+185
-14
internal/api/handlers.go
+185
-14
internal/api/handlers.go
···
11
11
"os"
12
12
"path/filepath"
13
13
"strconv"
14
+
"time"
14
15
15
16
"github.com/atscan/atscanner/internal/log"
16
17
"github.com/atscan/atscanner/internal/plc"
···
215
216
"plc_bundle_number": bundle.BundleNumber,
216
217
"start_time": bundle.StartTime,
217
218
"end_time": bundle.EndTime,
218
-
"operation_count": 1000,
219
+
"operation_count": plc.BUNDLE_SIZE,
219
220
"did_count": len(bundle.DIDs),
220
221
"hash": bundle.Hash, // Uncompressed (verifiable)
221
222
"compressed_hash": bundle.CompressedHash, // File integrity
···
301
302
if err := json.Unmarshal(line, &op); err != nil {
302
303
return nil, fmt.Errorf("failed to parse operation on line %d: %w", lineNum, err)
303
304
}
305
+
306
+
// CRITICAL: Store the original raw JSON bytes
307
+
op.RawJSON = make([]byte, len(line))
308
+
copy(op.RawJSON, line)
304
309
305
310
operations = append(operations, op)
306
311
}
···
425
430
}
426
431
}
427
432
428
-
// Fetch from PLC directory
429
-
remoteOps, err := s.plcClient.Export(ctx, plc.ExportOptions{
430
-
Count: 1000,
431
-
After: after,
432
-
})
433
-
if err != nil {
434
-
http.Error(w, fmt.Sprintf("Failed to fetch from PLC directory: %v", err), http.StatusInternalServerError)
435
-
return
433
+
// Collect remote operations (may need multiple fetches for large bundles)
434
+
var allRemoteOps []plc.PLCOperation
435
+
seenCIDs := make(map[string]bool)
436
+
437
+
// Track boundary CIDs
438
+
for cid := range prevBoundaryCIDs {
439
+
seenCIDs[cid] = true
436
440
}
437
441
438
-
// Apply same deduplication logic as when creating bundles
439
-
if after != "" && len(prevBoundaryCIDs) > 0 {
440
-
remoteOps = plc.StripBoundaryDuplicates(remoteOps, after, prevBoundaryCIDs)
442
+
currentAfter := after
443
+
maxFetches := 20 // Enough for up to 20k operations
444
+
445
+
for fetchNum := 0; fetchNum < maxFetches && len(allRemoteOps) < plc.BUNDLE_SIZE; fetchNum++ {
446
+
// Fetch from PLC directory
447
+
batch, err := s.plcClient.Export(ctx, plc.ExportOptions{
448
+
Count: 1000,
449
+
After: currentAfter,
450
+
})
451
+
if err != nil {
452
+
http.Error(w, fmt.Sprintf("Failed to fetch from PLC directory: %v", err), http.StatusInternalServerError)
453
+
return
454
+
}
455
+
456
+
if len(batch) == 0 {
457
+
break
458
+
}
459
+
460
+
// Deduplicate and add unique operations
461
+
for _, op := range batch {
462
+
if !seenCIDs[op.CID] {
463
+
seenCIDs[op.CID] = true
464
+
allRemoteOps = append(allRemoteOps, op)
465
+
if len(allRemoteOps) >= plc.BUNDLE_SIZE {
466
+
break
467
+
}
468
+
}
469
+
}
470
+
471
+
// Update cursor for next fetch
472
+
if len(batch) > 0 {
473
+
lastOp := batch[len(batch)-1]
474
+
currentAfter = lastOp.CreatedAt.Format("2006-01-02T15:04:05.000Z")
475
+
}
476
+
477
+
// If we got less than 1000, we've reached the end
478
+
if len(batch) < 1000 {
479
+
break
480
+
}
481
+
}
482
+
483
+
// Trim to exact bundle size
484
+
if len(allRemoteOps) > plc.BUNDLE_SIZE {
485
+
allRemoteOps = allRemoteOps[:plc.BUNDLE_SIZE]
441
486
}
442
487
443
488
// Compute remote hash (uncompressed JSONL)
444
-
remoteHash, err := computeRemoteOperationsHash(remoteOps)
489
+
remoteHash, err := computeRemoteOperationsHash(allRemoteOps)
445
490
if err != nil {
446
491
http.Error(w, fmt.Sprintf("Failed to compute remote hash: %v", err), http.StatusInternalServerError)
447
492
return
···
456
501
"local_hash": bundle.Hash,
457
502
"remote_hash": remoteHash,
458
503
"local_op_count": bundle.OperationCount,
459
-
"remote_op_count": len(remoteOps),
504
+
"remote_op_count": len(allRemoteOps),
460
505
"boundary_cids_used": len(prevBoundaryCIDs),
461
506
})
462
507
}
···
557
602
"first_prev_hash": firstBundle.PrevBundleHash, // Should be empty
558
603
"last_prev_hash": lastBundleData.PrevBundleHash,
559
604
})
605
+
}
606
+
607
+
// handlePLCExport simulates PLC directory /export endpoint using cached bundles
608
+
func (s *Server) handlePLCExport(w http.ResponseWriter, r *http.Request) {
609
+
ctx := r.Context()
610
+
611
+
// Parse query parameters
612
+
countStr := r.URL.Query().Get("count")
613
+
afterStr := r.URL.Query().Get("after")
614
+
615
+
count := 1000 // Default
616
+
if countStr != "" {
617
+
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
618
+
count = c
619
+
if count > 10000 {
620
+
count = 10000 // Max limit
621
+
}
622
+
}
623
+
}
624
+
625
+
var afterTime time.Time
626
+
if afterStr != "" {
627
+
// Try multiple timestamp formats (from most specific to least)
628
+
formats := []string{
629
+
time.RFC3339Nano, // 2023-11-09T03:55:00.123456789Z
630
+
time.RFC3339, // 2023-11-09T03:55:00Z
631
+
"2006-01-02T15:04:05.000Z", // 2023-11-09T03:55:00.000Z
632
+
"2006-01-02T15:04:05", // 2023-11-09T03:55:00
633
+
"2006-01-02T15:04", // 2023-11-09T03:55
634
+
"2006-01-02", // 2023-11-09
635
+
}
636
+
637
+
var parsed time.Time
638
+
var parseErr error
639
+
parsed = time.Time{} // zero value
640
+
641
+
for _, format := range formats {
642
+
parsed, parseErr = time.Parse(format, afterStr)
643
+
if parseErr == nil {
644
+
afterTime = parsed
645
+
break
646
+
}
647
+
}
648
+
649
+
if parseErr != nil {
650
+
http.Error(w, fmt.Sprintf("Invalid after parameter: %v", parseErr), http.StatusBadRequest)
651
+
return
652
+
}
653
+
}
654
+
655
+
// Find starting bundle (FAST - single query)
656
+
startBundle := 1
657
+
if !afterTime.IsZero() {
658
+
foundBundle, err := s.db.GetBundleForTimestamp(ctx, afterTime)
659
+
if err != nil {
660
+
log.Error("Failed to find bundle for timestamp: %v", err)
661
+
// Fallback to bundle 1
662
+
} else {
663
+
startBundle = foundBundle
664
+
// Go back one bundle to catch boundary timestamps
665
+
if startBundle > 1 {
666
+
startBundle--
667
+
}
668
+
}
669
+
}
670
+
671
+
// Collect operations from bundles
672
+
var allOps []plc.PLCOperation
673
+
seenCIDs := make(map[string]bool)
674
+
675
+
// Load bundles sequentially until we have enough operations
676
+
lastBundle, _ := s.db.GetLastBundleNumber(ctx)
677
+
678
+
for bundleNum := startBundle; bundleNum <= lastBundle && len(allOps) < count; bundleNum++ {
679
+
bundlePath := filepath.Join(s.plcCacheDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum))
680
+
681
+
ops, err := s.loadBundleOperations(bundlePath)
682
+
if err != nil {
683
+
log.Error("Warning: failed to load bundle %d: %v", bundleNum, err)
684
+
continue
685
+
}
686
+
687
+
// Filter operations
688
+
for _, op := range ops {
689
+
// Skip if STRICTLY BEFORE "after" timestamp
690
+
// Include operations AT or AFTER the timestamp
691
+
if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) {
692
+
continue
693
+
}
694
+
695
+
// Skip duplicates (by CID)
696
+
if seenCIDs[op.CID] {
697
+
continue
698
+
}
699
+
700
+
seenCIDs[op.CID] = true
701
+
allOps = append(allOps, op)
702
+
703
+
if len(allOps) >= count {
704
+
break
705
+
}
706
+
}
707
+
}
708
+
709
+
// Set headers for JSONL response
710
+
w.Header().Set("Content-Type", "application/jsonl")
711
+
w.Header().Set("X-Operation-Count", strconv.Itoa(len(allOps)))
712
+
713
+
// Write JSONL response (newline-delimited JSON with trailing newline)
714
+
for _, op := range allOps {
715
+
// Use raw JSON if available
716
+
if len(op.RawJSON) > 0 {
717
+
w.Write(op.RawJSON)
718
+
} else {
719
+
// Fallback: marshal the operation
720
+
jsonData, err := json.Marshal(op)
721
+
if err != nil {
722
+
log.Error("Failed to marshal operation: %v", err)
723
+
continue
724
+
}
725
+
w.Write(jsonData)
726
+
}
727
+
728
+
// Always add newline after each operation (including the last)
729
+
w.Write([]byte("\n"))
730
+
}
560
731
}
561
732
562
733
// computeRemoteOperationsHash - matching format
+1
internal/api/server.go
+1
internal/api/server.go
···
60
60
api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET")
61
61
api.HandleFunc("/plc/bundles/{bundleNumber}/verify", s.handleVerifyPLCBundle).Methods("POST")
62
62
api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET")
63
+
api.HandleFunc("/plc/export", s.handlePLCExport).Methods("GET")
63
64
64
65
// PLC/DID endpoints
65
66
api.HandleFunc("/plc/did/{did}", s.handleGetDID).Methods("GET")
+1
-1
internal/pds/client.go
+1
-1
internal/pds/client.go
···
86
86
func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, error) {
87
87
url := fmt.Sprintf("%s/xrpc/com.atproto.server.describeServer", endpoint)
88
88
89
-
fmt.Println(url)
89
+
//fmt.Println(url)
90
90
91
91
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
92
92
if err != nil {
+4
-8
internal/plc/bundle.go
+4
-8
internal/plc/bundle.go
···
302
302
if err := bm.indexBundleWithHash(ctx, bundleNumber, allOperations, path, uncompressedHash, compressedHash); err != nil {
303
303
log.Error("Warning: failed to index bundle: %v", err)
304
304
} else {
305
-
log.Info("✓ Bundle %06d saved [1000 ops, hash: %s, compressed: %s]",
306
-
bundleNumber, uncompressedHash[:16]+"...", compressedHash[:16]+"...")
305
+
log.Info("✓ Bundle %06d saved [%d ops, hash: %s, compressed: %s]",
306
+
bundleNumber, len(allOperations), uncompressedHash[:16]+"...", compressedHash[:16]+"...")
307
307
}
308
308
}
309
309
}
···
322
322
// saveBundleFileWithHash - NO trailing newline
323
323
func (bm *BundleManager) saveBundleFileWithHash(path string, operations []PLCOperation) (string, string, error) {
324
324
var jsonlData []byte
325
-
for i, op := range operations {
325
+
for _, op := range operations {
326
326
jsonlData = append(jsonlData, op.RawJSON...)
327
-
328
-
// Add newline ONLY between operations (not after last)
329
-
if i < len(operations)-1 {
330
-
jsonlData = append(jsonlData, '\n')
331
-
}
327
+
jsonlData = append(jsonlData, '\n')
332
328
}
333
329
334
330
uncompressedHash := bm.calculateHash(jsonlData)
+2
internal/storage/db.go
+2
internal/storage/db.go
···
2
2
3
3
import (
4
4
"context"
5
+
"time"
5
6
)
6
7
7
8
type Database interface {
···
30
31
GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error)
31
32
GetBundleStats(ctx context.Context) (int64, int64, error)
32
33
GetLastBundleNumber(ctx context.Context) (int, error)
34
+
GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error)
33
35
34
36
// Mempool operations
35
37
AddToMempool(ctx context.Context, ops []MempoolOperation) error
+36
-1
internal/storage/sqlite.go
+36
-1
internal/storage/sqlite.go
···
147
147
return &bundle, nil
148
148
}
149
149
150
-
// Update GetBundles, GetBundlesForDID similarly
150
+
// GetBundleForTimestamp finds the bundle that should contain operations at or after the given time
151
+
func (s *SQLiteDB) GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error) {
152
+
query := `
153
+
SELECT bundle_number
154
+
FROM plc_bundles
155
+
WHERE start_time <= ? AND end_time >= ?
156
+
ORDER BY bundle_number ASC
157
+
LIMIT 1
158
+
`
159
+
160
+
var bundleNum int
161
+
err := s.db.QueryRowContext(ctx, query, afterTime, afterTime).Scan(&bundleNum)
162
+
if err == sql.ErrNoRows {
163
+
// No exact match, find the closest bundle before this time
164
+
query = `
165
+
SELECT bundle_number
166
+
FROM plc_bundles
167
+
WHERE end_time < ?
168
+
ORDER BY bundle_number DESC
169
+
LIMIT 1
170
+
`
171
+
err = s.db.QueryRowContext(ctx, query, afterTime).Scan(&bundleNum)
172
+
if err == sql.ErrNoRows {
173
+
return 1, nil // Start from first bundle
174
+
}
175
+
if err != nil {
176
+
return 0, err
177
+
}
178
+
return bundleNum, nil // Return the bundle just before
179
+
}
180
+
if err != nil {
181
+
return 0, err
182
+
}
183
+
184
+
return bundleNum, nil
185
+
}
151
186
152
187
// GetLastBundleNumber gets the highest bundle number
153
188
func (s *SQLiteDB) GetLastBundleNumber(ctx context.Context) (int, error) {
-5
print.sh
-5
print.sh
+12
utils/check-minimal.sh
+12
utils/check-minimal.sh
···
1
+
pv plc_cache/*.jsonl.zst | zstdcat | \
2
+
jq -r '[.createdAt, .cid, .did] | @tsv' | \
3
+
awk '
4
+
NR > 1 {
5
+
if ($2 == prev_cid) {
6
+
printf "Duplicate CID: %s\n", $2
7
+
printf " Prev: time=%s DID=%s\n", prev_time, prev_did
8
+
printf " Curr: time=%s DID=%s\n\n", $1, $3
9
+
}
10
+
}
11
+
{prev_time = $1; prev_cid = $2; prev_did = $3}
12
+
'
+82
utils/verify-export-debug.sh
+82
utils/verify-export-debug.sh
···
1
+
#!/bin/bash
2
+
# verify-export-debug.sh - Deep comparison of export endpoints
3
+
4
+
AFTER="${1:-}"
5
+
COUNT="${2:-1000}"
6
+
LOCAL_URL="http://localhost:8080/api/v1/plc/export"
7
+
REMOTE_URL="https://plc.directory/export"
8
+
9
+
# Build query parameters
10
+
PARAMS="count=$COUNT"
11
+
if [ -n "$AFTER" ]; then
12
+
PARAMS="${PARAMS}&after=${AFTER}"
13
+
fi
14
+
15
+
echo "=== Fetching data ==="
16
+
curl -s "${LOCAL_URL}?${PARAMS}" > /tmp/local_export.jsonl
17
+
curl -s "${REMOTE_URL}?${PARAMS}" > /tmp/remote_export.jsonl
18
+
19
+
echo "Local file size: $(wc -c < /tmp/local_export.jsonl) bytes"
20
+
echo "Remote file size: $(wc -c < /tmp/remote_export.jsonl) bytes"
21
+
echo ""
22
+
23
+
echo "Local lines: $(wc -l < /tmp/local_export.jsonl)"
24
+
echo "Remote lines: $(wc -l < /tmp/remote_export.jsonl)"
25
+
echo ""
26
+
27
+
# Check for trailing newline
28
+
echo "Local ends with newline: $(tail -c 1 /tmp/local_export.jsonl | xxd -p)"
29
+
echo "Remote ends with newline: $(tail -c 1 /tmp/remote_export.jsonl | xxd -p)"
30
+
echo "(0a = newline, other = no trailing newline)"
31
+
echo ""
32
+
33
+
# Compare line by line
34
+
echo "=== Comparing CIDs line by line ==="
35
+
LOCAL_CIDS=$(cat /tmp/local_export.jsonl | jq -r '.cid' 2>/dev/null)
36
+
REMOTE_CIDS=$(cat /tmp/remote_export.jsonl | jq -r '.cid' 2>/dev/null)
37
+
38
+
if [ "$LOCAL_CIDS" = "$REMOTE_CIDS" ]; then
39
+
echo "✅ All CIDs match in order"
40
+
else
41
+
echo "❌ CIDs differ"
42
+
echo ""
43
+
echo "First 5 local CIDs:"
44
+
echo "$LOCAL_CIDS" | head -5
45
+
echo ""
46
+
echo "First 5 remote CIDs:"
47
+
echo "$REMOTE_CIDS" | head -5
48
+
fi
49
+
echo ""
50
+
51
+
# Compare exact JSON of first operation
52
+
echo "=== First operation comparison ==="
53
+
echo "Local:"
54
+
head -1 /tmp/local_export.jsonl | jq . 2>/dev/null || head -1 /tmp/local_export.jsonl
55
+
echo ""
56
+
echo "Remote:"
57
+
head -1 /tmp/remote_export.jsonl | jq . 2>/dev/null || head -1 /tmp/remote_export.jsonl
58
+
echo ""
59
+
60
+
# Check if it's just a trailing newline issue
61
+
echo "=== Testing trailing newline hypothesis ==="
62
+
LOCAL_HASH_NO_TRAIL=$(head -c -1 /tmp/local_export.jsonl | shasum -a 256 | cut -d' ' -f1)
63
+
REMOTE_HASH_NO_TRAIL=$(head -c -1 /tmp/remote_export.jsonl | shasum -a 256 | cut -d' ' -f1)
64
+
65
+
LOCAL_HASH_WITH_TRAIL=$(cat /tmp/local_export.jsonl && echo "" | shasum -a 256 | cut -d' ' -f1)
66
+
67
+
echo "Local hash (as-is): $(shasum -a 256 < /tmp/local_export.jsonl | cut -d' ' -f1)"
68
+
echo "Remote hash (as-is): $(shasum -a 256 < /tmp/remote_export.jsonl | cut -d' ' -f1)"
69
+
echo "Local hash (no trailing \\n): $LOCAL_HASH_NO_TRAIL"
70
+
echo "Remote hash (no trailing \\n): $REMOTE_HASH_NO_TRAIL"
71
+
72
+
if [ "$LOCAL_HASH_NO_TRAIL" = "$(shasum -a 256 < /tmp/remote_export.jsonl | cut -d' ' -f1)" ]; then
73
+
echo ""
74
+
echo "🔍 Found it! Local is missing trailing newline"
75
+
elif [ "$(shasum -a 256 < /tmp/local_export.jsonl | cut -d' ' -f1)" = "$REMOTE_HASH_NO_TRAIL" ]; then
76
+
echo ""
77
+
echo "🔍 Found it! Remote is missing trailing newline"
78
+
fi
79
+
80
+
# Clean up
81
+
rm -f /tmp/local_export.jsonl /tmp/remote_export.jsonl
82
+
+88
utils/verify-export.sh
+88
utils/verify-export.sh
···
1
+
#!/bin/bash
2
+
# verify-export.sh - Verify local PLC export endpoint against plc.directory
3
+
# Usage: ./verify-export.sh [after_timestamp] [count]
4
+
5
+
AFTER="${1:-}"
6
+
COUNT="${2:-50}"
7
+
LOCAL_URL="http://localhost:8080/api/v1/plc/export"
8
+
REMOTE_URL="https://plc.directory/export"
9
+
10
+
echo "=== PLC Export Verification ==="
11
+
echo "Count: $COUNT"
12
+
if [ -n "$AFTER" ]; then
13
+
echo "After: $AFTER"
14
+
else
15
+
echo "After: (none - from beginning)"
16
+
fi
17
+
echo ""
18
+
19
+
# Build query parameters
20
+
PARAMS="count=$COUNT"
21
+
if [ -n "$AFTER" ]; then
22
+
PARAMS="${PARAMS}&after=${AFTER}"
23
+
fi
24
+
25
+
echo "Fetching from local API..."
26
+
echo "curl -s \"${LOCAL_URL}?${PARAMS}\""
27
+
LOCAL_DATA=$(curl -s "${LOCAL_URL}?${PARAMS}")
28
+
LOCAL_COUNT=$(echo "$LOCAL_DATA" | wc -l | tr -d ' ')
29
+
LOCAL_HASH=$(echo "$LOCAL_DATA" | shasum -a 256 | cut -d' ' -f1)
30
+
31
+
echo " Operations: $LOCAL_COUNT"
32
+
echo " Hash: $LOCAL_HASH"
33
+
echo ""
34
+
35
+
echo "Fetching from plc.directory..."
36
+
REMOTE_DATA=$(curl -s "${REMOTE_URL}?${PARAMS}")
37
+
REMOTE_COUNT=$(echo "$REMOTE_DATA" | wc -l | tr -d ' ')
38
+
REMOTE_HASH=$(echo "$REMOTE_DATA" | shasum -a 256 | cut -d' ' -f1)
39
+
40
+
echo " Operations: $REMOTE_COUNT"
41
+
echo " Hash: $REMOTE_HASH"
42
+
echo ""
43
+
44
+
# Compare
45
+
echo "=== COMPARISON ==="
46
+
if [ "$LOCAL_HASH" = "$REMOTE_HASH" ]; then
47
+
echo "✅ MATCH! Hashes are identical"
48
+
echo ""
49
+
echo "Local and remote exports are in sync! 🎯"
50
+
exit 0
51
+
else
52
+
echo "❌ MISMATCH! Hashes differ"
53
+
echo ""
54
+
55
+
# Show counts
56
+
if [ "$LOCAL_COUNT" != "$REMOTE_COUNT" ]; then
57
+
echo "⚠️ Operation count differs:"
58
+
echo " Local: $LOCAL_COUNT operations"
59
+
echo " Remote: $REMOTE_COUNT operations"
60
+
echo " Diff: $((REMOTE_COUNT - LOCAL_COUNT))"
61
+
echo ""
62
+
fi
63
+
64
+
# Sample first and last operations
65
+
echo "First operation (local):"
66
+
echo "$LOCAL_DATA" | head -1 | jq -r '[.did, .cid, .createdAt] | @tsv' 2>/dev/null || echo "(parse error)"
67
+
echo ""
68
+
69
+
echo "First operation (remote):"
70
+
echo "$REMOTE_DATA" | head -1 | jq -r '[.did, .cid, .createdAt] | @tsv' 2>/dev/null || echo "(parse error)"
71
+
echo ""
72
+
73
+
echo "Last operation (local):"
74
+
echo "$LOCAL_DATA" | tail -1 | jq -r '[.did, .cid, .createdAt] | @tsv' 2>/dev/null || echo "(parse error)"
75
+
echo ""
76
+
77
+
echo "Last operation (remote):"
78
+
echo "$REMOTE_DATA" | tail -1 | jq -r '[.did, .cid, .createdAt] | @tsv' 2>/dev/null || echo "(parse error)"
79
+
echo ""
80
+
81
+
# Find first difference
82
+
echo "Finding first difference..."
83
+
diff <(echo "$LOCAL_DATA" | jq -r '.cid' 2>/dev/null | head -20) \
84
+
<(echo "$REMOTE_DATA" | jq -r '.cid' 2>/dev/null | head -20) || true
85
+
86
+
exit 1
87
+
fi
88
+
verify-bundle.sh
utils/verify-bundle.sh
verify-bundle.sh
utils/verify-bundle.sh