Skip to content

Commit

Permalink
Merge pull request #23 from sireto/features_agent_websocket
Browse files Browse the repository at this point in the history
Features / Agent Websocket
  • Loading branch information
JosephRana11 authored Apr 9, 2024
2 parents 8085390 + 5caf8b9 commit 74532b7
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 1 deletion.
1 change: 1 addition & 0 deletions autonomous_agent/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
app.log
3 changes: 3 additions & 0 deletions autonomous_agent/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Changelog

- Setup websocket client for autonomous agent service
48 changes: 48 additions & 0 deletions autonomous_agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

# Autonomous Agent

This is a python service for creating websocket connection with the central server using agent id. The agent pings the server periodically via the websocket connection.

Requirments:

- Python 3.12.2

- Poetry
## Setup Guide

Clone the project

```bash
git clone https://github.com/sireto/cardano-autonomous-agent
```

Go to the Autonomous Agent Directory

```bash
cd autonomous_agent
```

Install dependencies via poetry.

```bash
poetry install
```
## Creating a new Agent.

To create a new agent you need to send a post request to **api/create_agent** endpoint. Make sure that **autonomous_agent_api** backend service and **database** are running correctly.

Copy the id from the post response . The id looks something similar to **c2d4c358-5171-4be8-b273-0147cc57c204.**

## Running the service

Activate the poetry virtual env inside **autonomous_agent** folder by running the following command.

```
poetry shell
```
Now run the following command along with your agent id.

```
python connect-agent.py --agent_id < Your id here >
```
After a successfull connection , you should see the periodic ping request and response in your terminal.
Empty file added autonomous_agent/__init__.py
Empty file.
39 changes: 39 additions & 0 deletions autonomous_agent/connect-agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import websockets
import argparse
from websockets.exceptions import ConnectionClosed

default_ping_timeout = 10 # Sends a ping every 10 seconds


async def connect_to_server(agent_id):
uri = "ws://127.0.0.1:8000/api/agent/ws"
headers = {"agent_id": agent_id}

try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
while True:
try:
await websocket.send("PING")
response = await websocket.recv()
print("Received:", response)
await asyncio.sleep(default_ping_timeout)
except ConnectionClosed:
print("Connection closed by server")
break
except ConnectionError:
print("Failed to connect to the server")


async def main(agent_id):
await connect_to_server(agent_id)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Connect to Central WebSocket server with agent ID"
)
parser.add_argument("--agent_id", help="Agent ID to connect with", required=True)
args = parser.parse_args()

asyncio.run(main(args.agent_id))
87 changes: 87 additions & 0 deletions autonomous_agent/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions autonomous_agent/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "autonomous-agent"
version = "0.1.0"
description = ""
authors = ["Joseph Rana <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.12"
websockets = "^12.0"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
3 changes: 3 additions & 0 deletions autonomous_agent_api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@
- Made table for Triggers
- Created router,service,repository for trigger CRUD Action
- Created the validation function for cron expression and kafka topic while creating the trigger by agent

# TITLE - AGENT Websocket , DATE- 2024-04-09
- Added websocket endpoint for agent websocket connection
81 changes: 81 additions & 0 deletions autonomous_agent_api/backend/app/controllers/agent_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from fastapi import WebSocket, APIRouter, WebSocketDisconnect, WebSocketException
from backend.config.database import prisma_connection
from fastapi import status
from datetime import datetime
from backend.config.logger import logger

router = APIRouter()


class WebSocket_Connection_Manager:
def __init__(self) -> None:
self.active_connections: dict[str, WebSocket] = {}

async def connect_websocket(self, websocket_agent_id: str, websocket: WebSocket):
# Stores Websocket along with agent id in Key Value Pair : websocket_agent_id : Key , websocket : value
await websocket.accept()
self.active_connections[websocket_agent_id] = websocket

async def disconnect_websocket(self, websocket_agent_id):
"""Critical : Dont use .close() here as this will close the new connection when dealing with multiple web socket request for the same bot.
This is due to the nature of try/except code that gets called by the previous connection."""
self.active_connections.pop(websocket_agent_id)

async def send_message_to_websocket(self, websocket_agent_id: str, message: dict):
# Checks if agent is active , first then sends message
agent_active = await self.check_if_agent_active(websocket_agent_id)
if agent_active:
await self.active_connections[websocket_agent_id].send_json(message)
else:
logger.critical(
"Agent with the id {websocket_agent_id} does not exist in the active Connection list. Sending Message Failed!"
)

async def check_if_agent_active(self, websocket_agent_id: str):
# Checks if agent is present in active connection list.
return websocket_agent_id in self.active_connections

async def remove_previous_agent_connection_if_exists(self, websocket_agent_id: str):
"""
If client requests websocket connection for an already active bot.
Removes the old connection and establishes a new one.
"""
if await self.check_if_agent_active(websocket_agent_id):
existing_websocket = self.active_connections.pop(websocket_agent_id)
await existing_websocket.close(code=1000, reason="establishing a new connection")


manager = WebSocket_Connection_Manager()


@router.websocket("/agent/ws")
async def agent_websocket_endpoint(websocket: WebSocket):
# todo: Cookies Authentication

# Get agent id from the websocket header.
websocket_agent_id = websocket.headers.get("agent_id")
if websocket_agent_id == None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

# Check if agent with the id exists.
agent_exists = await check_if_agent_exists_in_db(websocket_agent_id)
if agent_exists:
await manager.remove_previous_agent_connection_if_exists(websocket_agent_id)
await manager.connect_websocket(websocket_agent_id, websocket)
try:
while True:
data = await websocket.receive_text()
print(f"Received Data: {data} from {websocket_agent_id}")
await websocket.send_text(f"Ping recieved from {websocket_agent_id} at {datetime.now()}")
except WebSocketDisconnect:
await manager.disconnect_websocket(websocket_agent_id)
pass
else:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)


async def check_if_agent_exists_in_db(agent_id: str):
# Query agent with the agent id from the database -> reurns a boolean
async with prisma_connection:
agent_exists = await prisma_connection.prisma.agent.find_first(where={"id": agent_id, "deleted_at": None})
return bool(agent_exists)
10 changes: 9 additions & 1 deletion autonomous_agent_api/backend/app/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@
"""

from fastapi import APIRouter
from backend.app.controllers import ready, demo, agent_router, trigger_router

from backend.app.controllers import ready, demo, agent_router, trigger_router, agent_websocket

root_api_router = APIRouter(prefix="/api")

# For ready status Api
root_api_router.include_router(ready.router, tags=["ready"])

# For Demo Ping APi
root_api_router.include_router(demo.router, tags=["test"])

# For Agent CRUD operations
root_api_router.include_router(agent_router.AgentRouter().router, tags=["agent"])

# For Agent Websocket connection
root_api_router.include_router(agent_websocket.router)

# For Agent Trigger
root_api_router.include_router(trigger_router.TriggerRouter().router, tags=["trigger"])

0 comments on commit 74532b7

Please sign in to comment.