+241
-6
internal/api/handlers.go
+241
-6
internal/api/handlers.go
···
267
return
268
}
269
270
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
271
if err != nil {
272
resp.error("bundle not found", http.StatusNotFound)
273
return
274
}
275
276
-
resp.json(formatBundleResponse(bundle))
277
}
278
279
func (s *Server) handleGetPLCBundleDIDs(w http.ResponseWriter, r *http.Request) {
···
310
compressed := r.URL.Query().Get("compressed") != "false"
311
312
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
313
if err != nil {
314
resp.error("bundle not found", http.StatusNotFound)
315
return
316
}
317
318
-
resp.bundleHeaders(bundle)
319
320
-
if compressed {
321
-
s.serveCompressedBundle(w, r, bundle)
322
-
} else {
323
-
s.serveUncompressedBundle(w, r, bundle)
324
}
325
}
326
327
func (s *Server) serveCompressedBundle(w http.ResponseWriter, r *http.Request, bundle *storage.PLCBundle) {
···
267
return
268
}
269
270
+
// Try to get existing bundle
271
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
272
+
if err == nil {
273
+
// Bundle exists, return it normally
274
+
resp.json(formatBundleResponse(bundle))
275
+
return
276
+
}
277
+
278
+
// Bundle not found - check if it's the next upcoming bundle
279
+
lastBundle, err := s.db.GetLastBundleNumber(r.Context())
280
if err != nil {
281
resp.error("bundle not found", http.StatusNotFound)
282
return
283
}
284
285
+
if bundleNum == lastBundle+1 {
286
+
// This is the upcoming bundle - return preview based on mempool
287
+
upcomingBundle, err := s.createUpcomingBundlePreview(r.Context(), r, bundleNum)
288
+
if err != nil {
289
+
resp.error(fmt.Sprintf("failed to create upcoming bundle preview: %v", err), http.StatusInternalServerError)
290
+
return
291
+
}
292
+
resp.json(upcomingBundle)
293
+
return
294
+
}
295
+
296
+
// Not an upcoming bundle, just not found
297
+
resp.error("bundle not found", http.StatusNotFound)
298
+
}
299
+
300
+
func (s *Server) createUpcomingBundlePreview(ctx context.Context, r *http.Request, bundleNum int) (map[string]interface{}, error) {
301
+
// Get mempool stats
302
+
mempoolCount, err := s.db.GetMempoolCount(ctx)
303
+
if err != nil {
304
+
return nil, err
305
+
}
306
+
307
+
if mempoolCount == 0 {
308
+
return map[string]interface{}{
309
+
"plc_bundle_number": bundleNum,
310
+
"is_upcoming": true,
311
+
"status": "empty",
312
+
"message": "No operations in mempool yet",
313
+
"operation_count": 0,
314
+
}, nil
315
+
}
316
+
317
+
// Get first and last operations for time range
318
+
firstOp, err := s.db.GetFirstMempoolOperation(ctx)
319
+
if err != nil {
320
+
return nil, err
321
+
}
322
+
323
+
lastOp, err := s.db.GetLastMempoolOperation(ctx)
324
+
if err != nil {
325
+
return nil, err
326
+
}
327
+
328
+
// Get unique DID count
329
+
uniqueDIDCount, err := s.db.GetMempoolUniqueDIDCount(ctx)
330
+
if err != nil {
331
+
return nil, err
332
+
}
333
+
334
+
// Get uncompressed size estimate
335
+
uncompressedSize, err := s.db.GetMempoolUncompressedSize(ctx)
336
+
if err != nil {
337
+
return nil, err
338
+
}
339
+
340
+
// Estimate compressed size (typical ratio is ~0.1-0.15 for PLC data)
341
+
estimatedCompressedSize := int64(float64(uncompressedSize) * 0.12)
342
+
343
+
// Calculate completion estimate
344
+
var estimatedCompletionTime *time.Time
345
+
var operationsNeeded int
346
+
var currentRate float64
347
+
348
+
operationsNeeded = plc.BUNDLE_SIZE - mempoolCount
349
+
350
+
if mempoolCount < plc.BUNDLE_SIZE && mempoolCount > 0 {
351
+
timeSpan := lastOp.CreatedAt.Sub(firstOp.CreatedAt).Seconds()
352
+
if timeSpan > 0 {
353
+
currentRate = float64(mempoolCount) / timeSpan
354
+
if currentRate > 0 {
355
+
secondsNeeded := float64(operationsNeeded) / currentRate
356
+
completionTime := time.Now().Add(time.Duration(secondsNeeded) * time.Second)
357
+
estimatedCompletionTime = &completionTime
358
+
}
359
+
}
360
+
}
361
+
362
+
// Get previous bundle for cursor context
363
+
var prevBundleHash string
364
+
var cursor string
365
+
if bundleNum > 1 {
366
+
prevBundle, err := s.db.GetBundleByNumber(ctx, bundleNum-1)
367
+
if err == nil {
368
+
prevBundleHash = prevBundle.Hash
369
+
cursor = prevBundle.EndTime.Format(time.RFC3339Nano)
370
+
}
371
+
}
372
+
373
+
// Determine bundle status
374
+
status := "filling"
375
+
if mempoolCount >= plc.BUNDLE_SIZE {
376
+
status = "ready"
377
+
}
378
+
379
+
// Build upcoming bundle response
380
+
result := map[string]interface{}{
381
+
"plc_bundle_number": bundleNum,
382
+
"is_upcoming": true,
383
+
"status": status,
384
+
"operation_count": mempoolCount,
385
+
"target_operation_count": plc.BUNDLE_SIZE,
386
+
"progress_percent": float64(mempoolCount) / float64(plc.BUNDLE_SIZE) * 100,
387
+
"operations_needed": operationsNeeded,
388
+
"did_count": uniqueDIDCount,
389
+
"start_time": firstOp.CreatedAt, // This is FIXED once first op exists
390
+
"current_end_time": lastOp.CreatedAt, // This will change as more ops arrive
391
+
"uncompressed_size": uncompressedSize,
392
+
"estimated_compressed_size": estimatedCompressedSize,
393
+
"compression_ratio": float64(uncompressedSize) / float64(estimatedCompressedSize),
394
+
"prev_bundle_hash": prevBundleHash,
395
+
"cursor": cursor,
396
+
}
397
+
398
+
if estimatedCompletionTime != nil {
399
+
result["estimated_completion_time"] = *estimatedCompletionTime
400
+
result["current_rate_per_second"] = currentRate
401
+
}
402
+
403
+
// Get actual mempool operations if requested
404
+
if r.URL.Query().Get("include_dids") == "true" {
405
+
ops, err := s.db.GetMempoolOperations(ctx, plc.BUNDLE_SIZE)
406
+
if err == nil {
407
+
// Extract unique DIDs
408
+
didSet := make(map[string]bool)
409
+
for _, op := range ops {
410
+
didSet[op.DID] = true
411
+
}
412
+
dids := make([]string, 0, len(didSet))
413
+
for did := range didSet {
414
+
dids = append(dids, did)
415
+
}
416
+
result["dids"] = dids
417
+
}
418
+
}
419
+
420
+
return result, nil
421
}
422
423
func (s *Server) handleGetPLCBundleDIDs(w http.ResponseWriter, r *http.Request) {
···
454
compressed := r.URL.Query().Get("compressed") != "false"
455
456
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
457
+
if err == nil {
458
+
// Bundle exists, serve it normally
459
+
resp.bundleHeaders(bundle)
460
+
461
+
if compressed {
462
+
s.serveCompressedBundle(w, r, bundle)
463
+
} else {
464
+
s.serveUncompressedBundle(w, r, bundle)
465
+
}
466
+
return
467
+
}
468
+
469
+
// Bundle not found - check if it's the upcoming bundle
470
+
lastBundle, err := s.db.GetLastBundleNumber(r.Context())
471
if err != nil {
472
resp.error("bundle not found", http.StatusNotFound)
473
return
474
}
475
476
+
if bundleNum == lastBundle+1 {
477
+
// This is the upcoming bundle - serve from mempool
478
+
s.serveUpcomingBundle(w, r, bundleNum)
479
+
return
480
+
}
481
+
482
+
// Not an upcoming bundle, just not found
483
+
resp.error("bundle not found", http.StatusNotFound)
484
+
}
485
+
486
+
func (s *Server) serveUpcomingBundle(w http.ResponseWriter, r *http.Request, bundleNum int) {
487
+
ctx := r.Context()
488
+
489
+
// Get mempool count
490
+
mempoolCount, err := s.db.GetMempoolCount(ctx)
491
+
if err != nil {
492
+
http.Error(w, fmt.Sprintf("failed to get mempool count: %v", err), http.StatusInternalServerError)
493
+
return
494
+
}
495
+
496
+
if mempoolCount == 0 {
497
+
http.Error(w, "upcoming bundle is empty (no operations in mempool)", http.StatusNotFound)
498
+
return
499
+
}
500
+
501
+
// Get mempool operations (up to BUNDLE_SIZE)
502
+
mempoolOps, err := s.db.GetMempoolOperations(ctx, plc.BUNDLE_SIZE)
503
+
if err != nil {
504
+
http.Error(w, fmt.Sprintf("failed to get mempool operations: %v", err), http.StatusInternalServerError)
505
+
return
506
+
}
507
+
508
+
if len(mempoolOps) == 0 {
509
+
http.Error(w, "upcoming bundle is empty", http.StatusNotFound)
510
+
return
511
+
}
512
+
513
+
// Get time range
514
+
firstOp := mempoolOps[0]
515
+
lastOp := mempoolOps[len(mempoolOps)-1]
516
+
517
+
// Extract unique DIDs
518
+
didSet := make(map[string]bool)
519
+
for _, op := range mempoolOps {
520
+
didSet[op.DID] = true
521
+
}
522
+
523
+
// Get previous bundle hash
524
+
prevBundleHash := ""
525
+
if bundleNum > 1 {
526
+
if prevBundle, err := s.db.GetBundleByNumber(ctx, bundleNum-1); err == nil {
527
+
prevBundleHash = prevBundle.Hash
528
+
}
529
+
}
530
531
+
// Serialize operations to JSONL
532
+
var buf []byte
533
+
for _, mop := range mempoolOps {
534
+
buf = append(buf, []byte(mop.Operation)...)
535
+
buf = append(buf, '\n')
536
}
537
+
538
+
// Calculate size
539
+
uncompressedSize := int64(len(buf))
540
+
541
+
// Set headers
542
+
w.Header().Set("X-Bundle-Number", fmt.Sprintf("%d", bundleNum))
543
+
w.Header().Set("X-Bundle-Is-Upcoming", "true")
544
+
w.Header().Set("X-Bundle-Status", "preview")
545
+
w.Header().Set("X-Bundle-Start-Time", firstOp.CreatedAt.Format(time.RFC3339Nano))
546
+
w.Header().Set("X-Bundle-Current-End-Time", lastOp.CreatedAt.Format(time.RFC3339Nano))
547
+
w.Header().Set("X-Bundle-Operation-Count", fmt.Sprintf("%d", len(mempoolOps)))
548
+
w.Header().Set("X-Bundle-Target-Count", fmt.Sprintf("%d", plc.BUNDLE_SIZE))
549
+
w.Header().Set("X-Bundle-Progress-Percent", fmt.Sprintf("%.2f", float64(len(mempoolOps))/float64(plc.BUNDLE_SIZE)*100))
550
+
w.Header().Set("X-Bundle-DID-Count", fmt.Sprintf("%d", len(didSet)))
551
+
w.Header().Set("X-Bundle-Prev-Hash", prevBundleHash)
552
+
553
+
w.Header().Set("Content-Type", "application/jsonl")
554
+
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d-upcoming.jsonl", bundleNum))
555
+
w.Header().Set("Content-Length", fmt.Sprintf("%d", uncompressedSize))
556
+
w.Header().Set("X-Uncompressed-Size", fmt.Sprintf("%d", uncompressedSize))
557
+
558
+
w.WriteHeader(http.StatusOK)
559
+
w.Write(buf)
560
}
561
562
func (s *Server) serveCompressedBundle(w http.ResponseWriter, r *http.Request, bundle *storage.PLCBundle) {