Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Dataset import lock #20

Open
wants to merge 29 commits into
base: 7.x-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0c045f8
add support for organizations
ziodave May 3, 2015
ae106a4
add support for organizations in datasets
ziodave May 3, 2015
f102197
add support for HTTP Parallel requests
ziodave May 4, 2015
d30ac37
ignore IDEA project files
ziodave May 7, 2015
92dc66e
remove customizations from this files
ziodave May 7, 2015
b92851d
remove httprl calls
ziodave May 7, 2015
54f3fd8
remove httprl dependency
ziodave May 7, 2015
edadb58
require the httprl library for parallel HTTP reqs
ziodave May 7, 2015
d7387a4
add support for httprl
ziodave May 7, 2015
b0a9ea2
add support for multi-threaded drush (using the *limit* parameter; th…
ziodave May 7, 2015
b62ee29
add support for multi-threaded drush (using the *limit* parameter; th…
ziodave May 7, 2015
f28b7b7
move the list of fields to a separate function (like the other classe…
ziodave May 7, 2015
cb9f8f8
remove httprl dependency to maintain compatibility with environments …
ziodave May 8, 2015
3d97ee8
when uninstalling, deregister the organization migration; use httprl …
ziodave May 8, 2015
a783425
fix comments
ziodave May 8, 2015
a65c5d7
fix line removed setting extra value; only set extra key/value if val…
ziodave May 12, 2015
54ca79a
when a URL fails put it back to the URLs array
ziodave May 21, 2015
3db7f43
add retries to organization and dataset import
ziodave May 21, 2015
39ebc17
initial support for a filter on IDs
ziodave May 27, 2015
c15bec2
add dkan_migrate_base_resources.inc among the files
ziodave May 28, 2015
3365175
add supporting classes to enable migration of resources while importi…
ziodave May 28, 2015
1e06be9
require the new dkan_migrate_base_resources.inc file
ziodave May 28, 2015
d18d31e
removing excessive log messages
ziodave May 28, 2015
9f35728
return if the resources parameter hasn't been provided
ziodave May 28, 2015
eb74391
add support for a *field_migrate_lock* boolean field which tells the …
ziodave May 29, 2015
43591ef
prepareRow shall always return a value
ziodave May 29, 2015
0aa8b9d
add support for custom class names
ziodave May 31, 2015
bd83d15
use the dataset name as part of the new migrate task name
ziodave Jun 3, 2015
f6285ab
use the UUID of the dataset
ziodave Jun 3, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea/
2 changes: 2 additions & 0 deletions dkan_migrate_base.info
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ dependencies[] = list
dependencies[] = number
files[] = dkan_migrate_base.migrate.inc
files[] = dkan_migrate_base_group.inc
files[] = dkan_migrate_base_organization.inc
files[] = dkan_migrate_base_dataset.inc
files[] = dkan_migrate_base_resource.inc
files[] = dkan_migrate_base_resources.inc
192 changes: 139 additions & 53 deletions dkan_migrate_base.migrate.inc
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,42 @@
function dkan_migrate_base_migrate_api() {
$api = array(
// Migrate API, not CKAN's of course.
'api' => 2,
'groups' => array(
'api' => 2,
'groups' => array(
'dkan' => array(
'title' => t('DKAN'),
),
),
'migrations' => array(
'ckan_dataset_base' => array(
'ckan_dataset_base' => array(
'class_name' => 'MigrateCkanDatasetBase',
'group_name' => 'dkan',
'title' => t('CKAN Dataset Base'),
'title' => t('CKAN Dataset Base'),
),
'ckan_group_base' => array(
'ckan_group_base' => array(
'class_name' => 'MigrateCkanGroupBase',
'group_name' => 'dkan',
'title' => t('CKAN Group Base'),
'title' => t('CKAN Group Base'),
),
'ckan_resource_base' => array(
'class_name' => 'MigrateCkanResourceBase',
'group_name' => 'dkan',
'title' => t('CKAN Resource Base'),
'title' => t('CKAN Resource Base'),
),
),
);

if (FALSE === node_type_get_type('organization')) {
drupal_set_message(t("Organization content type not detected. MigrateCkanOrganizationBase migration not registered."));
}
else {
$api['migrations']['ckan_organization_base'] = array(
'class_name' => 'MigrateCkanOrganizationBase',
'group_name' => 'dkan',
'title' => t('CKAN Organization Base'),
);
}

return $api;
}

Expand All @@ -45,27 +57,30 @@ class CKANListJSON extends MigrateListJSON {
public function __construct($list_url, $http_options = array()) {
parent::__construct($list_url);
$this->httpOptions = $http_options;
$this->page = isset($http_options['page']) ? $http_options['page'] : '';
$this->offset = isset($http_options['offset']) ? $http_options['offset'] : '';

// WAS: How many elements to retrieve. This parameter has been commented out, I couldn't find any documentation nor any reference of it in the code.
// $this->page = isset($http_options['page']) ? $http_options['page'] : '';

// The starting offset (by default start from the first element).
$this->offset = isset($http_options['offset']) && is_numeric($http_options['offset']) ? $http_options['offset'] : 0;

// In order to support mtm (multi-threaded migrate) we need to get the *limit* parameter (NULL equals no limit).
$this->limit = isset($http_options['limit']) && is_numeric($http_options['limit']) ? $http_options['limit'] : NULL;

// A regex filter to select the dataset(s) to import.
$this->filter = !empty($http_options['filter']) ? $http_options['filter'] : NULL;
}

/**
* The default implementation assumes the IDs are top-level array elements.
*/
protected function getIDsFromJSON(array $data) {
$ids = array();
$datasets = 0;
$total = $this->page + $this->offset;
foreach ($data['result'] as $item) {
if ($datasets < $this->offset) {
$datasets++;
continue;
}
$ids[] = $item;
$datasets++;
if ($total && $datasets >= $total) {
break;
}
}

// Get the portion of results within the specified boundaries (starting from *offset*).
$ids = array_slice($data['result'], $this->offset, $this->limit);

$this->log("returning " . sizeof($ids) . " item(s) [ offset :: {$this->offset} ][ limit :: " . (isset($this->limit) ? $this->limit : 'not set') . " ]");

return $ids;
}

Expand All @@ -83,20 +98,36 @@ class CKANListJSON extends MigrateListJSON {
}
else {
$response = drupal_http_request($this->listUrl, $this->httpOptions);
$json = $response->data;
$json = $response->data;
}

migrate_instrument_stop("Retrieve $this->listUrl");
if ($json) {
$data = drupal_json_decode($json);
if ($data) {

// If a filter has been specified, then apply it.
if (!empty($this->filter)) {
$data = $this->applyFilter($data);
}

return $this->getIDsFromJSON($data);
}
}
Migration::displayMessage(t('Loading of !listurl failed:',
array('!listurl' => $this->listUrl)));
Migration::displayMessage(t('Loading of !listurl failed:', array('!listurl' => $this->listUrl)));
return NULL;
}

protected function applyFilter($data) {

// Replace the results with the filtered results.
$data['result'] = array_filter($data['result'], function ($item) {
return (1 === preg_match("|$this->filter|", $item));
});

return $data;
}

/**
* Implements computeCount().
*/
Expand All @@ -107,20 +138,32 @@ class CKANListJSON extends MigrateListJSON {
}
else {
$response = drupal_http_request($this->listUrl, $this->httpOptions);
$json = $response->data;
$json = $response->data;
}
if ($json) {
$data = drupal_json_decode($json);
if ($data) {
$count = count($data['result']);
}
}
// Only return page number if that many actually exist.
if ($count > $this->page) {
$count = $this->page;

// If a limit has been set and the count is larger then that, we return the limit.
if (isset($this->limit) && $count > $this->limit) {
$count = $this->limit;
}
return $count;
}

/**
* Log a message.
*
* @param $message
*/
private function log($message) {

Migration::displayMessage($message);

}
}

class CKANItemJSON extends MigrateItemJSON {
Expand All @@ -135,6 +178,7 @@ class CKANItemJSON extends MigrateItemJSON {
}
return $ids;
}

/**
* Parses for 'results' instead of base.
*/
Expand All @@ -145,7 +189,7 @@ class CKANItemJSON extends MigrateItemJSON {
}
else {
$response = drupal_http_request($this->listUrl, $this->httpOptions);
$json = $response->data;
$json = $response->data;
}
if ($json) {
$data = drupal_json_decode($json);
Expand All @@ -169,8 +213,9 @@ class CKANItemJSON extends MigrateItemJSON {
}
if ($json && isset($json->error) && $json->error->message == 'Access denied') {
$migration = Migration::currentMigration();
$message = t('Access denied for !objecturl', array('!objecturl' => $item_url));
$migration->getMap()->saveMessage(array($id), $message, MigrationBase::MESSAGE_ERROR);
$message = t('Access denied for !objecturl', array('!objecturl' => $item_url));
$migration->getMap()
->saveMessage(array($id), $message, MigrationBase::MESSAGE_ERROR);
$result = $this->emptyItem($id);
return $result;
}
Expand All @@ -180,19 +225,44 @@ class CKANItemJSON extends MigrateItemJSON {
return $json->result;
}
$migration = Migration::currentMigration();
$message = t('Loading of !objecturl failed:', array('!objecturl' => $item_url));
$migration->getMap()->saveMessage(array($id), $message, MigrationBase::MESSAGE_ERROR);
$message = t('Loading of !objecturl failed:', array('!objecturl' => $item_url));
$migration->getMap()
->saveMessage(array($id), $message, MigrationBase::MESSAGE_ERROR);
return new stdClass();
}

/**
* Override the superclass loadJSONUrl in order to use the standard {@link drupal_http_request}
* and retry if the remote system returns an error. Current settings are to
* retry every 5 secs. for a total of 5 times top.
*
* @param string $item_url The item URL.
*
* @return mixed The object instance.
*/
public function loadJSONUrl($item_url) {

$response = drupal_http_request($item_url, empty($this->httpOptions) ? array() : $this->httpOptions);

$retries = 0;
while (200 !== (int) $response->code && 5 > $retries++) {
sleep(5);
echo("Retrying [ url :: $item_url ][ response code :: $response->code ][ retries :: $retries ]\n");
$response = drupal_http_request($item_url, empty($this->httpOptions) ? array() : $this->httpOptions);
}

$json = $response->data;
return json_decode($json);
}

/**
* Creates a stub entry.
*/
public function emptyItem($id) {
$result = new stdClass();
$result->id = substr($id, 0, 35);
$result = new stdClass();
$result->id = substr($id, 0, 35);
$result->title = t('Access denied for %id', array('%id' => $id));
$result->name = $id;
$result->name = $id;
return $result;
}
}
Expand All @@ -210,10 +280,10 @@ abstract class MigrateDKAN extends Migration {
return $term;
}
else {
$new_term = new stdClass();
$new_term = new stdClass();
$new_term->name = $name;
$new_term->vid = $vid;
$term = taxonomy_term_save($new_term);
$new_term->vid = $vid;
$term = taxonomy_term_save($new_term);
return $term;
}
}
Expand All @@ -234,9 +304,12 @@ abstract class MigrateDKAN extends Migration {
* Gets Group Nid by title if exists.
*/
public function getGroupNidByTitle($title) {
$type = 'group';
$result = db_query("SELECT n.nid FROM {node} n WHERE n.title = :title AND n.type = :type", array(":title"=> $title, ":type"=> $type));
$nid = $result->fetchField();
$type = 'group';
$result = db_query("SELECT n.nid FROM {node} n WHERE n.title = :title AND n.type = :type", array(
":title" => $title,
":type" => $type
));
$nid = $result->fetchField();
if ($nid) {
return $nid;
}
Expand All @@ -263,6 +336,15 @@ abstract class MigrateDKAN extends Migration {
}
}

/**
* Looks up nid for organization.
*/
public function getOrganizationId($uuid) {
if ($nid = entity_get_id_by_uuid('node', array($uuid))) {
return $nid[$uuid];
}
}

/**
* Looks up user if they exist, if not creates them.
*
Expand All @@ -282,9 +364,9 @@ abstract class MigrateDKAN extends Migration {
'uuid' => $creator_user_id,
);
// Get User name from CKAN API.
$response = drupal_http_request($this->endpoint . 'user_show?id=' . $creator_user_id);
$json = $response->data;
$data = drupal_json_decode($json);
$response = drupal_http_request($this->endpoint . 'user_show?id=' . $creator_user_id);
$json = $response->data;
$data = drupal_json_decode($json);
if ($name = $data['result']['name']) {
$current_uid = db_query("SELECT uid from {users} WHERE name = :name", array(":name" => $name))->fetchField();
if ($current_uid) {
Expand All @@ -293,7 +375,7 @@ abstract class MigrateDKAN extends Migration {
}
else {
$new_user['name'] = $name;
$account = user_save(NULL, $new_user);
$account = user_save(NULL, $new_user);
return $account->uid;
}
}
Expand All @@ -316,17 +398,17 @@ abstract class MigrateDKAN extends Migration {
*/
public function downloadExternalFile($url, $uri, $save_mode = FILE_EXISTS_RENAME, $manage_file = TRUE) {

$url_info = parse_url($url);
$url_info = parse_url($url);
$url_path_info = pathinfo($url_info['path']);

// This helps with filenames with spaces.
$url = $url_info['scheme'] . '://' . $url_info['host'] . $url_path_info['dirname'] . '/' . rawurlencode($url_path_info['basename']);
$url = $url_info['scheme'] . '://' . $url_info['host'] . $url_path_info['dirname'] . '/' . rawurlencode($url_path_info['basename']);

// Need to remove the filename from the uri.
$uri_target = file_uri_target($uri);
$uri_scheme = file_uri_scheme($uri);
$uri_target = file_uri_target($uri);
$uri_scheme = file_uri_scheme($uri);
$uri_path_info = pathinfo($uri_target);
$directory = file_stream_wrapper_uri_normalize($uri_scheme . "://" . $uri_path_info['dirname']);
$directory = file_stream_wrapper_uri_normalize($uri_scheme . "://" . $uri_path_info['dirname']);

if (file_prepare_directory($directory, FILE_CREATE_DIRECTORY)) {
$drupal_result = drupal_http_request($url);
Expand Down Expand Up @@ -359,6 +441,10 @@ abstract class MigrateCkanBase extends MigrateDKAN {
*/
public function __construct($arguments) {
$this->endpoint = isset($arguments['endpoint']) ? $arguments['endpoint'] : 'http://demo.getdkan.com/api/3/action/';

// Set the regex filter if provided.
$this->filter = isset($arguments['filter']) ? $arguments['filter'] : '';

parent::__construct($arguments);
}
}
Loading