Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/integ/audio.py: add test for audio-intput in QubesDB #608

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 143 additions & 37 deletions qubes/tests/integ/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import numpy as np

import qubes.vm
import qubes.devices
from qubes.tests.integ.vm_qrexec_gui import TC_00_AppVMMixin, in_qemu


Expand Down Expand Up @@ -142,7 +143,7 @@ def assert_pacat_running(self, audiovm, testvm, expected=True):
def check_audio_sample(self, sample, sfreq):
rec = np.fromstring(sample, dtype=np.float32)
# determine sample size using silence threshold
threshold = 10**-3
threshold = 10 ** -3
rec_size = np.count_nonzero((rec > threshold) | (rec < -threshold))
if not rec_size:
self.fail('only silence detected, no useful audio data')
Expand All @@ -151,34 +152,34 @@ def check_audio_sample(self, sample, sfreq):
# be less strict on HVM tests in nested virt, the test environment
# has huge overhead already
margin = 0.80
if rec_size < margin*441000:
if rec_size < margin * 441000:
fname = f"/tmp/audio-sample-{self.id()}.raw"
with open(fname, "wb") as f:
f.write(sample)
self.fail(f'too short audio, expected 10s, got {rec_size/44100}, saved to {fname}')
self.fail(f'too short audio, expected 10s, got {rec_size / 44100}, saved to {fname}')
# find zero crossings
crossings = np.nonzero((rec[1:] > threshold) &
(rec[:-1] < -threshold))[0]
(rec[:-1] < -threshold))[0]
np.seterr('raise')
# compare against sine wave frequency
rec_freq = 44100/np.mean(np.diff(crossings))
if not sfreq*0.8 < rec_freq < sfreq*1.2:
rec_freq = 44100 / np.mean(np.diff(crossings))
if not sfreq * 0.8 < rec_freq < sfreq * 1.2:
fname = f"/tmp/audio-sample-{self.id()}.raw"
with open(fname, "wb") as f:
f.write(sample)
self.fail('frequency {} not in specified range, saved to {}'
.format(rec_freq, fname))
.format(rec_freq, fname))

def common_audio_playback(self):
# sine frequency
sfreq = 4400
# generate signal
audio_in = np.sin(2*np.pi*np.arange(441000)*sfreq/44100)
audio_in = np.sin(2 * np.pi * np.arange(441000) * sfreq / 44100)
# Need to use .snd extension so that pw-play (really libsndfile)
# recognizes the file as raw audio.
self.loop.run_until_complete(
self.testvm1.run_for_stdio('cat > audio_in.snd',
input=audio_in.astype(np.float32).tobytes()))
input=audio_in.astype(np.float32).tobytes()))
local_user = grp.getgrnam('qubes').gr_mem[0]
if self.testvm1.features['service.pipewire']:
cmd = 'timeout 20s pw-play --format=f32 --rate=44100 --channels=1 - < audio_in.snd'
Expand All @@ -188,9 +189,10 @@ def common_audio_playback(self):
with tempfile.NamedTemporaryFile() as recorded_audio:
os.chmod(recorded_audio.name, 0o666)
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
'parecord', '-d', '@DEFAULT_MONITOR@', '--raw',
'--format=float32le', '--rate=44100', '--channels=1',
recorded_audio.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
'parecord', '-d', '@DEFAULT_MONITOR@', '--raw',
'--format=float32le', '--rate=44100', '--channels=1',
recorded_audio.name], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
self.loop.run_until_complete(self.testvm1.run_for_stdio(cmd))
except subprocess.CalledProcessError as err:
Expand All @@ -199,17 +201,29 @@ def common_audio_playback(self):
self.loop.run_until_complete(asyncio.sleep(2))
if p.returncode is not None:
self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format(
p.returncode, p.stderr.read()))
p.returncode, p.stderr.read()))
p.send_signal(signal.SIGINT)
p.wait()
self.check_audio_sample(recorded_audio.file.read(), sfreq)

def _configure_audio_recording(self, vm):
"""Connect VM's output-source to sink monitor instead of mic"""
"""Connect VM's source-output to sink monitor instead of mic"""
local_user = grp.getgrnam("qubes").gr_mem[0]
audiovm = vm.audiovm

sudo = ["sudo", "-E", "-u", local_user]
source_outputs = json.loads(subprocess.check_output(
sudo + ["pactl", "-f", "json", "list", "source-outputs"]))

source_outputs_cmd = ["pactl", "-f", "json", "list", "source-outputs"]
if audiovm.name != "dom0":
stdout, _ = self.loop.run_until_complete(
audiovm.run_for_stdio(" ".join(source_outputs_cmd)))
source_outputs = json.loads(stdout)
else:
source_outputs = json.loads(subprocess.check_output(sudo + source_outputs_cmd))

if not source_outputs:
self.fail("no source-output found in {}".format(audiovm.name))
assert False

try:
output_index = [s["index"] for s in source_outputs
Expand All @@ -220,8 +234,17 @@ def _configure_audio_recording(self, vm):
# self.fail never returns
assert False

sources = json.loads(subprocess.check_output(
sudo + ["pactl", "-f", "json", "list", "sources"]))
sources_cmd = ["pactl", "-f", "json", "list", "sources"]
if audiovm.name != "dom0":
res, _ = self.loop.run_until_complete(audiovm.run_for_stdio(" ".join(sources_cmd)))
sources = json.loads(res)
else:
sources = json.loads(subprocess.check_output(sudo + sources_cmd))

if not sources:
self.fail("no sources found in {}".format(audiovm.name))
assert False

try:
source_index = [s["index"] for s in sources
if s["name"].endswith(".monitor")][0]
Expand All @@ -230,8 +253,36 @@ def _configure_audio_recording(self, vm):
# self.fail never returns
assert False

subprocess.check_call(sudo +
["pactl", "move-source-output", str(output_index), str(source_index)])
cmd = ["pactl", "move-source-output", str(output_index), str(source_index)]
if audiovm.name != "dom0":
self.loop.run_until_complete(audiovm.run(" ".join(cmd)))
else:
subprocess.check_call(sudo + cmd)

async def retrieve_audio_input(self, vm, status):
try:
await asyncio.wait_for(self._check_audio_input_status(vm, status), timeout=2)
except asyncio.TimeoutError:
self.fail("Failed to get mic attach/detach status!")

@staticmethod
async def _check_audio_input_status(vm, status):
while vm.audiovm.untrusted_qdb.read("/audio-input/{}".format(vm.name)) != status:
await asyncio.sleep(0.5)

def attach_mic(self):
deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic')
self.loop.run_until_complete(
self.testvm1.devices['mic'].attach(deva)
)
self.loop.run_until_complete(self.retrieve_audio_input(self.testvm1, b"1"))

def detach_mic(self):
deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic')
self.loop.run_until_complete(
self.testvm1.devices['mic'].detach(deva)
)
self.loop.run_until_complete(self.retrieve_audio_input(self.testvm1, b"0"))

def common_audio_record_muted(self):
# connect VM's recording source output monitor (instead of mic)
Expand All @@ -240,6 +291,7 @@ def common_audio_record_muted(self):
# generate some "audio" data
audio_in = b'\x20' * 4 * 44100
local_user = grp.getgrnam('qubes').gr_mem[0]
sudo = ["sudo", "-E", "-u", local_user]
# Need to use .snd extension so that pw-play (really libsndfile)
# recognizes the file as raw audio.
if self.testvm1.features['service.pipewire']:
Expand All @@ -249,18 +301,28 @@ def common_audio_record_muted(self):
cmd = 'parecord --raw audio_rec.snd'
kill_cmd = 'pkill --signal SIGINT parecord'
record = self.loop.run_until_complete(self.testvm1.run(cmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE))
stdout=subprocess.PIPE,
stderr=subprocess.PIPE))
# give it time to start recording
self.loop.run_until_complete(asyncio.sleep(0.5))
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
'paplay', '--raw'],
stdin=subprocess.PIPE)
p.communicate(audio_in)

play_cmd = ['paplay', '--raw']
if self.testvm1.audiovm.name != "dom0":
self.loop.run_until_complete(
self.testvm1.audiovm.run_for_stdio(
" ".join(play_cmd),
input=audio_in
)
)
else:
p = subprocess.Popen(sudo + play_cmd, stdin=subprocess.PIPE)
p.communicate(audio_in)

# wait for possible parecord buffering
self.loop.run_until_complete(asyncio.sleep(2))
if record.returncode is not None:
self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format(
record.returncode, self.loop.run_until_complete(record.stderr.read())))
record.returncode, self.loop.run_until_complete(record.stderr.read())))
try:
self.loop.run_until_complete(
self.testvm1.run_for_stdio(kill_cmd))
Expand All @@ -273,15 +335,20 @@ def common_audio_record_muted(self):
if audio_in[:32] in recorded_audio:
self.fail('VM recorded something, even though mic disabled')

def common_audio_record_unmuted(self):
deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic')
self.loop.run_until_complete(
self.testvm1.devices['mic'].attach(deva))
def common_audio_record_unmuted(self, attach_mic=True, detach_mic=True):
if attach_mic:
try:
self.detach_mic()
except qubes.devices.DeviceNotAssigned:
pass
self.attach_mic()
fepitre marked this conversation as resolved.
Show resolved Hide resolved
# connect VM's recording source output monitor (instead of mic)
self._configure_audio_recording(self.testvm1)
sfreq = 4400
audio_in = np.sin(2*np.pi*np.arange(441000)*sfreq/44100)
audio_in = np.sin(2 * np.pi * np.arange(441000) * sfreq / 44100)
local_user = grp.getgrnam('qubes').gr_mem[0]
sudo = ["sudo", "-E", "-u", local_user]

# Need to use .snd extension so that pw-play (really libsndfile)
# recognizes the file as raw audio.
if self.testvm1.features['service.pipewire']:
Expand All @@ -295,16 +362,28 @@ def common_audio_record_unmuted(self):
record = self.loop.run_until_complete(self.testvm1.run(record_cmd))
# give it time to start recording
self.loop.run_until_complete(asyncio.sleep(0.5))
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
'paplay', '--raw', '--format=float32le',
'--rate=44100', '--channels=1'],
stdin=subprocess.PIPE)
p.communicate(audio_in.astype(np.float32).tobytes())

# play sound that will be used as source-output
play_cmd = ['paplay', '--raw', '--format=float32le', '--rate=44100', '--channels=1']
if self.testvm1.audiovm.name != "dom0":
self.loop.run_until_complete(
self.testvm1.audiovm.run_for_stdio(
" ".join(play_cmd),
input=audio_in.astype(np.float32).tobytes()
)
)
else:
p = subprocess.Popen(
sudo + play_cmd,
stdin=subprocess.PIPE
)
p.communicate(audio_in.astype(np.float32).tobytes())

# wait for possible parecord buffering
self.loop.run_until_complete(asyncio.sleep(2))
if record.returncode is not None:
self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format(
record.returncode, self.loop.run_until_complete(record.stderr.read())))
record.returncode, self.loop.run_until_complete(record.stderr.read())))
try:
self.loop.run_until_complete(self.testvm1.run_for_stdio(kill_cmd))
except subprocess.CalledProcessError:
Expand All @@ -317,6 +396,8 @@ def common_audio_record_unmuted(self):
recorded_audio, _ = self.loop.run_until_complete(
self.testvm1.run_for_stdio('cat audio_rec.snd'))
self.check_audio_sample(recorded_audio, sfreq)
if detach_mic:
self.detach_mic()


class TC_20_AudioVM_Pulse(TC_00_AudioMixin):
Expand Down Expand Up @@ -459,6 +540,31 @@ def test_251_audio_playback_audiovm_pipewire_late_start(self):
self.assert_pacat_running(self.app.domains[0], self.testvm1, False)
self.common_audio_playback()

@unittest.skipUnless(spawn.find_executable('parecord'),
"pulseaudio-utils not installed in dom0")
def test_260_audio_mic_enabled_switch_audiovm(self):
self.create_audio_vm('pipewire', start=False)
self.testvm1.audiovm = self.audiovm
self.prepare_audio_test('pipewire')
self.loop.run_until_complete(self.audiovm.start())

# check mic is enabled in first audiovm
self.assert_pacat_running(self.audiovm, self.testvm1, True)
self.common_audio_record_unmuted(detach_mic=False)

# check mic is enabled in second audiovm, admin ext will
# allow mic during switch as it was previously enabled
self.testvm1.audiovm = self.app.domains[0]
self.assert_pacat_running(self.testvm1.audiovm, self.testvm1, True)
self.common_audio_record_unmuted(attach_mic=False, detach_mic=False)

# detach mic, switch to original audiovm and check there
# is no sound as we disabled mic
self.detach_mic()
self.testvm1.audiovm = self.audiovm
self.assert_pacat_running(self.audiovm, self.testvm1, True)
self.common_audio_record_muted()


def create_testcases_for_templates():
yield from qubes.tests.create_testcases_for_templates(
Expand Down