linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Audio device detection via ultrasonic tone.
5
6Direct copy from solstone's observe/detect.py — no solstone imports.
7Plays an ultrasonic tone and records from all mics to identify
8microphone vs loopback devices.
9"""
10
11import logging
12import threading
13
14import numpy as np
15import soundcard as sc
16
17logger = logging.getLogger(__name__)
18
19
20def input_detect(duration=0.4, sample_rate=44100):
21 t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False)
22 tone = 0.5 * np.sin(2 * np.pi * 18000 * t) # ultrasonic
23
24 try:
25 devices = sc.all_microphones(include_loopback=True)
26 except Exception:
27 logger.warning("Failed to enumerate audio devices")
28 return None, None
29 if not devices:
30 logger.warning("No audio devices found")
31 return None, None
32
33 results = {}
34 barrier = threading.Barrier(len(devices) + 1)
35
36 def record_mic(mic, results):
37 barrier.wait()
38 try:
39 audio = mic.record(
40 samplerate=sample_rate, numframes=int(sample_rate * duration)
41 )
42 results[mic.name] = audio
43 except Exception:
44 results[mic.name] = None
45
46 def play_tone():
47 barrier.wait()
48 try:
49 sp = sc.default_speaker()
50 sp.play(tone, samplerate=sample_rate)
51 except Exception:
52 logger.warning("No default speaker available for tone detection")
53
54 threads = []
55 for mic in devices:
56 thread = threading.Thread(target=record_mic, args=(mic, results))
57 thread.start()
58 threads.append(thread)
59
60 play_thread = threading.Thread(target=play_tone)
61 play_thread.start()
62 threads.append(play_thread)
63
64 for thread in threads:
65 thread.join()
66
67 # Analyze the recordings with a simple amplitude threshold
68 threshold = 0.001
69 mic_detected = None
70 loopback_detected = None
71 for mic in devices:
72 audio = results.get(mic.name)
73 if audio is not None and np.max(np.abs(audio)) > threshold:
74 # First match for each category
75 if "microphone" in str(mic).lower() and mic_detected is None:
76 mic_detected = mic
77 if "loopback" in str(mic).lower() and loopback_detected is None:
78 loopback_detected = mic
79 return mic_detected, loopback_detected