+1
-1
config.yaml
+1
-1
config.yaml
+98
-3
internal/api/handlers.go
+98
-3
internal/api/handlers.go
···
251
251
})
252
252
}
253
253
254
+
func (s *Server) handleDownloadPLCBundle(w http.ResponseWriter, r *http.Request) {
255
+
ctx := r.Context()
256
+
vars := mux.Vars(r)
257
+
258
+
bundleNumber, err := strconv.Atoi(vars["number"])
259
+
if err != nil {
260
+
http.Error(w, "invalid bundle number", http.StatusBadRequest)
261
+
return
262
+
}
263
+
264
+
// Verify bundle exists in database
265
+
bundle, err := s.db.GetBundleByNumber(ctx, bundleNumber)
266
+
if err != nil {
267
+
http.Error(w, "bundle not found", http.StatusNotFound)
268
+
return
269
+
}
270
+
271
+
// Build file path
272
+
filePath := filepath.Join(s.plcBundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
273
+
274
+
// Check if file exists
275
+
fileInfo, err := os.Stat(filePath)
276
+
if err != nil {
277
+
if os.IsNotExist(err) {
278
+
http.Error(w, "bundle file not found on disk", http.StatusNotFound)
279
+
return
280
+
}
281
+
http.Error(w, fmt.Sprintf("error accessing bundle file: %v", err), http.StatusInternalServerError)
282
+
return
283
+
}
284
+
285
+
// Open file
286
+
file, err := os.Open(filePath)
287
+
if err != nil {
288
+
http.Error(w, fmt.Sprintf("error opening bundle file: %v", err), http.StatusInternalServerError)
289
+
return
290
+
}
291
+
defer file.Close()
292
+
293
+
// Set headers
294
+
w.Header().Set("Content-Type", "application/zstd")
295
+
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl.zst", bundleNumber))
296
+
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
297
+
w.Header().Set("X-Bundle-Number", fmt.Sprintf("%d", bundleNumber))
298
+
w.Header().Set("X-Bundle-Hash", bundle.Hash)
299
+
w.Header().Set("X-Bundle-Compressed-Hash", bundle.CompressedHash)
300
+
w.Header().Set("X-Bundle-Start-Time", bundle.StartTime.Format(time.RFC3339Nano))
301
+
w.Header().Set("X-Bundle-End-Time", bundle.EndTime.Format(time.RFC3339Nano))
302
+
w.Header().Set("X-Bundle-Operation-Count", fmt.Sprintf("%d", plc.BUNDLE_SIZE))
303
+
w.Header().Set("X-Bundle-DID-Count", fmt.Sprintf("%d", len(bundle.DIDs)))
304
+
305
+
// Stream the file
306
+
http.ServeContent(w, r, filepath.Base(filePath), bundle.CreatedAt, file)
307
+
}
308
+
254
309
func (s *Server) handleGetMempoolStats(w http.ResponseWriter, r *http.Request) {
255
310
ctx := r.Context()
256
311
···
260
315
return
261
316
}
262
317
263
-
respondJSON(w, map[string]interface{}{
318
+
response := map[string]interface{}{
264
319
"operation_count": count,
265
-
"can_create_bundle": count >= 1000,
266
-
})
320
+
"can_create_bundle": count >= plc.BUNDLE_SIZE,
321
+
}
322
+
323
+
// Get mempool start time (first item)
324
+
if count > 0 {
325
+
firstOp, err := s.db.GetFirstMempoolOperation(ctx)
326
+
if err == nil && firstOp != nil {
327
+
response["mempool_start_time"] = firstOp.CreatedAt
328
+
329
+
// Calculate estimated next bundle time
330
+
if count < plc.BUNDLE_SIZE {
331
+
lastOp, err := s.db.GetLastMempoolOperation(ctx)
332
+
if err == nil && lastOp != nil {
333
+
// Calculate rate of operations per second
334
+
timeSpan := lastOp.CreatedAt.Sub(firstOp.CreatedAt).Seconds()
335
+
336
+
if timeSpan > 0 {
337
+
opsPerSecond := float64(count) / timeSpan
338
+
339
+
if opsPerSecond > 0 {
340
+
remainingOps := plc.BUNDLE_SIZE - count
341
+
secondsNeeded := float64(remainingOps) / opsPerSecond
342
+
estimatedTime := time.Now().Add(time.Duration(secondsNeeded) * time.Second)
343
+
344
+
response["estimated_next_bundle_time"] = estimatedTime
345
+
response["operations_needed"] = remainingOps
346
+
response["current_rate_per_second"] = opsPerSecond
347
+
}
348
+
}
349
+
}
350
+
} else {
351
+
// Bundle can be created now
352
+
response["estimated_next_bundle_time"] = time.Now()
353
+
response["operations_needed"] = 0
354
+
}
355
+
}
356
+
} else {
357
+
response["mempool_start_time"] = nil
358
+
response["estimated_next_bundle_time"] = nil
359
+
}
360
+
361
+
respondJSON(w, response)
267
362
}
268
363
269
364
// Helper to load bundle operations - UPDATED FOR JSONL FORMAT
+1
internal/api/server.go
+1
internal/api/server.go
···
66
66
api.HandleFunc("/plc/bundles/chain", s.handleGetChainInfo).Methods("GET")
67
67
api.HandleFunc("/plc/bundles/verify-chain", s.handleVerifyChain).Methods("POST")
68
68
api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET")
69
+
api.HandleFunc("/plc/bundles/{number}/download", s.handleDownloadPLCBundle).Methods("GET")
69
70
api.HandleFunc("/plc/bundles/{bundleNumber}/verify", s.handleVerifyPLCBundle).Methods("POST")
70
71
api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET")
71
72
api.HandleFunc("/plc/export", s.handlePLCExport).Methods("GET")
+1
internal/storage/db.go
+1
internal/storage/db.go
···
38
38
GetMempoolCount(ctx context.Context) (int, error)
39
39
GetMempoolOperations(ctx context.Context, limit int) ([]MempoolOperation, error)
40
40
DeleteFromMempool(ctx context.Context, ids []int64) error
41
+
GetFirstMempoolOperation(ctx context.Context) (*MempoolOperation, error)
41
42
GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error)
42
43
43
44
// Metrics
+23
internal/storage/sqlite.go
+23
internal/storage/sqlite.go
···
282
282
return err
283
283
}
284
284
285
+
// GetFirstMempoolOperation retrieves the oldest operation from mempool
286
+
func (s *SQLiteDB) GetFirstMempoolOperation(ctx context.Context) (*MempoolOperation, error) {
287
+
query := `
288
+
SELECT id, did, operation, cid, created_at, added_at
289
+
FROM plc_mempool
290
+
ORDER BY created_at ASC, id ASC
291
+
LIMIT 1
292
+
`
293
+
294
+
var op MempoolOperation
295
+
err := s.db.QueryRowContext(ctx, query).Scan(
296
+
&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt,
297
+
)
298
+
if err == sql.ErrNoRows {
299
+
return nil, nil // No operations in mempool
300
+
}
301
+
if err != nil {
302
+
return nil, err
303
+
}
304
+
305
+
return &op, nil
306
+
}
307
+
285
308
// GetLastMempoolOperation retrieves the most recent operation from mempool
286
309
func (s *SQLiteDB) GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error) {
287
310
query := `