Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change logging so that the messages appear once on thread status change #36

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ def sleep(t):
if __name__ == '__main__':
print("Starting the deadlocks monitoring")
monitoring_thread = start_monitoring(seconds_frozen=1)
print("Sleep 3 seconds in custom func")
sleep(3)
time.sleep(2)
print("Sleep 3 seconds")
time.sleep(3)
print("Sleep 3 seconds")
time.sleep(3)

print("Stopping the deadlocks monitoring")
# This may be useful when working in shell.
monitoring_thread.stop()
print("Sleep 3 seconds")
time.sleep(3)
print("Sleep 3 seconds")
time.sleep(3)

print("Exiting")
80 changes: 59 additions & 21 deletions hanging_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def is_stopped(self):
return self._stopped


def get_current_frames():
"""Return current threads prepared for further processing"""
return dict(
(thread_id, {'frame': thread2list(frame), 'time': None})
for thread_id, frame in sys._current_frames().items()
)


def frame2string(frame):
"""Return info about frame

Expand All @@ -75,7 +83,7 @@ def frame2string(frame):


def thread2list(frame):
"""Return list with string frame representation of each fame of thread"""
"""Return list with string frame representation of each frame of thread"""
l = []
while frame:
l.insert(0, frame2string(frame))
Expand All @@ -91,35 +99,65 @@ def monitor(seconds_frozen, tests_per_second):
of test_per_second parameter
"""
current_thread = threading.current_thread()
self = get_ident()
old_threads = {}
hanging_threads = set()
old_threads = {} # Threads found on previous iteration.

while not current_thread.is_stopped():
new_threads = get_current_frames()

# Report died threads.
for thread_id in old_threads.keys():
if thread_id not in new_threads and thread_id in hanging_threads:
log_died_thread(thread_id)

# Process live threads.
time.sleep(1. / tests_per_second)
now = time.time()
then = now - seconds_frozen
frames = sys._current_frames()
new_threads = {}
for frame_id, frame in frames.items():
new_threads[frame_id] = thread2list(frame)
for thread_id, frame_list in new_threads.items():
if thread_id == self: continue
if thread_id not in old_threads or \
frame_list != old_threads[thread_id][0]:
new_threads[thread_id] = (frame_list, now)
elif old_threads[thread_id][1] < then:
print_frame_list(frame_list, thread_id)
for thread_id, thread_data in new_threads.items():
# Don't report the monitor thread.
if thread_id == current_thread.ident:
continue
frame = thread_data['frame']
# If thread is new or it's stack is changed then update time.
if (thread_id not in old_threads or
frame != old_threads[thread_id]['frame']):
thread_data['time'] = now
# If the thread was hanging then report awaked thread.
if thread_id in hanging_threads:
hanging_threads.remove(thread_id)
log_awaked_thread(thread_id)
else:
new_threads[thread_id] = old_threads[thread_id]
# If stack is not changed then keep old time.
last_change_time = old_threads[thread_id]['time']
thread_data['time'] = last_change_time
# Check if this is a new hanging thread.
if (thread_id not in hanging_threads and
last_change_time < then):
# Gotcha!
hanging_threads.add(thread_id)
# Report the hanged thread.
log_hanged_thread(thread_id, frame)
old_threads = new_threads
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to say that the monitor() function becomes too large. However I have not found simple elegant way to divide it into small chunks. I think it should be rewritten in FP or OOP style. But this is probably a job for another task. For now I just want to make it usable for me and others to let us all fix the hanging threads problem in our apps :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can create an issue: refactor monitor function and an issue to write tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #40 and #41.



def print_frame_list(frame_list, frame_id):
def log_hanged_thread(thread_id, frame):
"""Print the stack trace of the deadlock after hanging `seconds_frozen`"""
sys.stderr.write('-' * 20 +
'Thread {0}'.format(frame_id).center(20) +
'-' * 20 +
'\n' +
''.join(frame_list))
write_log('Thread {0} hangs '.format(thread_id), ''.join(frame))


def log_awaked_thread(thread_id):
write_log('Thread {0} awaked'.format(thread_id))


def log_died_thread(thread_id):
write_log('Thread {0} died '.format(thread_id))


def write_log(title, message=''):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this: It is a basis of setting a log

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought somebody may want to replace this function with own implementation to write messages to another destination in another format.

sys.stderr.write(''.join([
title.center(40).center(60, '-'), '\n', message
]))


def start_monitoring(seconds_frozen=SECONDS_FROZEN,
Expand Down