Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inputs added + recording start without server connection #413

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
14 changes: 12 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
PyCA – Opencast Capture Agent
=============================
fork with following changes:

- capture.py will not start capturing if connection to opencast endpoint is not possible. The original function service() will endless stay in a while-loop with 5sec sleep until endpoint is connected. Events in the database will not start recording. To change this, the already installed flag 'force_update' is used. The while-loop will only wait and loop if force_update=True and return immediately if force_update=False. force_update is passed through the calling functions register_ca(), recording_state(), set_service_status_immediate(), update_agent_state()
- Inputs are now possible. The Definition in pyca.conf is extended with an item inputs
- register_ca() is extended by the registration of the input configuration
- Ingest only uploads the selected tracks from schedule events





.. image:: https://github.com/opencast/pyCA/workflows/Test%20pyCA/badge.svg?branch=master
:target: https://github.com/opencast/pyCA/actions?query=workflow%3A%22Test+pyCA%22+branch%3Amaster
Expand All @@ -11,6 +19,8 @@ PyCA – Opencast Capture Agent
:target: https://github.com/opencast/pyCA/blob/master/license.lgpl
:alt: LGPL-3 license

PyCA – Opencast Capture Agent
=============================
**PyCA** is a fully functional Opencast_ capture agent written in Python.
It is free software licensed under the terms of the `GNU Lesser General Public
License`_.
Expand Down
11 changes: 9 additions & 2 deletions etc/pyca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
# Default: sqlite:///pyca.db
#database = sqlite:///pyca.db

# Name of Inputs
# If inputs='' or selected inputs in event attachment='' all tracks are uploaded
# Type: list of strings (write as '...', '...')
# default: inputs = ''
# inputs = 'HDMI', 'presenter', 'black board'


[capture]

Expand Down Expand Up @@ -78,8 +84,9 @@ directory = './recordings'
# Default: ffmpeg -nostats -re -f lavfi -r 25 -i testsrc -t {{time}} {{dir}}/{{name}}.webm'
command = 'ffmpeg -nostats -re -f lavfi -r 25 -i testsrc -f lavfi -i sine -t {{time}} {{dir}}/{{name}}.webm'

# Flavors of output files produced by the capture command. One flavors should
# be specified for every output file.
# Flavors of output files produced by the capture command. One flavors must
# be specified for every output file an Input. Flavor-names must be unique, number
# of flavors = number of inputs = number of output files
# Type: list of strings (write as '...', '...')
# Default: 'presenter/source'
#flavors = 'presenter/source'
Expand Down
1 change: 1 addition & 0 deletions pyca/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
cal_lookahead = integer(min=0, default=14)
backup_mode = boolean(default=false)
database = string(default='sqlite:///pyca.db')
inputs = force_list(default=list(''))

[capture]
directory = string(default='./recordings')
Expand Down
75 changes: 67 additions & 8 deletions pyca/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,61 @@
notify = sdnotify.SystemdNotifier()


def get_input_params(event):
'''Extract the input configuration parameters from the properties attached
to the schedule-entry
'''

inputs = []
for attachment in event.get_data().get('attach'):
data = attachment.get('data')
if (attachment.get('x-apple-filename') ==
'org.opencastproject.capture.agent.properties'):
for prop in data.split('\n'):
if prop.startswith('capture.device.names'):
param = prop.split('=', 1)
inputs = param[1].split(',')
break
return inputs


def trackinput_selected(event, flavor, track):
''' check if input corresponding to flavor is selected in
schedule-attachment parameter 'capture.device.names'
returns True if input is selected or if capture.device.names=''
'''

# inputs from pyca.conf
inputs_conf = config('agent', 'inputs')

# if no inputs defined, return True -> add all tracks to mediapackage
if (inputs_conf == ['']):
logger.info('No inputs in config defined')
return True

# flavors from pyca.conf
flavors_conf = config('capture', 'flavors')

# inputs in event attachment
inputs_event = get_input_params(event)

# if no inputs in attachment, return True -> add all tracks to mediapackage
if (inputs_event == ['']):
logger.info('No inputs in schedule')
# print('No inputs in event attachment')
return True

# Input corresponding to track-flavor from pyca.conf
input_track = inputs_conf[flavors_conf.index(flavor)]

if input_track in inputs_event:
# Input corresponding to flavor is selected in attachment
return True

# Input corresponding to flavor is not selected in attachment
return False


def get_config_params(properties):
'''Extract the set of configuration parameters from the properties attached
to the schedule
Expand All @@ -45,7 +100,7 @@ def ingest(event):
# Update status
set_service_status(Service.INGEST, ServiceStatus.BUSY)
notify.notify('STATUS=Uploading')
recording_state(event.uid, 'uploading')
recording_state(event.uid, 'uploading', force_update=True)
update_event_status(event, Status.UPLOADING)

# Select ingest service
Expand Down Expand Up @@ -86,11 +141,14 @@ def ingest(event):

# add track
for (flavor, track) in event.get_tracks():
logger.info('Adding track (%s -> %s)', flavor, track)
track = track.encode('ascii', 'ignore')
fields = [('mediaPackage', mediapackage), ('flavor', flavor),
('BODY1', (pycurl.FORM_FILE, track))]
mediapackage = http_request(service_url + '/addTrack', fields)
if trackinput_selected(event, flavor, track):
logger.info('Adding track (%s -> %s)', flavor, track)
track = track.encode('ascii', 'ignore')
fields = [('mediaPackage', mediapackage), ('flavor', flavor),
('BODY1', (pycurl.FORM_FILE, track))]
mediapackage = http_request(service_url + '/addTrack', fields)
else:
logger.info('Ignoring track (%s -> %s)', flavor, track)

# ingest
logger.info('Ingest recording')
Expand Down Expand Up @@ -125,7 +183,7 @@ def safe_start_ingest(event):
except Exception:
logger.exception('Something went wrong during the upload')
# Update state if something went wrong
recording_state(event.uid, 'upload_error')
recording_state(event.uid, 'upload_error', force_update=True)
update_event_status(event, Status.FAILED_UPLOADING)
set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE)

Expand All @@ -134,7 +192,8 @@ def control_loop():
'''Main loop of the capture agent, retrieving and checking the schedule as
well as starting the capture process if necessry.
'''
set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE)
set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE,
force_update=True)
notify.notify('READY=1')
notify.notify('STATUS=Running')
while not terminate():
Expand Down
7 changes: 6 additions & 1 deletion pyca/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ def get_schedule(db):
lookahead = config('agent', 'cal_lookahead') * 24 * 60 * 60
if lookahead:
params['cutoff'] = str((timestamp() + lookahead) * 1000)
uri = '%s/calendars?%s' % (service('scheduler')[0],

service_endpoint = service('scheduler', force_update=True)
if not service_endpoint:
logger.warning('Missing endpoint for updating schedule.')
return
uri = '%s/calendars?%s' % (service_endpoint[0],
urlencode(params))
try:
vcal = http_request(uri)
Expand Down
46 changes: 34 additions & 12 deletions pyca/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,14 @@ def service(service_name, force_update=False):
service_name,
config('services', service_id))
except pycurl.error:
logger.exception('Could not get %s endpoint. Retry in 5s',
service_name)
time.sleep(5.0)
if force_update:
logger.exception('Could not get %s endpoint. Retry in 5s',
service_name)
time.sleep(5.0)
else:
logger.warning('Could not get %s endpoint. Ignoring',
service_name)
break
return config('services', service_id)


Expand All @@ -140,7 +145,7 @@ def ensurelist(x):
return x if type(x) == list else [x]


def register_ca(status='idle'):
def register_ca(status='idle', force_update=False):
'''Register this capture agent at the Matterhorn admin server so that it
shows up in the admin interface.

Expand All @@ -151,7 +156,7 @@ def register_ca(status='idle'):
# here. We will just run silently in the background:
if config('agent', 'backup_mode'):
return
service_endpoint = service('capture.admin')
service_endpoint = service('capture.admin', force_update)
if not service_endpoint:
logger.warning('Missing endpoint for updating agent status.')
return
Expand All @@ -165,8 +170,20 @@ def register_ca(status='idle'):
except pycurl.error as e:
logger.warning('Could not set agent state to %s: %s', status, e)

# register_configuration
url += '/configuration'
inputstring = ",".join(config('agent', 'inputs'))
params = [('configuration',
'{\'capture.device.names\': \'' + inputstring + '\' }')]
try:
response = http_request(url, params).decode('utf-8')
if response:
logger.info(response)
except pycurl.error as e:
logger.warning('Could not set configuration: %s', e)

def recording_state(recording_id, status):

def recording_state(recording_id, status, force_update=False):
'''Send the state of the current recording to the Matterhorn core.

:param recording_id: ID of the current recording
Expand All @@ -177,9 +194,14 @@ def recording_state(recording_id, status):
# in the background:
if config('agent', 'backup_mode'):
return
service_endpoint = service('capture.admin', force_update)
# check if service_endpoint is availible, otherwise service()[0]
# is not defined
if not service_endpoint:
logger.warning('Missing endpoint for updating agent status.')
return
params = [('state', status)]
url = service('capture.admin')[0]
url += f'/recordings/{recording_id}'
url = f'{service_endpoint[0]}/recordings/{recording_id}'
try:
result = http_request(url, params).decode('utf-8')
logger.info(result)
Expand Down Expand Up @@ -209,12 +231,12 @@ def set_service_status(dbs, service, status):
dbs.commit()


def set_service_status_immediate(service, status):
def set_service_status_immediate(service, status, force_update=False):
'''Update the status of a particular service in the database and send an
immediate signal to Opencast.
'''
set_service_status(service, status)
update_agent_state()
update_agent_state(force_update)


@db.with_session
Expand All @@ -229,7 +251,7 @@ def get_service_status(dbs, service):
return db.ServiceStatus.STOPPED


def update_agent_state():
def update_agent_state(force_update=False):
'''Update the current agent state in opencast.
'''
status = 'idle'
Expand All @@ -242,7 +264,7 @@ def update_agent_state():
elif get_service_status(db.Service.INGEST) == db.ServiceStatus.BUSY:
status = 'uploading'

register_ca(status=status)
register_ca(status=status, force_update=force_update)


def terminate(shutdown=None):
Expand Down