diff --git a/openwpm/storage/storage_controller.py b/openwpm/storage/storage_controller.py index 9175c6388..aa50a2b55 100644 --- a/openwpm/storage/storage_controller.py +++ b/openwpm/storage/storage_controller.py @@ -206,6 +206,11 @@ async def finalize_visit_id( documentation """ + # If the following critical section contains any await statement + # we can run into race conditions as reported by https://github.com/openwpm/OpenWPM/issues/1068 + # By popping the tasks off we won't try to await them again in self.shutdown + + # THIS IS A CRITITCAL SECTION if visit_id not in self.store_record_tasks: self.logger.error( "There are no records to be stored for visit_id %d, skipping...", @@ -213,10 +218,13 @@ async def finalize_visit_id( ) return None + store_record_tasks = self.store_record_tasks.pop(visit_id) + # END OF CRITICAL SECTION + self.logger.info("Awaiting all tasks for visit_id %d", visit_id) - for task in self.store_record_tasks[visit_id]: + for task in store_record_tasks: await task - del self.store_record_tasks[visit_id] + self.logger.debug( "Awaited all tasks for visit_id %d while finalizing", visit_id ) @@ -255,13 +263,17 @@ async def shutdown(self, completion_queue_task: Task[None]) -> None: completion_tokens = {} visit_ids = list(self.store_record_tasks.keys()) for visit_id in visit_ids: - t = await self.finalize_visit_id(visit_id, success=False) - if t is not None: - completion_tokens[visit_id] = t + # Even if the token is None, we still want to put the visit_id + # in the completion queue + completion_tokens[visit_id] = await self.finalize_visit_id( + visit_id, success=False + ) + await self.structured_storage.flush_cache() await completion_queue_task for visit_id, token in completion_tokens.items(): - await token + if token: + await token self.completion_queue.put((visit_id, False)) await self.structured_storage.shutdown()