From 443c94b2d840db52c2d77f9b1f2636bcf4ae3d0f Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sat, 26 Sep 2020 19:12:15 -0500 Subject: [PATCH 1/8] fix file type check for python 3 --- pilot/eventservice/esprocess/esprocess.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index 78e4a7607..765356094 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -7,6 +7,7 @@ # - Wen Guan, wen.guan@cern.ch, 2017-2018 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019 +import io import json import logging import os @@ -29,6 +30,12 @@ logger = logging.getLogger(__name__) +try: + file_type = file +except NameError: + file_type = io.IOBase + + """ Main process to handle event service. It makes use of two hooks get_event_ranges_hook and handle_out_message_hook to communicate with other processes when @@ -161,7 +168,7 @@ def init_payload_process(self): # noqa: C901 executable = 'cd %s; %s' % (workdir, executable) if 'output_file' in self.__payload: - if type(self.__payload['output_file']) in [file]: + if isinstance(self.__payload['output_file'], file_type): output_file_fd = self.__payload['output_file'] else: if '/' in self.__payload['output_file']: @@ -174,7 +181,7 @@ def init_payload_process(self): # noqa: C901 output_file_fd = open(output_file, 'w') if 'error_file' in self.__payload: - if type(self.__payload['error_file']) in [file]: + if isinstance(self.__payload['error_file'], file_type): error_file_fd = self.__payload['error_file'] else: if '/' in self.__payload['error_file']: From b163a9b6bf1125212dda3e6bb57188022413cbf0 Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 27 Sep 2020 06:53:47 -0500 Subject: [PATCH 2/8] fix python3 threading _stop and change isSet() to is_set() --- .../communicationmanager/communicationmanager.py | 2 +- pilot/eventservice/esprocess/esmessage.py | 6 +++--- pilot/eventservice/workexecutor/plugins/baseexecutor.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pilot/eventservice/communicationmanager/communicationmanager.py b/pilot/eventservice/communicationmanager/communicationmanager.py index 9f3ba599b..2d1053cff 100644 --- a/pilot/eventservice/communicationmanager/communicationmanager.py +++ b/pilot/eventservice/communicationmanager/communicationmanager.py @@ -159,7 +159,7 @@ def is_stop(self): :returns: True if the stop signal is set, otherwise False """ - return self.stop_event.isSet() + return self.stop_event.is_set() def get_jobs(self, njobs=1, post_hook=None, args=None): """ diff --git a/pilot/eventservice/esprocess/esmessage.py b/pilot/eventservice/esprocess/esmessage.py index 7965a89b8..e2d65e37d 100644 --- a/pilot/eventservice/esprocess/esmessage.py +++ b/pilot/eventservice/esprocess/esmessage.py @@ -39,7 +39,7 @@ def __init__(self, message_queue, socket_name=None, context='local', **kwds): self.setName("MessageThread") self.__message_queue = message_queue self._socket_name = socket_name - self._stop = threading.Event() + self.__stop = threading.Event() logger.info('try to import yampl') try: @@ -82,7 +82,7 @@ def stop(self): Set stop event. """ logger.debug('set stop event') - self._stop.set() + self.__stop.set() def is_stopped(self): """ @@ -90,7 +90,7 @@ def is_stopped(self): :returns: True if stop event is set, otherwise False. """ - return self._stop.isSet() + return self.__stop.is_set() def terminate(self): """ diff --git a/pilot/eventservice/workexecutor/plugins/baseexecutor.py b/pilot/eventservice/workexecutor/plugins/baseexecutor.py index 613ea9a0b..ae565399e 100644 --- a/pilot/eventservice/workexecutor/plugins/baseexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/baseexecutor.py @@ -66,7 +66,7 @@ def stop(self): self.__stop.set() def is_stop(self): - return self.__stop.isSet() + return self.__stop.is_set() def stop_communicator(self): logger.info("Stopping communication manager") From 2203afa855bd72fe233ac43c93e769fca98dfbfb Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 27 Sep 2020 08:53:24 -0500 Subject: [PATCH 3/8] fix python3 type error --- pilot/eventservice/esprocess/esprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index 765356094..443e7463c 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -475,7 +475,7 @@ def handle_messages(self): pass else: logger.debug('received message from payload: %s' % message) - if "Ready for events" in message: + if "Ready for events" in str(message): event_ranges = self.get_event_range_to_payload() if not event_ranges: event_ranges = "No more events" From 100db31bb8817aa4d95283073b6eec05dbf6ecdd Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 27 Sep 2020 10:53:52 -0500 Subject: [PATCH 4/8] fix parse_out_message function --- pilot/eventservice/esprocess/esprocess.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index 443e7463c..d46432d6b 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -403,6 +403,7 @@ def parse_out_message(self, message): UnknownException: when other unknown exception is caught. """ + message = str(message) # needed for Python 3 logger.debug('parsing message: %s' % message) try: if message.startswith("/"): From 04662ec3b153cdd99c8988b27ec659150d42d14d Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 27 Sep 2020 12:37:19 -0500 Subject: [PATCH 5/8] convert out message to string --- pilot/eventservice/esprocess/esprocess.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index d46432d6b..a4c6c0c4a 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -475,8 +475,9 @@ def handle_messages(self): except queue.Empty: pass else: + message = str(message) # convert to string - works Python 2 or Python 3 logger.debug('received message from payload: %s' % message) - if "Ready for events" in str(message): + if "Ready for events" in message: event_ranges = self.get_event_range_to_payload() if not event_ranges: event_ranges = "No more events" From 9bf8ba11cede2efbb7a3f56f98c049cf5f1f58d2 Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 27 Sep 2020 17:00:16 -0500 Subject: [PATCH 6/8] convert out message to str - Python3 --- pilot/eventservice/esprocess/esprocess.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index a4c6c0c4a..cb004d4bc 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -13,6 +13,7 @@ import os import re import subprocess +import sys import time import threading import traceback @@ -403,7 +404,6 @@ def parse_out_message(self, message): UnknownException: when other unknown exception is caught. """ - message = str(message) # needed for Python 3 logger.debug('parsing message: %s' % message) try: if message.startswith("/"): @@ -475,8 +475,11 @@ def handle_messages(self): except queue.Empty: pass else: - message = str(message) # convert to string - works Python 2 or Python 3 logger.debug('received message from payload: %s' % message) + logger.debug('type of received message from payload: %s' % type(message)) + if (sys.version_info > (3, 0)): # needed for Python 3 + message = message.decode('utf-8') + logger.debug('type of converted received message : %s' % type(message)) if "Ready for events" in message: event_ranges = self.get_event_range_to_payload() if not event_ranges: From 99c391c8c36e583d5373ad60ab56f1e791848b66 Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 27 Sep 2020 17:39:24 -0500 Subject: [PATCH 7/8] remove extra debug print messages --- pilot/eventservice/esprocess/esprocess.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index cb004d4bc..7079d5534 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -475,11 +475,9 @@ def handle_messages(self): except queue.Empty: pass else: - logger.debug('received message from payload: %s' % message) - logger.debug('type of received message from payload: %s' % type(message)) if (sys.version_info > (3, 0)): # needed for Python 3 message = message.decode('utf-8') - logger.debug('type of converted received message : %s' % type(message)) + logger.debug('received message from payload: %s' % message) if "Ready for events" in message: event_ranges = self.get_event_range_to_payload() if not event_ranges: From 84adbd63e8ef7210235ee1016fd47e90043469a4 Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 27 Sep 2020 21:46:35 -0500 Subject: [PATCH 8/8] add python3 message encoding --- pilot/eventservice/esprocess/esprocess.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index 7079d5534..335f5cc76 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -384,6 +384,8 @@ def send_event_ranges_to_payload(self, event_ranges): msg = None if "No more events" in event_ranges: msg = event_ranges + if (sys.version_info > (3, 0)): # needed for Python 3 + msg = msg.encode('utf-8') self.is_no_more_events = True self.__no_more_event_time = time.time() else: