-
Notifications
You must be signed in to change notification settings - Fork 0
/
de.py
243 lines (179 loc) · 8.33 KB
/
de.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""
Used to manage communication with the CyVerse Discovery Environment (DE).
"""
import json
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
def get_de_api_key(username, password):
"""
Obtain an API key from the Discovery Environment (DE) using the username and password.
This function sends a GET request to the DE token endpoint with HTTP basic authentication
to retrieve an access token. The token can then be used to authorize calls to other DE API endpoints.
Args:
username (str): The CyVerse username.
password (str): The CyVerse password.
Returns:
str: The access token for DE API.
"""
url = 'https://de.cyverse.org/terrain/token/keycloak'
response = requests.get(url, auth=HTTPBasicAuth(username, password), timeout=10)
if response.status_code == 200:
token_data = response.json()
return token_data['access_token']
print(f"Error obtaining API key: {response.status_code} - {response.text}")
return None
# Base URL for the Discovery Environment API
BASE_URL = 'https://de.cyverse.org/terrain'
# Get the API using default login credentials
api_key = 'Bearer ' + get_de_api_key('tanmaytest', 'password123')
# Headers for the requests
default_headers = {
'Authorization': api_key
}
def pretty_print(json_data):
"""
Format and print JSON data in a readable way.
This function takes JSON data and prints it in a formatted, easy-to-read manner.
Useful for debugging and ensuring the correct data structure is being used.
Args:
json_data (dict): JSON data to be pretty-printed.
"""
print(json.dumps(json_data, indent=4, sort_keys=True))
def convert_to_date(milliseconds):
"""
Convert milliseconds since epoch to a human-readable date and time.
This function converts a timestamp in milliseconds to a human-readable date and time string.
This is used to convert the milliseconds since epoch of the date created
and date updated fields to a more readable format
Args:
milliseconds (int): Milliseconds since epoch.
Returns:
str: Human-readable date and time.
"""
seconds = milliseconds / 1000 # Convert milliseconds to seconds
date_obj = datetime.fromtimestamp(seconds) # Create a datetime object from the timestamp
date_str = date_obj.strftime('%Y-%m-%d %H:%M:%S') # Format the datetime object as a string
return date_str
def get_metadata(data_id, headers=default_headers):
"""
Get metadata for a specific data ID.
This function retrieves metadata for a specified dataset by its ID.
It sends a GET request to the Discovery Environment API.
Args:
data_id (str): The ID of the data item.
Returns:
dict: The metadata for the specified data ID.
"""
url = f'{BASE_URL}/filesystem/{data_id}/metadata' # Construct the API URL for the metadata endpoint
response = requests.get(url, headers=headers, timeout=10) # Send a GET request to the API
if response.status_code == 200:
metadata = response.json() # Parse the JSON response
return metadata
# Print error message if the request fails
print(f"Error getting metadata: {response.status_code} - {response.text}")
return None
def get_all_metadata_dataset(dataset):
"""
Get all metadata for a dataset.
This function collects all metadata for a given dataset, including creation and modification dates,
and detailed attributes from the Discovery Environment API. Used to migrate the datasets and their metadata
to CKAN.
Args:
dataset (dict): The dataset dictionary.
Returns:
dict: A dictionary containing all metadata for the dataset.
"""
metadata_dict = {}
# Convert and store creation and modification dates
date_created = convert_to_date(int(dataset['date-created'])) # Convert creation date to readable format
metadata_dict['date_created'] = date_created
date_modified = convert_to_date(int(dataset['date-modified'])) # Convert modification date to readable format
metadata_dict['date_modified'] = date_modified
metadata_dict['de_path'] = dataset['path'] # Store the dataset path
dataset_id = dataset['id'] # Get the dataset ID
# Get detailed metadata from the API
metadata_return = get_metadata(dataset_id)
avus = metadata_return['avus'] # Get attribute-value units (AVUs)
# Loop through each AVU and add it to the metadata dictionary
for avu in avus:
key = avu['attr']
value = avu['value']
if key in metadata_dict:
try:
metadata_dict[key].append(value)
except AttributeError:
metadata_dict[key] = [metadata_dict[key], value]
else:
metadata_dict[key] = value
return metadata_dict
def get_all_metadata_file(file):
"""
Get metadata for a specific file.
This function collects all metadata for a given file, including creation and modification dates,
file type, and WebDAV location. Used for migrating files to CKAN.
Args:
file (dict): The file dictionary.
Returns:
dict: A dictionary containing all metadata for the file.
"""
metadata_dict = {}
# Convert and store creation and modification dates
date_created = convert_to_date(int(file['date-created'])) # Convert creation date to readable format
metadata_dict['date_created'] = date_created
date_modified = convert_to_date(int(file['date-modified'])) # Convert modification date to readable format
metadata_dict['date_modified'] = date_modified
metadata_dict['de_path'] = file['path'] # Store the file path
file_name = file['label'] # Get the file name
metadata_dict['file_name'] = file_name
# Get the file type from the label
file_type = file_name.split('.')[-1]
if file_type == file_name:
file_type = ''
metadata_dict['file_type'] = file_type
# Construct the WebDAV location URL
web_dav_location = "https://data.cyverse.org/dav-anon" + file['path']
metadata_dict['web_dav_location'] = web_dav_location
return metadata_dict
def get_files(path, limit=10, headers=default_headers):
"""
Get the list of files in a specified directory.
This function retrieves a list of files in a specified directory from the Discovery Environment API.
Useful for migrating files from a directory to CKAN.
Args:
path (str): The path to the directory.
limit (int): The maximum number of files to retrieve.
Returns:
dict: A dictionary containing the list of files.
"""
url = f'{BASE_URL}/secured/filesystem/paged-directory' # Construct the API URL for the directory endpoint
params = {'limit': limit, 'path': path} # Set the request parameters
response = requests.get(url, headers=headers, params=params, timeout=10) # Send a GET request to the API
if response.status_code == 200:
files = response.json() # Parse the JSON response
return files
# Print error message if the request fails
print(f"Error getting files: {response.status_code} - {response.text}")
return None
def get_datasets(path='/iplant/home/shared/commons_repo/curated/', headers=default_headers):
"""
Get a list of all datasets with some of their metadata.
The rest of the metadata can be retrieved using the get_metadata function.
This function retrieves a list of all datasets in a specified path from the Discovery Environment API.
Used in conjunction with get_all_metadata_dataset function to migrate datasets and their metadata
to CKAN.
Args:
path (str): The path to the directory containing the datasets.
Returns:
list: A list of dictionaries, each representing a dataset with its metadata.
"""
url = f'{BASE_URL}/secured/filesystem/directory' # Construct the API URL for the directory endpoint
params = {'path': path} # Set the request parameters
response = requests.get(url, headers=headers, params=params, timeout=10) # Send a GET request to the API
if response.status_code == 200:
directories = response.json() # Parse the JSON response
datasets = directories['folders'] # Extract the list of datasets
return datasets
# Print error message if the request fails
print(f"Error getting directories: {response.status_code} - {response.text}")
return None