Skip to content

Commit

Permalink
k8s: replaced requests call with httplib code in k8s_startup_script
Browse files Browse the repository at this point in the history
  • Loading branch information
fbarreir committed Nov 20, 2019
1 parent 3637dad commit 7374893
Showing 1 changed file with 65 additions and 74 deletions.
139 changes: 65 additions & 74 deletions pandaharvester/harvestercloud/k8s_startup_script.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,88 @@
#!/usr/bin/env python

"""
This script will be executed at the VM startup time.
- It will download the proxy and panda queue from Google instance metadata
This script will be executed at container startup
- It will retrieve the proxy and panda queue from the environment
- It will download the pilot wrapper from github and execute it
- It will upload the pilot logs to panda cache
- It will upload the pilot logs to panda cache at the end
post-multipart code was taken from: https://github.com/haiwen/webapi-examples/blob/master/python/upload-file.py
"""

import requests
try:
import subprocess32 as subprocess
except Exception:
import subprocess
import os
import sys
import logging
import time
import traceback
from threading import Thread
import httplib
import mimetypes
import ssl
import urlparse

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s',
filename='/tmp/vm_script.log', filemode='w')

global loop
loop = True

def post_multipart(host, port, selector, files, proxy_cert):
"""
Post files to an http host as multipart/form-data.
files is a sequence of (name, filename, value) elements for data to be uploaded as files
Return the server's response page.
"""
content_type, body = encode_multipart_formdata(files)

context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=proxy_cert, keyfile=proxy_cert)

h = httplib.HTTPSConnection(host, port, context=context, timeout=180)

def upload_logs(url, log_file_name, destination_name, proxy_path):
try:
# open and compress the content of the file
with open(log_file_name, 'rb') as log_file_object:
files = {'file': (destination_name, log_file_object.read())}
h.putrequest('POST', selector)
h.putheader('content-type', content_type)
h.putheader('content-length', str(len(body)))
h.endheaders()
h.send(body)
response = h.getresponse()
return response.status, response.reason

cert = [proxy_path, proxy_path]
# verify = '/etc/grid-security/certificates' # not supported in CernVM - requests.exceptions.SSLError: [Errno 21] Is a directory

def encode_multipart_formdata(files):
"""
files is a sequence of (name, filename, value) elements for data to be uploaded as files
Return (content_type, body) ready for httplib.HTTP instance
"""
BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$'
CRLF = '\r\n'
L = []
for (key, filename, value) in files:
L.append('--' + BOUNDARY)
L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
L.append('Content-Type: %s' % get_content_type(filename))
L.append('')
L.append(value)
L.append('--' + BOUNDARY + '--')
L.append('')
body = CRLF.join(L)
content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
return content_type, body


def get_content_type(filename):
return mimetypes.guess_type(filename)[0] or 'application/octet-stream'


def upload_logs(url, log_file_name, destination_name, proxy_cert):
try:
full_url = url + '/putFile'
urlparts = urlparse.urlsplit(full_url)

logging.debug('[upload_logs] start')
res = requests.post(url + '/putFile', files=files, timeout=180, verify=False, cert=cert)
logging.debug('[upload_logs] finished with code={0} msg={1}'.format(res.status_code, res.text))
if res.status_code == 200:
files = [('file', destination_name, open(log_file_name).read())]
status, reason = post_multipart(urlparts.hostname, urlparts.port, urlparts.path, files, proxy_cert)
logging.debug('[upload_logs] finished with code={0} msg={1}'.format(status, reason))
if status == 200:
return True
except Exception:
err_type, err_value = sys.exc_info()[:2]
Expand All @@ -49,44 +93,10 @@ def upload_logs(url, log_file_name, destination_name, proxy_path):
return False


def contact_harvester(harvester_frontend, data, auth_token, proxy_path):
try:
headers = {'Content-Type': 'application/json',
'Authorization': 'Bearer {0}'.format(auth_token)}
cert = [proxy_path, proxy_path]
#verify = '/etc/grid-security/certificates' # not supported in CernVM - requests.exceptions.SSLError: [Errno 21] Is a directory
verify = False
resp = requests.post(harvester_frontend, json=data, headers=headers, cert=cert, verify=verify)
logging.debug('[contact_harvester] harvester returned: {0}'.format(resp.text))
except Exception as e:
# message could not be sent
logging.debug('[contact_harvester] failed to send message to harvester: {0}'.format(e))
pass


def heartbeat(harvester_frontend, worker_id, auth_token, proxy_path):
data = {'methodName': 'heartbeat', 'workerID': worker_id, 'data': None}
logging.debug('[heartbeat] sending heartbeat to harvester: {0}'.format(data))
return contact_harvester(harvester_frontend, data, auth_token, proxy_path)


def suicide(harvester_frontend, worker_id, auth_token, proxy_path):
data = {'methodName': 'killWorker', 'workerID': worker_id, 'data': None}
logging.debug('[suicide] sending suicide message to harvester: {0}'.format(data))
return contact_harvester(harvester_frontend, data, auth_token, proxy_path)


def heartbeat_loop(harvester_frontend, worker_id, auth_token, proxy_path):
while loop:
heartbeat(harvester_frontend, worker_id, auth_token, proxy_path)
time.sleep(300)


def get_url(url, headers=None):
"""
get content from specified URL
"""

reply = requests.get(url, headers=headers)
if reply.status_code != 200:
logging.debug('[get_attribute] Failed to open {0}'.format(url))
Expand Down Expand Up @@ -126,20 +136,10 @@ def get_configuration():
resource_type = os.environ.get('resourceType')
logging.debug('[main] got resource type: {0}'.format(resource_type))

# get the harvester frontend URL, where we'll send heartbeats
# harvester_frontend_url = METADATA_URL.format("harvester_frontend")
harvester_frontend = None
# logging.debug('[main] got harvester frontend: {0}'.format(harvester_frontend))

# get the worker id
worker_id = os.environ.get('workerID')
logging.debug('[main] got worker id: {0}'.format(worker_id))

# get the authentication token
# auth_token_url = METADATA_URL.format("auth_token")
auth_token = None
# logging.debug('[main] got authentication token')

# get the URL (e.g. panda cache) to upload logs
logs_frontend_w = os.environ.get('logs_frontend_w')
logging.debug('[main] got url to upload logs')
Expand All @@ -148,17 +148,13 @@ def get_configuration():
logs_frontend_r = os.environ.get('logs_frontend_r')
logging.debug('[main] got url to download logs')

return proxy_path, panda_site, panda_queue, resource_type, harvester_frontend, worker_id, auth_token, logs_frontend_w, logs_frontend_r
return proxy_path, panda_site, panda_queue, resource_type, worker_id, logs_frontend_w, logs_frontend_r


if __name__ == "__main__":

# get all the configuration from the GCE metadata server
proxy_path, panda_site, panda_queue, resource_type, harvester_frontend, worker_id, auth_token, logs_frontend_w, logs_frontend_r = get_configuration()

# start a separate thread that will send a heartbeat to harvester every 5 minutes
# heartbeat_thread = Thread(target=heartbeat_loop, args=(harvester_frontend, worker_id, auth_token, proxy_path))
# heartbeat_thread.start()
proxy_path, panda_site, panda_queue, resource_type, worker_id, logs_frontend_w, logs_frontend_r = get_configuration()

# the pilot should propagate the download link via the pilotId field in the job table
destination_name = '{0}.out'.format(worker_id)
Expand Down Expand Up @@ -190,9 +186,4 @@ def get_configuration():
logging.debug('[main] pilot wrapper done...')

# upload logs to e.g. panda cache or similar
upload_logs(logs_frontend_w, '/tmp/wrapper-wid.log', destination_name, proxy_path)

# ask harvester to kill the VM and stop the heartbeat
# suicide(harvester_frontend, worker_id, auth_token, proxy_path)
loop = False
# heartbeat_thread.join()
upload_logs(logs_frontend_w, '/tmp/wrapper-wid.log', destination_name, proxy_path)

0 comments on commit 7374893

Please sign in to comment.