Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for VTN-to-VEN Reports #174

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 58 additions & 20 deletions openleadr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ def add_handler(self, handler, callback):
"""
Add a callback for the given situation
"""
if handler not in ('on_event', 'on_update_event'):
logger.error("'handler' must be either on_event or on_update_event")
valid = ('on_event', 'on_update_event', 'on_register_report', 'on_report_update')
if handler not in valid:
logger.error("'handler' must be in %s" % (valid,))
return

setattr(self, handler, callback)
Expand Down Expand Up @@ -713,7 +714,7 @@ async def register_reports(self, reports):
# Handle the subscriptions that the VTN is interested in.
if 'report_requests' in response_payload:
await self.create_report(response_payload)

async def create_report(self, response_payload):
"""
Add the requested reports to the reporting mechanism.
Expand Down Expand Up @@ -799,7 +800,7 @@ async def create_report(self, response_payload):

if not single and report_back_duration.total_seconds() > 0:
callback = partial(self.update_report, report_request_id=report_request_id)

reporting_interval = granularity or report_back_duration
job = self.scheduler.add_job(func=callback,
trigger='cron',
Expand All @@ -817,15 +818,15 @@ async def create_report(self, response_payload):
'r_ids': requested_r_ids,
'granularity': granularity,
'job': None})

async def report_callback():
await self.update_report(report_request_id)

if 'report_interval' in report_request['report_specifier']:
self.scheduler.add_job(report_callback, 'date', run_date=report_request['report_specifier']['report_interval']['dtstart'])
else:
await self.update_report(report_request_id)

# Send the oadrCreatedReport message
message_type = 'oadrCreatedReport'
message_payload = {'pending_reports':
Expand Down Expand Up @@ -1021,6 +1022,24 @@ async def on_update_event(self, event):
if event['event_descriptor']['event_id'] in self.responded_events:
return self.responded_events.get(event['event_descriptor']['event_id'])

async def on_report_update(self, report_update):
"""
Placeholder for the on_report_update handler.
"""
logger.warning("A report update was sent but you don't have an on_report_update handler configured. "
"You should implement your own on_report_update handler. This handler receives "
"an oadrReport dict and should not return anything in response.")
return

async def on_register_report(self, report_metadata):
"""
Placeholder for the on_register_report handler.
"""
logger.warning("A report update was sent but you don't have an on_register_report handler configured. "
"You should implement your own on_report_update handler. This handler receives "
"an oadrReport dict and should not return anything in response.")
return None

async def on_cancel_party_registration(self, message):
if self.registration_id is None:
logger.info('VEN is not registered, doing nothing')
Expand Down Expand Up @@ -1141,6 +1160,35 @@ async def _execute_hooks(self, hook_name, *args, **kwargs):
logger.error(f"An error occurred while executing your '{hook_name}': {hook}:"
f"{err.__class__.__name__}: {err}")

async def _on_register_report(self, response_payload):
report_requests = []
for report_metadata in response_payload['reports']:
request = await self.on_register_report(response_payload)
if request:
report_requests.append(request)

message = self._create_message('oadrRegisteredReport',
report_requests=report_requests,
ven_id=self.ven_id,
response={'response_code': 200,
'response_description': 'OK',
'request_id': response_payload['request_id']})
service = 'EiReport'
await self._perform_request(service, message)

async def _on_report_update(self, response_payload):
for report_update in response_payload['reports']:
await self.on_report(response_payload)
message = self._create_message('oadrUpdatedReport',
ven_id=self.ven_id,
response= {
'request_id': response_payload['request_id'],
'response_code': 200,
'response_description': 'OK'
})
service = 'EiReport'
await self._perform_request(service, message)

async def _on_event(self, message):
events = message['events']
invalid_vtn_id = False
Expand Down Expand Up @@ -1242,7 +1290,7 @@ async def _event_status_log(self):
# ignoring the cancelled case
if event['event_descriptor']['event_status'] == 'cancelled':
continue

event_status = utils.determine_event_status(event['active_period'])
if event_status != event['event_descriptor']['event_status']:
event['event_descriptor']['event_status'] = event_status
Expand Down Expand Up @@ -1279,24 +1327,14 @@ async def _poll(self):
await self._on_event(response_payload)

elif response_type == 'oadrUpdateReport':
await self._on_report(response_payload)
await self._on_report_update(response_payload)

elif response_type == 'oadrCreateReport':
if 'report_requests' in response_payload:
await self.create_report(response_payload)

elif response_type == 'oadrRegisterReport':
# We don't support receiving reports from the VTN at this moment
logger.warning("The VTN offered reports, but OpenLEADR "
"does not support reports in this direction.")
message = self._create_message('oadrRegisteredReport',
report_requests=[],
ven_id=self.ven_id,
response={'response_code': 200,
'response_description': 'OK',
'request_id': response_payload['request_id']})
service = 'EiReport'
reponse_type, response_payload = await self._perform_request(service, message)
await self._on_register_report(response_payload)

elif response_type == 'oadrCancelPartyRegistration':
logger.info("The VTN required us to cancel the registration. Calling the cancel party registration procedure.")
Expand All @@ -1305,7 +1343,7 @@ async def _poll(self):
elif response_type == 'oadrCancelReport':
logger.info("The VTN required us to cancel a report. Calling the cancel report procedure.")
await self.cancel_report(response_payload)

else:
logger.warning(f"No handler implemented for incoming message "
f"of type {response_type}, ignoring.")
Expand Down