Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

py upgrade+ bug fix #13

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 102 additions & 81 deletions sync-brcdb/reverse_sync.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#!/usr/bin/python
import logging
import os
import urllib
import urllib2
import urllib.parse
import urllib.request
import urllib.error
import json
import argparse
import subprocess
Expand Down Expand Up @@ -31,6 +32,10 @@
CONFIG_FILE = 'reverse_sync_{}.conf'.format(MODE)
LOG_FILE = ('reverse_sync_{}_debug.log' if DEBUG else 'reverse_sync_{}.log').format(MODE)
BASE_URL = 'https://{}/api/'.format('mybrc.brc.berkeley.edu' if MODE == MODE_MYBRC else 'mylrc.lbl.gov')
# BASE_URL = 'http://localhost:8880/api/'

ALLOCATION_ENDPOINT = BASE_URL + 'allocations/'
PROJECT_ENDPOINT = BASE_URL + 'projects/'

COMPUTE_RESOURCES_TABLE = {
MODE_MYBRC: {
Expand Down Expand Up @@ -68,20 +73,20 @@
print('starting run, using endpoint {0} ...'.format(BASE_URL))
logging.info('starting run, using endpoint {0} ...'.format(BASE_URL))


# Makes paginated requests to a given URL with optional parameters.
def paginate_requests(url, params=None):
request_url = url
params = params or {}

if params:
request_url = url + '?' + urllib.urlencode(params)
request_url = url + '?' + urllib.parse.urlencode(params)

try:
req = urllib2.Request(request_url)
req = urllib.request.Request(request_url)
req.add_header('Authorization', AUTH_TOKEN)
response = json.loads(urllib2.urlopen(req).read())
response = json.loads(urllib.request.urlopen(req).read())

except urllib2.URLError as e:
except urllib.error.URLError as e:
if DEBUG:
print('[paginate_requests({0}, {1})] failed: {2}'.format(url, params, e))
logging.error('[paginate_requests({0}, {1})] failed: {2}'.format(url, params, e))
Expand All @@ -95,10 +100,10 @@ def paginate_requests(url, params=None):
try:
current_page += 1
params['page'] = current_page
request_url = url + '?' + urllib.urlencode(params)
req = urllib2.Request(request_url)
request_url = url + '?' + urllib.parse.urlencode(params)
req = urllib.request.Request(request_url)
req.add_header('Authorization', AUTH_TOKEN)
response = json.loads(urllib2.urlopen(req).read())
response = json.loads(urllib.request.urlopen(req).read())

results.extend(response['results'])
if current_page % 5 == 0:
Expand All @@ -109,7 +114,7 @@ def paginate_requests(url, params=None):
logging.warning('too many pages to sync at once, rerun script after this run completes...')
break

except urllib2.URLError as e:
except urllib.error.URLError as e:
response['next'] = None

if DEBUG:
Expand All @@ -118,18 +123,19 @@ def paginate_requests(url, params=None):

return results


# Makes a single HTTP request to the given URL with optional query
# parameters and returns the results.
def single_request(url, params=None):
request_url = url
params = params or {}

if params:
request_url = url + '?' + urllib.urlencode(params)
request_url += '?' + urllib.parse.urlencode(params)

try:
request = urllib2.Request(request_url)
request = urllib.request.Request(request_url)
request.add_header('Authorization', AUTH_TOKEN)
response = json.loads(urllib2.urlopen(request).read())
response = json.loads(urllib.request.urlopen(request).read())
except Exception as e:
response = {'results': None}

Expand All @@ -139,13 +145,11 @@ def single_request(url, params=None):

return response['results']


# Retrieves allocation information for a given project name.
def get_project_allocation(project_name):
allocation_id_url = BASE_URL + 'allocations/'

header = project.split('_')[0]
header = project_name.split('_')[0]
compute_resources = COMPUTE_RESOURCES_TABLE[MODE].get(header, '{} Compute'.format(header.upper()))
response = single_request(allocation_id_url, {'project': project_name, 'resources': compute_resources})
response = single_request(ALLOCATION_ENDPOINT, {'project': project_name, 'resources': compute_resources})
if not response or len(response) == 0:
if DEBUG:
print('[get_project_allocation({0})] ERR'.format(project_name))
Expand All @@ -154,7 +158,7 @@ def get_project_allocation(project_name):
return None

allocation_id = response[0]['id']
allocation_url = allocation_id_url + '{0}/attributes/'.format(allocation_id)
allocation_url = ALLOCATION_ENDPOINT + '{0}/attributes/'.format(allocation_id)
response = single_request(allocation_url, {'type': 'Service Units'})
if not response:
return None
Expand All @@ -164,13 +168,13 @@ def get_project_allocation(project_name):

return allocation


# Retrieves the start date of a project.
def get_project_start(project_name):
allocations_url = BASE_URL + 'allocations/'

header = project.split('_')[0]
header = project_name.split('_')[0]
compute_resources = COMPUTE_RESOURCES_TABLE[MODE].get(header, '{} Compute'.format(header.upper()))
response = single_request(allocations_url, {'project': project_name, 'resources': compute_resources})
params = {'project': project_name, 'resources': compute_resources}

response = single_request(ALLOCATION_ENDPOINT, params)
if not response or len(response) == 0:
if DEBUG:
print('[get_project_start({0})] ERR'.format(project_name))
Expand All @@ -185,58 +189,75 @@ def get_project_start(project_name):
return creation if '.' not in creation else creation.split('.')[0]
# return '{0}T00:00:00'.format(creation)


print('gathering accounts from {}db...'.format(MODE))
logging.info('gathering data from {}db...'.format(MODE))

# NOTE(vir): ignore abc and vector for now
project_table = paginate_requests(BASE_URL + 'projects/')
project_table = filter(
lambda p: p['name'] != 'abc' and not p['name'].startswith('vector_'),
project_table)
for project in project_table:
project['allocation'] = get_project_allocation(project['name'])
project['start'] = get_project_start(project['name'])

# NOTE(vir): can use this to update fca.conf file
'''
lines = []
for project in project_table:
if ('allocation' not in project) or ('name' not in project) or (project['allocation'] == None):
print('[project: {}] ERR, could not set allocation (value={})'.format(project['name'], project['allocation']))
logging.error('[project: {}] ERR, could not set allocation (value={})'.format(project['name'], project['allocation']))
continue

lines.append('{}|{}|{}|Initial Allocation for {}'.format(
project['name'], project['start'], project['allocation'], project['name']))
'''

print('writing data to file (slurmdb commands)...')
logging.info('writing data to file (slurmdb commands)...')

commands = ''
for project in project_table:
if ('allocation' not in project) or ('name' not in project) or (project['allocation'] == None):
print('[project: {0}] ERR, could not set allocation (value={1})'.format(project['name'], project['allocation']))
logging.error('[project: {0}] ERR, could not set allocation (value={1})'.format(project['name'], project['allocation']))
continue

allocation_in_seconds = 60 * project['allocation']
command = 'yes | sacctmgr modify account {0} set GrpTRESMins="cpu={1}"'.format(project['name'], allocation_in_seconds)
commands += '\n' + command

# NOTE(vir): actually update data in SLURM
# out, _ = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True).communicate()
# print('updated account: {}, allocation set to: {}, with error: {}'.format(project['name'], project['allocation'], out))
# logging.info('updated account: {}, allocation set to: {}, with error: {}'.format(project['name'], project['allocation'], out))

# print('updated allocation limits for {} accounts, run complete, exiting...'.format(len(project_table)))
# logging.info('updated allocation limits for {} accounts, run complete, exiting...'.format(len(project_table)))

with open('reverse_sync_output_{}.sh'.format(MODE), 'w') as f:
f.writelines(commands)

print('run complete, wrote output to reverse_sync_output_{}.sh, exiting...'.format(MODE))
logging.info('run complete, wrote output to reverse_sync_output_{}.sh, exiting...'.format(MODE))

# sacctmgr modify account <account_name> set GrpTRESMins="cpu=xxxx"
def main():
print('gathering accounts from {}db...'.format(MODE))
logging.info('gathering data from {}db...'.format(MODE))

# NOTE(vir): ignore abc and vector for now
project_table = paginate_requests(PROJECT_ENDPOINT)
project_table = filter(
lambda p: p['name'] != 'abc' and not p['name'].startswith('vector_'),
project_table)


commands = ''
for project in project_table:
project['allocation'] = get_project_allocation(project['name'])
project['start'] = get_project_start(project['name'])

if ('allocation' not in project) or ('name' not in project) or (project['allocation'] == None):
print('[project: {0}] ERR, could not set allocation (value={1})'.format(project['name'], project['allocation']))
logging.error('[project: {0}] ERR, could not set allocation (value={1})'.format(project['name'], project['allocation']))
continue

allocation_in_seconds = 60 * project['allocation']
command = 'yes | sacctmgr modify account {0} set GrpTRESMins="cpu={1}"'.format(project['name'], allocation_in_seconds)
commands += '\n' + command

# NOTE(vir): can use this to update fca.conf file
'''
lines = []
for project in project_table:
if ('allocation' not in project) or ('name' not in project) or (project['allocation'] == None):
print('[project: {}] ERR, could not set allocation (value={})'.format(project['name'], project['allocation']))
logging.error('[project: {}] ERR, could not set allocation (value={})'.format(project['name'], project['allocation']))
continue

lines.append('{}|{}|{}|Initial Allocation for {}'.format(
project['name'], project['start'], project['allocation'], project['name']))
'''

print('writing data to file (slurmdb commands)...')
logging.info('writing data to file (slurmdb commands)...')

'''
commands = ''
for project in project_table:
if ('allocation' not in project) or ('name' not in project) or (project['allocation'] == None):
print('[project: {0}] ERR, could not set allocation (value={1})'.format(project['name'], project['allocation']))
logging.error('[project: {0}] ERR, could not set allocation (value={1})'.format(project['name'], project['allocation']))
continue

allocation_in_seconds = 60 * project['allocation']
command = 'yes | sacctmgr modify account {0} set GrpTRESMins="cpu={1}"'.format(project['name'], allocation_in_seconds)
commands += '\n' + command

# NOTE(vir): actually update data in SLURM
# out, _ = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True).communicate()
# print('updated account: {}, allocation set to: {}, with error: {}'.format(project['name'], project['allocation'], out))
# logging.info('updated account: {}, allocation set to: {}, with error: {}'.format(project['name'], project['allocation'], out))

'''
# print('updated allocation limits for {} accounts, run complete, exiting...'.format(len(project_table)))
# logging.info('updated allocation limits for {} accounts, run complete, exiting...'.format(len(project_table)))

with open('reverse_sync_output_{}.sh'.format(MODE), 'w') as f:
f.writelines(commands)

print('run complete, wrote output to reverse_sync_output_{}.sh, exiting...'.format(MODE))
logging.info('run complete, wrote output to reverse_sync_output_{}.sh, exiting...'.format(MODE))

# sacctmgr modify account <account_name> set GrpTRESMins="cpu=xxxx"

if __name__ == '__main__':
main()