Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Extract platform dependent code out of Skill #76

Open
wants to merge 5 commits into
base: 21.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 29 additions & 121 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from alsaaudio import Mixer, mixers as alsa_mixers
from os.path import dirname, join

from adapt.intent import IntentBuilder
Expand All @@ -22,6 +21,8 @@
from mycroft.util import play_wav
from mycroft.util.parse import extract_number

from .hal import HALFactory


ALSA_PLATFORMS = ['mycroft_mark_1', 'picroft', 'unknown']

Expand Down Expand Up @@ -54,99 +55,31 @@ def __init__(self):
else:
self.settings["max_volume"] = 100 # can be 0 to 100
self.volume_sound = join(dirname(__file__), "blop-mark-diangelo.wav")
self.vol_before_mute = None
self._mixer = None

def _clear_mixer(self):
"""For Unknown platforms reinstantiate the mixer.

For mycroft_mark_1 do not reinstantiate the mixer.
"""
platform = self.config_core['enclosure'].get('platform', 'unknown')
if platform != 'mycroft_mark_1':
self._mixer = None

def _get_mixer(self):
self.log.debug('Finding Alsa Mixer for control...')
mixer = None
try:
# If there are only 1 mixer use that one
mixers = alsa_mixers()
if len(mixers) == 1:
mixer = Mixer(mixers[0])
elif 'Master' in mixers:
# Try using the default mixer (Master)
mixer = Mixer('Master')
elif 'PCM' in mixers:
# PCM is another common one
mixer = Mixer('PCM')
elif 'Digital' in mixers:
# My mixer is called 'Digital' (JustBoom DAC)
mixer = Mixer('Digital')
else:
# should be equivalent to 'Master'
mixer = Mixer()
except Exception:
# Retry instanciating the mixer with the built-in default
try:
mixer = Mixer()
except Exception as e:
self.log.error('Couldn\'t allocate mixer, {}'.format(repr(e)))
self._mixer = mixer
return mixer
# Instantiate the HAL emulator
self.platform = self.config_core['enclosure'].get('platform', 'unknown')
if self.platform in ALSA_PLATFORMS:
self.HAL = HALFactory.create('ALSA', self.settings)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon the intrusion, especially for a kind of style comment, it's not very important, I'm kind of wondering why this doesn't read

`self.HAL = HALFactory.create(self.platform, self.settings)`

And let "create" decide what to return rather than needing to switch on platform here (given that it's a HAL).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even self.HAL = HALFactory.create(self.config_core, self.settings)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No intrusion at all - glad to have more eyes on it.

You're totally right here too - I'm thinking passing in the self.platform should be sufficient for now.

I expect that this will change when it moves to the real HAL service, as each HAL plugin will either select their audio output service or provide their own methods that meet a minimum spec.

else:
self.HAL = None

def initialize(self):
# Register handlers to detect percentages as reported by STT
for i in range(101): # numbers 0 to 100
self.register_vocabulary(str(i) + '%', 'Percent')

# Register handlers for messagebus events
self.add_event('mycroft.volume.increase',
self.handle_increase_volume)
self.add_event('mycroft.volume.decrease',
self.handle_decrease_volume)
self.add_event('mycroft.volume.mute',
self.handle_mute_volume)
self.add_event('mycroft.volume.unmute',
self.handle_unmute_volume)
self.add_event('recognizer_loop:record_begin',
self.duck)
self.add_event('recognizer_loop:record_end',
self.unduck)

self.vol_before_mute = self.__get_system_volume()

@property
def mixer(self):
platform = self.config_core['enclosure'].get('platform', 'unknown')
if platform in ALSA_PLATFORMS:
return self._mixer or self._get_mixer()
else:
return None

def _setvolume(self, vol, emit=True):
# Update ALSA
if self.mixer:
self.log.debug(vol)
self.mixer.setvolume(vol)
# TODO: Remove this and control volume at the Enclosure level in
# response to the mycroft.volume.set message.

if emit:
# Notify non-ALSA systems of volume change
self.bus.emit(Message('mycroft.volume.set',
data={"percent": vol/100.0}))
def _set_volume(self, vol, emit=True):
self.bus.emit(Message('mycroft.volume.set',
data={"percent": vol/100.0}))

# Change Volume to X (Number 0 to) Intent Handlers
@intent_handler(IntentBuilder("SetVolume").require("Volume")
.optionally("Increase").optionally("Decrease")
.optionally("To").require("Level"))
def handle_set_volume(self, message):
self._clear_mixer()
default_vol = self.__get_system_volume(50)

level = self.__get_volume_level(message, default_vol)
self._setvolume(self.__level_to_volume(level))
self._set_volume(self.__level_to_volume(level))
if level == self.MAX_LEVEL:
self.speak_dialog('max.volume')
else:
Expand All @@ -157,17 +90,15 @@ def handle_set_volume(self, message):
.optionally("Increase").optionally("Decrease")
.optionally("To").require("Percent"))
def handle_set_volume_percent(self, message):
self._clear_mixer()
percent = extract_number(message.data['utterance'].replace('%', ''))
percent = int(percent)
self._setvolume(percent)
self._set_volume(percent)
self.speak_dialog('set.volume.percent', data={'level': percent})

# Volume Status Intent Handlers
@intent_handler(IntentBuilder("QueryVolume").optionally("Query")
.require("Volume"))
def handle_query_volume(self, message):
self._clear_mixer()
level = self.__volume_to_level(self.__get_system_volume(0, show=True))
self.speak_dialog('volume.is', data={'volume': round(level)})

Expand Down Expand Up @@ -195,21 +126,24 @@ def handle_increase_volume(self, message):
@intent_handler(IntentBuilder("IncreaseVolumeSet").require("Set")
.optionally("Volume").require("Increase"))
def handle_increase_volume_set(self, message):
self._clear_mixer()
self.handle_increase_volume(message)

@intent_handler(IntentBuilder("IncreaseVolumePhrase")
.require("IncreasePhrase"))
def handle_increase_volume_phrase(self, message):
self._clear_mixer()
self.handle_increase_volume(message)

# Decrease Volume Intent Handlers
@intent_handler(IntentBuilder("DecreaseVolume").require("Volume")
.require("Decrease"))
def handle_decrease_volume(self, message):
self.__communicate_volume_change(message, 'decrease.volume',
*self.__update_volume(-1))
self.log.info("DECREASING VOLUME")
response = self.bus.wait_for_response(Message('mycroft.volume.decrease'))
self.log.error(response)
if response and response.data['success']:
self.speak_dialog('decrease.volume', response.data)
# self.__communicate_volume_change(message, 'decrease.volume',
# *self.__update_volume(-1))

@intent_handler(IntentBuilder("DecreaseVolumeSet").require("Set")
.optionally("Volume").require("Decrease"))
Expand All @@ -226,8 +160,7 @@ def handle_decrease_volume_phrase(self, message):
.require("Volume").optionally("Increase")
.require("MaxVolume"))
def handle_max_volume(self, message):
self._clear_mixer()
self._setvolume(self.settings["max_volume"])
self._set_volume(self.settings["max_volume"])
speak_message = message.data.get('speak_message', True)
if speak_message:
self.speak_dialog('max.volume')
Expand All @@ -241,42 +174,27 @@ def handle_max_volume_increase_to_max(self, message):
self.handle_max_volume(message)

def duck(self, message):
self._clear_mixer()
if self.settings.get('ducking', True):
self._mute_volume()

def unduck(self, message):
self._clear_mixer()
if self.settings.get('ducking', True):
self._unmute_volume()

def _mute_volume(self, message=None, speak=False):
self.log.debug('MUTING!')
self.vol_before_mute = self.__get_system_volume()
self.log.debug(self.vol_before_mute)
if speak:
self.speak_dialog('mute.volume')
wait_while_speaking()
self._setvolume(0, emit=False)
self.bus.emit(Message('mycroft.volume.duck'))
self.bus.emit(Message('mycroft.volume.mute'))

# Mute Volume Intent Handlers
@intent_handler(IntentBuilder("MuteVolume").require(
"Volume").require("Mute"))
def handle_mute_volume(self, message):
self._clear_mixer()
self._mute_volume(speak=message.data.get('speak_message', True))

def _unmute_volume(self, message=None, speak=False):
if self.vol_before_mute is None:
vol = self.__level_to_volume(self.settings["default_level"])
else:
vol = self.vol_before_mute
self.vol_before_mute = None

self._setvolume(vol, emit=False)
self.bus.emit(Message('mycroft.volume.unduck'))

self.bus.emit(Message('mycroft.volume.unmute'))
if speak:
self.speak_dialog('reset.volume',
data={'volume':
Expand All @@ -286,7 +204,6 @@ def _unmute_volume(self, message=None, speak=False):
@intent_handler(IntentBuilder("UnmuteVolume").require("Volume")
.require("Unmute"))
def handle_unmute_volume(self, message):
self._clear_mixer()
self._unmute_volume(speak=message.data.get('speak_message', True))

def __volume_to_level(self, volume):
Expand Down Expand Up @@ -345,25 +262,20 @@ def __update_volume(self, change=0):
old_level = self.__volume_to_level(self.__get_system_volume(0))
new_level = self.__bound_level(old_level + change)
self.enclosure.eyes_volume(new_level)
self._setvolume(self.__level_to_volume(new_level))
self._set_volume(self.__level_to_volume(new_level))
return new_level, new_level != old_level

def __get_system_volume(self, default=50, show=False):
""" Get volume, either from mixer or ask on messagebus.
"""Get volume from message bus.

The show parameter should only be True when a user is requesting
the volume and not the system.
TODO: Remove usage of Mixer and move that stuff to enclosure.
"""
vol = default
if self.mixer:
vol = min(self.mixer.getvolume()[0], 100)
self.log.debug('Volume before mute: {}'.format(vol))
else:
vol_msg = self.bus.wait_for_response(
Message("mycroft.volume.get", {'show': show}))
if vol_msg:
vol = int(vol_msg.data["percent"] * 100)
vol_msg = self.bus.wait_for_response(
Message("mycroft.volume.get", {'show': show}))
if vol_msg:
vol = int(vol_msg.data["percent"] * 100)

return vol

Expand All @@ -390,10 +302,6 @@ def __get_volume_level(self, message, default=None):
level = self.__bound_level(level)
return level

def shutdown(self):
if self.vol_before_mute is not None:
self._unmute_volume()


def create_skill():
return VolumeSkill()
24 changes: 24 additions & 0 deletions hal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2021 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This is a temporary module to emulate the Hardware Abstraction Layer (HAL).

It is intended to make a simpler transition from the current Volume Skill that
directly interfaces with ALSA, to one that communicates changes to the HAL via
the message bus.

This module will be deprecated at the earliest possible moment.
"""
from .alsa import get_alsa_mixer
from .hal import HALFactory
81 changes: 81 additions & 0 deletions hal/alsa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2021 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Provides access to the ALSA audio system.

DEPRECATION NOTICE:
This will shortly be moved out of the Volume Skill to the Hardware Abstraction
Layer (HAL). The Volume Skill will be emitting messages to the bus, and the
HAL will be responsible for managing the system state.
"""

from alsaaudio import Mixer, mixers as alsa_mixers

from mycroft.util import LOG

from .base import HAL

class AlsaHAL(HAL):
"""Emulate the Hardware Abstraction Layer (HAL) for ALSA.

This class will be deprecated at the earliest possible moment.
"""
def __init__(self, settings):
super().__init__(settings)
self._mixer = get_alsa_mixer()

def _get_volume(self, message=None):
"""Get the current volume level."""
volume = min(self._mixer.getvolume()[0], self.max_volume)
LOG.debug('Current volume: {}'.format(volume))
return volume

def _set_volume(self, message):
"""Set the volume level."""
target_volume = int(message.data['percent'] * 100)
settable_volume = self.constrain_volume(target_volume)
LOG.debug("Setting volume to: {}".format(settable_volume))
self._mixer.setvolume(settable_volume)
success_data = {'success': True, 'volume': settable_volume}
self.bus.emit(message.reply('mycroft.volume.updated',
data=success_data))
return success_data

def get_alsa_mixer():
LOG.debug('Finding Alsa Mixer for control...')
mixer = None
try:
# If there are only 1 mixer use that one
mixers = alsa_mixers()
if len(mixers) == 1:
mixer = Mixer(mixers[0])
elif 'Master' in mixers:
# Try using the default mixer (Master)
mixer = Mixer('Master')
elif 'PCM' in mixers:
# PCM is another common one
mixer = Mixer('PCM')
elif 'Digital' in mixers:
# My mixer is called 'Digital' (JustBoom DAC)
mixer = Mixer('Digital')
else:
# should be equivalent to 'Master'
mixer = Mixer()
except Exception:
# Retry instanciating the mixer with the built-in default
try:
mixer = Mixer()
except Exception as e:
LOG.error('Couldn\'t allocate mixer, {}'.format(repr(e)))
return mixer
Loading