Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling RuntimeError from Storage Controller in OpenWPM #1079

Open
MohammadMahdiJavid opened this issue Dec 28, 2023 · 3 comments
Open

Handling RuntimeError from Storage Controller in OpenWPM #1079

MohammadMahdiJavid opened this issue Dec 28, 2023 · 3 comments

Comments

@MohammadMahdiJavid
Copy link

MohammadMahdiJavid commented Dec 28, 2023

Hi,

I am encountering a specific RuntimeError when using OpenWPM and I am seeking advice on the correct approach to handle it. The error occurs as follows:

  File "demo.py", line 470, in <module>
    manager.execute_command_sequence(command_sequence)
    
  File "openwpm/task_manager.py", line 435, in execute_command_sequence
    agg_queue_size = self.storage_controller_handle.get_most_recent_status()
    
  File "storage/storage_controller.py", line 625, in get_most_recent_status
    raise RuntimeError(
    
RuntimeError: No status update from the storage controller process for 4859 seconds.

I attempted to handle this exception with a try-except block, but it doesn't seem to be effective:

for command_sequence in command_sequences : 
  try :
      manager.execute_command_sequence(command_sequence)
  except : 
      continue

I would appreciate any guidance on the correct way to handle this exception. Is there a specific approach or pattern recommended for handling such timeouts or lack of status updates in OpenWPM? Any insights or suggestions would be greatly appreciated.

Thank you.

def execute_command_sequence(
self, command_sequence: CommandSequence, index: Optional[int] = None
) -> None:
"""
parses command type and issues command(s) to the proper browser
<index> specifies the type of command this is:
None -> first come, first serve
int -> index of browser to send command to
"""
# Block if the storage controller has too many unfinished records
agg_queue_size = self.storage_controller_handle.get_most_recent_status()
if agg_queue_size >= STORAGE_CONTROLLER_JOB_LIMIT:
while agg_queue_size >= STORAGE_CONTROLLER_JOB_LIMIT:
self.logger.info(
"Blocking command submission until the storage controller "
"is below the max queue size of %d. Current queue "
"length %d. " % (STORAGE_CONTROLLER_JOB_LIMIT, agg_queue_size)
)
agg_queue_size = self.storage_controller_handle.get_status()

def get_most_recent_status(self) -> int:
"""Return the most recent queue size sent from the Storage Controller process"""
# Block until we receive the first status update
if self._last_status is None:
return self.get_status()
# Drain status queue until we receive most recent update
while not self.status_queue.empty():
self._last_status = self.status_queue.get()
self._last_status_received = time.time()
# Check last status signal
if (time.time() - self._last_status_received) > STATUS_TIMEOUT:
raise RuntimeError(
"No status update from the storage controller process "
"for %d seconds." % (time.time() - self._last_status_received)
)
return self._last_status

@MohammadMahdiJavid
Copy link
Author

since self.status_queue is empty, so again it's going to say

     if (time.time() - self._last_status_received) > STATUS_TIMEOUT: 

and is going to use previous self._last_status_received for the new one and raise the exception again

@MohammadMahdiJavid
Copy link
Author

proposed fix:

if self.status_queue.empty() : 
    self.status_queue.put(0)

or maybe i'm handling the exception not correctly

@vringar
Copy link
Contributor

vringar commented Dec 29, 2023

RuntimeError: No status update from the storage controller process for 4859 seconds.

This implies that the following code has not been running for more than an hour, which is deeply troubling as it should run every 5 seconds.

async def update_status_queue(self) -> NoReturn:
"""Send manager process a status update.
This coroutine will get cancelled with an exception
so there is no need for an orderly return
"""
while True:
await asyncio.sleep(STATUS_UPDATE_INTERVAL)
visit_id_count = len(self.store_record_tasks.keys())
task_count = 0
for task_list in self.store_record_tasks.values():
for task in task_list:
if not task.done():
task_count += 1
self.status_queue.put(task_count)
self.logger.debug(
(
"StorageController status: There are currently %d scheduled tasks "
"for %d visit_ids"
),
task_count,
visit_id_count,
)

This suggests that you are performing long running blocking operations in the storage controller which breaks it.
The storage process is a single threaded asynchronous execution environment. Any task (including any storage provider) that doesn't yield control can lock up the whole process.

I would assume that whatever is blocking the process is also preventing any data from getting saved out.

This can not be fixed anywhere outside of the storage (controller) process

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants