tangled
alpha
login
or
join now
atscan.net
/
plcbundle
A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
14
fork
atom
overview
issues
2
pulls
pipelines
cmd mempool
tree.fail
3 months ago
99fb2e58
f66a7546
+259
-103
8 changed files
expand all
collapse all
unified
split
cmd
plcbundle
commands
did.go
diff.go
mempool.go
server.go
status.go
stream.go
sync.go
main.go
-1
cmd/plcbundle/commands/did.go
···
1
-
// repo/cmd/plcbundle/commands/did.go
2
package commands
3
4
import (
···
0
1
package commands
2
3
import (
-1
cmd/plcbundle/commands/diff.go
···
1
-
// repo/cmd/plcbundle/commands/diff.go
2
package commands
3
4
import (
···
0
1
package commands
2
3
import (
+257
-95
cmd/plcbundle/commands/mempool.go
···
7
"strings"
8
"time"
9
10
-
flag "github.com/spf13/pflag"
11
-
12
"tangled.org/atscan.net/plcbundle/internal/types"
13
)
14
15
-
// MempoolCommand handles the mempool subcommand
16
-
func MempoolCommand(args []string) error {
17
-
fs := flag.NewFlagSet("mempool", flag.ExitOnError)
18
-
clear := fs.Bool("clear", false, "clear the mempool")
19
-
export := fs.Bool("export", false, "export mempool operations as JSONL to stdout")
20
-
refresh := fs.Bool("refresh", false, "reload mempool from disk")
21
-
validate := fs.Bool("validate", false, "validate chronological order")
22
-
verbose := fs.Bool("v", false, "verbose output")
23
24
-
if err := fs.Parse(args); err != nil {
25
-
return err
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
26
}
27
28
-
mgr, dir, err := getManager("")
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
29
if err != nil {
30
return err
31
}
32
defer mgr.Close()
33
34
-
fmt.Printf("Working in: %s\n\n", dir)
0
35
36
-
// Handle validate
37
-
if *validate {
38
-
fmt.Printf("Validating mempool chronological order...\n")
39
-
if err := mgr.ValidateMempool(); err != nil {
40
-
return fmt.Errorf("validation failed: %w", err)
41
-
}
42
-
fmt.Printf("✓ Mempool validation passed\n")
43
-
return nil
44
-
}
45
46
-
// Handle refresh
47
-
if *refresh {
48
-
fmt.Printf("Refreshing mempool from disk...\n")
49
-
if err := mgr.RefreshMempool(); err != nil {
50
-
return fmt.Errorf("refresh failed: %w", err)
51
-
}
52
53
-
if err := mgr.ValidateMempool(); err != nil {
54
-
fmt.Fprintf(os.Stderr, "⚠️ Warning: mempool validation failed after refresh: %v\n", err)
55
-
} else {
56
-
fmt.Printf("✓ Mempool refreshed and validated\n\n")
57
-
}
58
-
}
59
60
-
// Handle clear
61
-
if *clear {
62
-
stats := mgr.GetMempoolStats()
63
-
count := stats["count"].(int)
64
65
-
if count == 0 {
66
-
fmt.Println("Mempool is already empty")
67
-
return nil
68
-
}
69
70
-
fmt.Printf("⚠️ This will clear %d operations from the mempool.\n", count)
71
-
fmt.Printf("Are you sure? [y/N]: ")
72
-
var response string
73
-
fmt.Scanln(&response)
74
-
if strings.ToLower(strings.TrimSpace(response)) != "y" {
75
-
fmt.Println("Cancelled")
76
-
return nil
77
-
}
78
79
-
if err := mgr.ClearMempool(); err != nil {
80
-
return fmt.Errorf("clear failed: %w", err)
81
-
}
82
83
-
fmt.Printf("✓ Mempool cleared (%d operations removed)\n", count)
84
-
return nil
85
-
}
0
0
0
86
87
-
// Handle export
88
-
if *export {
89
-
ops, err := mgr.GetMempoolOperations()
90
-
if err != nil {
91
-
return fmt.Errorf("failed to get mempool operations: %w", err)
92
-
}
93
94
-
if len(ops) == 0 {
95
-
fmt.Fprintf(os.Stderr, "Mempool is empty\n")
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
96
return nil
97
-
}
0
0
0
98
99
-
for _, op := range ops {
100
-
if len(op.RawJSON) > 0 {
101
-
fmt.Println(string(op.RawJSON))
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
102
}
103
-
}
104
105
-
fmt.Fprintf(os.Stderr, "Exported %d operations from mempool\n", len(ops))
106
-
return nil
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
107
}
108
109
-
// Default: Show mempool stats
110
-
return showMempoolStats(mgr, dir, *verbose)
0
111
}
112
113
-
func showMempoolStats(mgr BundleManager, dir string, verbose bool) error {
0
0
0
0
114
stats := mgr.GetMempoolStats()
115
count := stats["count"].(int)
116
canCreate := stats["can_create_bundle"].(bool)
···
118
minTimestamp := stats["min_timestamp"].(time.Time)
119
validated := stats["validated"].(bool)
120
121
-
fmt.Printf("Mempool Status:\n")
122
-
fmt.Printf(" Target bundle: %06d\n", targetBundle)
123
-
fmt.Printf(" Operations: %d\n", count)
124
-
fmt.Printf(" Can create bundle: %v (need %d)\n", canCreate, types.BUNDLE_SIZE)
125
-
fmt.Printf(" Min timestamp: %s\n", minTimestamp.Format("2006-01-02 15:04:05"))
0
126
0
127
validationIcon := "✓"
128
if !validated {
129
validationIcon = "⚠️"
130
}
131
-
fmt.Printf(" Validated: %s %v\n", validationIcon, validated)
132
133
if count > 0 {
0
134
if sizeBytes, ok := stats["size_bytes"].(int); ok {
135
-
fmt.Printf(" Size: %.2f KB\n", float64(sizeBytes)/1024)
136
}
137
0
138
if firstTime, ok := stats["first_time"].(time.Time); ok {
139
-
fmt.Printf(" First operation: %s\n", firstTime.Format("2006-01-02 15:04:05"))
140
}
141
-
142
if lastTime, ok := stats["last_time"].(time.Time); ok {
143
-
fmt.Printf(" Last operation: %s\n", lastTime.Format("2006-01-02 15:04:05"))
144
}
145
0
0
0
146
progress := float64(count) / float64(types.BUNDLE_SIZE) * 100
147
-
fmt.Printf(" Progress: %.1f%% (%d/%d)\n", progress, count, types.BUNDLE_SIZE)
148
149
-
// Progress bar
150
barWidth := 40
151
filled := int(float64(barWidth) * float64(count) / float64(types.BUNDLE_SIZE))
152
if filled > barWidth {
153
filled = barWidth
154
}
155
bar := strings.Repeat("█", filled) + strings.Repeat("░", barWidth-filled)
156
-
fmt.Printf(" [%s]\n", bar)
0
0
0
0
0
0
0
0
157
} else {
158
-
fmt.Printf(" (empty)\n")
159
}
0
0
160
161
// Verbose: Show sample operations
162
if verbose && count > 0 {
163
-
fmt.Println()
164
-
fmt.Printf("Sample operations (showing up to 10):\n")
165
166
ops, err := mgr.GetMempoolOperations()
167
if err != nil {
···
178
fmt.Printf(" %d. DID: %s\n", i+1, op.DID)
179
fmt.Printf(" CID: %s\n", op.CID)
180
fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05.000"))
0
181
}
182
183
if len(ops) > showCount {
184
-
fmt.Printf(" ... and %d more\n", len(ops)-showCount)
185
}
186
}
187
188
-
fmt.Println()
189
-
190
-
// Show mempool file
191
mempoolFilename := fmt.Sprintf("plc_mempool_%06d.jsonl", targetBundle)
192
fmt.Printf("File: %s\n", filepath.Join(dir, mempoolFilename))
193
···
7
"strings"
8
"time"
9
10
+
"github.com/goccy/go-json"
11
+
"github.com/spf13/cobra"
12
"tangled.org/atscan.net/plcbundle/internal/types"
13
)
14
15
+
func NewMempoolCommand() *cobra.Command {
16
+
cmd := &cobra.Command{
17
+
Use: "mempool [status]",
18
+
Aliases: []string{"mp"},
19
+
Short: "Manage mempool operations",
20
+
Long: `Manage mempool operations
0
0
21
22
+
The mempool stores operations waiting to be bundled. It maintains
23
+
strict chronological order and automatically validates consistency.`,
24
+
25
+
Example: ` # Show mempool status
26
+
plcbundle mempool
27
+
plcbundle mempool status
28
+
29
+
# Clear all operations
30
+
plcbundle mempool clear
31
+
32
+
# Export operations as JSONL
33
+
plcbundle mempool dump
34
+
plcbundle mempool dump > operations.jsonl
35
+
36
+
# Using alias
37
+
plcbundle mp status`,
38
+
39
+
RunE: func(cmd *cobra.Command, args []string) error {
40
+
// Default to status subcommand
41
+
return mempoolStatus(cmd, args)
42
+
},
43
}
44
45
+
// Add subcommands
46
+
cmd.AddCommand(newMempoolStatusCommand())
47
+
cmd.AddCommand(newMempoolClearCommand())
48
+
cmd.AddCommand(newMempoolDumpCommand())
49
+
50
+
return cmd
51
+
}
52
+
53
+
// ============================================================================
54
+
// MEMPOOL STATUS - Show current state
55
+
// ============================================================================
56
+
57
+
func newMempoolStatusCommand() *cobra.Command {
58
+
var verbose bool
59
+
60
+
cmd := &cobra.Command{
61
+
Use: "status",
62
+
Aliases: []string{"s", "info"},
63
+
Short: "Show mempool status",
64
+
Long: `Show mempool status and statistics
65
+
66
+
Displays current mempool state including operation count, progress toward
67
+
next bundle, validation status, and memory usage.`,
68
+
69
+
Example: ` # Show status
70
+
plcbundle mempool status
71
+
plcbundle mempool
72
+
73
+
# Verbose output with samples
74
+
plcbundle mempool status -v`,
75
+
76
+
RunE: func(cmd *cobra.Command, args []string) error {
77
+
return mempoolStatus(cmd, args)
78
+
},
79
+
}
80
+
81
+
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Show sample operations")
82
+
83
+
return cmd
84
+
}
85
+
86
+
func mempoolStatus(cmd *cobra.Command, args []string) error {
87
+
verbose, _ := cmd.Flags().GetBool("verbose")
88
+
if cmd.Parent() != nil {
89
+
// Called as subcommand, check parent's verbose flag
90
+
if v, err := cmd.Parent().Flags().GetBool("verbose"); err == nil && v {
91
+
verbose = true
92
+
}
93
+
}
94
+
95
+
mgr, dir, err := getManagerFromCommand(cmd, "")
96
if err != nil {
97
return err
98
}
99
defer mgr.Close()
100
101
+
return showMempoolStatus(mgr, dir, verbose)
102
+
}
103
104
+
// ============================================================================
105
+
// MEMPOOL CLEAR - Remove all operations
106
+
// ============================================================================
0
0
0
0
0
0
107
108
+
func newMempoolClearCommand() *cobra.Command {
109
+
var force bool
0
0
0
0
110
111
+
cmd := &cobra.Command{
112
+
Use: "clear",
113
+
Aliases: []string{"c", "reset"},
114
+
Short: "Clear mempool operations",
115
+
Long: `Clear all operations from mempool
0
116
117
+
Removes all operations from the mempool and saves the empty state.
118
+
This is a destructive operation that cannot be undone.
0
0
119
120
+
Use cases:
121
+
• Reset after testing
122
+
• Clear corrupted data
123
+
• Force fresh start`,
124
125
+
Example: ` # Clear with confirmation
126
+
plcbundle mempool clear
0
0
0
0
0
0
127
128
+
# Force clear without confirmation
129
+
plcbundle mempool clear --force
130
+
plcbundle mempool clear -f`,
131
132
+
RunE: func(cmd *cobra.Command, args []string) error {
133
+
mgr, dir, err := getManagerFromCommand(cmd, "")
134
+
if err != nil {
135
+
return err
136
+
}
137
+
defer mgr.Close()
138
139
+
stats := mgr.GetMempoolStats()
140
+
count := stats["count"].(int)
0
0
0
0
141
142
+
if count == 0 {
143
+
fmt.Println("Mempool is already empty")
144
+
return nil
145
+
}
146
+
147
+
fmt.Printf("Working in: %s\n\n", dir)
148
+
149
+
if !force {
150
+
fmt.Printf("⚠️ This will clear %d operations from the mempool.\n", count)
151
+
fmt.Printf("Are you sure? [y/N]: ")
152
+
var response string
153
+
fmt.Scanln(&response)
154
+
if strings.ToLower(strings.TrimSpace(response)) != "y" {
155
+
fmt.Println("Cancelled")
156
+
return nil
157
+
}
158
+
}
159
+
160
+
if err := mgr.ClearMempool(); err != nil {
161
+
return fmt.Errorf("clear failed: %w", err)
162
+
}
163
+
164
+
fmt.Printf("\n✓ Mempool cleared (%d operations removed)\n", count)
165
return nil
166
+
},
167
+
}
168
+
169
+
cmd.Flags().BoolVarP(&force, "force", "f", false, "Skip confirmation prompt")
170
171
+
return cmd
172
+
}
173
+
174
+
// ============================================================================
175
+
// MEMPOOL DUMP - Export operations as JSONL
176
+
// ============================================================================
177
+
178
+
func newMempoolDumpCommand() *cobra.Command {
179
+
var outputFile string
180
+
181
+
cmd := &cobra.Command{
182
+
Use: "dump",
183
+
Aliases: []string{"export", "d"},
184
+
Short: "Export mempool as JSONL",
185
+
Long: `Export mempool operations as JSONL
186
+
187
+
Outputs all operations in the mempool as newline-delimited JSON.
188
+
Perfect for backup, analysis, or piping to other tools.`,
189
+
190
+
Example: ` # Dump to stdout
191
+
plcbundle mempool dump
192
+
193
+
# Save to file
194
+
plcbundle mempool dump > mempool.jsonl
195
+
plcbundle mempool dump -o mempool.jsonl
196
+
197
+
# Pipe to jq
198
+
plcbundle mempool dump | jq -r .did
199
+
200
+
# Count operations
201
+
plcbundle mempool dump | wc -l
202
+
203
+
# Using alias
204
+
plcbundle mempool export`,
205
+
206
+
RunE: func(cmd *cobra.Command, args []string) error {
207
+
mgr, _, err := getManagerFromCommand(cmd, "")
208
+
if err != nil {
209
+
return err
210
}
211
+
defer mgr.Close()
212
213
+
ops, err := mgr.GetMempoolOperations()
214
+
if err != nil {
215
+
return fmt.Errorf("failed to get mempool operations: %w", err)
216
+
}
217
+
218
+
if len(ops) == 0 {
219
+
fmt.Fprintf(os.Stderr, "Mempool is empty\n")
220
+
return nil
221
+
}
222
+
223
+
// Determine output destination
224
+
var output *os.File
225
+
if outputFile != "" {
226
+
output, err = os.Create(outputFile)
227
+
if err != nil {
228
+
return fmt.Errorf("failed to create output file: %w", err)
229
+
}
230
+
defer output.Close()
231
+
fmt.Fprintf(os.Stderr, "Exporting to: %s\n", outputFile)
232
+
} else {
233
+
output = os.Stdout
234
+
}
235
+
236
+
// Write JSONL
237
+
for _, op := range ops {
238
+
if len(op.RawJSON) > 0 {
239
+
output.Write(op.RawJSON)
240
+
} else {
241
+
data, _ := json.Marshal(op)
242
+
output.Write(data)
243
+
}
244
+
output.Write([]byte("\n"))
245
+
}
246
+
247
+
fmt.Fprintf(os.Stderr, "Exported %d operations from mempool\n", len(ops))
248
+
return nil
249
+
},
250
}
251
252
+
cmd.Flags().StringVarP(&outputFile, "output", "o", "", "Output file (default: stdout)")
253
+
254
+
return cmd
255
}
256
257
+
// ============================================================================
258
+
// HELPER FUNCTIONS
259
+
// ============================================================================
260
+
261
+
func showMempoolStatus(mgr BundleManager, dir string, verbose bool) error {
262
stats := mgr.GetMempoolStats()
263
count := stats["count"].(int)
264
canCreate := stats["can_create_bundle"].(bool)
···
266
minTimestamp := stats["min_timestamp"].(time.Time)
267
validated := stats["validated"].(bool)
268
269
+
fmt.Printf("Mempool Status\n")
270
+
fmt.Printf("══════════════\n\n")
271
+
fmt.Printf(" Directory: %s\n", dir)
272
+
fmt.Printf(" Target bundle: %06d\n", targetBundle)
273
+
fmt.Printf(" Operations: %d / %d\n", count, types.BUNDLE_SIZE)
274
+
fmt.Printf(" Min timestamp: %s\n\n", minTimestamp.Format("2006-01-02 15:04:05"))
275
276
+
// Validation status
277
validationIcon := "✓"
278
if !validated {
279
validationIcon = "⚠️"
280
}
281
+
fmt.Printf(" Validated: %s %v\n", validationIcon, validated)
282
283
if count > 0 {
284
+
// Size information
285
if sizeBytes, ok := stats["size_bytes"].(int); ok {
286
+
fmt.Printf(" Size: %.2f KB\n", float64(sizeBytes)/1024)
287
}
288
289
+
// Time range
290
if firstTime, ok := stats["first_time"].(time.Time); ok {
291
+
fmt.Printf(" First op: %s\n", firstTime.Format("2006-01-02 15:04:05"))
292
}
0
293
if lastTime, ok := stats["last_time"].(time.Time); ok {
294
+
fmt.Printf(" Last op: %s\n", lastTime.Format("2006-01-02 15:04:05"))
295
}
296
297
+
fmt.Printf("\n")
298
+
299
+
// Progress bar
300
progress := float64(count) / float64(types.BUNDLE_SIZE) * 100
301
+
fmt.Printf(" Progress: %.1f%% (%d/%d)\n", progress, count, types.BUNDLE_SIZE)
302
0
303
barWidth := 40
304
filled := int(float64(barWidth) * float64(count) / float64(types.BUNDLE_SIZE))
305
if filled > barWidth {
306
filled = barWidth
307
}
308
bar := strings.Repeat("█", filled) + strings.Repeat("░", barWidth-filled)
309
+
fmt.Printf(" [%s]\n\n", bar)
310
+
311
+
// Bundle creation status
312
+
if canCreate {
313
+
fmt.Printf(" ✓ Ready to create bundle\n")
314
+
} else {
315
+
remaining := types.BUNDLE_SIZE - count
316
+
fmt.Printf(" Need %s more operations\n", formatNumber(remaining))
317
+
}
318
} else {
319
+
fmt.Printf("\n (empty)\n")
320
}
321
+
322
+
fmt.Printf("\n")
323
324
// Verbose: Show sample operations
325
if verbose && count > 0 {
326
+
fmt.Printf("Sample Operations (first 10)\n")
327
+
fmt.Printf("────────────────────────────\n\n")
328
329
ops, err := mgr.GetMempoolOperations()
330
if err != nil {
···
341
fmt.Printf(" %d. DID: %s\n", i+1, op.DID)
342
fmt.Printf(" CID: %s\n", op.CID)
343
fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05.000"))
344
+
fmt.Printf("\n")
345
}
346
347
if len(ops) > showCount {
348
+
fmt.Printf(" ... and %d more\n\n", len(ops)-showCount)
349
}
350
}
351
352
+
// Show mempool file location
0
0
353
mempoolFilename := fmt.Sprintf("plc_mempool_%06d.jsonl", targetBundle)
354
fmt.Printf("File: %s\n", filepath.Join(dir, mempoolFilename))
355
-1
cmd/plcbundle/commands/server.go
···
1
-
// cmd/plcbundle/commands/server.go
2
package commands
3
4
import (
···
0
1
package commands
2
3
import (
-1
cmd/plcbundle/commands/status.go
···
1
-
// repo/cmd/plcbundle/commands/status.go
2
package commands
3
4
import (
···
0
1
package commands
2
3
import (
-1
cmd/plcbundle/commands/stream.go
···
1
-
// repo/cmd/plcbundle/commands/stream.go
2
package commands
3
4
import (
···
0
1
package commands
2
3
import (
-1
cmd/plcbundle/commands/sync.go
···
1
-
// cmd/plcbundle/commands/sync.go
2
package commands
3
4
import (
···
0
1
package commands
2
3
import (
+2
-2
cmd/plcbundle/main.go
···
64
65
// Namespaced commands
66
cmd.AddCommand(commands.NewDIDCommand())
67
-
/*cmd.AddCommand(commands.NewIndexCommand())
68
cmd.AddCommand(commands.NewMempoolCommand())
69
-
cmd.AddCommand(commands.NewDetectorCommand())
70
71
// Monitoring & maintenance
72
cmd.AddCommand(commands.NewWatchCommand())
···
64
65
// Namespaced commands
66
cmd.AddCommand(commands.NewDIDCommand())
67
+
//cmd.AddCommand(commands.NewIndexCommand())
68
cmd.AddCommand(commands.NewMempoolCommand())
69
+
/*cmd.AddCommand(commands.NewDetectorCommand())
70
71
// Monitoring & maintenance
72
cmd.AddCommand(commands.NewWatchCommand())