diff --git a/ckanext/canada/auth.py b/ckanext/canada/auth.py index 7a598d7f7..f72793428 100644 --- a/ckanext/canada/auth.py +++ b/ckanext/canada/auth.py @@ -1,5 +1,6 @@ from ckan.plugins.toolkit import chained_auth_function, config from ckan.authz import has_user_permission_for_group_or_org, is_sysadmin +from ckan.plugins import plugin_loaded def _is_reporting_user(context): @@ -10,19 +11,19 @@ def _is_reporting_user(context): # block datastore-modifying APIs on the portal @chained_auth_function def datastore_create(up_func, context, data_dict): - if 'canada_internal' not in config.get('ckan.plugins'): + if not plugin_loaded('canada_internal'): return {'success': False} return up_func(context, data_dict) @chained_auth_function def datastore_delete(up_func, context, data_dict): - if 'canada_internal' not in config.get('ckan.plugins'): + if not plugin_loaded('canada_internal'): return {'success': False} return up_func(context, data_dict) @chained_auth_function def datastore_upsert(up_func, context, data_dict): - if 'canada_internal' not in config.get('ckan.plugins'): + if not plugin_loaded('canada_internal'): return {'success': False} return up_func(context, data_dict) @@ -51,3 +52,21 @@ def organization_list(context, data_dict): def organization_show(context, data_dict): return {'success': bool(context.get('user'))} + + +def portal_sync_info(context, data_dict): + """ + Registry users have to be logged in. + + Anyone on public Portal can access. + """ + if plugin_loaded('canada_internal'): + return {'success': bool(context.get('user'))} + return {'success': True} + + +def list_out_of_sync_packages(context, data_dict): + """ + Only sysadmins can list the out of sync packages. + """ + return {'success': False} diff --git a/ckanext/canada/cli.py b/ckanext/canada/cli.py index b219e426f..00eb2d510 100644 --- a/ckanext/canada/cli.py +++ b/ckanext/canada/cli.py @@ -12,10 +12,12 @@ import requests from collections import defaultdict +from typing import Optional, Union, Tuple + from contextlib import contextmanager from urllib.request import URLError from urllib.parse import urlparse -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from ckan.logic import get_action from ckan import model @@ -38,6 +40,7 @@ import ckanext.datastore.backend.postgres as datastore from ckanext.canada import triggers +from ckanext.canada import model as canada_model BOM = "\N{bom}" @@ -143,9 +146,9 @@ def _portal_update(self, activity_date): log = open(self.log, 'a') registry = LocalCKAN() - source_ds = str(datastore.get_write_engine().url) + source_datastore_uri = str(datastore.get_write_engine().url) - def changed_package_id_runs(start_date, verbose=False): + def changed_package_id_runs(start_date, verbose:Optional[bool]=False): # retrieve a list of changed packages from the registry while True: packages, next_date = _changed_packages_since( @@ -163,7 +166,7 @@ def changed_package_id_runs(start_date, verbose=False): 'canada', 'copy-datasets', '-o', - source_ds, + source_datastore_uri, '-u', self.ckan_user ] @@ -183,7 +186,7 @@ def changed_package_id_runs(start_date, verbose=False): # Advance generator so we may call send() below next(pool) - def append_log(finished, package_id, action, reason): + def append_log(finished, package_id, action, reason, error: Optional[Union[str, None]]=None): if not log: return log.write(json.dumps([ @@ -193,6 +196,8 @@ def append_log(finished, package_id, action, reason): action, reason, ]) + '\n') + if error: + log.write(error + '\n') log.flush() with _quiet_int_pipe(): @@ -209,17 +214,34 @@ def append_log(finished, package_id, action, reason): stats = completion_stats(self.processes) while result is not None: try: - package_id, action, reason = json.loads(result) + package_id, action, reason, error, failure_reason, failure_trace, do_update_sync_success_time = json.loads(result) except Exception as e: if self.verbose: print("Worker proccess failed on:") print(result) raise Exception(e) - print(job_ids, next(stats), finished, package_id, \ - action, reason) - append_log(finished, package_id, action, reason) + _stats = next(stats) + print(job_ids, _stats, finished, package_id, action, reason) + if error: + print(job_ids, _stats, finished, package_id, error) + append_log(finished, package_id, action, reason, error) job_ids, finished, result = next(pool) + #TODO: need to actually output error to stderr!!! + + # save the sync state in the database + last_successful_sync = None + if do_update_sync_success_time: + last_successful_sync = datetime.now(timezone.utc) + else: + sync_obj = canada_model.PackageSync.get(package_id=package_id) + if sync_obj: + last_successful_sync = sync_obj.last_successful_sync + canada_model.PackageSync.upsert(package_id=package_id, + last_successful_sync=last_successful_sync, + error_on=failure_reason or None, + error=failure_trace or None) + print(" --- next batch starting at: " + next_date.isoformat()) append_log( None, @@ -231,8 +253,11 @@ def append_log(finished, package_id, action, reason): self._portal_update_completed = True -def _changed_packages_since(registry, since_time, ids_only=False, verbose=False): +def _changed_packages_since(registry: LocalCKAN, since_time: str, + ids_only: Optional[bool]=False, verbose: Optional[bool]=False): """ + PortalUpdater member: Gathers packages based on activity. + Query source ckan instance for packages changed since_time. returns (packages, next since_time to query) or (None, None) when no more changes are found. @@ -278,13 +303,14 @@ def _changed_packages_since(registry, since_time, ids_only=False, verbose=False) return packages, since_time -def _copy_datasets(source, user, mirror=False, verbose=False): +def _copy_datasets(source_datastore_uri: Optional[Union[str, None]], user: Optional[Union[str, None]]=None, + mirror: Optional[bool]=False, verbose: Optional[bool]=False): """ - A process that accepts packages on stdin which are compared - to the local version of the same package. The local package is - then created, updated, deleted or left unchanged. This process - outputs that action as a string 'created', 'updated', 'deleted' - or 'unchanged' + PortalUpdater member: Syncs package dicts from the stdin (valid JSON). + + A process that accepts packages on stdin which are compared o the local version of the same package. + The local package is hen created, updated, deleted or left unchanged. This process outputs that + action as a string 'created', 'updated', 'deleted' or 'unchanged'. """ with _quiet_int_pipe(): portal = LocalCKAN(username = user) @@ -345,38 +371,111 @@ def _copy_datasets(source, user, mirror=False, verbose=False): target_pkg = get_datastore_and_views(target_pkg, portal, verbose=verbose) _trim_package(target_pkg) - target_hash = {} + resource_file_hashes = {} + + failure_reason = '' + failure_trace = '' + do_update_sync_success_time = False + error = '' # will output to stderr, while action gets outputted to stdout if action == 'skip': pass elif target_pkg is None and source_pkg is None: action = 'unchanged' reason = reason or 'deleted on registry' + do_update_sync_success_time = False # do not update sync time if nothing changed elif target_deleted: action = 'updated' reason = 'undeleting on target' - portal.action.package_update(**source_pkg) - for r in source_pkg['resources']: - target_hash[r['id']] = r.get('hash') - action += _add_datastore_and_views(source_pkg, portal, target_hash, source, verbose=verbose) + try: + portal.action.package_update(**source_pkg) + do_update_sync_success_time = True + except Exception as e: + failure_reason = 'package_update' + failure_trace = traceback.format_exc() + do_update_sync_success_time = False + error += 'package-update-failed for %s %s' % (package_id, str(e)) + if verbose: + error += '\n Failed with Error: %s' % failure_trace + pass + else: # no need to attempt to update datastore and views if the base package failed + for r in source_pkg['resources']: + # use Registry file hashes for force undelete + resource_file_hashes[r['id']] = r.get('hash') + _action, _error, failure_reason, failure_trace = _add_datastore_and_views(source_pkg, portal, + resource_file_hashes, + source_datastore_uri, + verbose=verbose) + action += _action + error += _error + if failure_reason: + do_update_sync_success_time = False elif target_pkg is None: action = 'created' - portal.action.package_create(**source_pkg) - action += _add_datastore_and_views(source_pkg, portal, target_hash, source, verbose=verbose) + try: + portal.action.package_create(**source_pkg) + do_update_sync_success_time = True + except Exception as e: + failure_reason = 'package_create' + failure_trace = traceback.format_exc() + do_update_sync_success_time = False + error += 'package-create-failed for %s %s' % (package_id, str(e)) + if verbose: + error += '\n Failed with Error: %s' % failure_trace + pass + else: # no need to attempt to update datastore and views if the base package failed + _action, _error, failure_reason, failure_trace = _add_datastore_and_views(source_pkg, portal, + resource_file_hashes, + source_datastore_uri, + verbose=verbose) + action += _action + error += _error + if failure_reason: + do_update_sync_success_time = False elif source_pkg is None: action = 'deleted' - portal.action.package_delete(id=package_id) + try: + portal.action.package_delete(id=package_id) + do_update_sync_success_time = True + except Exception as e: + failure_reason = 'package_delete' + failure_trace = traceback.format_exc() + do_update_sync_success_time = False + error += 'package-delete-failed for %s %s' % (package_id, str(e)) + if verbose: + error += '\n Failed with Error: %s' % failure_trace + pass elif source_pkg == target_pkg: action = 'unchanged' reason = 'no difference found' + do_update_sync_success_time = False # do not update sync time if nothing changed else: action = 'updated' for r in target_pkg['resources']: - target_hash[r['id']] = r.get('hash') - portal.action.package_update(**source_pkg) - action += _add_datastore_and_views(source_pkg, portal, target_hash, source, verbose=verbose) - - sys.stdout.write(json.dumps([package_id, action, reason]) + '\n') + # use Portal file hashes + resource_file_hashes[r['id']] = r.get('hash') + try: + portal.action.package_update(**source_pkg) + do_update_sync_success_time = True + except Exception as e: + failure_reason = 'package_update' + failure_trace = traceback.format_exc() + do_update_sync_success_time = False + error += 'package-update-failed for %s %s' % (package_id, str(e)) + if verbose: + error += '\n Failed with ValidationError: %s' % failure_trace + pass + else: # no need to attempt to update datastore and views if the base package failed + _action, _error, failure_reason, failure_trace = _add_datastore_and_views(source_pkg, portal, + resource_file_hashes, + source_datastore_uri, + verbose=verbose) + error += _error + action += _action + if failure_reason: + do_update_sync_success_time = False + + sys.stdout.write(json.dumps([package_id, action, reason, error, failure_reason, failure_trace, do_update_sync_success_time]) + '\n') sys.stdout.flush() @@ -664,8 +763,10 @@ def _bulk_validate(): log.close() -def _trim_package(pkg): +def _trim_package(pkg: Optional[Union[dict, None]]=None): """ + PortalUpdater member: removes keys from provided package dict. + remove keys from pkg that we don't care about when comparing or updating/creating packages. Also try to convert types and create missing fields that will be present in package_show. @@ -699,81 +800,140 @@ def _trim_package(pkg): pkg[k] = '' -def _add_datastore_and_views(package, portal, res_hash, ds, verbose=False): +def _add_datastore_and_views(package: dict, portal: LocalCKAN, resource_file_hashes: dict, + source_datastore_uri: str, verbose: Optional[bool]=False) -> Tuple[str, str, str, str]: + """ + PortalUpdater member: Syncs DataDictionaries, Resource Views, and DataStore tables. + """ # create datastore table and views for each resource of the package action = '' + error = '' + failure_reason = '' + failure_trace = '' for resource in package['resources']: res_id = resource['id'] if res_id in package.keys(): if 'data_dict' in package[res_id].keys(): - action += _add_to_datastore(portal, resource, package[res_id], res_hash, ds, verbose=verbose) + _action, _error, _failure_reason, _failure_trace = _add_to_datastore(portal, resource, + package[res_id], resource_file_hashes, + source_datastore_uri, verbose=verbose) + action += _action + error += _error + if failure_reason: + failure_reason += ',%s' % _failure_reason # comma separate multiple failure reasons + failure_trace += '\n\n%s' % _failure_trace # separate multiple failure traces with newlines + else: + failure_reason = _failure_reason + failure_trace = _failure_trace if 'views' in package[res_id].keys(): - action += _add_views(portal, resource, package[res_id], verbose=verbose) - return action - - -def _delete_datastore_and_views(package, portal): - # remove datastore table and views for each resource of the package - action = '' - for resource in package['resources']: - res_id = resource['id'] - try: - portal.call_action('datastore_delete', {"id": resource['id'], "force": True}) - action += '\n datastore-deleted for ' + resource['id'] - except NotFound: - action += '\n failed to delete datastore for ' + resource['id'] - try: - portal.call_action('resource_view_clear', {"id": resource['id'], "force": True}) - action += '\n views-deleted for ' + resource['id'] - except NotFound: - action += '\n failed to delete all views for ' + resource['id'] - return action + _action, _error, _failure_reason, _failure_trace = _add_views(portal, resource, package[res_id], verbose=verbose) + action += _action + error += _error + if failure_reason: + failure_reason += ',%s' % _failure_reason # comma separate multiple failure reasons + failure_trace += '\n\n%s' % _failure_trace # separate multiple failure traces with newlines + else: + failure_reason = _failure_reason + failure_trace = _failure_trace + return action, error, failure_reason, failure_trace -def _add_to_datastore(portal, resource, resource_details, t_hash, source_ds_url, verbose=False): +def _add_to_datastore(portal: LocalCKAN, resource: dict, resource_details: dict, + resource_file_hashes: dict, source_datastore_uri: str, verbose: Optional[bool]=False) -> Tuple[str, str, str, str]: + """ + PortalUpdater member: Syncs DataDictionaries and DataStore tables. + """ action = '' + error = '' + failure_reason = '' + failure_trace = '' try: portal.call_action('datastore_search', {'resource_id': resource['id'], 'limit': 0}) - if t_hash.get(resource['id']) \ - and t_hash.get(resource['id']) == resource.get('hash')\ + if resource_file_hashes.get(resource['id']) \ + and resource_file_hashes.get(resource['id']) == resource.get('hash')\ and _datastore_dictionary(portal, resource['id']) == resource_details['data_dict']: if verbose: action += '\n File hash and Data Dictionary has not changed, skipping DataStore for %s...' % resource['id'] return action else: - portal.call_action('datastore_delete', {"id": resource['id'], "force": True}) - action += '\n datastore-deleted for ' + resource['id'] + try: + portal.call_action('datastore_delete', {"id": resource['id'], "force": True}) + action += '\n datastore-deleted for ' + resource['id'] + except Exception as e: + failure_reason = 'datastore_delete[resource_id=%s]' % resource['id'] + failure_trace = traceback.format_exc() + error += '\n datastore-create-failed for %s %s' % (resource['id'], str(e)) + if verbose: + error += '\n Failed with Error: %s' % failure_trace + pass except NotFound: # not an issue, resource does not exist in datastore if verbose: action += '\n DataStore does not exist for resource %s...trying to create it...' % resource['id'] pass - portal.call_action('datastore_create', - {"resource_id": resource['id'], - "fields": resource_details['data_dict'], - "force": True}) - - action += '\n datastore-created for ' + resource['id'] + datastore_created = False + try: + portal.call_action('datastore_create', + {"resource_id": resource['id'], + "fields": resource_details['data_dict'], + "force": True}) + + action += '\n datastore-created for ' + resource['id'] + datastore_created = True + except Exception as e: + _failure_reason = 'datastore_create[resource_id=%s]' % resource['id'] + _failure_trace = traceback.format_exc() + if failure_reason: + failure_reason += ',%s' % _failure_reason # comma separate multiple failure reasons + failure_trace += '\n\n%s' % _failure_trace # separate multiple failure traces with newlines + else: + failure_reason = _failure_reason + failure_trace = _failure_trace + error += '\n datastore-create-failed for %s %s' % (resource['id'], str(e)) + if verbose: + error += '\n Failed with Error: %s' % failure_trace + pass - # load data - target_ds_url = str(datastore.get_write_engine().url) - cmd1 = subprocess.Popen(['pg_dump', source_ds_url, '-a', '-t', resource['id']], stdout=subprocess.PIPE) - cmd2 = subprocess.Popen(['psql', target_ds_url], stdin=cmd1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = cmd2.communicate() - action += ' data-loaded' if not err else ' data-load-failed' - if verbose: - if resource_details['data_dict']: - action += '\n Using DataStore fields:' - for field in resource_details['data_dict']: - action += '\n %s' % field['id'] + if datastore_created: + # load data + target_datastore_uri = str(datastore.get_write_engine().url) + cmd1 = subprocess.Popen(['pg_dump', source_datastore_uri, '-a', '-t', resource['id']], stdout=subprocess.PIPE) + cmd2 = subprocess.Popen(['psql', target_datastore_uri], stdin=cmd1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = cmd2.communicate() + if not err: + action += ' data-loaded' + if verbose: + if resource_details['data_dict']: + action += '\n Using DataStore fields:' + for field in resource_details['data_dict']: + action += '\n %s' % field['id'] + else: + action += '\n There are no DataStore fields!!!' else: - action += '\n There are no DataStore fields!!!' - return action + _failure_reason = 'datastore_load[resource_id=%s]' % resource['id'] + _failure_trace = err + if failure_reason: + failure_reason += ',%s' % _failure_reason # comma separate multiple failure reasons + failure_trace += '\n\n%s' % _failure_trace # separate multiple failure traces with newlines + else: + failure_reason = _failure_reason + failure_trace = _failure_trace + error += '\n datastore-load-failed for %s %s' % (resource['id'], str(e)) + if verbose: + error += '\n Failed with Error: %s' % failure_trace + + return action, error, failure_reason, failure_trace -def _add_views(portal, resource, resource_details, verbose=False): +def _add_views(portal: LocalCKAN, resource: dict, resource_details: dict, verbose: Optional[bool]=False) -> Tuple[str, str, str, str]: + """ + PortalUpdater member: Syncs Resource Views. + """ action = '' + error = '' + failure_reason = '' + failure_trace = '' target_views = portal.call_action('resource_view_list', {'id': resource['id']}) for src_view in resource_details['views']: view_action = 'resource_view_create' @@ -784,11 +944,19 @@ def _add_views(portal, resource, resource_details, verbose=False): if view_action: try: portal.call_action(view_action, src_view) - action += '\n ' + view_action + ' ' + src_view['id'] + ' for resource ' + resource['id'] - except ValidationError as e: - action += '\n ' + view_action + ' failed for view ' + src_view['id'] + ' for resource ' + resource['id'] + action += '\n %s %s for resource %s' % (view_action, src_view['id'], resource['id']) + except Exception as e: + _failure_reason = '%s[resource_id=%s][view_id=%s]' % (view_action, resource['id'], src_view['id']) + _failure_trace = traceback.format_exc() + if failure_reason: + failure_reason += ',%s' % _failure_reason # comma separate multiple failure reasons + failure_trace += '\n\n%s' % _failure_trace # separate multiple failure traces with newlines + else: + failure_reason = _failure_reason + failure_trace = _failure_trace + error += '\n %s failed for view %s for resource %s %s' % (view_action, src_view['id'], resource['id'], str(e)) if verbose: - action += '\n Failed with ValidationError: %s' % e.error_dict + error += '\n Failed with Error: %s' % str(e) pass for target_view in target_views: @@ -799,10 +967,24 @@ def _add_views(portal, resource, resource_details, verbose=False): break if to_delete: view_action = 'resource_view_delete' - portal.call_action(view_action, {'id':target_view['id']}) - action += '\n ' + view_action + ' ' + src_view['id'] + ' for resource ' + resource['id'] + try: + portal.call_action(view_action, {'id':target_view['id']}) + action += '\n %s %s for resource %s' % (view_action, src_view['id'], resource['id']) + except Exception as e: + _failure_reason = '%s[resource_id=%s][view_id=%s]' % (view_action, resource['id'], src_view['id']) + _failure_trace = traceback.format_exc() + if failure_reason: + failure_reason += ',%s' % _failure_reason # comma separate multiple failure reasons + failure_trace += '\n\n%s' % _failure_trace # separate multiple failure traces with newlines + else: + failure_reason = _failure_reason + failure_trace = _failure_trace + error += '\n %s failed for view %s for resource %s %s' % (view_action, src_view['id'], resource['id'], str(e)) + if verbose: + error += '\n Failed with Error: %s' % failure_trace + pass - return action + return action, error, failure_reason, failure_trace def get_datastore_and_views(package, ckan_instance, verbose=False): @@ -908,7 +1090,7 @@ def _quiet_int_pipe(): raise -def _get_user(user): +def _get_user(user:Optional[Union[str, None]]=None) -> str: if user is not None: return user return get_action('get_site_user')({'ignore_auth': True}).get('name') @@ -980,6 +1162,8 @@ def portal_update(portal_ini, delay=60, verbose=False): """ + PortalUpdater member: CKAN cli command entrance to run the PortalUpdater stack. + Collect batches of packages modified at local CKAN since activity_date and apply the package updates to the portal instance for all packages with published_date set to any time in the past. @@ -1029,13 +1213,14 @@ def portal_update(portal_ini, is_flag=True, help="Increase verbosity", ) -def copy_datasets(mirror=False, ckan_user=None, source=None, verbose=False): +def copy_datasets(mirror: Optional[bool]=False, ckan_user: Optional[Union[str, None]]=None, + source: Optional[Union[str, None]]=None, verbose: Optional[bool]=False): """ - A process that accepts packages on stdin which are compared - to the local version of the same package. The local package is - then created, updated, deleted or left unchanged. This process - outputs that action as a string 'created', 'updated', 'deleted' - or 'unchanged' + PortalUpdater member: CKAN cli command entrance to sync packages from stdin (valid JSON). + + A process that accepts packages on stdin which are compared to the local version of the same package. + The local package is then created, updated, deleted or left unchanged. This process outputs that + action as a string 'created', 'updated', 'deleted' or 'unchanged'. Full Usage:\n canada copy-datasets [-m] [-o ] diff --git a/ckanext/canada/logic.py b/ckanext/canada/logic.py index f282d8184..2e63774a1 100644 --- a/ckanext/canada/logic.py +++ b/ckanext/canada/logic.py @@ -37,6 +37,9 @@ from ckanext.scheming.helpers import scheming_get_preset from ckanext.datastore.backend import DatastoreBackend +from ckanext.canada import model as canada_model + +from ckanapi import RemoteCKAN MIMETYPES_AS_DOMAINS = [ 'application/x-msdos-program', # .com @@ -529,3 +532,56 @@ def canada_datastore_run_triggers(up_func, context, data_dict): context['connection'] = backend._get_write_engine().connect() with datastore_create_temp_user_table(context, drop_on_commit=False): return up_func(context, data_dict) + + +@side_effect_free +def portal_sync_info(context, data_dict): + """ + Returns PackageSync object for a given package_id if it exists. + """ + package_id = get_or_bust(data_dict, 'id') + + check_access('portal_sync_info', context, data_dict) + + sync_info = canada_model.PackageSync.get(package_id=package_id) + + if not sync_info: + raise ObjectNotFound(_('No Portal Sync information found for package %s') % package_id) + + # NOTE: never show sync_info.error as it contains stack traces and system information + return { + 'package_id': sync_info.package_id, + 'created': sync_info.created, + 'last_successful_sync': sync_info.last_successful_sync, + 'error_on': sync_info.error_on, + } + + +@side_effect_free +def list_out_of_sync_packages(context, data_dict): + """ + Returns a list of out of sync packages on the Portal. + + Based on PackageSync model. + """ + check_access('portal_sync_info', context, data_dict) + + limit = data_dict.get('limit', 25) + + sync_infos = model.Session.query(canada_model.PackageSync).filter(canada_model.PackageSync.error_on != None).limit(limit) + + if not sync_infos: + return [] + + out_of_sync_packages = [] + + for sync_info in sync_infos: + try: + pkg_dict = get_action('package_show')({'user': context.get('user')}, {'id': sync_info.package_id}) + except (ObjectNotFound, NotAuthorized): + continue + # NOTE: never show sync_info.error as it contains stack traces and system information + out_of_sync_packages.append({'pkg_dict': pkg_dict, 'last_successful_sync': sync_info.last_successful_sync, + 'error_on': sync_info.error_on}) + + return out_of_sync_packages diff --git a/ckanext/canada/migration/canada_public/README b/ckanext/canada/migration/canada_public/README new file mode 100644 index 000000000..98e4f9c44 --- /dev/null +++ b/ckanext/canada/migration/canada_public/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/ckanext/canada/migration/canada_public/alembic.ini b/ckanext/canada/migration/canada_public/alembic.ini new file mode 100644 index 000000000..f0d665297 --- /dev/null +++ b/ckanext/canada/migration/canada_public/alembic.ini @@ -0,0 +1,74 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = %(here)s + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# timezone to use when rendering the date +# within the migration file as well as the filename. +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +#truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; this defaults +# to /srv/app/ckan/registry/src/ckanext-canada/ckanext/canada/migration/canada_public/versions. When using multiple version +# directories, initial revisions must be specified with --version-path +# version_locations = %(here)s/bar %(here)s/bat /srv/app/ckan/registry/src/ckanext-canada/ckanext/canada/migration/canada_public/versions + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/ckanext/canada/migration/canada_public/env.py b/ckanext/canada/migration/canada_public/env.py new file mode 100644 index 000000000..009368218 --- /dev/null +++ b/ckanext/canada/migration/canada_public/env.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +from __future__ import with_statement +from alembic import context +from sqlalchemy import engine_from_config, pool +from logging.config import fileConfig + +import os + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + +name = os.path.basename(os.path.dirname(__file__)) + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + + url = config.get_main_option(u"sqlalchemy.url") + context.configure( + url=url, target_metadata=target_metadata, literal_binds=True, + version_table=u'{}_alembic_version'.format(name) + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix=u'sqlalchemy.', + poolclass=pool.NullPool) + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + version_table=u'{}_alembic_version'.format(name) + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/ckanext/canada/migration/canada_public/script.py.mako b/ckanext/canada/migration/canada_public/script.py.mako new file mode 100644 index 000000000..2c0156303 --- /dev/null +++ b/ckanext/canada/migration/canada_public/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/ckanext/canada/migration/canada_public/versions/0ef791477ff0_.py b/ckanext/canada/migration/canada_public/versions/0ef791477ff0_.py new file mode 100644 index 000000000..57041b3ba --- /dev/null +++ b/ckanext/canada/migration/canada_public/versions/0ef791477ff0_.py @@ -0,0 +1,36 @@ +"""empty message + +Revision ID: 0ef791477ff0 +Revises: +Create Date: 2024-10-18 18:20:50.040861 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '0ef791477ff0' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + from ckanext.canada.model import PackageSync + if PackageSync.__table__.exists(): + print('%s database table already exists' % PackageSync.__tablename__) + return + PackageSync.__table__.create() + print('%s database table created' % PackageSync.__tablename__) + pass + + +def downgrade(): + from ckanext.canada.model import PackageSync + if not PackageSync.__table__.exists(): + print('%s database table not present' % PackageSync.__tablename__) + return + PackageSync.__table__.drop() + print('%s database table deleted' % PackageSync.__tablename__) + pass diff --git a/ckanext/canada/model.py b/ckanext/canada/model.py new file mode 100644 index 000000000..1c2c9d175 --- /dev/null +++ b/ckanext/canada/model.py @@ -0,0 +1,87 @@ +# encoding: utf-8 + +import datetime +import logging + +from sqlalchemy import Column, Unicode, DateTime, Integer +from sqlalchemy.ext.declarative import declarative_base + +from ckan.model import meta + +log = logging.getLogger(__name__) + + +Base = declarative_base(metadata=meta.metadata) + + +class PackageSync(Base): + __tablename__ = 'package_sync' + + id = Column(Integer, primary_key=True, autoincrement=True) + package_id = Column(Unicode) + created = Column(DateTime, default=datetime.datetime.now(datetime.timezone.utc)) + last_successful_sync = Column(DateTime, nullable=True) + error_on = Column(Unicode, nullable=True) + error = Column(Unicode, nullable=True) + + @classmethod + def get(cls, package_id, for_update=False): + '''Returns a package_sync object referenced by its package_id.''' + if not package_id: + return None + + q = meta.Session.query(cls).autoflush(True).filter_by(package_id=package_id) + if for_update: + q = q.with_for_update() + return q.first() + + + @classmethod + def upsert(cls, package_id, last_successful_sync=None, error_on=None, error=None): + '''Sets and returns a package_sync object referenced by its package_id.''' + package_sync = cls.get(package_id, for_update=True) + + if package_sync: + package_sync.error_on = error_on + package_sync.error = error + package_sync.created = datetime.datetime.now(datetime.timezone.utc) + package_sync.last_successful_sync = last_successful_sync + else: + package_sync = cls(package_id=package_id, last_successful_sync=last_successful_sync, + error_on=error_on, error=error) + + meta.Session.add(package_sync) + meta.Session.commit() + + return cls.get(package_id) + + + @classmethod + def delete(cls, package_id): + '''Deletes a pacage_sync object referenced by its package_id.''' + package_sync = cls.get(package_id, for_update=True) + + if not package_sync: + return + + meta.Session.query(cls).filter_by(package_id=package_id).delete() + meta.Session.commit() + + +def _get_models(): + return [PackageSync] + + +def create_tables(models=_get_models()): + for model in models: + model.__table__.create() + + log.info('%s database table created' % model.__tablename__) + + +def tables_no_exist(): + return [model for model in _get_models() if not model.__table__.exists()] + + +def tables_exist(): + return [model for model in _get_models() if model.__table__.exists()] diff --git a/ckanext/canada/plugins.py b/ckanext/canada/plugins.py index 508df9669..4d39d8c6a 100755 --- a/ckanext/canada/plugins.py +++ b/ckanext/canada/plugins.py @@ -501,6 +501,8 @@ def get_actions(self): resource_view_update=resource_view_update_bilingual, resource_view_create=resource_view_create_bilingual, datastore_run_triggers=logic.canada_datastore_run_triggers, + portal_sync_info=logic.portal_sync_info, + list_out_of_sync_packages=logic.list_out_of_sync_packages, ) # IAuthFunctions @@ -511,6 +513,8 @@ def get_auth_functions(self): 'group_show': auth.group_show, 'organization_list': auth.organization_list, 'organization_show': auth.organization_show, + 'portal_sync_info': auth.portal_sync_info, + 'list_out_of_sync_packages': auth.list_out_of_sync_packages, } # IXloader diff --git a/ckanext/canada/templates/admin/base.html b/ckanext/canada/templates/admin/base.html index 63f9bcc63..58dea4952 100644 --- a/ckanext/canada/templates/admin/base.html +++ b/ckanext/canada/templates/admin/base.html @@ -15,6 +15,7 @@ {{ h.build_nav_icon('admin.trash', _('Trash'), icon='trash-o') }} {{ h.build_nav_icon('canada.ckanadmin_publish', _('Publish Records'), icon='cloud-upload') }} {{ h.build_nav_icon('canada.ckanadmin_job_queue', _('Job Queue'), icon='tasks') }} + {{ h.build_nav_icon('canada.ckan_admin_portal_sync', _('Out of Sync Packages'), icon='exclamation-triangle') }} {{ h.build_extra_admin_nav() }} {% else %} {{ super() }} diff --git a/ckanext/canada/templates/admin/portal_sync.html b/ckanext/canada/templates/admin/portal_sync.html new file mode 100644 index 000000000..3f4c73841 --- /dev/null +++ b/ckanext/canada/templates/admin/portal_sync.html @@ -0,0 +1,37 @@ +{% extends 'admin/base.html' %} + +{% block primary_content_inner %} +

{{ _('Out of Sync Packages') }}

+
+ {% if g.page.item_count == 0 %} +
+ {% snippet 'snippets/search_result_text.html', query=g.q, count=g.page.item_count, type='dataset' %} +
+ {% else %} +
+ {% snippet 'snippets/search_result_text.html', query=g.q, count=g.page.item_count, type='dataset' %} +
+ {% endif %} +
+ + {%- block pager -%} + {{ g.page.pager(q=g.q) }} + {%- endblock -%} +{% endblock %} + +{% block secondary_content %} +
+
+

+  {{ _('View Out of Sync Packages') }} +

+
+
+

{{ _('View packages that are out of sync on the Portal and why they have not been synced.') }}

+
+
+{% endblock %} diff --git a/ckanext/canada/templates/admin/snippets/portal_sync_package_item.html b/ckanext/canada/templates/admin/snippets/portal_sync_package_item.html new file mode 100644 index 000000000..b55f41a54 --- /dev/null +++ b/ckanext/canada/templates/admin/snippets/portal_sync_package_item.html @@ -0,0 +1,24 @@ +{% extends 'snippets/package_item.html' %} + +{% block resources_outer %} + {{ super() }} +
+
+
+ +   + {{ _('Last Successful Sync:') }}  + {{ h.render_datetime(sync_info['last_successful_sync'], '%Y-%m-%d %H:%M:%S %Z') }} + +
+
+
+
+ +   + {{ _('Failure Reason:') }}  + {{ sync_info['error_on'] }} + +
+
+{% endblock %} diff --git a/ckanext/canada/view.py b/ckanext/canada/view.py index 1a376f949..7cd0b2af0 100644 --- a/ckanext/canada/view.py +++ b/ckanext/canada/view.py @@ -24,7 +24,8 @@ from ckan.lib.base import model from ckan.lib.helpers import ( date_str_to_datetime, - lang + lang, + Page, ) from ckan.views.dataset import ( @@ -1296,3 +1297,37 @@ def ckan_admin_config(): 404 this page always. """ return abort(404) + + +@canada_views.route('/ckan-admin/portal-sync', methods=['GET']) +def ckan_admin_portal_sync(): + """ + Lists any packages that are out of date with the Portal. + """ + try: + check_access('list_out_of_sync_packages', {'user': g.user}) + except NotAuthorized: + abort(403) + + page = h.get_page_number(request.args) + limit = 25 + extra_vars = {} + + out_of_sync_packages = get_action('list_out_of_sync_packages')({'user': g.user}, {'limit': limit}) + extra_vars['out_of_sync_packages'] = out_of_sync_packages + + #FIXME: figure out pager... + pager_url = h.url_for('canada.ckan_admin_portal_sync', page=page) + extra_vars['page'] = Page( + collection=out_of_sync_packages, + page=page, + url=pager_url, + item_count=len(out_of_sync_packages), + items_per_page=limit + ) + extra_vars['page'].items = out_of_sync_packages + + # TODO: remove in CKAN 2.11?? + setattr(g, 'page', extra_vars['page']) + + return render('admin/portal_sync.html', extra_vars=extra_vars)