linux observer
at main 79 lines 2.4 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Audio device detection via ultrasonic tone. 5 6Direct copy from solstone's observe/detect.py — no solstone imports. 7Plays an ultrasonic tone and records from all mics to identify 8microphone vs loopback devices. 9""" 10 11import logging 12import threading 13 14import numpy as np 15import soundcard as sc 16 17logger = logging.getLogger(__name__) 18 19 20def input_detect(duration=0.4, sample_rate=44100): 21 t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) 22 tone = 0.5 * np.sin(2 * np.pi * 18000 * t) # ultrasonic 23 24 try: 25 devices = sc.all_microphones(include_loopback=True) 26 except Exception: 27 logger.warning("Failed to enumerate audio devices") 28 return None, None 29 if not devices: 30 logger.warning("No audio devices found") 31 return None, None 32 33 results = {} 34 barrier = threading.Barrier(len(devices) + 1) 35 36 def record_mic(mic, results): 37 barrier.wait() 38 try: 39 audio = mic.record( 40 samplerate=sample_rate, numframes=int(sample_rate * duration) 41 ) 42 results[mic.name] = audio 43 except Exception: 44 results[mic.name] = None 45 46 def play_tone(): 47 barrier.wait() 48 try: 49 sp = sc.default_speaker() 50 sp.play(tone, samplerate=sample_rate) 51 except Exception: 52 logger.warning("No default speaker available for tone detection") 53 54 threads = [] 55 for mic in devices: 56 thread = threading.Thread(target=record_mic, args=(mic, results)) 57 thread.start() 58 threads.append(thread) 59 60 play_thread = threading.Thread(target=play_tone) 61 play_thread.start() 62 threads.append(play_thread) 63 64 for thread in threads: 65 thread.join() 66 67 # Analyze the recordings with a simple amplitude threshold 68 threshold = 0.001 69 mic_detected = None 70 loopback_detected = None 71 for mic in devices: 72 audio = results.get(mic.name) 73 if audio is not None and np.max(np.abs(audio)) > threshold: 74 # First match for each category 75 if "microphone" in str(mic).lower() and mic_detected is None: 76 mic_detected = mic 77 if "loopback" in str(mic).lower() and loopback_detected is None: 78 loopback_detected = mic 79 return mic_detected, loopback_detected