Skip to content

Commit

Permalink
updates to checkrepo plugin. addressing issues teltek#487 and teltek#524
Browse files Browse the repository at this point in the history


If galicaster restarts during a recording, on the next long heartbeat with an iCal update if the recording should still be happening it is resumed. once the recording is finished any track files belonging to the recording in rectemp are concatenated with ffmpeg and put into the correct mediapackage to then be ingested.
there is also an option to delay the file concatenation and ingest until nightly time (this is to allow a recording in quick successtion to not wait for this job to complete)

[repository]
recoverytype = save

[plugins]
checkrepo = True

[checkrepo]
to_merge = True
delay_merge = False

its important to note some changes were made to galicaster behaviour around file recovery to accomadate this.
on galicaster restart autorecover 'full' recoverytype was always active, this has now been made configurable via the 'autorecover' option as with error handling. and for this plugin should be set to 'save'
only manual recordings should be given a prefix of "Recovered - " when autorecovered as this changes metadata for opencast scheduled recordings. maybe this information is better in the galicaster.xml if UI notification of a recovered MP is needed. this is not addressed here
  • Loading branch information
Andrew Wilson committed Nov 30, 2017
1 parent 60146c3 commit ce196cb
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 34 deletions.
3 changes: 2 additions & 1 deletion galicaster/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ def get_repository():
conf.get('basic', 'repository'),
conf.get_hostname(),
template,
get_logger())
get_logger(),
conf.get_choice('repository', 'recoverytype', ['full', 'save'], 'full'))

return __galicaster_context['repository']

Expand Down
17 changes: 11 additions & 6 deletions galicaster/mediapackage/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Repository(object):

def __init__(self, root=None, hostname='',
folder_template='gc_{hostname}_{year}-{month}-{day}T{hour}h{minute}m{second}',
logger=None):
logger=None, recoverytype=None):
"""Initializes a repository that will contain a set of mediapackage.
Args:
root (str): absolute path to the working folder. ~/Repository used if it is None
Expand All @@ -45,6 +45,7 @@ def __init__(self, root=None, hostname='',
__list (Dict{str,Mediapackage}): the mediapackages in the repository and its identifiers as keys
"""
self.logger = logger
self.recoverytype = recoverytype

if not root:
self.root = os.path.expanduser('~/Repository')
Expand Down Expand Up @@ -95,10 +96,13 @@ def check_for_recover_recordings(self):
"""If a manifest.xml file exists, calls the recover_recoding method.
If else, calls the save_crash_recordings method.
"""
if os.path.exists(os.path.join(self.get_rectemp_path(), "manifest.xml")):
self.logger and self.logger.info("Found a recording that has crashed")
self.crash_file_creator()
self.recover_recording()
if self.recoverytype == 'full':
if os.path.exists(os.path.join(self.get_rectemp_path(), "manifest.xml")):
self.logger and self.logger.info("Found a recording that has crashed")
self.crash_file_creator()
self.recover_recording()
else:
self.save_crash_recordings()
else:
self.save_crash_recordings()

Expand Down Expand Up @@ -135,7 +139,8 @@ def recover_recording(self):
mp.setFromDict(info)
# Overwrite some data
mp.status = 4
mp.setTitle("Recovered - " + mp.getTitle())
if mp.manual:
mp.setTitle("Recovered - " + mp.getTitle())
if not mp.getIdentifier():
mp.setNewIdentifier()

Expand Down
197 changes: 171 additions & 26 deletions galicaster/plugins/checkrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,181 @@
"""

import datetime
import os
import subprocess
import shutil
import uuid


from galicaster.core import context
from galicaster.mediapackage import mediapackage
from galicaster.plugins import failovermic

logger = context.get_logger()
worker = context.get_worker()
conf = context.get_conf()
recorder = context.get_recorder()


def init():
dispatcher = context.get_dispatcher()
dispatcher.connect('ical-processed', check_repository)

def check_repository(self):
global logger
#mp_list is collection of mediapackages ID's
mp_list = context.get_repository()

for uid,mp in mp_list.iteritems():
if mp.status == mediapackage.SCHEDULED and mp.getDate() < datetime.datetime.utcnow() and mp.getDate()+datetime.timedelta(seconds=(mp.getDuration()/1000)) > datetime.datetime.utcnow():
#duration update
x = datetime.datetime.utcnow() - mp.getDate()
x = x.seconds-2
mp.setDuration(mp.getDuration() - x*1000)
#start-datetime update
mp.setDate(datetime.datetime.utcnow()+datetime.timedelta(seconds=2))
#repository update
mp_list.update(mp)

scheduler = context.get_scheduler()
try:
scheduler.create_timer(mp)
logger.info("Mediapackage with UID:%s have been reprogrammed", uid)
except Exception as exc:
logger.error("Error trying to create a new timer for MP {}: {}".format(uid, exc))

try:
dispatcher = context.get_dispatcher()
findrecs = FindRecordings()
dispatcher.connect('ical-processed', findrecs.check_repository)
dispatcher.connect('recorder-stopped', findrecs.find_recordings)
dispatcher.connect('timer-nightly', findrecs.merge_delayed)

except ValueError:
pass

class FindRecordings(object):

def __init__(self):
self.rectemp_exists = False
self.check_attachment = 'check.attach'
self.rectemp_uris_attachment = 'rectempURIs.attach'
self.delay = conf.get_boolean('checkrepo', 'delay_merge') or False
self.to_merge = conf.get_boolean('checkrepo', 'to_merge') or False
self.pause_state_file = os.path.join(context.get_repository().get_rectemp_path(), "paused")
if os.path.exists(self.pause_state_file):
os.remove(self.pause_state_file)

def find_recordings(self, signal, mpid):
mp = recorder.current_mediapackage
mpUri = mp.getURI()
dest = os.path.join(mpUri, self.check_attachment)
repofile = os.path.join(mpUri, self.rectemp_uris_attachment)

if os.path.isfile(dest):
mp_list = context.get_repository()
rectemp = mp_list.get_rectemp_path()
timesfile = open(dest, "r")
timespan = timesfile.readline()
times = timespan.split(',')
start = datetime.datetime.strptime(times[0], "%Y-%m-%d %H:%M:%S")
end = datetime.datetime.strptime(times[1], "%Y-%m-%d %H:%M:%S")
timesfile.close()
repocheck = open(repofile, "a")
for fname in os.listdir(rectemp):
filepath = os.path.join(rectemp, fname)
if os.path.isdir(filepath):
for item in (os.listdir(filepath)):
fileitem = os.path.join(filepath, item)
timestamp = os.path.getmtime(fileitem)
time = datetime.datetime.utcfromtimestamp(timestamp)
if start < time and end > time:
self.rectemp_exists = True
repocheck.write(filepath+"\n")
# FIXME could write this better
break
repocheck.close()
# check for real rectemp files, stops merge happening on mp's that started after start time
if self.rectemp_exists:
if self.to_merge:
if self.delay:
# stop ingest for now, set to delayed
logger.info('delaying merge of mp parts and ingest')
mp.setOpStatus('ingest', mediapackage.OP_NIGHTLY)
mp_list.update(mp)
else:
self.merge(mp, repofile, dest, mp_list)
else:
self.rectemp_exists = False
# clean up temp files
os.remove(dest)
os.remove(repofile)


def merge(self, mp, repofile, dest, mp_list):
mpUri = mp.getURI()
logger.info("merging recovered files into mediapackage:" + mpUri)
# while merging, create a paused file, this can be used by other systems as a signal
# to know galicaster is currently processing a mediapackage.
wait = False
if os.path.exists(self.pause_state_file):
os.utime(self.pause_state_file, None)
else:
wait = True
open(self.pause_state_file, 'a').close()
with open(repofile) as f:
rectemps = f.read().splitlines()
# get list of rectemp files
rectemps_list = sorted(rectemps)
rectemps_list.append(mpUri)
# get the track file names + mimetype
tracks = context.get_conf().get_current_profile().tracks
for track in tracks:
# get all the rectemp and final mp files into a list to be concatenated by ffmpeg
track_file = track.file
add_track = [s + '/{}'.format(track_file) for s in rectemps_list]
# check if its a real file, if not remove from the list
real_tracks = []
for fullpath_t in add_track:
if os.path.isfile(fullpath_t):
real_tracks.append(fullpath_t)
rectemps_fmted = ('|').join(real_tracks)
temp_track_file = 'temp_{}.{}'.format(str(uuid.uuid4())[:8], track_file.split(".")[-1])
# do a file concat per track into the mp
full_cmd = 'ffmpeg -i "concat:{}" -c copy {}/{}'.format(rectemps_fmted, mpUri, temp_track_file)
subprocess.call(full_cmd, shell=True)
# remove existing track files
os.remove(mpUri + '/' + track_file)
# replace with new, merged files
shutil.move(mpUri + '/' + temp_track_file, mpUri + '/' + track_file)

# clean up temp files
os.remove(dest)
os.remove(repofile)
# update mp with correct duration
mp.discoverDuration()
mp_list.update(mp)
logger.info("merging complete for UID: {} - URI: {}".format(mp.getIdentifier(), mpUri))
if wait:
os.remove(self.pause_state_file)

def merge_delayed(self, signal):
# merge and ingest the delayed mp's
if self.delay:
repo = context.get_repository()
for mp_id, mp in repo.iteritems():
if not (mp.status == mediapackage.SCHEDULED or mp.status == mediapackage.RECORDING):
mpUri = mp.getURI()
dest = os.path.join(mpUri, self.check_attachment)
repofile = os.path.join(mpUri, self.rectemp_uris_attachment)
if os.path.exists(repofile):
self.merge(mp, repofile, dest, repo)
logger.info('Starting Ingest of merge delayed mediapackage: {}'.format(mp_id))
worker.enqueue_job_by_name('ingest', mp)

def check_repository(self, signal):
# mp_list is collection of mediapackages ID's
# don't check when recording already
if recorder.is_recording():
return

mp_list = context.get_repository()
for uid,mp in mp_list.iteritems():
start = mp.getDate()
end = start + datetime.timedelta(seconds=(mp.getDuration()/1000))
if mp.status == mediapackage.SCHEDULED and start < datetime.datetime.utcnow() and end > datetime.datetime.utcnow():
# make a check attachment in the mp to mark the mp as having restarted recording
dest = os.path.join(mp.getURI(),self.check_attachment)
if not os.path.isfile(dest):
repocheck = open(dest, "w")
repocheck.write(str(start) + "," + str(end) + ",\n")
repocheck.close()
# duration update
x = datetime.datetime.utcnow() - start
x = x.seconds-2
mp.setDuration(mp.getDuration() - x*1000)
# start-datetime update
mp.setDate(datetime.datetime.utcnow()+datetime.timedelta(seconds=2))
# repository update
mp_list.update(mp)

scheduler = context.get_scheduler()
try:
scheduler.create_timer(mp)
logger.info("Mediapackage with UID: {} have been reprogrammed".format(uid))
except Exception as exc:
logger.error("Error trying to create a new timer for MP {}: {}".format(uid, exc))
4 changes: 3 additions & 1 deletion galicaster/recorder/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def __close_mp(self):
close_duration, self.current_mediapackage.manual, True, self.conf.get_boolean('ingest', 'ignore_capture_devices'))

self.dispatcher.emit("recorder-stopped", self.current_mediapackage.getIdentifier())
self.enqueue_ingest(self.current_mediapackage)
# Do not ingest if status set to nightly at the end of recording
if self.current_mediapackage.getOpStatus('ingest') != mediapackage.OP_NIGHTLY:
self.enqueue_ingest(self.current_mediapackage)
self.last_mediapackage = self.current_mediapackage
self.current_mediapackage = None

Expand Down

0 comments on commit ce196cb

Please sign in to comment.