Skip to content

Commit

Permalink
Update gh_manager.py
Browse files Browse the repository at this point in the history
improved restore music
  • Loading branch information
jumping2000 authored May 14, 2023
1 parent 6227cf2 commit 53bca0c
Showing 1 changed file with 110 additions and 85 deletions.
195 changes: 110 additions & 85 deletions apps/notifier/gh_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hassapi as hass
import time
#import datetime
import sys
from queue import Queue
from threading import Thread
Expand All @@ -18,7 +19,6 @@
__TTS__ = "tts/"
SUB_TTS = [("[\*\-\[\]_\(\)\{\~\|\}\s]+"," ")]
SUB_VOICE = [
# ("[.]{2,}", "."),
("[\U00010000-\U0010ffff]", ""), # strip emoji
("[\?\.\!,]+(?=[\?\.\!,])", ""), # Exclude duplicate
("(\s+\.|\s+\.\s+|[\.])(?! )(?![^{]*})(?![^\d.]*\d)", ". "),
Expand All @@ -28,8 +28,19 @@
(" +", " "),
]

class GH_Manager(hass.Hass):
CONF_MEDIA_PLAYER = "media_player"
CONF_ATTRIBUTES = "attributes"
CONF_FRIENDLY_NAME = "friendly_name"
CONF_VOLUME_LEVEL = "volume_level"
CONF_MEDIA_CONTENT_ID = "media_content_id"
CONF_MEDIA_CONTENT_TYPE = "media_content_type"
CONF_MEDIA_DURATION = "media_duration"
CONF_MEDIA_POSITION = "media_position"
CONF_APP_NAME = "app_name"
CONF_AUTHSIG = "authSig"
CONF_DEBUG = False

class GH_Manager(hass.Hass):
def initialize(self)->None:
self.gh_service = h.get_arg(self.args, "gh_service")
self.gh_wait_time = h.get_arg(self.args, "gh_wait_time")
Expand All @@ -42,54 +53,62 @@ def initialize(self)->None:
self.debug_sensor = h.get_arg(self.args, "debug_sensor")
self.set_state(self.debug_sensor, state="on")
self.check_gh_service = self.check_gh(self.gh_service)
#
self._player = {}
##
for k in list(self.get_state(CONF_MEDIA_PLAYER).keys()):
if CONF_FRIENDLY_NAME in self.get_state(CONF_MEDIA_PLAYER)[k][CONF_ATTRIBUTES]:
self._player.update({str(self.get_state(CONF_MEDIA_PLAYER)[k][CONF_ATTRIBUTES][CONF_FRIENDLY_NAME]).lower():k})

self.queue = Queue(maxsize=0)
self._when_tts_done_callback_queue = Queue()
t = Thread(target=self.worker)
t.daemon = True
t.start()

def check_mplayer(self, gh_player: list):
media_p = list(self.get_state("media_player").keys())
def check_mplayer(self, _player: dict, gh_player: list) -> list:
""" check and save the media player entity or friendly name and return only entities """
media_p = list(self.get_state(CONF_MEDIA_PLAYER).keys())
gh = []
for item in [x.strip(" ") for x in gh_player] :
if item in media_p or item == "all":
gh.append(item)
return gh
elif _player[item] in media_p:
gh.append(_player[item])
return list(set(gh))

def check_volume(self, gh_volume):
def check_volume(self, gh_volume) -> list:
""" check and save the media player volumes """
media_state = self.get_state("media_player")
gh = []
for entity, state in media_state.items():
friendly_name = state["attributes"].get("friendly_name") if state["attributes"].get("friendly_name") is not None else ""
friendly_name = state[CONF_ATTRIBUTES].get(CONF_FRIENDLY_NAME) if state[CONF_ATTRIBUTES].get(CONF_FRIENDLY_NAME) is not None else ""
for item in gh_volume:
if "gruppo" not in str(item).lower() and str(friendly_name).lower() == str(item).lower():
gh.append(entity)
return gh

def volume_set(self, gh_player: list, volume: float):
def volume_set(self, gh_player: list, volume: float) -> None:
""" set the media player volumes """
if gh_player != ["all"]:
for item in gh_player:
self.call_service("media_player/volume_set", entity_id = item, volume_level = volume)

def volume_get(self, media_player:list, volume: float):
self.dict_volumes = {}
for i in media_player:
self.dict_volumes[i] = self.get_state(i, attribute="volume_level", default=volume)
return self.dict_volumes

def mediastate_get(self, media_player:list, volume: float):
self.dict_info_mplayer = {}
def mediastate_get(self, media_player:list, volume: float) -> dict:
""" check and save the media player attributes """
dict_info_mplayer = {}
for i in media_player:
self.dict_info_mplayer[i] = {}
dict_info_mplayer[i] = {}
for i in media_player:
self.dict_info_mplayer[i]['volume'] = self.get_state(i, attribute="volume_level", default=volume)
self.dict_info_mplayer[i]['state'] = self.get_state(i, default='idle')
self.dict_info_mplayer[i]['media_id'] = self.get_state(i, attribute="media_content_id", default='')
self.dict_info_mplayer[i]['media_type'] = self.get_state(i, attribute="media_content_type", default='')
self.dict_info_mplayer[i]['app_name'] = self.get_state(i, attribute="app_name", default='')
self.dict_info_mplayer[i]['authSig'] = self.get_state(i, attribute="authSig", default='')
return self.dict_info_mplayer
dict_info_mplayer[i][CONF_VOLUME_LEVEL] = self.get_state(i, attribute=CONF_VOLUME_LEVEL, default=volume)
dict_info_mplayer[i]["state"] = self.get_state(i, default="idle")
dict_info_mplayer[i][CONF_MEDIA_CONTENT_ID] = self.get_state(i, attribute=CONF_MEDIA_CONTENT_ID, default='')
dict_info_mplayer[i][CONF_MEDIA_CONTENT_TYPE] = self.get_state(i, attribute=CONF_MEDIA_CONTENT_TYPE, default='')
dict_info_mplayer[i][CONF_MEDIA_POSITION] = self.get_state(i, attribute=CONF_MEDIA_POSITION, default='')
dict_info_mplayer[i][CONF_APP_NAME] = self.get_state(i, attribute=CONF_APP_NAME, default='')
dict_info_mplayer[i][CONF_AUTHSIG] = self.get_state(i, attribute=CONF_AUTHSIG, default='')
##
##self.log("dict mplayer: {}".format(dict_info_mplayer))
return dict_info_mplayer

def set_debug_sensor(self, state, error):
attributes = {}
Expand All @@ -103,8 +122,38 @@ def check_gh(self, service):
components = self.hass_config["components"]
return next((True for comp in components if service in comp), False)

def restore_mplayer_states(self, gh_players:list, dict_info_mplayers:dict)->None:
""" Restore volumes and media-player states """
playing = False
if dict_info_mplayers and gh_players:
for item in gh_players:
for key,value in dict_info_mplayers.items():
if item == key:
self.call_service("media_player/volume_set", entity_id = key, volume_level = value[CONF_VOLUME_LEVEL])
# Force Set state
#self.set_state(i, state="", attributes = {CONF_VOLUME_LEVEL: j})
if value["state"] == "playing": playing = True
else: playing = False
if self.ytube_called:
self.call_service("ytube_music_player/call_method", entity_id = self.ytube_player, command = "interrupt_resume")
#self.call_service("media_player/volume_set", entity_id = k, volume_level = float(self.get_state(self.tts_period_of_day_volume))/100 )
elif playing and (value[CONF_AUTHSIG] !=""):
self.call_service("media_player/play_media", entity_id = key, media_content_id = value[CONF_MEDIA_CONTENT_ID], media_content_type = value[CONF_MEDIA_CONTENT_TYPE], authSig = value[CONF_AUTHSIG])
self.call_service("media_player/media_seek", entity_id = key, seek_position = value[CONF_MEDIA_POSITION])
elif playing and value[CONF_APP_NAME] =="Spotify":
self.call_service("spotcast/start", entity_id = key, force_playback = True)
elif playing:
self.call_service("media_player/play_media", entity_id = key, media_content_id = value[CONF_MEDIA_CONTENT_ID], media_content_type = value[CONF_MEDIA_CONTENT_TYPE])
self.call_service("media_player/media_seek", entity_id = key, seek_position = value[CONF_MEDIA_POSITION])
### DEBUG #############################################################
if CONF_DEBUG:
self.log("Restore volumes, mplayer-volume precedente: {} - {}".format(item,value[CONF_VOLUME_LEVEL]))
self.log("Restore Music: {} - {} - {} - {} - {} - {} - {}".format(playing, key, value[CONF_MEDIA_CONTENT_ID], value[CONF_MEDIA_CONTENT_TYPE], value[CONF_MEDIA_POSITION], value[CONF_APP_NAME],value[CONF_AUTHSIG] ))
### DEBUG #############################################################


def speak(self, google, gh_mode: bool, gh_notifier: str):
"""Speak the provided text through the media player."""
""" Speak the provided text through the media player. """
if not self.check_gh_service:
self.set_debug_sensor(
"I can't find the TTS Google component", "https://www.home-assistant.io/integrations/tts"
Expand All @@ -113,23 +162,25 @@ def speak(self, google, gh_mode: bool, gh_notifier: str):
if "media_player" not in google:
google["media_player"] = self.get_state(self.gh_sensor_media_player)
if "volume" not in google:
google["volume"] = float(self.get_state(self.tts_period_of_day_volume))/100
google["volume"] = float(self.get_state(self.tts_period_of_day_volume))/100
if "language" not in google:
google["language"] = self.get_state(self.tts_language).lower()
###
gh_player = self.check_mplayer(self.split_device_list(google["media_player"]))
gh_volume = self.check_volume(self.get_state(self.gh_select_media_player, attribute="options"))
self.volume_get(gh_volume,float(self.get_state(self.args["tts_period_of_day_volume"]))/100)
self.mediastate_get(gh_volume,float(self.get_state(self.args["tts_period_of_day_volume"]))/100)
wait_time = float(self.get_state(self.gh_wait_time))
message = h.replace_regular(google["message"], SUB_VOICE)
### set volume
self.volume_set(gh_player,google["volume"])
### DEBUG #############################################################
if CONF_DEBUG:
self.log("media player ricevuti: {}".format(google["media_player"]))
self.log("volume: {}".format(google["volume"]))
self.log("language: {}".format(google["language"]))
self.log("wait: {}".format(wait_time))
self.log("gh_mode: {}".format(gh_mode))
self.log("gh_notifier: {}".format(gh_notifier))
#######################################################################
# queues the message to be handled async, use when_tts_done_do method to supply callback when tts is done
if google["media_content_id"] != "":
if google[CONF_MEDIA_CONTENT_ID] != "":
try:
self.call_service("media_extractor/play_media", entity_id = gh_player, media_content_id= google["media_content_id"],
media_content_type = google["media_content_type"])
self.call_service("media_extractor/play_media", entity_id = gh_player, media_content_id= google[CONF_MEDIA_CONTENT_ID],
media_content_type = google[CONF_MEDIA_CONTENT_TYPE])
except Exception as ex:
self.log("An error occurred in GH Manager - Errore in media_content: {}".format(ex),level="ERROR")
self.set_debug_sensor("GH Manager - media_content Error ", ex)
Expand All @@ -139,29 +190,39 @@ def speak(self, google, gh_mode: bool, gh_notifier: str):
"gh_player": google["media_player"], "wait_time": wait_time, "gh_mode": gh_mode, "gh_notifier": gh_notifier})

def when_tts_done_do(self, callback:callable)->None:
"""Callback when the queue of tts messages are done"""
""" Callback when the queue of tts messages are done """
self._when_tts_done_callback_queue.put(callback)

def worker(self):
while True:
try:
data = self.queue.get()
duration = 0
gh_player = self.check_mplayer(self.split_device_list(data["gh_player"]))
### SAVE DATA
_gh_players = self.check_mplayer(self._player, self.split_device_list(data["gh_player"]))
_gh_volumes = self.check_volume(self.get_state(self.gh_select_media_player, attribute="options"))
_dict_info_mplayers = self.mediastate_get(_gh_volumes,float(self.get_state(self.args["tts_period_of_day_volume"]))/100)
### set volume
self.volume_set(_gh_players,data["volume"])
### DEBUG #############################################################
if CONF_DEBUG:
self.log("media player elaborati: {}".format(_gh_players))
self.log("list_volumes: {}".format(_gh_volumes))
self.log("dict_info_mplayer: {}".format(_dict_info_mplayers))
#######################################################################
### SPEAK
if data["gh_mode"].lower() == 'google assistant':
self.call_service(__NOTIFY__ + data["gh_notifier"], message = data["text"])
else:
if len(gh_player) == 1:
entity = gh_player[0]
if len(_gh_players) == 1:
entity = _gh_players[0]
else:
entity = gh_player
entity = _gh_players
############ YTUBE ###############
if self.get_state(self.ytube_player) == "playing" and self.get_state(entity) == "playing":
self.call_service("ytube_music_player/call_method", entity_id = self.ytube_player, command = "interrupt_start")
self.ytube_called = True
time.sleep(1)
#self.volume_set(entity,data["volume"])
##### Speech time calculator #####
message_clean = data["text"]
words = len(h.remove_tags(message_clean).split())
Expand All @@ -174,65 +235,29 @@ def worker(self):
##################################
self.call_service(__TTS__ + data["gh_notifier"], entity_id = entity, message = data["text"])#, language = data["language"])
if (type(entity) is list) or entity == "all" or \
(self.get_state(entity, attribute='media_duration') is None) or \
float(self.get_state(entity, attribute='media_duration')) > 60 or \
float(self.get_state(entity, attribute='media_duration')) == -1:
#duration = float(len(data["text"].split())) / 2 + data["wait_time"]
(self.get_state(entity, attribute=CONF_MEDIA_DURATION) is None) or \
float(self.get_state(entity, attribute=CONF_MEDIA_DURATION)) > 60 or \
float(self.get_state(entity, attribute=CONF_MEDIA_DURATION)) == -1:
duration += data["wait_time"]
else:
duration = float(self.get_state(entity, attribute='media_duration')) + data["wait_time"]
duration = float(self.get_state(entity, attribute=CONF_MEDIA_DURATION)) + data["wait_time"]
self.log("DURATION {}: ".format(duration))
#Sleep and wait for the tts to finish
time.sleep(duration)
##################################
if self.ytube_called:
self.call_service("media_player/volume_set", entity_id = entity, volume_level = 0)
##################################
self.restore_mplayer_states(_gh_players,_dict_info_mplayers)
##################################
self.set_state(self.debug_sensor, state="OK")
except Exception as ex:
self.log("An error occurred in GH Manager - Errore nel Worker: {}".format(ex),level="ERROR")
self.log(sys.exc_info())
self.set_debug_sensor("GH Manager - Worker Error ", ex)

self.queue.task_done()

if self.queue.qsize() == 0:
#self.log("QSIZE = 0 - Worker thread exiting")
## RESTORE VOLUME
if self.dict_volumes:
for i,j in self.dict_volumes.items():
self.call_service("media_player/volume_set", entity_id = i, volume_level = j)
# Force Set state
self.set_state(i, state="", attributes = {"volume_level": j})
## RESTORE MUSIC
if self.dict_info_mplayer:
for k,v in self.dict_info_mplayer.items():
temp_media_id = ''
temp_media_type = ''
temp_app_name = ''
temp_auth_sig = ''
playing = False
for k1,v1 in v.items():
if v1 == 'playing':
playing = True
if k1 == 'media_id':
temp_media_id = v1
if k1 == 'media_type':
temp_media_type = v1
if k1 == 'app_name':
temp_app_name = v1
if k1 == 'authSig':
temp_auth_sig = v1
#self.log("Costruzione del servizio: {} - {} - {} - {} - {}".format(k, temp_media_id, temp_media_type, temp_app_name,temp_auth_sig ))
if self.ytube_called:
self.call_service("ytube_music_player/call_method", entity_id = self.ytube_player, command = "interrupt_resume")
self.call_service("media_player/volume_set", entity_id = k, volume_level = float(self.get_state(self.tts_period_of_day_volume))/100 )
elif playing and (temp_auth_sig !=''):
self.call_service("media_player/play_media", entity_id = k, media_content_id = temp_media_id, media_content_type = temp_media_type, authSig = temp_auth_sig)
elif playing and temp_app_name =="Spotify":
self.call_service("spotcast/start", entity_id = k, force_playback = True)
elif playing:
self.call_service("media_player/play_media", entity_id = k, media_content_id = temp_media_id, media_content_type = temp_media_type)
# It is empty, make callbacks
try:
while(self._when_tts_done_callback_queue.qsize() > 0):
Expand Down

0 comments on commit 53bca0c

Please sign in to comment.