diff --git a/.aws/petercat-preview.toml b/.aws/petercat-preview.toml index 3819a5da..c1b39016 100644 --- a/.aws/petercat-preview.toml +++ b/.aws/petercat-preview.toml @@ -7,4 +7,4 @@ region = "ap-northeast-1" confirm_changeset = true capabilities = "CAPABILITY_IAM" disable_rollback = true -image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo"] +image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/petercatapipreview49199518/fastapifunctionead79d0drepo", "SQSSubscriptionFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/petercatapipreview49199518/sqssubscriptionfunctiona2fc8b7drepo"] diff --git a/.aws/petercat-prod.toml b/.aws/petercat-prod.toml index 2c1d5d45..08f8429f 100644 --- a/.aws/petercat-prod.toml +++ b/.aws/petercat-prod.toml @@ -7,4 +7,4 @@ region = "ap-northeast-1" confirm_changeset = true capabilities = "CAPABILITY_IAM" disable_rollback = true -image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo"] +image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo", "SQSSubscriptionFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/sqssubscriptionfunctiona2fc8b7drepo"] diff --git a/server/Dockerfile b/server/Dockerfile deleted file mode 100644 index d1094fb0..00000000 --- a/server/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -# Using a slim version for a smaller base image -FROM python:3.11.6-slim-bullseye - -# Copy function code -COPY . ${LAMBDA_TASK_ROOT} -# from your project folder. -COPY requirements.txt . -RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}" -U --no-cache-dir - -EXPOSE 80 - -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80", "--workers", "6"] \ No newline at end of file diff --git a/server/data_class.py b/server/data_class.py index b159a9fe..8ea7fd8f 100644 --- a/server/data_class.py +++ b/server/data_class.py @@ -13,3 +13,8 @@ class Message(BaseModel): class ChatData(BaseModel): messages: list[Message] = [] prompt: str = None + +class ExecuteMessage(BaseModel): + type: str + repo: str + path: str \ No newline at end of file diff --git a/server/main.py b/server/main.py index d59e970a..1433725e 100644 --- a/server/main.py +++ b/server/main.py @@ -1,15 +1,17 @@ import os +import uvicorn + from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware -from data_class import DalleData, ChatData -from openai_api import dalle -from langchain_api import chat + from agent import stream from uilts.env import get_env_variable -import uvicorn +from data_class import ChatData, ExecuteMessage +from message_queue.queue_wrapper import delete_messages, get_queue, receive_messages, send_message, unpack_message open_api_key = get_env_variable("OPENAI_API_KEY") +sqs_queue_name = get_env_variable("PETERCAT_EX_SQS") app = FastAPI( title="Bo-meta Server", @@ -30,15 +32,15 @@ def read_root(): return {"Hello": "World"} -@app.post("/api/dall-e") -def run_img_generator(input_data: DalleData): - result = dalle.img_generator(input_data, open_api_key) - return result +@app.post("/api/message") +def send_sqs_message(message: ExecuteMessage): + queue = get_queue(sqs_queue_name) + return send_message(queue=queue, message=message) -@app.post("/api/chat") -def run_langchain_chat(input_data: ChatData): - result = chat.langchain_chat(input_data, open_api_key) - return result +@app.get("/api/message/receive") +def receive_sqs_message(): + queue = get_queue(sqs_queue_name) + return StreamingResponse(receive_messages(queue), media_type="text/event-stream") @app.post("/api/chat/stream", response_class=StreamingResponse) diff --git a/server/message_queue/queue_wrapper.py b/server/message_queue/queue_wrapper.py new file mode 100644 index 00000000..58c37977 --- /dev/null +++ b/server/message_queue/queue_wrapper.py @@ -0,0 +1,102 @@ +import json +import boto3 +from botocore.exceptions import ClientError +import logging + +from data_class import ExecuteMessage + +logger = logging.getLogger(__name__) +sqs = boto3.resource("sqs") + +def get_queue(name): + try: + queue = sqs.get_queue_by_name(QueueName=name) + except ClientError as error: + logger.exception("Couldn't get queue named %s.", name) + raise error + else: + return queue + +def send_message(queue, message: ExecuteMessage, message_attributes=None): + if not message_attributes: + message_attributes = { + "type": { "StringValue": message.type, "DataType": "String" }, + "repo": { "StringValue": message.repo, "DataType": "String" }, + "path": { "StringValue": message.path, "DataType": "String" }, + } + + message_body = encode_message(message=message) + + try: + response = queue.send_message( + MessageBody=message_body, MessageAttributes=message_attributes + ) + + except ClientError as error: + logger.exception("Send message failed: %s", message_body) + raise error + else: + return response + +async def receive_messages(queue, max_number = 10, wait_time = 2): + try: + messages = queue.receive_messages( + MessageAttributeNames=["All"], + MaxNumberOfMessages=max_number, + WaitTimeSeconds=wait_time, + ) + for msg in messages: + logger.info("Received message: %s: %s", msg.message_id, msg.body) + type, repo, path = unpack_message(msg) + yield json.dumps({ "type": type, "repo": repo, "path": path }) + delete_messages(queue, messages) + + except ClientError as error: + logger.exception("Couldn't receive messages from queue: %s", queue) + raise error + + +def delete_messages(queue, messages): + """ + Delete a batch of messages from a queue in a single request. + + :param queue: The queue from which to delete the messages. + :param messages: The list of messages to delete. + :return: The response from SQS that contains the list of successful and failed + message deletions. + """ + try: + entries = [ + {"Id": str(ind), "ReceiptHandle": msg.receipt_handle} + for ind, msg in enumerate(messages) + ] + response = queue.delete_messages(Entries=entries) + if "Successful" in response: + for msg_meta in response["Successful"]: + logger.info("Deleted %s", messages[int(msg_meta["Id"])].receipt_handle) + if "Failed" in response: + for msg_meta in response["Failed"]: + logger.warning( + "Could not delete %s", messages[int(msg_meta["Id"])].receipt_handle + ) + except ClientError: + logger.exception("Couldn't delete messages from queue %s", queue) + else: + return response + +def encode_message(message: ExecuteMessage): + return json.dumps({ + "type": message.type, + "repo": message.repo, + "path": message.path, + }) + +def unpack_message(msg): + if (msg is None): + return (f"", f"", f"") + else: + return ( + msg.message_attributes["type"]["StringValue"], + msg.message_attributes["repo"]["StringValue"], + msg.message_attributes["path"]["StringValue"], + ) \ No newline at end of file diff --git a/server/requirements.txt b/server/requirements.txt index c40a37db..df929a91 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -11,3 +11,4 @@ PyGithub python-multipart httpx[socks] load_dotenv +boto3>=1.26.79 diff --git a/subscriber/Dockerfile b/subscriber/Dockerfile new file mode 100644 index 00000000..0df1de9f --- /dev/null +++ b/subscriber/Dockerfile @@ -0,0 +1,13 @@ +FROM public.ecr.aws/lambda/python:3.12 + +# Copy requirements.txt +COPY requirements.txt ${LAMBDA_TASK_ROOT} + +# Install the specified packages +RUN pip install -r requirements.txt + +# Copy function code +COPY sqs_subscriber.py ${LAMBDA_TASK_ROOT} + +# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile) +CMD [ "sqs_subscriber.lambda_handler" ] \ No newline at end of file diff --git a/subscriber/requirements.txt b/subscriber/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/subscriber/sqs_subscriber.py b/subscriber/sqs_subscriber.py new file mode 100644 index 00000000..b1403e47 --- /dev/null +++ b/subscriber/sqs_subscriber.py @@ -0,0 +1,16 @@ +import json + +def lambda_handler(event, context): + if event: + batch_item_failures = [] + sqs_batch_response = {} + + for record in event["Records"]: + try: + # process message + print(f"receive message here") + except Exception as e: + batch_item_failures.append({"itemIdentifier": record['messageId']}) + + sqs_batch_response["batchItemFailures"] = batch_item_failures + return sqs_batch_response \ No newline at end of file diff --git a/template.yml b/template.yml index 717d7b57..521ce47b 100644 --- a/template.yml +++ b/template.yml @@ -33,10 +33,37 @@ Resources: DockerContext: server DockerTag: v1 + SQSSubscriptionFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Image + MemorySize: 512 + FunctionUrlConfig: + AuthType: NONE + Policies: + - Statement: + - Sid: BedrockInvokePolicy + Effect: Allow + Action: + - bedrock:InvokeModelWithResponseStream + Resource: '*' + Tracing: Active + Metadata: + Dockerfile: Dockerfile + DockerContext: subscriber + DockerTag: v1 + Outputs: FastAPIFunctionUrl: Description: "Function URL for FastAPI function" Value: !GetAtt FastAPIFunctionUrl.FunctionUrl FastAPIFunction: Description: "FastAPI Lambda Function ARN" - Value: !GetAtt FastAPIFunction.Arn \ No newline at end of file + Value: !GetAtt FastAPIFunction.Arn + + SQSSubscriptionFunctionUrl: + Description: "Function URL for SQS Subscriptio function" + Value: !GetAtt FastAPIFunctionUrl.FunctionUrl + SQSSubscriptionFunction: + Description: "SQS Subscription Function Lambda Function ARN" + Value: !GetAtt SQSSubscriptionFunction.Arn \ No newline at end of file