optimizing a gate level bcm to the end of the earth and back
1#!/usr/bin/env python3
2"""
3Search for optimal BCD to 7-segment decoder circuit with 2, 3, and 4-input gates.
4"""
5
6import multiprocessing as mp
7from multiprocessing import Manager
8from concurrent.futures import ProcessPoolExecutor, as_completed
9import sys
10import os
11import time
12import threading
13import queue
14import termios
15import tty
16import signal
17import atexit
18
19from bcd_optimization.solver import BCDTo7SegmentSolver
20from bcd_optimization.truth_tables import SEGMENT_MINTERMS, SEGMENT_NAMES
21
22# ANSI escape codes for terminal control
23CLEAR_LINE = "\033[K"
24MOVE_UP = "\033[A"
25GREEN = "\033[92m"
26YELLOW = "\033[93m"
27CYAN = "\033[96m"
28DIM = "\033[2m"
29RESET = "\033[0m"
30BOLD = "\033[1m"
31HIDE_CURSOR = "\033[?25l"
32SHOW_CURSOR = "\033[?25h"
33
34
35class ProgressDisplay:
36 """Async progress display that updates in a separate thread."""
37
38 def __init__(self, stats_queue=None):
39 self.lock = threading.Lock()
40 self.running = False
41 self.thread = None
42 self.stats_queue = stats_queue
43 self.original_termios = None
44
45 # State
46 self.completed = 0
47 self.total = 0
48 self.start_time = 0
49 self.last_config = ""
50 self.current_cost = 0
51 self.message = ""
52
53 # SAT solver stats (aggregated across all running configs)
54 self.active_configs = {} # config_str -> stats dict
55 self.total_conflicts = 0
56 self.total_decisions = 0
57 self.total_vars = 0
58 self.total_clauses = 0
59
60 def _disable_input(self):
61 """Disable keyboard echo and hide cursor."""
62 try:
63 self.original_termios = termios.tcgetattr(sys.stdin)
64 new_termios = termios.tcgetattr(sys.stdin)
65 new_termios[3] = new_termios[3] & ~termios.ECHO & ~termios.ICANON
66 termios.tcsetattr(sys.stdin, termios.TCSANOW, new_termios)
67 except (termios.error, AttributeError):
68 pass
69 sys.stdout.write(HIDE_CURSOR)
70 sys.stdout.flush()
71
72 def _restore_input(self):
73 """Restore keyboard echo and show cursor."""
74 sys.stdout.write(SHOW_CURSOR)
75 sys.stdout.flush()
76 if self.original_termios:
77 try:
78 termios.tcsetattr(sys.stdin, termios.TCSANOW, self.original_termios)
79 except (termios.error, AttributeError):
80 pass
81 self.original_termios = None
82
83 def start(self, total, cost):
84 """Start the progress display thread."""
85 with self.lock:
86 self.completed = 0
87 self.total = total
88 self.start_time = time.time()
89 self.last_config = ""
90 self.current_cost = cost
91 self.message = ""
92 self.active_configs = {}
93 self.total_conflicts = 0
94 self.total_decisions = 0
95 self.running = True
96
97 self._disable_input()
98 self.thread = threading.Thread(target=self._update_loop, daemon=True)
99 self.thread.start()
100
101 def stop(self):
102 """Stop the progress display thread."""
103 self.running = False
104 if self.thread:
105 self.thread.join(timeout=0.5)
106 self._restore_input()
107 # Clear the line
108 print(f"\r{CLEAR_LINE}", end="", flush=True)
109
110 def update(self, completed, last_config=""):
111 """Update progress state (called from main thread)."""
112 with self.lock:
113 self.completed = completed
114 if last_config:
115 self.last_config = last_config
116 # Remove completed config from active
117 if last_config in self.active_configs:
118 del self.active_configs[last_config]
119
120 def set_message(self, msg):
121 """Set a temporary message to display."""
122 with self.lock:
123 self.message = msg
124
125 def _poll_stats(self):
126 """Poll stats queue for updates from worker processes."""
127 if not self.stats_queue:
128 return
129
130 try:
131 while True:
132 stats = self.stats_queue.get_nowait()
133 config = stats.get('config', '')
134 with self.lock:
135 self.active_configs[config] = stats
136 # Aggregate stats
137 self.total_conflicts = sum(s.get('conflicts', 0) for s in self.active_configs.values())
138 self.total_decisions = sum(s.get('decisions', 0) for s in self.active_configs.values())
139 self.total_vars = sum(s.get('vars', 0) for s in self.active_configs.values())
140 self.total_clauses = sum(s.get('clauses', 0) for s in self.active_configs.values())
141 except:
142 pass # Queue empty
143
144 def _update_loop(self):
145 """Background thread that updates the display."""
146 import shutil
147
148 spinner = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
149 spin_idx = 0
150 last_conflicts = 0
151 conflict_rate = 0
152
153 def fmt_num(n):
154 if n >= 1_000_000:
155 return f"{n/1_000_000:.1f}M"
156 elif n >= 1_000:
157 return f"{n/1_000:.1f}K"
158 return str(int(n))
159
160 while self.running:
161 self._poll_stats()
162
163 # Get terminal width each iteration (in case of resize)
164 try:
165 term_width = shutil.get_terminal_size().columns
166 except:
167 term_width = 80
168
169 with self.lock:
170 elapsed = time.time() - self.start_time
171 speed = self.completed / elapsed if elapsed > 0 else 0
172 remaining = self.total - self.completed
173 eta = remaining / speed if speed > 0 else 0
174
175 pct = 100 * self.completed / self.total if self.total > 0 else 0
176 spin = spinner[spin_idx % len(spinner)]
177
178 # Calculate conflict rate
179 conflict_delta = self.total_conflicts - last_conflicts
180 conflict_rate = conflict_rate * 0.7 + conflict_delta * 10 * 0.3
181 last_conflicts = self.total_conflicts
182
183 # Fixed parts (calculate their length)
184 # Format: " ⠹ [BAR] 2/6 (33%) 0.8/s [ACT] 29K"
185 prefix = f" {spin} ["
186 count_str = f"] {self.completed}/{self.total} ({pct:.0f}%) {speed:.1f}/s"
187
188 if self.total_conflicts > 0:
189 activity_level = min(max(conflict_rate / 50000, 0), 1.0)
190 conflict_str = f" {fmt_num(self.total_conflicts)}"
191 else:
192 activity_level = 0
193 conflict_str = ""
194
195 # Calculate available space for bars
196 fixed_len = len(prefix) + len(count_str) + len(conflict_str) + 4 # +4 for " []" around activity
197 available = term_width - fixed_len - 2 # -2 for safety margin
198
199 if available < 10:
200 # Too narrow, minimal display
201 content = f" {spin} {self.completed}/{self.total} {fmt_num(self.total_conflicts)}"
202 else:
203 # Split available space: 70% progress bar, 30% activity bar
204 if self.total_conflicts > 0:
205 progress_width = int(available * 0.65)
206 activity_width = available - progress_width
207 else:
208 progress_width = available
209 activity_width = 0
210
211 # Progress bar
212 filled = int(progress_width * self.completed / self.total) if self.total > 0 else 0
213 bar = "█" * filled + "░" * (progress_width - filled)
214
215 # Activity bar
216 if activity_width > 0:
217 activity_filled = int(activity_width * activity_level)
218 activity_bar = f" [{CYAN}{'▮' * activity_filled}{'▯' * (activity_width - activity_filled)}{RESET}]{conflict_str}"
219 else:
220 activity_bar = ""
221
222 content = f"{prefix}{bar}{count_str}{activity_bar}"
223
224 line = f"\r{content}{CLEAR_LINE}"
225 sys.stdout.write(line)
226 sys.stdout.flush()
227
228 spin_idx += 1
229 time.sleep(0.1)
230
231
232def progress_bar(current, total, width=30, label=""):
233 """Generate a progress bar string."""
234 filled = int(width * current / total) if total > 0 else 0
235 bar = "█" * filled + "░" * (width - filled)
236 pct = 100 * current / total if total > 0 else 0
237 return f"{label}[{bar}] {current}/{total} ({pct:.0f}%)"
238
239
240def try_config_with_stats(n2, n3, n4, use_complements, restrict_functions, stats_queue, stop_event=None):
241 """Try a single (n2, n3, n4) configuration with stats reporting."""
242 cost = n2 * 2 + n3 * 3 + n4 * 4
243 n_gates = n2 + n3 + n4
244 config_str = f"{n2}x2+{n3}x3+{n4}x4"
245
246 if n_gates < 7:
247 return None, cost, f"Skipped (only {n_gates} gates)", (n2, n3, n4)
248
249 from pysat.formula import CNF
250 from pysat.solvers import Solver
251
252 solver = BCDTo7SegmentSolver()
253 solver.generate_prime_implicants()
254
255 start = time.time()
256 try:
257 # Check early termination before building CNF
258 if stop_event and stop_event.is_set():
259 return None, cost, "Cancelled", (n2, n3, n4)
260
261 # Build the CNF without solving
262 cnf = solver._build_general_cnf(n2, n3, n4, use_complements, restrict_functions)
263 if cnf is None:
264 return None, cost, f"UNSAT (no valid config)", (n2, n3, n4)
265
266 n_vars = cnf['n_vars']
267 n_clauses = len(cnf['clauses'])
268
269 # Report initial stats
270 if stats_queue:
271 try:
272 stats_queue.put_nowait({
273 'config': config_str,
274 'phase': 'solving',
275 'vars': n_vars,
276 'clauses': n_clauses,
277 'conflicts': 0,
278 'decisions': 0,
279 })
280 except:
281 pass
282
283 # Solve with periodic stats updates
284 with Solver(name='g3', bootstrap_with=CNF(from_clauses=cnf['clauses'])) as sat_solver:
285 # Use solve_limited with conflict budget for progress updates
286 conflict_budget = 10000
287 total_conflicts = 0
288 total_decisions = 0
289
290 while True:
291 # Check early termination
292 if stop_event and stop_event.is_set():
293 elapsed = time.time() - start
294 return None, cost, f"Cancelled ({elapsed:.1f}s, {total_conflicts} conflicts)", (n2, n3, n4)
295
296 sat_solver.conf_budget(conflict_budget)
297 status = sat_solver.solve_limited()
298
299 stats = sat_solver.accum_stats()
300 total_conflicts = stats.get('conflicts', 0)
301 total_decisions = stats.get('decisions', 0)
302
303 if stats_queue:
304 try:
305 stats_queue.put_nowait({
306 'config': config_str,
307 'phase': 'solving',
308 'vars': n_vars,
309 'clauses': n_clauses,
310 'conflicts': total_conflicts,
311 'decisions': total_decisions,
312 })
313 except:
314 pass
315
316 if status is not None:
317 # Solved (True = SAT, False = UNSAT)
318 break
319 # status is None means budget exhausted, continue
320
321 elapsed = time.time() - start
322
323 if status:
324 model = set(sat_solver.get_model())
325 result = solver._decode_general_solution_from_cnf(model, cnf)
326 return result, cost, f"SAT ({elapsed:.1f}s, {total_conflicts} conflicts)", (n2, n3, n4)
327 else:
328 return None, cost, f"UNSAT ({elapsed:.1f}s, {total_conflicts} conflicts)", (n2, n3, n4)
329
330 except Exception as e:
331 import traceback
332 elapsed = time.time() - start
333 return None, cost, f"Error ({elapsed:.1f}s): {e}", (n2, n3, n4)
334
335
336def try_config(args):
337 """Try a single (n2, n3, n4) configuration. Run in separate process."""
338 n2, n3, n4, use_complements, restrict_functions, stats_queue, stop_event = args
339 return try_config_with_stats(n2, n3, n4, use_complements, restrict_functions, stats_queue, stop_event)
340
341
342def worker_init():
343 """Initialize worker process to ignore SIGINT (parent handles it)."""
344 signal.signal(signal.SIGINT, signal.SIG_IGN)
345
346
347def verify_result(result):
348 """Verify a synthesis result is correct."""
349 def eval_func2(func, a, b):
350 return (func >> (a * 2 + b)) & 1
351
352 def eval_func3(func, a, b, c):
353 return (func >> (a * 4 + b * 2 + c)) & 1
354
355 def eval_func4(func, a, b, c, d):
356 return (func >> (a * 8 + b * 4 + c * 2 + d)) & 1
357
358 for digit in range(10):
359 A = (digit >> 3) & 1
360 B = (digit >> 2) & 1
361 C = (digit >> 1) & 1
362 D = digit & 1
363
364 nodes = [A, B, C, D, 1-A, 1-B, 1-C, 1-D]
365
366 for g in result.gates:
367 if isinstance(g.input2, tuple):
368 if len(g.input2) == 2:
369 k, l = g.input2
370 val = eval_func3(g.func, nodes[g.input1], nodes[k], nodes[l])
371 else:
372 k, l, m = g.input2
373 val = eval_func4(g.func, nodes[g.input1], nodes[k], nodes[l], nodes[m])
374 else:
375 val = eval_func2(g.func, nodes[g.input1], nodes[g.input2])
376 nodes.append(val)
377
378 for seg in SEGMENT_NAMES:
379 expected = 1 if digit in SEGMENT_MINTERMS[seg] else 0
380 actual = nodes[result.output_map[seg]]
381 if actual != expected:
382 return False, f"Digit {digit}, {seg}: expected {expected}, got {actual}"
383
384 return True, "All correct"
385
386
387def cleanup_terminal():
388 """Restore terminal to normal state."""
389 sys.stdout.write(SHOW_CURSOR)
390 sys.stdout.flush()
391
392
393def main():
394 # Register cleanup to ensure cursor is always restored
395 atexit.register(cleanup_terminal)
396
397 # Store original terminal settings for signal handler
398 original_termios = None
399 try:
400 original_termios = termios.tcgetattr(sys.stdin)
401 except (termios.error, AttributeError):
402 pass
403
404 # Create progress display early so signal handler can access it
405 progress = ProgressDisplay(None) # Queue set later
406 executor_ref = [None] # Mutable container for executor reference
407 # State for signal handler status message
408 search_state = {
409 'start_time': 0,
410 'group_start': 0,
411 'configs_tested': 0,
412 'current_cost': 0,
413 'group_completed': 0,
414 'group_size': 0,
415 }
416
417 def signal_handler(signum, frame):
418 """Handle interrupt signals gracefully."""
419 # Stop progress display first to prevent overwrites
420 progress.running = False
421 if progress.thread:
422 progress.thread.join(timeout=0.2)
423 # Shutdown executor without waiting
424 if executor_ref[0]:
425 executor_ref[0].shutdown(wait=False, cancel_futures=True)
426 # Restore terminal
427 sys.stdout.write(f"\r{CLEAR_LINE}")
428 sys.stdout.write(SHOW_CURSOR)
429 sys.stdout.flush()
430 if original_termios:
431 try:
432 termios.tcsetattr(sys.stdin, termios.TCSANOW, original_termios)
433 except (termios.error, AttributeError):
434 pass
435 # Print status message
436 elapsed = time.time() - search_state['group_start'] if search_state['group_start'] else 0
437 conflicts = progress.total_conflicts
438 print(f" {YELLOW}● Interrupted at {search_state['group_completed']}/{search_state['group_size']} configurations{RESET} ({elapsed:.1f}s, {conflicts} conflicts)")
439 print(f"\n{YELLOW}Search interrupted by user{RESET}")
440 os._exit(0) # Hard exit to avoid cleanup errors from child processes
441
442 signal.signal(signal.SIGINT, signal_handler)
443 signal.signal(signal.SIGTERM, signal_handler)
444
445 print(f"{BOLD}{'=' * 60}{RESET}")
446 print(f"{BOLD}BCD to 7-Segment Optimal Circuit Search (with 4-input gates){RESET}")
447 print(f"{BOLD}{'=' * 60}{RESET}")
448 print()
449 print(f"Gates: AND, OR, XOR, XNOR, NAND, NOR (2, 3, and 4 input variants)")
450 print(f"Primary input complements (A', B', C', D') are free")
451 print()
452
453 # Generate configurations sorted by cost
454 configs = []
455 for n2 in range(0, 12):
456 for n3 in range(0, 8):
457 for n4 in range(0, 6):
458 cost = n2 * 2 + n3 * 3 + n4 * 4
459 n_gates = n2 + n3 + n4
460 # Need at least 7 gates for 7 outputs
461 if 7 <= n_gates <= 11 and 14 <= cost <= 22:
462 configs.append((n2, n3, n4, True, True))
463
464 # Sort by cost, then by number of gates
465 configs.sort(key=lambda x: (x[0]*2 + x[1]*3 + x[2]*4, x[0]+x[1]+x[2]))
466
467 # Group configs by cost
468 cost_groups = {}
469 for cfg in configs:
470 cost = cfg[0] * 2 + cfg[1] * 3 + cfg[2] * 4
471 if cost not in cost_groups:
472 cost_groups[cost] = []
473 cost_groups[cost].append(cfg)
474
475 min_cost = min(cost_groups.keys())
476 max_cost = max(cost_groups.keys())
477
478 print(f"Searching {len(configs)} configurations from {min_cost} to {max_cost} inputs")
479 print(f"Using {mp.cpu_count()} CPU cores")
480 print()
481
482 best_result = None
483 best_cost = float('inf')
484
485 total_start = time.time()
486 search_state['start_time'] = total_start
487 configs_tested = 0
488 total_configs = len(configs)
489
490 # Create shared queue for stats and stop event for early termination
491 manager = Manager()
492 stats_queue = manager.Queue()
493 stop_event = manager.Event()
494 progress.stats_queue = stats_queue
495
496 with ProcessPoolExecutor(max_workers=mp.cpu_count(), initializer=worker_init) as executor:
497 executor_ref[0] = executor
498 for cost in sorted(cost_groups.keys()):
499 if cost >= best_cost:
500 continue
501
502 group = cost_groups[cost]
503 group_size = len(group)
504 group_start = time.time()
505 completed_in_group = 0
506
507 # Update state for signal handler
508 search_state['current_cost'] = cost
509 search_state['group_size'] = group_size
510 search_state['group_completed'] = 0
511 search_state['group_start'] = group_start
512
513 print(f"\n{CYAN}{BOLD}Testing {cost} inputs{RESET} ({group_size} configurations)")
514 print("-" * 50)
515
516 # Start async progress display
517 progress.start(group_size, cost)
518
519 # Add stats_queue and stop_event to each config
520 stop_event.clear() # Reset for new cost group
521 configs_with_queue = [(cfg[0], cfg[1], cfg[2], cfg[3], cfg[4], stats_queue, stop_event) for cfg in group]
522 futures = {executor.submit(try_config, cfg): cfg for cfg in configs_with_queue}
523 found_solution = False
524
525 for future in as_completed(futures):
526 cfg = futures[future]
527 n2, n3, n4 = cfg[0], cfg[1], cfg[2] # First 3 elements are gate counts
528 completed_in_group += 1
529 configs_tested += 1
530 search_state['group_completed'] = completed_in_group
531 search_state['configs_tested'] = configs_tested
532 config_str = f"{n2}x2+{n3}x3+{n4}x4"
533
534 try:
535 result, result_cost, status, _ = future.result(timeout=300)
536
537 # Update progress (always, even for UNSAT)
538 progress.update(completed_in_group, config_str)
539
540 if result is not None:
541 # Found a potential solution - stop progress to print
542 valid, msg = verify_result(result)
543 progress.stop()
544 if valid:
545 print(f"\n {GREEN}✓ {n2}x2 + {n3}x3 + {n4}x4{RESET}: {status} {GREEN}(VERIFIED){RESET}")
546 if result_cost < best_cost:
547 best_result = result
548 best_cost = result_cost
549 found_solution = True
550 # Signal other workers to stop
551 stop_event.set()
552 for f in futures:
553 f.cancel()
554 break
555 else:
556 print(f"\n {YELLOW}✗ {n2}x2 + {n3}x3 + {n4}x4{RESET}: INVALID - {msg}")
557 # Restart progress
558 progress.start(group_size, cost)
559 progress.update(completed_in_group, config_str)
560 # For UNSAT: just continue, progress bar updates automatically
561
562 except Exception as e:
563 progress.stop()
564 print(f"\n {n2}x2 + {n3}x3 + {n4}x4: Error - {e}")
565 progress.start(group_size, cost)
566 progress.update(completed_in_group)
567
568 # Stop progress and print summary
569 progress.stop()
570 group_elapsed = time.time() - group_start
571
572 if not found_solution:
573 print(f" {YELLOW}✗ All {group_size} configurations UNSAT{RESET} ({group_elapsed:.1f}s, {progress.total_conflicts} conflicts)")
574
575 if best_result is not None and best_cost <= cost:
576 print(f"\n{GREEN}{BOLD}Found solution at {best_cost} inputs!{RESET}")
577 break
578
579 total_elapsed = time.time() - total_start
580
581 print()
582 print(f"{BOLD}{'=' * 60}{RESET}")
583 print(f"{BOLD}RESULTS{RESET}")
584 print(f"{BOLD}{'=' * 60}{RESET}")
585 print(f"Search time: {total_elapsed:.1f} seconds ({configs_tested} configurations tested)")
586 print(f"Average speed: {configs_tested/total_elapsed:.2f} configurations/second")
587
588 if best_result:
589 print(f"\n{GREEN}{BOLD}Best solution: {best_cost} gate inputs{RESET}")
590 print()
591 print("Gates:")
592
593 node_names = ['A', 'B', 'C', 'D', "A'", "B'", "C'", "D'"]
594 for g in best_result.gates:
595 i1 = node_names[g.input1]
596 if isinstance(g.input2, tuple):
597 if len(g.input2) == 2:
598 k, l = g.input2
599 i2, i3 = node_names[k], node_names[l]
600 print(f" g{g.index}: {g.func_name}({i1}, {i2}, {i3})")
601 else:
602 k, l, m = g.input2
603 i2, i3, i4 = node_names[k], node_names[l], node_names[m]
604 print(f" g{g.index}: {g.func_name}({i1}, {i2}, {i3}, {i4})")
605 else:
606 i2 = node_names[g.input2]
607 print(f" g{g.index}: {g.func_name}({i1}, {i2})")
608 node_names.append(f"g{g.index}")
609
610 print()
611 print("Outputs:")
612 for seg in SEGMENT_NAMES:
613 print(f" {seg} = {node_names[best_result.output_map[seg]]}")
614 else:
615 print(f"\n{YELLOW}No solution found in the search range (14-22 inputs).{RESET}")
616 print("The minimum is likely 23 inputs (7x2 + 3x3 configuration).")
617
618
619if __name__ == "__main__":
620 main()