Skip to content

Commit

Permalink
Merge pull request #85 from ant-xuexiao/feat/support-aws-sqs
Browse files Browse the repository at this point in the history
feat: 增加 AWS SQS 消息队列功能
  • Loading branch information
xingwanying authored Apr 11, 2024
2 parents 224097b + 53a5541 commit 3acc205
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .aws/petercat-preview.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ region = "ap-northeast-1"
confirm_changeset = true
capabilities = "CAPABILITY_IAM"
disable_rollback = true
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo"]
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/petercatapipreview49199518/fastapifunctionead79d0drepo", "SQSSubscriptionFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/petercatapipreview49199518/sqssubscriptionfunctiona2fc8b7drepo"]
2 changes: 1 addition & 1 deletion .aws/petercat-prod.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ region = "ap-northeast-1"
confirm_changeset = true
capabilities = "CAPABILITY_IAM"
disable_rollback = true
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo"]
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo", "SQSSubscriptionFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/sqssubscriptionfunctiona2fc8b7drepo"]
12 changes: 0 additions & 12 deletions server/Dockerfile

This file was deleted.

5 changes: 5 additions & 0 deletions server/data_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ class Message(BaseModel):
class ChatData(BaseModel):
messages: list[Message] = []
prompt: str = None

class ExecuteMessage(BaseModel):
type: str
repo: str
path: str
26 changes: 14 additions & 12 deletions server/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os
import uvicorn

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from data_class import DalleData, ChatData
from openai_api import dalle
from langchain_api import chat

from agent import stream
from uilts.env import get_env_variable
import uvicorn
from data_class import ChatData, ExecuteMessage
from message_queue.queue_wrapper import delete_messages, get_queue, receive_messages, send_message, unpack_message

open_api_key = get_env_variable("OPENAI_API_KEY")
sqs_queue_name = get_env_variable("PETERCAT_EX_SQS")

app = FastAPI(
title="Bo-meta Server",
Expand All @@ -30,15 +32,15 @@
def read_root():
return {"Hello": "World"}

@app.post("/api/dall-e")
def run_img_generator(input_data: DalleData):
result = dalle.img_generator(input_data, open_api_key)
return result
@app.post("/api/message")
def send_sqs_message(message: ExecuteMessage):
queue = get_queue(sqs_queue_name)
return send_message(queue=queue, message=message)

@app.post("/api/chat")
def run_langchain_chat(input_data: ChatData):
result = chat.langchain_chat(input_data, open_api_key)
return result
@app.get("/api/message/receive")
def receive_sqs_message():
queue = get_queue(sqs_queue_name)
return StreamingResponse(receive_messages(queue), media_type="text/event-stream")


@app.post("/api/chat/stream", response_class=StreamingResponse)
Expand Down
102 changes: 102 additions & 0 deletions server/message_queue/queue_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import json
import boto3
from botocore.exceptions import ClientError
import logging

from data_class import ExecuteMessage

logger = logging.getLogger(__name__)
sqs = boto3.resource("sqs")

def get_queue(name):
try:
queue = sqs.get_queue_by_name(QueueName=name)
except ClientError as error:
logger.exception("Couldn't get queue named %s.", name)
raise error
else:
return queue

def send_message(queue, message: ExecuteMessage, message_attributes=None):
if not message_attributes:
message_attributes = {
"type": { "StringValue": message.type, "DataType": "String" },
"repo": { "StringValue": message.repo, "DataType": "String" },
"path": { "StringValue": message.path, "DataType": "String" },
}

message_body = encode_message(message=message)

try:
response = queue.send_message(
MessageBody=message_body, MessageAttributes=message_attributes
)

except ClientError as error:
logger.exception("Send message failed: %s", message_body)
raise error
else:
return response

async def receive_messages(queue, max_number = 10, wait_time = 2):
try:
messages = queue.receive_messages(
MessageAttributeNames=["All"],
MaxNumberOfMessages=max_number,
WaitTimeSeconds=wait_time,
)
for msg in messages:
logger.info("Received message: %s: %s", msg.message_id, msg.body)
type, repo, path = unpack_message(msg)
yield json.dumps({ "type": type, "repo": repo, "path": path })
delete_messages(queue, messages)

except ClientError as error:
logger.exception("Couldn't receive messages from queue: %s", queue)
raise error


def delete_messages(queue, messages):
"""
Delete a batch of messages from a queue in a single request.
:param queue: The queue from which to delete the messages.
:param messages: The list of messages to delete.
:return: The response from SQS that contains the list of successful and failed
message deletions.
"""
try:
entries = [
{"Id": str(ind), "ReceiptHandle": msg.receipt_handle}
for ind, msg in enumerate(messages)
]
response = queue.delete_messages(Entries=entries)
if "Successful" in response:
for msg_meta in response["Successful"]:
logger.info("Deleted %s", messages[int(msg_meta["Id"])].receipt_handle)
if "Failed" in response:
for msg_meta in response["Failed"]:
logger.warning(
"Could not delete %s", messages[int(msg_meta["Id"])].receipt_handle
)
except ClientError:
logger.exception("Couldn't delete messages from queue %s", queue)
else:
return response

def encode_message(message: ExecuteMessage):
return json.dumps({
"type": message.type,
"repo": message.repo,
"path": message.path,
})

def unpack_message(msg):
if (msg is None):
return (f"", f"", f"")
else:
return (
msg.message_attributes["type"]["StringValue"],
msg.message_attributes["repo"]["StringValue"],
msg.message_attributes["path"]["StringValue"],
)
1 change: 1 addition & 0 deletions server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ PyGithub
python-multipart
httpx[socks]
load_dotenv
boto3>=1.26.79
13 changes: 13 additions & 0 deletions subscriber/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM public.ecr.aws/lambda/python:3.12

# Copy requirements.txt
COPY requirements.txt ${LAMBDA_TASK_ROOT}

# Install the specified packages
RUN pip install -r requirements.txt

# Copy function code
COPY sqs_subscriber.py ${LAMBDA_TASK_ROOT}

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "sqs_subscriber.lambda_handler" ]
Empty file added subscriber/requirements.txt
Empty file.
16 changes: 16 additions & 0 deletions subscriber/sqs_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json

def lambda_handler(event, context):
if event:
batch_item_failures = []
sqs_batch_response = {}

for record in event["Records"]:
try:
# process message
print(f"receive message here")
except Exception as e:
batch_item_failures.append({"itemIdentifier": record['messageId']})

sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
29 changes: 28 additions & 1 deletion template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,37 @@ Resources:
DockerContext: server
DockerTag: v1

SQSSubscriptionFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
MemorySize: 512
FunctionUrlConfig:
AuthType: NONE
Policies:
- Statement:
- Sid: BedrockInvokePolicy
Effect: Allow
Action:
- bedrock:InvokeModelWithResponseStream
Resource: '*'
Tracing: Active
Metadata:
Dockerfile: Dockerfile
DockerContext: subscriber
DockerTag: v1

Outputs:
FastAPIFunctionUrl:
Description: "Function URL for FastAPI function"
Value: !GetAtt FastAPIFunctionUrl.FunctionUrl
FastAPIFunction:
Description: "FastAPI Lambda Function ARN"
Value: !GetAtt FastAPIFunction.Arn
Value: !GetAtt FastAPIFunction.Arn

SQSSubscriptionFunctionUrl:
Description: "Function URL for SQS Subscriptio function"
Value: !GetAtt FastAPIFunctionUrl.FunctionUrl
SQSSubscriptionFunction:
Description: "SQS Subscription Function Lambda Function ARN"
Value: !GetAtt SQSSubscriptionFunction.Arn

0 comments on commit 3acc205

Please sign in to comment.