diff --git a/pandaharvester/harvestercloud/k8s_startup_script.py b/pandaharvester/harvestercloud/k8s_startup_script.py index c9c14487..52fbeae3 100644 --- a/pandaharvester/harvestercloud/k8s_startup_script.py +++ b/pandaharvester/harvestercloud/k8s_startup_script.py @@ -1,13 +1,14 @@ #!/usr/bin/env python """ -This script will be executed at the VM startup time. -- It will download the proxy and panda queue from Google instance metadata +This script will be executed at container startup +- It will retrieve the proxy and panda queue from the environment - It will download the pilot wrapper from github and execute it -- It will upload the pilot logs to panda cache +- It will upload the pilot logs to panda cache at the end + +post-multipart code was taken from: https://github.com/haiwen/webapi-examples/blob/master/python/upload-file.py """ -import requests try: import subprocess32 as subprocess except Exception: @@ -15,30 +16,73 @@ import os import sys import logging -import time import traceback -from threading import Thread +import httplib +import mimetypes +import ssl +import urlparse logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', filename='/tmp/vm_script.log', filemode='w') -global loop -loop = True +def post_multipart(host, port, selector, files, proxy_cert): + """ + Post files to an http host as multipart/form-data. + files is a sequence of (name, filename, value) elements for data to be uploaded as files + Return the server's response page. + """ + content_type, body = encode_multipart_formdata(files) + + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + context.load_cert_chain(certfile=proxy_cert, keyfile=proxy_cert) + + h = httplib.HTTPSConnection(host, port, context=context, timeout=180) -def upload_logs(url, log_file_name, destination_name, proxy_path): - try: - # open and compress the content of the file - with open(log_file_name, 'rb') as log_file_object: - files = {'file': (destination_name, log_file_object.read())} + h.putrequest('POST', selector) + h.putheader('content-type', content_type) + h.putheader('content-length', str(len(body))) + h.endheaders() + h.send(body) + response = h.getresponse() + return response.status, response.reason - cert = [proxy_path, proxy_path] - # verify = '/etc/grid-security/certificates' # not supported in CernVM - requests.exceptions.SSLError: [Errno 21] Is a directory + +def encode_multipart_formdata(files): + """ + files is a sequence of (name, filename, value) elements for data to be uploaded as files + Return (content_type, body) ready for httplib.HTTP instance + """ + BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$' + CRLF = '\r\n' + L = [] + for (key, filename, value) in files: + L.append('--' + BOUNDARY) + L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)) + L.append('Content-Type: %s' % get_content_type(filename)) + L.append('') + L.append(value) + L.append('--' + BOUNDARY + '--') + L.append('') + body = CRLF.join(L) + content_type = 'multipart/form-data; boundary=%s' % BOUNDARY + return content_type, body + + +def get_content_type(filename): + return mimetypes.guess_type(filename)[0] or 'application/octet-stream' + + +def upload_logs(url, log_file_name, destination_name, proxy_cert): + try: + full_url = url + '/putFile' + urlparts = urlparse.urlsplit(full_url) logging.debug('[upload_logs] start') - res = requests.post(url + '/putFile', files=files, timeout=180, verify=False, cert=cert) - logging.debug('[upload_logs] finished with code={0} msg={1}'.format(res.status_code, res.text)) - if res.status_code == 200: + files = [('file', destination_name, open(log_file_name).read())] + status, reason = post_multipart(urlparts.hostname, urlparts.port, urlparts.path, files, proxy_cert) + logging.debug('[upload_logs] finished with code={0} msg={1}'.format(status, reason)) + if status == 200: return True except Exception: err_type, err_value = sys.exc_info()[:2] @@ -49,44 +93,10 @@ def upload_logs(url, log_file_name, destination_name, proxy_path): return False -def contact_harvester(harvester_frontend, data, auth_token, proxy_path): - try: - headers = {'Content-Type': 'application/json', - 'Authorization': 'Bearer {0}'.format(auth_token)} - cert = [proxy_path, proxy_path] - #verify = '/etc/grid-security/certificates' # not supported in CernVM - requests.exceptions.SSLError: [Errno 21] Is a directory - verify = False - resp = requests.post(harvester_frontend, json=data, headers=headers, cert=cert, verify=verify) - logging.debug('[contact_harvester] harvester returned: {0}'.format(resp.text)) - except Exception as e: - # message could not be sent - logging.debug('[contact_harvester] failed to send message to harvester: {0}'.format(e)) - pass - - -def heartbeat(harvester_frontend, worker_id, auth_token, proxy_path): - data = {'methodName': 'heartbeat', 'workerID': worker_id, 'data': None} - logging.debug('[heartbeat] sending heartbeat to harvester: {0}'.format(data)) - return contact_harvester(harvester_frontend, data, auth_token, proxy_path) - - -def suicide(harvester_frontend, worker_id, auth_token, proxy_path): - data = {'methodName': 'killWorker', 'workerID': worker_id, 'data': None} - logging.debug('[suicide] sending suicide message to harvester: {0}'.format(data)) - return contact_harvester(harvester_frontend, data, auth_token, proxy_path) - - -def heartbeat_loop(harvester_frontend, worker_id, auth_token, proxy_path): - while loop: - heartbeat(harvester_frontend, worker_id, auth_token, proxy_path) - time.sleep(300) - - def get_url(url, headers=None): """ get content from specified URL """ - reply = requests.get(url, headers=headers) if reply.status_code != 200: logging.debug('[get_attribute] Failed to open {0}'.format(url)) @@ -126,20 +136,10 @@ def get_configuration(): resource_type = os.environ.get('resourceType') logging.debug('[main] got resource type: {0}'.format(resource_type)) - # get the harvester frontend URL, where we'll send heartbeats - # harvester_frontend_url = METADATA_URL.format("harvester_frontend") - harvester_frontend = None - # logging.debug('[main] got harvester frontend: {0}'.format(harvester_frontend)) - # get the worker id worker_id = os.environ.get('workerID') logging.debug('[main] got worker id: {0}'.format(worker_id)) - # get the authentication token - # auth_token_url = METADATA_URL.format("auth_token") - auth_token = None - # logging.debug('[main] got authentication token') - # get the URL (e.g. panda cache) to upload logs logs_frontend_w = os.environ.get('logs_frontend_w') logging.debug('[main] got url to upload logs') @@ -148,17 +148,13 @@ def get_configuration(): logs_frontend_r = os.environ.get('logs_frontend_r') logging.debug('[main] got url to download logs') - return proxy_path, panda_site, panda_queue, resource_type, harvester_frontend, worker_id, auth_token, logs_frontend_w, logs_frontend_r + return proxy_path, panda_site, panda_queue, resource_type, worker_id, logs_frontend_w, logs_frontend_r if __name__ == "__main__": # get all the configuration from the GCE metadata server - proxy_path, panda_site, panda_queue, resource_type, harvester_frontend, worker_id, auth_token, logs_frontend_w, logs_frontend_r = get_configuration() - - # start a separate thread that will send a heartbeat to harvester every 5 minutes - # heartbeat_thread = Thread(target=heartbeat_loop, args=(harvester_frontend, worker_id, auth_token, proxy_path)) - # heartbeat_thread.start() + proxy_path, panda_site, panda_queue, resource_type, worker_id, logs_frontend_w, logs_frontend_r = get_configuration() # the pilot should propagate the download link via the pilotId field in the job table destination_name = '{0}.out'.format(worker_id) @@ -190,9 +186,4 @@ def get_configuration(): logging.debug('[main] pilot wrapper done...') # upload logs to e.g. panda cache or similar - upload_logs(logs_frontend_w, '/tmp/wrapper-wid.log', destination_name, proxy_path) - - # ask harvester to kill the VM and stop the heartbeat - # suicide(harvester_frontend, worker_id, auth_token, proxy_path) - loop = False - # heartbeat_thread.join() + upload_logs(logs_frontend_w, '/tmp/wrapper-wid.log', destination_name, proxy_path) \ No newline at end of file