diff --git a/hypha/VERSION b/hypha/VERSION index d09c0739..46f9bd60 100644 --- a/hypha/VERSION +++ b/hypha/VERSION @@ -1,3 +1,3 @@ { - "version": "0.20.38.post3" + "version": "0.20.38.post4" } diff --git a/hypha/core/queue.py b/hypha/core/queue.py index b5f11a55..fcf3e7be 100644 --- a/hypha/core/queue.py +++ b/hypha/core/queue.py @@ -14,20 +14,6 @@ def create_queue_service(store: RedisStore): """Create a queue service for Hypha.""" redis: aioredis.FakeRedis = store.get_redis() - event_bus = store.get_event_bus() - - async def on_workspace_unloaded(workspace): - # delete all the keys that start with workspace["name"] + ":q:" - keys_pattern = workspace["name"] + ":q:*" - cursor = "0" - - while cursor != 0: - cursor, keys = await redis.scan(cursor=cursor, match=keys_pattern) - if keys: - await redis.delete(*keys) - logger.info("Removed queue keys for workspace: %s", workspace["name"]) - - event_bus.on_local("workspace_unloaded", on_workspace_unloaded) async def push_task(queue_name, task: dict, context: dict = None): workspace = context["ws"] diff --git a/hypha/core/store.py b/hypha/core/store.py index faea1a12..e3dc8a81 100644 --- a/hypha/core/store.py +++ b/hypha/core/store.py @@ -295,7 +295,11 @@ def _upgrade_schema_if_needed(conn): workspace_info["type"] = "workspace" if "applications" in workspace_info: del workspace_info["applications"] - await self._redis.hset("workspaces", k, json.dumps(workspace_info)) + if workspace_info.get("persistent"): + await self._redis.hset("workspaces", k, json.dumps(workspace_info)) + else: + # remove non-persistent workspaces + await self._redis.hdel("workspaces", k) old_keys = await self._redis.keys( f"services:*|*:{self._root_user.get_workspace()}/workspace-client-*:*@*" diff --git a/hypha/core/workspace.py b/hypha/core/workspace.py index 2fdaa04b..982b06da 100644 --- a/hypha/core/workspace.py +++ b/hypha/core/workspace.py @@ -1529,19 +1529,24 @@ async def unload(self, context=None): ) await self._event_bus.emit(f"unload:{ws}", "Unloading workspace: " + ws) - if self._s3_controller: + if winfo.persistent and self._s3_controller: # since the workspace will be persisted, we can remove the workspace info from the redis store await self._redis.hdel("workspaces", ws) - await self._s3_controller.cleanup_workspace(winfo) - else: - if not winfo.persistent and not winfo.read_only: - await self._redis.hdel("workspaces", ws) self._active_ws.dec() await self._event_bus.emit("workspace_unloaded", winfo.model_dump()) logger.info("Workspace %s unloaded.", ws) + if not winfo.persistent: + # delete all the items in redis starting with `workspaces_name:` + keys = await self._redis.keys(f"{ws}:*") + for key in keys: + await self._redis.delete(key) + await self._redis.hdel("workspaces", ws) + if self._s3_controller: + await self._s3_controller.cleanup_workspace(winfo) + @schema_method async def cleanup( self,