Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #293 from dougbenjamin/next
Browse files Browse the repository at this point in the history
fix file type check for python 3
  • Loading branch information
PalNilsson authored Sep 29, 2020
2 parents 5999036 + 84adbd6 commit 3f73060
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def is_stop(self):
:returns: True if the stop signal is set, otherwise False
"""
return self.stop_event.isSet()
return self.stop_event.is_set()

def get_jobs(self, njobs=1, post_hook=None, args=None):
"""
Expand Down
6 changes: 3 additions & 3 deletions pilot/eventservice/esprocess/esmessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, message_queue, socket_name=None, context='local', **kwds):
self.setName("MessageThread")
self.__message_queue = message_queue
self._socket_name = socket_name
self._stop = threading.Event()
self.__stop = threading.Event()

logger.info('try to import yampl')
try:
Expand Down Expand Up @@ -82,15 +82,15 @@ def stop(self):
Set stop event.
"""
logger.debug('set stop event')
self._stop.set()
self.__stop.set()

def is_stopped(self):
"""
Get status whether stop event is set.
:returns: True if stop event is set, otherwise False.
"""
return self._stop.isSet()
return self.__stop.is_set()

def terminate(self):
"""
Expand Down
16 changes: 14 additions & 2 deletions pilot/eventservice/esprocess/esprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
# - Wen Guan, [email protected], 2017-2018
# - Paul Nilsson, [email protected], 2018-2019

import io
import json
import logging
import os
import re
import subprocess
import sys
import time
import threading
import traceback
Expand All @@ -29,6 +31,12 @@

logger = logging.getLogger(__name__)

try:
file_type = file
except NameError:
file_type = io.IOBase


"""
Main process to handle event service.
It makes use of two hooks get_event_ranges_hook and handle_out_message_hook to communicate with other processes when
Expand Down Expand Up @@ -161,7 +169,7 @@ def init_payload_process(self): # noqa: C901
executable = 'cd %s; %s' % (workdir, executable)

if 'output_file' in self.__payload:
if type(self.__payload['output_file']) in [file]:
if isinstance(self.__payload['output_file'], file_type):
output_file_fd = self.__payload['output_file']
else:
if '/' in self.__payload['output_file']:
Expand All @@ -174,7 +182,7 @@ def init_payload_process(self): # noqa: C901
output_file_fd = open(output_file, 'w')

if 'error_file' in self.__payload:
if type(self.__payload['error_file']) in [file]:
if isinstance(self.__payload['error_file'], file_type):
error_file_fd = self.__payload['error_file']
else:
if '/' in self.__payload['error_file']:
Expand Down Expand Up @@ -376,6 +384,8 @@ def send_event_ranges_to_payload(self, event_ranges):
msg = None
if "No more events" in event_ranges:
msg = event_ranges
if (sys.version_info > (3, 0)): # needed for Python 3
msg = msg.encode('utf-8')
self.is_no_more_events = True
self.__no_more_event_time = time.time()
else:
Expand Down Expand Up @@ -467,6 +477,8 @@ def handle_messages(self):
except queue.Empty:
pass
else:
if (sys.version_info > (3, 0)): # needed for Python 3
message = message.decode('utf-8')
logger.debug('received message from payload: %s' % message)
if "Ready for events" in message:
event_ranges = self.get_event_range_to_payload()
Expand Down
2 changes: 1 addition & 1 deletion pilot/eventservice/workexecutor/plugins/baseexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def stop(self):
self.__stop.set()

def is_stop(self):
return self.__stop.isSet()
return self.__stop.is_set()

def stop_communicator(self):
logger.info("Stopping communication manager")
Expand Down

0 comments on commit 3f73060

Please sign in to comment.