Skip to content

Commit

Permalink
Improve redis event bus (#718)
Browse files Browse the repository at this point in the history
* small fix

* Improve the speed

* Fix tests

* Fix teardown

* Fix file count

* small fix
  • Loading branch information
oeway authored Nov 26, 2024
1 parent 317f815 commit efb038b
Show file tree
Hide file tree
Showing 18 changed files with 148 additions and 99 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Hypha Change Log

### 0.20.40

- Add vector store service to support vector search and retrieval.
- Fix zenodo file upload issue
- Speed up server by removing the `asyncio.sleep(0.01)` throttling and support concurrent handling of events in the redis event bus.

### 0.20.39

- Revise artifact manager to use artifact id as the primary key, remove `prefix` based keys.
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ svc = await get_remote_service("http://localhost:9527/ws-user-scintillating-lawy
Include the following script in your HTML file to load the `hypha-rpc` client:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
```

Use the following code in JavaScript to connect to the server and access an existing service:
Expand Down
10 changes: 5 additions & 5 deletions docs/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To connect to the server, instead of installing the `imjoy-rpc` module, you will
pip install -U hypha-rpc # new install
```

We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.39` is compatible with Hypha server version `0.20.39`.
We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.40` is compatible with Hypha server version `0.20.40`.

#### 2. Change the imports to use `hypha-rpc`

Expand Down Expand Up @@ -128,10 +128,10 @@ loop.run_forever()
To connect to the server, instead of using the `imjoy-rpc` module, you will need to use the `hypha-rpc` module. The `hypha-rpc` module is a standalone module that provides the RPC connection to the Hypha server. You can include it in your HTML using a script tag:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
```

We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.39` is compatible with Hypha server version `0.20.39`.
We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.40` is compatible with Hypha server version `0.20.40`.

#### 2. Change the connection method and use camelCase for service function names

Expand All @@ -149,7 +149,7 @@ Here is a suggested list of search and replace operations to update your code:
Here is an example of how the updated code might look:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
<script>
async function main(){
const server = await hyphaWebsocketClient.connectToServer({"server_url": "https://hypha.amun.ai"});
Expand Down Expand Up @@ -197,7 +197,7 @@ We created a tutorial to introduce this new feature: [service type annotation](.
Here is a quick example in JavaScript:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>

<script>
async function main(){
Expand Down
2 changes: 1 addition & 1 deletion docs/service-type-annotation.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ if __name__ == "__main__":
**JavaScript Client: Service Usage**

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-rpc-websocket.min.js"></script>
<script>
async function main() {
const server = await hyphaWebsocketClient.connectToServer({"server_url": "https://hypha.amun.ai"});
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/aks-hypha.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ replicaCount: 1
image:
repository: ghcr.io/amun-ai/hypha
pullPolicy: IfNotPresent
tag: "0.20.39"
tag: "0.20.40"
serviceAccount:
create: true
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/hypha-server/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.20.39
version: 0.20.40

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/hypha-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ image:
repository: ghcr.io/amun-ai/hypha
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "0.20.39"
tag: "0.20.40"

imagePullSecrets: []
nameOverride: ""
Expand Down
2 changes: 1 addition & 1 deletion hypha/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.20.39.post20"
"version": "0.20.40"
}
79 changes: 40 additions & 39 deletions hypha/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,46 +1798,47 @@ async def commit(

versions = artifact.versions or []
artifact.config = artifact.config or {}
if artifact.staging:
s3_config = self._get_s3_config(artifact, parent_artifact)
async with self._create_client_async(
s3_config,
) as s3_client:
download_weights = {}
for file_info in artifact.staging:
file_key = safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}/{file_info['path']}",
)
try:
await s3_client.head_object(
Bucket=s3_config["bucket"], Key=file_key
)
except ClientError:
raise FileNotFoundError(
f"File '{file_info['path']}' does not exist in the artifact."
)
if (
file_info.get("download_weight") is not None
and file_info["download_weight"] > 0
):
download_weights[file_info["path"]] = file_info[
"download_weight"
]
if download_weights:
artifact.config["download_weights"] = download_weights
flag_modified(artifact, "config")

artifact.file_count = await self._count_files_in_prefix(
s3_client,
s3_config["bucket"],
safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}",
),

s3_config = self._get_s3_config(artifact, parent_artifact)
async with self._create_client_async(
s3_config,
) as s3_client:
download_weights = {}
for file_info in artifact.staging or []:
file_key = safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}/{file_info['path']}",
)
try:
await s3_client.head_object(
Bucket=s3_config["bucket"], Key=file_key
)
except ClientError:
raise FileNotFoundError(
f"File '{file_info['path']}' does not exist in the artifact."
)
if (
file_info.get("download_weight") is not None
and file_info["download_weight"] > 0
):
download_weights[file_info["path"]] = file_info[
"download_weight"
]

if download_weights:
artifact.config["download_weights"] = download_weights
flag_modified(artifact, "config")

artifact.file_count = await self._count_files_in_prefix(
s3_client,
s3_config["bucket"],
safe_join(
s3_config["prefix"],
artifact.workspace,
f"{self._artifacts_dir}/{artifact.id}/v{version_index}",
),
)

parent_artifact_config = (
parent_artifact.config if parent_artifact else {}
Expand Down
74 changes: 45 additions & 29 deletions hypha/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import sys
import os
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from pydantic import BaseModel, Field, field_validator
Expand Down Expand Up @@ -547,7 +548,11 @@ async def init(self):
loop = asyncio.get_running_loop()
self._ready = loop.create_future()
self._loop = loop

# Start the Redis subscription task
loop.create_task(self._subscribe_redis())

# Wait for readiness signal
await self._ready

def on(self, event_name, func):
Expand Down Expand Up @@ -652,39 +657,50 @@ def stop(self):
self._stop = True

async def _subscribe_redis(self):
cpu_count = os.cpu_count() or 1
concurrent_tasks = cpu_count * 10
pubsub = self._redis.pubsub()
self._stop = False
try:
await pubsub.psubscribe("event:*")
self._ready.set_result(True)
while self._stop is False:
msg = await pubsub.get_message(ignore_subscribe_messages=True)
semaphore = asyncio.Semaphore(concurrent_tasks) # Limit concurrent tasks

async def process_message(msg):
"""Process a single message while respecting the semaphore."""
async with semaphore: # Acquire semaphore
try:
if msg:
channel = msg["channel"].decode("utf-8")
RedisEventBus._counter.labels(event="*").inc()
if channel.startswith("event:b:"):
event_type = channel[8:]
data = msg["data"]
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:d:"):
event_type = channel[8:]
data = json.loads(msg["data"])
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:s:"):
event_type = channel[8:]
data = msg["data"].decode("utf-8")
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
else:
logger.info("Unknown channel: %s", channel)
channel = msg["channel"].decode("utf-8")
RedisEventBus._counter.labels(event="*").inc()

if channel.startswith("event:b:"):
event_type = channel[8:]
data = msg["data"]
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:d:"):
event_type = channel[8:]
data = json.loads(msg["data"])
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
elif channel.startswith("event:s:"):
event_type = channel[8:]
data = msg["data"].decode("utf-8")
await self._redis_event_bus.emit(event_type, data)
if ":" not in event_type:
RedisEventBus._counter.labels(event=event_type).inc()
else:
logger.info("Unknown channel: %s", channel)
except Exception as exp:
logger.exception(f"Error processing message: {exp}")
await asyncio.sleep(0)

try:
await pubsub.psubscribe("event:*")
self._ready.set_result(True)
while not self._stop:
msg = await pubsub.get_message(
ignore_subscribe_messages=True, timeout=0.05
)
if msg:
asyncio.create_task(process_message(msg)) # Add task to pool
except Exception as exp:
self._ready.set_exception(exp)
7 changes: 7 additions & 0 deletions hypha/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,4 +907,11 @@ async def teardown(self):
await self.remove_client(
client_id, self._root_user.get_workspace(), self._root_user, unload=True
)
if self._websocket_server:
websockets = self._websocket_server.get_websockets()
for ws in websockets.values():
try:
await ws.close()
except GeneratorExit:
pass
logger.info("Teardown complete")
2 changes: 1 addition & 1 deletion hypha/templates/hypha-core-app/hypha-app-webpython.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ loadPyodide().then(async (pyodide) => {
pyodide.setStderr({ batched: (msg) => console.error(msg) });
await pyodide.loadPackage("micropip");
const micropip = pyodide.pyimport("micropip");
await micropip.install('hypha-rpc==0.20.39');
await micropip.install('hypha-rpc==0.20.40');
const isWindow = typeof window !== "undefined";

setTimeout(() => {
Expand Down
4 changes: 2 additions & 2 deletions hypha/templates/ws/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
<body class="bg-black text-white font-poppins">
<div id="app"></div>
<script type="module">
import { HyphaCore } from "https://cdn.jsdelivr.net/npm/[email protected].39/dist/hypha-core.mjs";
import { HyphaCore } from "https://cdn.jsdelivr.net/npm/[email protected].40/dist/hypha-core.mjs";

const defaultService = {
getServerConfig(context){
Expand Down Expand Up @@ -765,7 +765,7 @@ <h2 className="text-2xl font-bold text-white">Artifact Details</h2>
<h3 className="text-lg font-bold">Files:</h3>
<ul className="list-disc list-inside text-gray-300 mt-2">
{files.map((file) => (
<li key={file.name} className="break-all flex items-center justify-between">
<li key={file.name} className="break-all flex mb-1 items-center justify-between">
<span><i className={`fas ${file.type==='directory' ? 'fa-folder' : 'fa-file-alt'} mr-2`}></i>{file.name}</span>
{file.type === "file" && (
<div className="flex space-x-2">
Expand Down
25 changes: 19 additions & 6 deletions hypha/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ async def websocket_endpoint(
return
else:
logger.warning("Rejecting legacy imjoy-rpc client (%s)", client_id)
await websocket.close(
code=status.WS_1008_POLICY_VIOLATION,
reason="Connection rejected, please upgrade to `hypha-rpc` which pass the authentication information in the first message",
)
try:
await websocket.close(
code=status.WS_1008_POLICY_VIOLATION,
reason="Connection rejected, please upgrade to `hypha-rpc` which pass the authentication information in the first message",
)
except GeneratorExit:
pass
return

try:
Expand Down Expand Up @@ -164,11 +167,18 @@ async def websocket_endpoint(
e,
)

def get_websockets(self):
"""Get the active websockets."""
return self._websockets

async def force_disconnect(self, workspace, client_id, code, reason):
"""Force disconnect a client."""
assert f"{workspace}/{client_id}" in self._websockets, "Client not connected"
websocket = self._websockets[f"{workspace}/{client_id}"]
await websocket.close(code=code, reason=reason)
try:
await websocket.close(code=code, reason=reason)
except GeneratorExit:
pass # Suppress GeneratorExit to avoid RuntimeError

async def check_client(self, client_id, workspace, user_info):
"""Check if the client is already connected."""
Expand Down Expand Up @@ -367,7 +377,10 @@ async def disconnect(self, websocket, reason, code=status.WS_1000_NORMAL_CLOSURE
await websocket.send_text(json.dumps({"type": "error", "message": reason}))
try:
if websocket.client_state.name not in ["CLOSED", "CLOSING", "DISCONNECTED"]:
await websocket.close(code=code, reason=reason)
try:
await websocket.close(code=code, reason=reason)
except GeneratorExit:
pass # Suppress GeneratorExit to avoid RuntimeError
except Exception as e:
logger.error(f"Error disconnecting websocket: {str(e)}")

Expand Down
5 changes: 4 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[pytest]
asyncio_mode = strict
asyncio_mode = strict
filterwarnings =
ignore::DeprecationWarning
ignore::UserWarning
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ aiofiles==23.2.1
websockets==13.1
base58==2.1.1
fastapi==0.106.0
hypha-rpc==0.20.39
hypha-rpc==0.20.40
jinja2==3.1.4
lxml==4.9.3
msgpack==1.0.8
Expand Down
Loading

0 comments on commit efb038b

Please sign in to comment.