diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..494fc32 --- /dev/null +++ b/.flake8 @@ -0,0 +1,26 @@ +[flake8] +# it's not a bug that we aren't using all of hacking, ignore: +# F401: A module has been imported but is not used anywhere in the file +# F812: list comprehension redefines ... +# H101: Use TODO(NAME) +# H202: assertRaises Exception too broad +# H233: Python 3.x incompatible use of print operator +# H301: one import per line +# H306: imports not in alphabetical order (time, os) +# H401: docstring should not start with a space +# H403: multi line docstrings should end on a new line +# H404: multi line docstring should start without a leading new line +# H405: multi line docstring summary not separated with an empty line +# H501: Do not use self.__dict__ for string formatting +# +# E501: line too long (82 > 79 characters) +# E129: visually indented line with same indent as next logical line +# E126: continuation line over-indented for hanging indent +# E711: comparison to None should be ‘if cond is None:’ +# E999: SyntaxError: invalid syntax +# W504: line break after binary operator +# W503: line break before binary operator +# W605: invalid escape sequence +max-line-length = 248 +ignore = N813,F401,F812,H101,H202,H233,H301,H306,H401,H403,H404,H405,H501,E501,E999,E129,E126,E711,W503,W504,W605,N801,N806 +# exclude = web-servcie-base,fauths/tests diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..09d488a --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,128 @@ +# This is a basic workflow to help you get started with Actions + +name: CI + +# Controls when the workflow will run +on: + # Triggers the workflow on push or pull request events but only for the "main" branch + push: + branches: + - main + - dev + - 'v**' + - 'feature-**' + - latest + tags: + - '*' + workflow_dispatch: + inputs: + ref: + description: "Why trigger?" + required: true + type: string + +env: + IMAGE: "deepflowio-stella-agent-ce" + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + fetch-depth: 0 + + - name: Set Env + run: | + echo "IMAGE_TAG_PREFIX=${{ github.ref_name }}"|sed 's|main|latest|' >> $GITHUB_ENV + echo "IMAGE_TAG=$(git rev-list --count HEAD)" >> $GITHUB_ENV + + - name: Setup Pythons + uses: actions/setup-python@v5 + with: + python-version: '3.8' + + - name: Setup Docker Buildx + uses: docker/setup-buildx-action@v2 + with: + install: true + + - name: Log In To GitHub Docker Registry + uses: docker/login-action@v2 + with: + registry: "ghcr.io" + username: "${{ github.repository_owner }}" + password: "${{ secrets.GITHUB_TOKEN }}" + + - name: Log In To Docker Registry + uses: docker/login-action@v2 + with: + username: "deepflowce" + password: "${{ secrets.REGISTRY_PASS }}" + + - name: Build And Push deepflowio-stella-agent-ce Images + uses: docker/build-push-action@v2 + with: + context: . + push: true + file: Dockerfile + platforms: linux/amd64,linux/arm64 + tags: | + "ghcr.io/${{ github.repository_owner }}/deepflow-ce/${{ env.IMAGE }}:${{ env.IMAGE_TAG_PREFIX }}-${{ env.IMAGE_TAG }}" + "ghcr.io/${{ github.repository_owner }}/deepflow-ce/${{ env.IMAGE }}:${{ env.IMAGE_TAG_PREFIX }}" + "deepflowce/${{ env.IMAGE }}:${{ env.IMAGE_TAG_PREFIX }}" + + - name: Log In To ALIYUN HongKong Docker Registry + uses: docker/login-action@v2 + with: + registry: "registry.cn-hongkong.aliyuncs.com" + username: "${{ secrets.REGISTRY_ALIYUN_USER }}" + password: "${{ secrets.REGISTRY_PASS }}" + + - name: Build And Push deepflowio-stella-agent-ce Images to ALIYUN HongKong + uses: docker/build-push-action@v2 + with: + context: . + push: true + file: Dockerfile + platforms: linux/amd64,linux/arm64 + tags: | + "registry.cn-hongkong.aliyuncs.com/deepflow-ce/${{ env.IMAGE }}:${{ env.IMAGE_TAG_PREFIX }}-${{ env.IMAGE_TAG }}" + "registry.cn-hongkong.aliyuncs.com/deepflow-ce/${{ env.IMAGE }}:${{ env.IMAGE_TAG_PREFIX }}" + + - name: Log In To ALIYUN Docker Registry + uses: docker/login-action@v2 + with: + registry: "registry.cn-beijing.aliyuncs.com" + username: "${{ secrets.REGISTRY_ALIYUN_USER }}" + password: "${{ secrets.REGISTRY_PASS }}" + + - name: Build And Push deepflowio-stella-agent-ce Images To ALIYUN BeiJing + uses: docker/build-push-action@v2 + with: + context: . + push: true + file: Dockerfile + platforms: linux/amd64,linux/arm64 + tags: | + "registry.cn-beijing.aliyuncs.com/deepflow-ce/${{ env.IMAGE }}:${{ env.IMAGE_TAG_PREFIX }}-${{ env.IMAGE_TAG }}" + "registry.cn-beijing.aliyuncs.com/deepflow-ce/${{ env.IMAGE }}:${{ env.IMAGE_TAG_PREFIX }}" + + - name: Build tar.gz + run: | + mkdir -p release/stella-agent-ce + mv deploy df-llm-agent etc Dockerfile requirements3.txt stream.html -t release/stella-agent-ce + cd release + tar -zcvf deepflow-gui-grafana.tar.gz ./stella-agent-ce/* + rm -r ./stella-agent-ce + + - name: Release + if: startsWith(github.ref, 'refs/tags/') + uses: softprops/action-gh-release@v1 + with: + files: | + ./release/* + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7b62f12 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.vscode \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8851942 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,55 @@ +# 构建层 +FROM python:3.8-slim-buster AS builder +RUN apt-get update && \ + apt-get install --no-install-suggests \ + --no-install-recommends --yes \ + python3-venv \ + gcc \ + libpython3-dev \ + libpq-dev \ + default-libmysqlclient-dev \ + make \ + && \ + python3 -m venv /root/venv && \ + /root/venv/bin/pip install --upgrade pip --trusted-host mirrors.aliyun.com --index-url https://mirrors.aliyun.com/pypi/simple + +# Base 构建层 +FROM builder AS builder-venv-base +RUN /root/venv/bin/pip install cffi --trusted-host mirrors.aliyun.com --index-url https://mirrors.aliyun.com/pypi/simple/ + +# 自定义构建层 +FROM builder-venv-base AS builder-venv-custom +COPY requirements3.txt /root/requirements3.txt +RUN /root/venv/bin/pip install --disable-pip-version-check \ + --no-cache-dir \ + --trusted-host mirrors.aliyun.com \ + --index-url https://mirrors.aliyun.com/pypi/simple/ \ + -r /root/requirements3.txt + +FROM python:3.8-slim-buster AS runner + +WORKDIR /root/df-llm-agent/ + +RUN apt-get update && apt-get install --no-install-suggests --no-install-recommends --yes gcc libpython3-dev +COPY --from=builder-venv-custom /usr/lib/*-linux-gnu/*.so* /usr/lib/x86_64-linux-gnu/ +RUN mkdir /usr/lib/x86_64-linux-gnu/mariadb19 +COPY --from=builder-venv-custom /usr/lib/*-linux-gnu/mariadb19/ /usr/lib/x86_64-linux-gnu/mariadb19/ +COPY --from=builder-venv-custom /root/venv /root/venv + +# Copy code +## 复制代码 +COPY ./etc/df-llm-agent.yaml /etc/web/ +COPY ./df-llm-agent /root/df-llm-agent/ + +RUN chmod +x /root/df-llm-agent/py2c.sh + +RUN /root/df-llm-agent/py2c.sh + +RUN ls -la /root/df-llm-agent + +## dockerfile里的db_version 和issu里最大版本的x.x.x.x.sql 一致 +ENV DB_VERSION=1.0.0.0 + +## Run +CMD /root/venv/bin/python3 -u /root/df-llm-agent/app.py + diff --git a/README.md b/README.md new file mode 100644 index 0000000..25577ac --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +# stella-agent-ce +gpt agent + +# 运行服务 +进入deploy/templates 目录下,执行命令 + +``` +kubectl create -f ./configmap.yaml +kubectl create -f ./deployment.yaml +kubectl create -f ./service.yaml +``` + +# 本地测试 + +python3 -m pip install virtualenv + +/usr/local/bin/virtualenv --clear ./venv + +./venv/bin/pip3 install -r ./requirements3.txt + +./venv/bin/python3 ./app.py diff --git a/commit-template b/commit-template new file mode 100644 index 0000000..338ff8d --- /dev/null +++ b/commit-template @@ -0,0 +1,26 @@ +[feature|bugfix] commit message title + +**Phenomenon and reproduction steps** + +(Describe the phenomenon and scenario of the bug) + +**Root cause and solution** + +(Describe the cause of the bug and corresponding solution) + +**Impactions** + +(Visible changes after this fix) + +**Test method** + +(Test method to ensure the bug is fixed) + +**Affected branch(es)** + +* main + +**Checklist** + +- [ ] Dependencies update required +- [ ] Common bug (similar problem in other repo) diff --git a/deploy/.helmignore b/deploy/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/deploy/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/deploy/Chart.yaml b/deploy/Chart.yaml new file mode 100644 index 0000000..faeaa14 --- /dev/null +++ b/deploy/Chart.yaml @@ -0,0 +1,7 @@ +apiVersion: v2 +name: df-llm-agent +description: A Helm chart for Kubernetes + +type: application + +version: 6.1.6 diff --git a/deploy/templates/configmap.yaml b/deploy/templates/configmap.yaml new file mode 100644 index 0000000..a590fed --- /dev/null +++ b/deploy/templates/configmap.yaml @@ -0,0 +1,66 @@ +# df-llm-agent Config +kind: ConfigMap +apiVersion: v1 +metadata: + name: df-llm-agent + namespace: deepflow +data: + df-llm-agent.yaml: |- + daemon: True + api_timeout: 500 + sql_show: False + log_file: /var/log/df-llm-agent.log + log_level: info + instance_path: /root/df-llm-agent + + redis: + host: + - redis + cluster_enabled: False # True,False + port: 6379 + db: 6 + password: "password123" + mysql: + user_name: root + user_password: password123 + host: mysql + port: 30130 + database: deepflow_llm + ai: + enable: False # True,False + platforms: + - + enable: False + platform: "azure" + model: "gpt" + api_type: "azure" + api_key: "" + api_base: "" + api_version: "" + engine_name: + - "" + - + enable: False + platform: "aliyun" + model: "dashscope" + api_key: "" + engine_name: + - "qwen-turbo" + - "qwen-plus" + - + enable: False + platform: "baidu" + model: "qianfan" + api_key: "" + api_secre: "" + engine_name: + - "ERNIE-Bot" + - "ERNIE-Bot-turbo" + - + enable: False + platform: "zhipu" + model: "zhipuai" + api_key: "" + engine_name: + - "chatglm_turbo" + diff --git a/deploy/templates/deployment.yaml b/deploy/templates/deployment.yaml new file mode 100644 index 0000000..6574ff3 --- /dev/null +++ b/deploy/templates/deployment.yaml @@ -0,0 +1,41 @@ +--- +kind: Deployment +apiVersion: apps/v1 +metadata: + name: df-llm-agent-deployment + namespace: deepflow + labels: + component: df-llm-agent +spec: + replicas: 1 + selector: + matchLabels: + component: df-llm-agent + template: + metadata: + labels: + component: df-llm-agent + spec: + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + containers: + - name: df-llm-agent + image: "hub.deepflow.yunshan.net/dev/df-llm-agent:latest" + imagePullPolicy: Always + volumeMounts: + - name: debug-path + mountPath: /root/debug + - mountPath: /etc/web/df-llm-agent.yaml + name: web-volumes-df-llm-agent + subPath: df-llm-agent.yaml + volumes: + - name: debug-path + hostPath: + type: DirectoryOrCreate + path: /usr/local/deepflow/debug/ + - name: web-volumes-df-llm-agent + configMap: + name: df-llm-agent + items: + - key: df-llm-agent.yaml + path: df-llm-agent.yaml diff --git a/deploy/templates/deployment_mysql.yaml b/deploy/templates/deployment_mysql.yaml new file mode 100644 index 0000000..8feb903 --- /dev/null +++ b/deploy/templates/deployment_mysql.yaml @@ -0,0 +1,30 @@ +--- +kind: Deployment +apiVersion: apps/v1 +metadata: + name: df-llm-agent-deployment-mysql + namespace: cx-test + labels: + component: df-llm-agent-mysql +spec: + replicas: 1 + selector: + matchLabels: + component: df-llm-agent-mysql + template: + metadata: + labels: + component: df-llm-agent-mysql + spec: + hostNetwork: false + dnsPolicy: ClusterFirstWithHostNet + containers: + - name: df-llm-agent-mysql + image: "mysql:8.0.26" + imagePullPolicy: Always + env: + - name: MYSQL_ROOT_PASSWORD + value: password123 + ports: + - containerPort: 3306 + name: mysql diff --git a/deploy/templates/service.yaml b/deploy/templates/service.yaml new file mode 100644 index 0000000..8086457 --- /dev/null +++ b/deploy/templates/service.yaml @@ -0,0 +1,17 @@ +--- +# df-llm-agent Service +apiVersion: v1 +kind: Service +metadata: + name: df-llm-agent + namespace: deepflow +spec: + ports: + - port: 20831 + nodePort: 30831 + targetPort: 20831 + protocol: TCP + name: http + selector: + component: df-llm-agent + type: NodePort diff --git a/deploy/templates/service_mysql.yaml b/deploy/templates/service_mysql.yaml new file mode 100644 index 0000000..9c98b36 --- /dev/null +++ b/deploy/templates/service_mysql.yaml @@ -0,0 +1,17 @@ +--- +# df-llm-agent Service +apiVersion: v1 +kind: Service +metadata: + name: df-llm-agent-mysql + namespace: cx-test +spec: + ports: + - port: 30306 + nodePort: 30306 + targetPort: 3306 + protocol: TCP + name: mysql + selector: + component: df-llm-agent-mysql + type: NodePort diff --git a/df-llm-agent/app.py b/df-llm-agent/app.py new file mode 100644 index 0000000..682367e --- /dev/null +++ b/df-llm-agent/app.py @@ -0,0 +1,34 @@ +import asyncio +from server import app +from const import HOST, PORT +import multiprocessing +from config import config +from database.db_init import db_init +from utils import logger +import argparse + +logger_manager = logger.LoggerManager('df-llm-agent', config.log_level, config.log_file) +logger_manager.init_logger() + +log = logger.getLogger(__name__) + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + parser.add_argument("--port", type=int, help="test port") + args = parser.parse_args() + + if args.port: + APP_PORT = args.port + else: + APP_PORT = PORT + + # db 初始化&issu执行 + asyncio.run(db_init.db_init()) + + try: + workers = multiprocessing.cpu_count() + log.info(f'======== Starting df-llm-agent application listen {HOST}:{APP_PORT}, workers={workers} ...========') + app.run(host=HOST, port=APP_PORT, workers=workers, access_log=True) + except KeyboardInterrupt: + log.info("ctrl+c Stopping df-llm-agent application...") diff --git a/df-llm-agent/chat_record_app/__init__.py b/df-llm-agent/chat_record_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/df-llm-agent/chat_record_app/app.py b/df-llm-agent/chat_record_app/app.py new file mode 100644 index 0000000..8251df2 --- /dev/null +++ b/df-llm-agent/chat_record_app/app.py @@ -0,0 +1,45 @@ +from sanic import Blueprint +from chat_record_app.worker import app_worker +from utils.response_tools import wrap_resp +from const import API_PREFIX + +chat_record_app = Blueprint("chat_record", url_prefix=API_PREFIX) + +# +# 对话必须属于一个主题和用户 +# + +# 对话记录 + + +# 新增对话记录(通过请求gpt时自动写入,不再提供接口) +# @chat_record_app.route("/chat_record", methods=["POST"]) +# @wrap_resp +# async def chat_record_add(request): +# worker = app_worker(request) +# # 参数如果不带主题id,先创建主题 +# res = await worker.chat_record_add() +# return res + + +# 获取所有主题,或一个主题下的所有对话,并带上评分(如果存在) +@chat_record_app.route("/chat_record", name="chat_record_list_all") +@chat_record_app.route("/chat_record/") +@wrap_resp +async def chat_record_list11(request, id=0): + worker = app_worker(request) + res = await worker.chat_record_list(id) + return res + + +# 对当前用户下的单次对话或一个主题评分 + + +# 对单次对话评分或一个主题评分 +@chat_record_app.route("/chat_record/score/", methods=["POST"], name="chat_record_score_to_topic") +@chat_record_app.route("/chat_record/score//", methods=["POST"]) +@wrap_resp +async def chat_record_score_add(request, chat_topic_id=0, chat_id=0): + worker = app_worker(request) + res = await worker.chat_record_score_add(chat_topic_id, chat_id) + return res diff --git a/df-llm-agent/chat_record_app/chat_record.py b/df-llm-agent/chat_record_app/chat_record.py new file mode 100644 index 0000000..d03683c --- /dev/null +++ b/df-llm-agent/chat_record_app/chat_record.py @@ -0,0 +1,306 @@ +from tortoise.transactions import atomic, in_transaction +from tortoise.exceptions import BaseORMException, OperationalError +from tortoise.expressions import Q +from tortoise.functions import Coalesce, Count, Length, Lower, Min, Sum, Trim, Upper +from exception import BadRequestException +import const +from database import db_models +from utils import logger +import traceback +import datetime +import json + +log = logger.getLogger(__name__) + + +class chatRecordWorker(object): + + # 校验数据 + @staticmethod + async def verify_data(data): + pass + + # 对话记录 + # 新增 + # 调用agent前提前记录,output 默认为空,待agent返回数据后在更新该会话 + @classmethod + async def chat_record_add(cls, user_info, args, data): + + user_id = user_info.get("ID", 0) + + # 如果不带主题id,先创建主题,主题名称截取input前255个字符 + chat_topic_id = data.get("chat_topic_id", 0) + # 对话主题类型,暂时默认=1 + type = data.get("type", 1) + + input = data.get("input", "") + output = data.get("output", "") + + # 对话用的llm引擎 + engine = data.get("engine", "") + + try: + # 有主题id + if chat_topic_id > 0: + + where_chat_topic_info = {} + where_chat_topic_info["user_id"] = user_id + where_chat_topic_info["id"] = chat_topic_id + + chat_topic_exist = await db_models.ChatTopic.filter(**where_chat_topic_info).count() + + # 主题存在 + if chat_topic_exist > 0: + chat_data_info = {} + chat_data_info["chat_topic_id"] = chat_topic_id + chat_data_info["input"] = input + chat_data_info["output"] = output + chat_data_info["engine"] = engine + + res_chat = await db_models.Chat.create(**chat_data_info) + + res_chat_id = res_chat.id + else: + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 关联的主题不存在", f"{const.DATA_NOT_FOUND}: 关联的主题不存在") + + else: + time_now = datetime.datetime.now().strftime(const.DATE_PATTEN) + async with in_transaction() as connection: + + # 创建主题 + chat_topic_info = {} + chat_topic_info["user_id"] = user_id + chat_topic_info["name"] = f"会话-{time_now}" + chat_topic_info["type"] = type + + res_chat_topic = await db_models.ChatTopic.create(using_db=connection, **chat_topic_info) + + # 记录对话 + chat_data_info = {} + chat_data_info["chat_topic_id"] = res_chat_topic.id + chat_data_info["input"] = input + chat_data_info["output"] = output + chat_data_info["engine"] = engine + + res_chat = await db_models.Chat.create(using_db=connection, **chat_data_info) + + # 返回主题id + chat_topic_id = res_chat_topic.id + res_chat_id = res_chat.id + + res = {"res_chat_topic_id": chat_topic_id, "res_chat_id": res_chat_id} + + log.info(f"用户:{user_id}, 添加对话记录, 信息: {res}") + + return res + + except BadRequestException as e: + raise BadRequestException(e.status, e.message, e.err) + + except Exception as e: + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + # 更新 + # agent返回数据后调用更新会话 + @classmethod + async def chat_record_update(cls, user_info, res_chat_id, data): + + user_id = user_info.get("ID", 0) + where_info = {} + where_info["id"] = res_chat_id + + data_info = {} + data_info["output"] = data.get("output", []) + data_info["output_all"] = data.get("output_all", []) + data_info["updated_at"] = datetime.datetime.now().strftime(const.DATE_PATTEN) + + try: + res = await db_models.Chat.get(**where_info) + if res: + await db_models.Chat.filter(**where_info).update(**data_info) + else: + raise Exception(f"{const.DATA_NOT_FOUND}") + + except Exception as e: + log.error(f"用户:{user_id}, 更新会话失败: {e}, 待更新信息: {data}") + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + else: + log.info(f"用户:{user_id}, 更新会话") + return True + + # 获取所有主题或某个主题下的所有对话 + @classmethod + async def chat_record_list(cls, user_info, chat_topic_id=0): + + res_all = [] + try: + # 某个主题下的对话列表 + if chat_topic_id > 0: + + where_chat_topic_info = {} + where_chat_topic_info["user_id"] = user_info.get("ID", 0) + where_chat_topic_info["id"] = chat_topic_id + + chat_topic_exist = await db_models.ChatTopic.filter(**where_chat_topic_info).count() + + # 主题存在,获取该主题下所有对话 + if chat_topic_exist > 0: + data_info = {} + data_info["chat_topic_id"] = chat_topic_id + res_chat = await db_models.Chat.filter(**data_info).all() + + if res_chat: + # 获取所有对话id + chat_ids = [] + for v in res_chat: + v_dict = dict(v) + chat_ids.append(v_dict["id"]) + + # 获取所有属于这些对话的评分 + res_score_dict = {} + if chat_ids: + query = Q(obj_id__in=chat_ids) & Q(type=2) + res_score = await db_models.Score.filter(query).all() + + for v in res_score: + v_dict = dict(v) + res_score_dict[v_dict["obj_id"]] = v_dict + + # 所有对话加上获取到的评分 + chat_list = [] + for v in res_chat: + v_dict = dict(v) + chat_id = v_dict["id"] + + if chat_id in res_score_dict: + v_dict["score"] = res_score_dict[chat_id]["score"] + v_dict["feedback"] = res_score_dict[chat_id]["feedback"] + v_dict["user_name"] = res_score_dict[chat_id]["user_name"] + else: + v_dict["score"] = "" + v_dict["feedback"] = "" + v_dict["user_name"] = "" + + chat_list.append(v_dict) + + # 赋值 + res_all = chat_list + + else: + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 关联的主题不存在", f"{const.DATA_NOT_FOUND}: 关联的主题不存在") + else: + # 所有主题 + data_info = {} + data_info["user_id"] = user_info.get("ID", 0) + res_chat_topic = await db_models.ChatTopic.filter(**data_info).all() + + # 主题存在 + if res_chat_topic: + # 获取所有主题id + chat_topic_ids = [] + for v in res_chat_topic: + v_dict = dict(v) + chat_topic_ids.append(v_dict["id"]) + + # 获取所有属于这些对话的评分 + res_score_dict = {} + if chat_topic_ids: + query = Q(obj_id__in=chat_topic_ids) & Q(type=1) + res_score = await db_models.Score.filter(query).all() + + for v in res_score: + v_dict = dict(v) + res_score_dict[v_dict["obj_id"]] = v_dict + + # 所有主题加上获取到的评分 + chat_topic_list = [] + for v in res_chat_topic: + v_dict = dict(v) + chat_id = v_dict["id"] + + if chat_id in res_score_dict: + v_dict["score"] = res_score_dict[chat_id]["score"] + v_dict["feedback"] = res_score_dict[chat_id]["feedback"] + v_dict["user_name"] = res_score_dict[chat_id]["user_name"] + else: + v_dict["score"] = "" + v_dict["feedback"] = "" + v_dict["user_name"] = "" + + chat_topic_list.append(v_dict) + + # 赋值 + res_all = chat_topic_list + + except BadRequestException as e: + raise BadRequestException(e.status, e.message, e.err) + + except Exception as e: + traceback.print_exc() + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + return res_all + + # 评分 + # 新增 + @classmethod + async def chat_record_score_add(cls, user_info, args, data, chat_topic_id, chat_id): + + user_id = user_info.get("ID", 0) + + data_info = {} + data_info["user_id"] = user_id + data_info["score"] = data.get("score", 0) + data_info["feedback"] = data.get("feedback", '') + data_info["user_name"] = data.get("user_name", '') + + try: + + if not isinstance(data_info["score"], int) or data_info["score"] < 0 or data_info["score"] > 100: + raise BadRequestException("INVALID_POST_DATA", f"{const.INVALID_POST_DATA}: 评分值错误") + + if data_info["feedback"] == "" or data_info["user_name"] == "": + raise BadRequestException("INVALID_POST_DATA", f"{const.INVALID_POST_DATA}: 反馈内容和用户不能为空") + + # 主题存在与否 + where_chat_topic_info = {} + where_chat_topic_info["user_id"] = user_id + where_chat_topic_info["id"] = chat_topic_id + chat_topic_exist = await db_models.ChatTopic.filter(**where_chat_topic_info).count() + + # 不存在 + if chat_topic_exist <= 0: + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 主题不存在") + + # 对话id + if chat_id > 0: + # 对话存在与否 + where_chat_info = {} + where_chat_info["id"] = chat_id + where_chat_info["chat_topic_id"] = chat_topic_id + chat_exist = await db_models.Chat.filter(**where_chat_info).count() + + # 不存在 + if chat_exist <= 0: + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 对话不存在") + + data_info["type"] = 2 + data_info["obj_id"] = chat_id + else: + data_info["type"] = 1 + data_info["obj_id"] = chat_topic_id + + log.info(f"用户:{user_id}, 添加评分,数据: {data_info}") + + except BadRequestException as e: + raise BadRequestException(e.status, e.message, e.err) + + try: + await db_models.Score.create(**data_info) + except Exception as e: + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + return True + + +chat_record_worker = chatRecordWorker() diff --git a/df-llm-agent/chat_record_app/worker.py b/df-llm-agent/chat_record_app/worker.py new file mode 100644 index 0000000..be3c085 --- /dev/null +++ b/df-llm-agent/chat_record_app/worker.py @@ -0,0 +1,49 @@ +from exception import BadRequestException +import const +from chat_record_app.chat_record import chat_record_worker +from config import config +from utils.curl_tools import curl_tools +from utils import logger +import json +import time + +log = logger.getLogger(__name__) + + +class app_worker(object): + + args = data = user_info = None + + def __init__(self, request): + app_worker.args = request.args + if app_worker.args: + for k, v in self.args.items(): + app_worker.args[k] = [i for i in v] + app_worker.data = request.json + app_worker.user_info = request.ctx.user + + # 对话记录 + @classmethod + async def chat_record_add(cls): + # 校验todoing + return await chat_record_worker.chat_record_add(cls.user_info, cls.args, cls.data) + + @classmethod + async def chat_record_list(cls, id=0): + # 校验todoing + return await chat_record_worker.chat_record_list(cls.user_info, id) + + # 评分 + @classmethod + async def chat_record_score_add(cls, chat_topic_id=0, chat_id=0): + # 校验todoing + return await chat_record_worker.chat_record_score_add( + cls.user_info, + cls.args, + cls.data, + chat_topic_id, + chat_id, + ) + + +# worker = app_worker() diff --git a/df-llm-agent/config.py b/df-llm-agent/config.py new file mode 100644 index 0000000..23e47d1 --- /dev/null +++ b/df-llm-agent/config.py @@ -0,0 +1,54 @@ +import sys +import traceback + +from utils.file_read_tools import file_read_tools +import const + + +class Config(object): + + def __init__(self, _yml=None): + try: + if not _yml: + yml = file_read_tools.yaml_read(const.YML_FILE) + else: + yml = _yml + + self.daemon = yml.get('daemon', True) + self.api_timeout = yml.get('api_timeout', 500) + self.sql_show = yml.get('sql_show', False) + self.log_file = yml.get('log_file', "/var/log/web_all.log") + self.log_level = yml.get('log_level', 'info') + + self.app_key = None + + self.instance_path = yml.get('instance_path', None) + + redis = yml.get('redis', {}) + mysql = yml.get('mysql', {}) + + self.redis_cluster = redis.get('cluster_enabled', False) + self.redis_host = redis.get('host', '127.0.0.1') + self.redis_port = redis.get('port', 6379) + self.redis_db = redis.get('db', 7) + self.redis_password = redis.get('password', 'password123') + + self.mysql_user_name = mysql.get('user_name', 'root') + self.mysql_user_password = mysql.get('user_password', 'password123') + self.mysql_host = mysql.get('host', '127.0.0.1') + self.mysql_port = mysql.get('port', 20130) + self.mysql_database = mysql.get('database', 'deepflow_llm') + + ai = yml.get('ai', {}) + if ai: + enable = ai.get('enable', False) + if enable: + self.platforms = ai.get('platforms', []) + + except Exception as e: + traceback.print_exc() + print("配置文件解析错误: %s" % e) + sys.exit(1) + + +config = Config() diff --git a/df-llm-agent/const.py b/df-llm-agent/const.py new file mode 100644 index 0000000..93c01b3 --- /dev/null +++ b/df-llm-agent/const.py @@ -0,0 +1,42 @@ +# df-llm-agent + +HOST = "0.0.0.0" +PORT = 20831 + +WORKER_NUMBER = 10 +YML_FILE = "/etc/web/df-llm-agent.yaml" + +# API +API_VERSION = "v1" +API_PREFIX = "/" + API_VERSION + +# http +JSON_TYPE = "application/json; charset=utf-8" + +# 错误码 +SUCCESS = "成功" +FAIL = "失败" +SERVER_ERROR = "系统错误" + +APP_ERROR = "APP返回错误" +APP_TIMEOUT = "APP请求超时" + +SQL_ERROR = "SQL错误" + +INVALID_PARAMETERS = "参数格式错误" +INVALID_POST_DATA = "数据格式错误" +JSON_FORMAT_ERR = "json格式错误" + +DATA_NOT_FOUND = "数据不存在" +DATA_IN_USE = "数据被使用" + +# 状态码 +HTTP_OK = 200 +HTTP_PARTIAL_RESULT = 206 +HTTP_BAD_REQUEST = 400 +HTTP_ACCESSDENIED = 403 +HTTP_NOT_FOUND = 404 +HTTP_NOT_ALLOWED = 405 +HTTP_INTERNAL_SERVER_ERROR = 500 + +DATE_PATTEN = "%Y-%m-%d %H:%M:%S" diff --git a/df-llm-agent/data.py b/df-llm-agent/data.py new file mode 100644 index 0000000..f18dd00 --- /dev/null +++ b/df-llm-agent/data.py @@ -0,0 +1,54 @@ +from tortoise import Tortoise, run_async, connections +from config import config +from tzlocal import get_localzone +import urllib.parse + +mysql_host = config.mysql_host +mysql_port = config.mysql_port +mysql_database = config.mysql_database + +mysql_user_name = config.mysql_user_name +mysql_user_password = urllib.parse.quote_plus(config.mysql_user_password) + + +async def init_db(): + dbconfig = { + 'connections': { + 'default': { + 'engine': 'tortoise.backends.mysql', + 'credentials': { + 'host': mysql_host, + 'port': mysql_port, + 'user': mysql_user_name, + 'password': mysql_user_password, + 'database': mysql_database, + 'connect_timeout': 60, + 'maxsize': 10, + 'echo': True #config.sql_show + } + } + }, + 'apps': { + 'llm_agent': { + 'models': ['database.db_models'] + } + }, + 'timezone': str(get_localzone()) + } + + await Tortoise.init(config=dbconfig) + + # 不要依据表结构来创建表,已经提前创建好 + # await Tortoise.generate_schemas() + + +async def close_db(): + await connections.close_all() + + +def init_db_sync(): + run_async(init_db()) + + +def close_db_sync(): + run_async(close_db()) diff --git a/df-llm-agent/database/__init__.py b/df-llm-agent/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/df-llm-agent/database/cache.py b/df-llm-agent/database/cache.py new file mode 100644 index 0000000..c3e7ee2 --- /dev/null +++ b/df-llm-agent/database/cache.py @@ -0,0 +1,53 @@ +from aredis import StrictRedis +from aredis import StrictRedisCluster +from aredis.exceptions import RedisError + +import urllib.parse +import aiofiles +import os +import sys +import importlib +from exception import BadRequestException +from utils import logger +from config import config +import const +import traceback + +log = logger.getLogger(__name__) + + +class cache(object): + + redis_cluster = config.redis_cluster + redis_host = config.redis_host + redis_port = config.redis_port + redis_db = config.redis_db + redis_password = urllib.parse.quote_plus(config.redis_password) + + @classmethod + async def GetCacheServer(cls): + # 集群 + if cls.redis_cluster: + startup_nodes = [] + for host in cls.redis_host: + node = {'host': f"{host}", 'port': cls.redis_port} + startup_nodes.append(node) + client = StrictRedisCluster(startup_nodes=startup_nodes, password=cls.redis_password, decode_responses=True) + else: + if isinstance(cls.redis_host, list): + redis_host = cls.redis_host[0] + else: + redis_host = cls.redis_host + + client = StrictRedis(host=f"{redis_host}", port=cls.redis_port, db=cls.redis_db, password=cls.redis_password, decode_responses=True) + + try: + res = await client.ping() + if not res: + raise BadRequestException("FAIL", f"{const.FAIL}: redis ping 返回 false", f"{const.FAIL}: {traceback.format_exc()}") + except Exception: + raise BadRequestException("FAIL", f"{const.FAIL}: redis ping 失败", f"{const.FAIL}: {traceback.format_exc()}") + return client + + +cache = cache() diff --git a/df-llm-agent/database/database.py b/df-llm-agent/database/database.py new file mode 100644 index 0000000..3932128 --- /dev/null +++ b/df-llm-agent/database/database.py @@ -0,0 +1,231 @@ +from tortoise import Tortoise, run_async +from tzlocal import get_localzone +import urllib.parse +import aiofiles +import os +import sys +import importlib + +from utils import logger +from config import config + +mysql_host = config.mysql_host +mysql_port = config.mysql_port +mysql_database = config.mysql_database + +mysql_user_name = config.mysql_user_name +mysql_user_password = urllib.parse.quote_plus(config.mysql_user_password) + +env_db_version = os.getenv('DB_VERSION') + +log = logger.getLogger(__name__) + + +class database(object): + + @classmethod + async def GetConnectionWithoutDatabase(cls): + + if not env_db_version: + log.error("环境变量中缺少DB_VERSION") + sys.exit(1) + + mysql_conn = f"mysql://{mysql_user_name}:{mysql_user_password}@{mysql_host}:{mysql_port}/" + await Tortoise.init(db_url=mysql_conn, modules={'models': []}, timezone=str(get_localzone())) + return Tortoise.get_connection("default") + + @classmethod + async def CreateDatabaseIfNotExists(cls, client): + + affected_rows, result = await client.execute_query("show databases") + # 提取所有数据库 + databases = [] + for item in result: + databases.append(item['Database']) + + if mysql_database not in databases: + affected_rows, result = await client.execute_query(f"create database {mysql_database}") + log.info(f"初始化创建数据库: {mysql_database} 成功") + return True + else: + return False + + @classmethod + async def GetConnectionWithDatabase(cls): + db_conn = f"mysql://{mysql_user_name}:{mysql_user_password}@{mysql_host}:{mysql_port}/{mysql_database}" + await Tortoise.init(db_url=db_conn, modules={'models': []}, timezone=str(get_localzone())) + return Tortoise.get_connection("default") + + @classmethod + async def InitTablesIfNotExists(cls, client): + + sql_path = f"{config.instance_path}/database/init.sql" + # sql_path = "/root/df-llm-agent-test/df-llm-agent/database/init.sql" + + if os.path.exists(sql_path): + async with aiofiles.open(sql_path) as f: + sql = await f.read() + + affected_rows, result = await client.execute_query(sql) + affected_rows, result = await client.execute_query(f"INSERT INTO db_version (`version`) VALUES ('{env_db_version}');") + log.info(f"初始化表成功,sql路径: {sql_path}") + log.info(f"version版本为:{env_db_version}") + else: + raise Exception(f"初始化sql: {sql_path} 不存在,终止运行") + + @classmethod + async def DropDatabaseIfInitTablesFailed(cls, client): + try: + await cls.InitTablesIfNotExists(client) + except Exception as e: + await cls.DropDatabase(client) + msg = f"首次安装服务,初始化表失败,回滚并删除数据库, 错误: {e}" + raise Exception(msg) + + # @classmethod + # async def InitTablesWithoutRollBack(cls, client): + # try: + # await cls.InitTablesIfNotExists(client) + # except Exception as e: + # log.info(f"初始化表失败, 错误: {e}") + + @classmethod + async def InitTables(cls, client): + # 执行 SQL 语句 + sql = f"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='{mysql_database}' AND TABLE_NAME='db_version'" + affected_rows, results = await client.execute_query(sql) + + # 表存在 + if affected_rows > 0: + log.info("数据库存在,db_version表存在,执行issu") + return True + else: + log.info("数据库存在,db_version表不存在,初始化表") + await cls.InitTablesIfNotExists(client) + return False + + @classmethod + async def DropDatabase(cls, client): + affected_rows, result = await client.execute_query(f"drop database {mysql_database}") + + # 获取用来升级的sql文件名(版本号.小版本号) + @classmethod + async def getAscSortedNextVersions(cls, up_sql_path, db_version): + # 获取用来升级的sql文件名(版本号.小版本号) + filename_list = [] + for _file in os.listdir(up_sql_path): + filename = os.path.splitext(_file)[0] + filename_suffix = os.path.splitext(_file)[1] + # print(f"{filename},{filename_suffix}") + # 非sql跳过 + if "sql" not in filename_suffix: + continue + filename_list.append(filename) + + # 防止 db里记录了6.1.1.0 ,但是issu 里从6.1.1.1获取issu的list,导致db记录的值不存在list里的index里导致报错 + if db_version and f"{db_version}" not in filename_list: + filename_list.append(f"{db_version}") + + # 没有需要执行的issu 或者获取不到db里系统版本号 + if not filename_list or not db_version: + return [] + + # 获取所有已经存在的issu + # 补齐0.0.0.0格式 + # 版本排序 + n_filename_list = [] + # print(filename_list) + for v in filename_list: + v_l = v.split(".") + v_l = list(map(int, v.split('.'))) + if len(v_l) < 3 or len(v_l) > 4: + continue + + if len(v_l) == 3: + v_l.append(0) + + if v_l not in n_filename_list: + n_filename_list.append(v_l) + + n_filename_list.sort() + + # 转成["6.1.1.0","6.1.2.0"] 格式 + n_filename = list(map(lambda xs: '.'.join(str(x) for x in xs), n_filename_list)) + + # db里已经到的版本,前面的issu全部跳过,只执行比他大的issu + try: + v_index = n_filename.index(db_version) + except ValueError as e: + log.error(f"系统里使用了不存在的版本:{db_version}, 错误:{e},跳过issu升级") + sys.exit(1) + e_n_filename = n_filename[v_index + 1::] + if len(e_n_filename) <= 0: + # db里已经是最新的版本了,返回最后一个版本 + return n_filename[-1] + else: + return e_n_filename + + @classmethod + async def ExecuteIssus(cls, client): + + # 正常不会出现,除非人为的置空 + version = "" + # 数据库记录的版本 + affected_rows, results = await client.execute_query("select version from db_version limit 1") + if affected_rows > 0: + version = results[0]["version"] + + db_sql_path = f"{config.instance_path}/database/issu" + # db_sql_path = "/root/df-llm-agent-test/df-llm-agent/database/issu" + + # + up_issu_list = await cls.getAscSortedNextVersions(db_sql_path, version) + + # 没有需要执行的issu + if len(up_issu_list) <= 0 or isinstance(up_issu_list, str): + if version: + # + if version != env_db_version: + # affected_rows, result = await client.execute_query(f"UPDATE db_version SET version='{env_db_version}', updated_at= now();") + log.info(f"没有需要执行的issu且数据库记录中的version版本: {version} 和环境变量中的版本: {env_db_version} 不一致。请检查服务镜像是否回退、手动修改数据库记录中的version版本") + else: + print("当前issu版本已经最新") + else: + affected_rows, result = await client.execute_query(f"UPDATE db_version SET version='{env_db_version}', updated_at= now();") + log.info(f"没有需要执行的issu且数据库记录中的version为空, 和环境变量中的版本: {env_db_version} 不一致, 更新db_version到环境变量版本") + + return True + + # 多个issu需要循环执行升级 + for _version in up_issu_list: + + # 获取该版本的issu的sql(升级时可能没有issu) + sql_path = f"{db_sql_path}/{_version}.sql" + msg = f"issu文件: {sql_path}" + + if os.path.exists(sql_path): + async with aiofiles.open(sql_path) as f: + sql = await f.read() + + try: + affected_rows, result = await client.execute_query(sql) + log.info(f"执行成功, {msg}") + except Exception as e: + log.info(f"执行失败, {msg}, 错误:{e}") + sys.exit(1) + else: + log.info(f"{msg} 不存在, 跳过") + + # 是否有脚本 + + __version = _version.replace(".", "") + + script_file_path = f"{db_sql_path}/{__version}_script.py" + + if os.path.exists(script_file_path): + log.info(f"脚本执行成功, 文件路径: {script_file_path}") + script_module = importlib.import_module(f"database.issu.{__version}_script") + script_module.script_service.run(client) + + +database = database() diff --git a/df-llm-agent/database/db_init.py b/df-llm-agent/database/db_init.py new file mode 100644 index 0000000..4c8f3ad --- /dev/null +++ b/df-llm-agent/database/db_init.py @@ -0,0 +1,57 @@ +from tortoise import Tortoise, run_async +from utils import logger +from database.database import database +import sys + + +class dbTools(object): + + @classmethod + async def db_init(cls): + # 无db下连接mysql + try: + client = await database.GetConnectionWithoutDatabase() + except Exception as e: + print(f"连接mysql失败: {e}") + sys.exit(1) + + # 数据库不存在则创建否则跳过 + try: + first_install = await database.CreateDatabaseIfNotExists(client) + except Exception as e: + print(f"database不存在, 创建失败: {e}") + sys.exit(1) + finally: + await Tortoise.close_connections() + + # 使用db连接mysql + try: + client = await database.GetConnectionWithDatabase() + except Exception as e: + print(f"连接数据库失败: {e}") + sys.exit(1) + + # + try: + # 第一次安装,初始化安装表&记录version为环境变量里的版本(不需执行issu,初始化sql里已经包含),如果失败就回滚并删除数据库 + if first_install: + await database.DropDatabaseIfInitTablesFailed(client) + else: + # 不是第一次安装,不存在db_version表,就初始化安装表,如果失败不删除数据库(正常不会出现这个问题,除非人为的删除该表) + update_issu = await database.InitTables(client) + # 存在db_version表,需要执行issu升级 + if update_issu: + try: + # 获取db_version中version版本,判断issu中版本,只执行大于version版本的issu.sql + await database.ExecuteIssus(client) + except Exception as e: + print(f"issu执行失败: {e}") + sys.exit(1) + except Exception as e: + print(f"初始化表失败: {e}") + sys.exit(1) + finally: + await Tortoise.close_connections() + + +db_init = dbTools() diff --git a/df-llm-agent/database/db_models.py b/df-llm-agent/database/db_models.py new file mode 100644 index 0000000..618bb9d --- /dev/null +++ b/df-llm-agent/database/db_models.py @@ -0,0 +1,73 @@ +from tortoise.models import Model +from tortoise.fields import (SmallIntField, IntField, FloatField, CharField, TextField, DatetimeField, JSONField) + + +class DbVersion(Model): + + class Meta: + table = "db_version" + + id = IntField(pk=True, autoincrement=True) + version = CharField(max_length=64, null=True) + created_at = DatetimeField(auto_now_add=True) + updated_at = DatetimeField(auto_now=True) + + +class ChatTopic(Model): + + class Meta: + table = "chat_topic" + + id = IntField(pk=True, autoincrement=True) + user_id = IntField() + name = CharField(max_length=256, null=True) + type = SmallIntField() + created_at = DatetimeField(auto_now_add=True) + updated_at = DatetimeField(auto_now=True) + + +class Chat(Model): + + class Meta: + table = "chat" + + id = IntField(pk=True, autoincrement=True) + chat_topic_id = IntField() + input = TextField() + output = TextField() + output_all = JSONField() + engine = CharField(max_length=64, null=True) + created_at = DatetimeField(auto_now_add=True) + updated_at = DatetimeField(auto_now=True) + + +class Score(Model): + + class Meta: + table = "score" + + id = IntField(pk=True, autoincrement=True) + user_id = IntField(null=False) + type = SmallIntField() + obj_id = IntField() + score = SmallIntField() + feedback = TextField() + user_name = CharField(max_length=64) + created_at = DatetimeField(auto_now_add=True) + updated_at = DatetimeField(auto_now=True) + + +class LlmConfig(Model): + + class Meta: + table = "llm_config" + + id = IntField(pk=True, autoincrement=True) + user_id = IntField(null=False) + platform = CharField(max_length=64, null=False) + model = CharField(max_length=64, null=False) + model_info = CharField(max_length=255, null=True) + key = CharField(max_length=64, null=False) + value = CharField(max_length=256, null=False) + created_at = DatetimeField(auto_now_add=True) + updated_at = DatetimeField(auto_now=True) diff --git a/df-llm-agent/database/init.sql b/df-llm-agent/database/init.sql new file mode 100644 index 0000000..f831eaa --- /dev/null +++ b/df-llm-agent/database/init.sql @@ -0,0 +1,102 @@ +-- DROP DATABASE IF EXISTS deepflow_llm; + +-- CREATE DATABASE deepflow_llm; + +-- USE deepflow_llm; + +CREATE TABLE IF NOT EXISTS db_version ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + `version` CHAR(64) NOT NULL DEFAULT '', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP +)ENGINE=innodb DEFAULT CHARSET=utf8 COMMENT='db版本'; +TRUNCATE TABLE db_version; + +CREATE TABLE IF NOT EXISTS chat_topic( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + user_id INTEGER NOT NULL DEFAULT 0, + `name` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '对话主题标题', + `type` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '对话主题类型', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + INDEX `user_id` (`user_id`) +)ENGINE=innodb DEFAULT CHARSET=utf8 COMMENT='对话主题'; +TRUNCATE TABLE chat_topic; + + +CREATE TABLE IF NOT EXISTS chat( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + chat_topic_id INTEGER NOT NULL DEFAULT 0 COMMENT '主题id, 一个主题包含多个对话', + `input` LONGTEXT COMMENT '会话问题', + `output` LONGTEXT COMMENT 'llm返回的数据经过提取后用于显示的内容', + `output_all` json COMMENT 'llm返回的原生数据或请求异常信息', + `engine` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '对话用的llm引擎', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + INDEX `chat_topic_id` (`chat_topic_id`) +)ENGINE=innodb DEFAULT CHARSET=utf8 COMMENT='对话'; +TRUNCATE TABLE chat; + + +CREATE TABLE IF NOT EXISTS score( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + user_id INTEGER NOT NULL DEFAULT 0, + `type` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '1: 主题评分,2: 对话评分', + obj_id INTEGER NOT NULL DEFAULT 0 COMMENT '主题id或对话id', + score TINYINT(1) NOT NULL DEFAULT 0 COMMENT '评分0-100', + feedback TEXT COMMENT '反馈信息', + user_name VARCHAR(64) COMMENT '反馈者', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + INDEX `user_id` (`user_id`) +)ENGINE=innodb DEFAULT CHARSET=utf8 COMMENT='评分'; +TRUNCATE TABLE score; + + +CREATE TABLE IF NOT EXISTS llm_config( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + user_id INTEGER NOT NULL DEFAULT 0, + `platform` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '平台: azure、aliyun、baidu、tencent', + `model` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '模型统称,例如 azure: openai, 阿里: dashscope, 百度: qianfan, 腾讯: hyllm', + `model_info` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '模型相关具体细节', + `key` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '模型需要的配置项', + `value` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '模型需要的配置项的值', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + INDEX `user_id` (`user_id`), + UNIQUE KEY `unique_engine` (`user_id`,`platform`,`model`,`key`,`value`) +)ENGINE=innodb DEFAULT CHARSET=utf8 COMMENT='llm 配置'; +TRUNCATE TABLE llm_config; + +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'azure','gpt','enable',0); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'azure','gpt','api_key',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'azure','gpt','api_type','azure'); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'azure','gpt','api_base',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'azure','gpt','api_version',''); +-- INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'azure','gpt','engine_name',''); + +-- INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'openai','gpt','enable',0); +-- INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'openai','gpt','api_key',''); + +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'aliyun','dashscope','enable',0); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'aliyun','dashscope','api_key',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'aliyun','dashscope','engine_name','qwen-turbo'); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'aliyun','dashscope','engine_name','qwen-plus'); + + +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'baidu','qianfan','enable',0); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'baidu','qianfan','api_key',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'baidu','qianfan','api_secre',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'baidu','qianfan','engine_name','ERNIE-Bot'); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'baidu','qianfan','engine_name','ERNIE-Bot-turbo'); + +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'zhipu','zhipuai','enable',0); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'zhipu','zhipuai','api_key',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'zhipu','zhipuai','engine_name','chatglm_turbo'); + + +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'lixiang','gpt','enable',0); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'lixiang','gpt','api_key',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'lixiang','gpt','api_type','azure'); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'lixiang','gpt','api_base',''); +INSERT INTO llm_config (`user_id`,`platform`,`model`,`key`,`value`) VALUES (1,'lixiang','gpt','api_version',''); \ No newline at end of file diff --git a/df-llm-agent/database/issu/1.0.0.0.sql b/df-llm-agent/database/issu/1.0.0.0.sql new file mode 100644 index 0000000..ae8abfd --- /dev/null +++ b/df-llm-agent/database/issu/1.0.0.0.sql @@ -0,0 +1,3 @@ + +-- 任何issu最后都需要执行该sql,且version 和文件名一致 +UPDATE db_version SET version='1.0.0.0', updated_at= now(); \ No newline at end of file diff --git a/df-llm-agent/database/readme.md b/df-llm-agent/database/readme.md new file mode 100644 index 0000000..14ed92c --- /dev/null +++ b/df-llm-agent/database/readme.md @@ -0,0 +1,11 @@ +# +服务启动,判断是否存在数据库 +- 不存在(全新部署) + - 创建库和表,创建表失败就回滚并删除库 +- 存在(db_version表肯定存在) + - 判断issu中版本,只执行大于version版本的issu + +db_version 初始版本为1.0.0.0,每次更新issu,都依次递增例如:1.0.0.1, 1.0.0.2 + +每次新增x.x.x.x.sql时 都必须同时更新dockerfile里的db_version + diff --git a/df-llm-agent/exception.py b/df-llm-agent/exception.py new file mode 100644 index 0000000..2d8a977 --- /dev/null +++ b/df-llm-agent/exception.py @@ -0,0 +1,33 @@ +class HttpException(Exception): + status_code = 500 + status = None + message = None + err = None + show_type_html = None + + def __init__(self, status=None, message=None, err=None, show_type_html=None): + Exception.__init__(self) + self.status = status + self.message = message + self.err = err + self.show_type_html = show_type_html + + +class BadRequestException(HttpException): + status_code = 400 + + +class NotFoundException(HttpException): + status_code = 404 + + +class AccessDeniedException(HttpException): + status_code = 403 + + +class MethodNotAllowedException(HttpException): + status_code = 405 + + +class SQLException(HttpException): + status_code = 400 diff --git a/df-llm-agent/health_app/__init__.py b/df-llm-agent/health_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/df-llm-agent/health_app/app.py b/df-llm-agent/health_app/app.py new file mode 100644 index 0000000..9c0ec21 --- /dev/null +++ b/df-llm-agent/health_app/app.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +from sanic import Blueprint +from sanic.response import json as sanic_json +from const import API_PREFIX + +health_app = Blueprint("health", url_prefix=API_PREFIX) + + +@health_app.route('/health/', methods=['GET', 'HEAD']) +async def health_get_api(request): + return sanic_json({}, content_type='application/json; charset=utf-8', status=200) diff --git a/df-llm-agent/llm_agent_app/__init__.py b/df-llm-agent/llm_agent_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/df-llm-agent/llm_agent_app/app.py b/df-llm-agent/llm_agent_app/app.py new file mode 100644 index 0000000..3a1c8ec --- /dev/null +++ b/df-llm-agent/llm_agent_app/app.py @@ -0,0 +1,61 @@ +from sanic import Blueprint +import asyncio +from llm_agent_app.worker import app_worker +from utils.response_tools import wrap_resp, wrap_resp_stream +from const import API_PREFIX + +llm_agent_app = Blueprint("llm_agent", url_prefix=API_PREFIX) + +# agent的配置curd +# 配置必须属于一个用户 + + +@llm_agent_app.route("/llm_agent_config", methods=["POST"]) +@wrap_resp +async def llm_agent_config_add(request): + worker = app_worker(request) + res = await worker.llm_agent_config_add() + return res + + +@llm_agent_app.route("/llm_agent_config/", name="by_platform") +@llm_agent_app.route("/llm_agent_config") +@wrap_resp +async def llm_agent_config_list(request, platform=""): + worker = app_worker(request) + res = await worker.llm_agent_config_list(platform) + return res + + +@llm_agent_app.route("/llm_agent_config//", methods=["PATCH"]) +@wrap_resp +async def llm_agent_config_update(request, platform="", key_name=""): + worker = app_worker(request) + res = await worker.llm_agent_config_update(platform, key_name) + return res + + +@llm_agent_app.route("/llm_agent_config//", methods=["DELETE"]) +@wrap_resp +async def llm_agent_config_delete(request, platform="", engine_name=""): + worker = app_worker(request) + res = await worker.llm_agent_config_delete(platform, engine_name) + return res + + +# 流返回 +@llm_agent_app.route("/ai/stream/", methods=["POST"]) +@wrap_resp_stream +async def llm_agent_stream_system(request, platform=""): + worker = app_worker(request) + # 流数据 + res = await worker.llm_agent_stream(platform) + return res + +# 组件 +@llm_agent_app.route("/ai/azure/deepflow/modules", methods=["POST"]) +@wrap_resp +async def llm_agent_module(request): + worker = app_worker(request) + res = await worker.llm_agent_module(platform='azure') + return res diff --git a/df-llm-agent/llm_agent_app/llm_agent.py b/df-llm-agent/llm_agent_app/llm_agent.py new file mode 100644 index 0000000..d71e519 --- /dev/null +++ b/df-llm-agent/llm_agent_app/llm_agent.py @@ -0,0 +1,788 @@ +from openai import AsyncAzureOpenAI, AzureOpenAI +import openai + +import tiktoken + +import dashscope + +import qianfan + +import zhipuai + +from http import HTTPStatus +from exception import BadRequestException +import const +import json +from utils import logger +from database import db_models +import datetime + +from utils.curl_tools import curl_tools +from utils.tools import generate_uuid +from config import config +from chat_record_app.chat_record import chat_record_worker + +from langchain_openai import AzureChatOpenAI +from langchain.prompts import ChatPromptTemplate +from langchain.prompts import PromptTemplate +from langchain_core.output_parsers import StrOutputParser +from langchain_core.runnables import RunnablePassthrough +from langchain_core.runnables import RunnableBranch + +log = logger.getLogger(__name__) + +# https://learn.microsoft.com/zh-cn/azure/ai-services/openai/how-to/chatgpt?pivots=programming-language-chat-completions + +# https://help.aliyun.com/zh/dashscope/developer-reference/?spm=a2c4g.11174283.0.0.2b8a16e9dSzAQL + +# https://cloud.baidu.com/doc/WENXINWORKSHOP/s/xlmokikxe#%E5%A4%9A%E8%BD%AE%E5%AF%B9%E8%AF%9D + +# https://cloud.tencent.com/document/product/1729/97732#bac1763f-47e3-4564-8712-9a510b60fabd + +# https://bytedance.larkoffice.com/docx/X5emdie5ToeMKRxQosMc3w2Unid + +# 配置文件(engine_name可以多个具体看各个平台支持的引擎列表,其他key需要在各个平台保持唯一) +# 注释部分只是说明格式,参数值没有任何意义 +# azure需要的配置项 +# { +# "api_key": "906cc0fa3398455dbf5454edbf5454e", +# "api_type": "azure", +# "api_base":"https://df.openai.azure.com/", +# "api_version":"2023-07-01-preview", +# "engine_name":"DF-GPT-16K", +# "engine_name":"DF-GPT4" +# } + +# dashscope需要的配置文件 +# { +# "api_key": "sk-55153949da514909b93a0f909b93a0f", +# "engine_name": "qwen-turbo", +# "engine_name": "qwen-plus", +# } + +# qianfan需要的配置文件 +# { +# "api_key": "gz3jgMiC38gn9xm8O9xm8O", +# "api_secre":"gVKiffSxDjjGQUFz7lWtPjGQUFz7lWtP" +# "engine_name": "ERNIE-Bot", +# "engine_name": "ERNIE-Bot-turbo", +# } + +# zhipu需要的配置文件 +# { +# "api_key": "gz3jgMiC38gn9xm8O9xm8OWe", +# "engine_name": "chatglm_turbo", +# } + +# lixiang需要的配置文件 +# { +# "api_key": "gz3jgMiC38gn9MiC38gn9", # X-CHJ-GWToken +# "api_base":"http://csd-api.chj.cloud", +# "api_version":"1.0", +# "engine_name":"gpt35-turbo",# gpt35-turbo-16k,gpt4,gpt4-32k,gpt4-turbo +# } + + +# 对接外部agent + + +class llmAgentWorker(object): + + request = None + + user_info = "" + + # 原生问题 + user_question = {} + + # + system_content = "" + messages = "" + + # 拼装后的问题 + query = "" + + # 会话id + res_chat_id = 0 + + # 使用引擎名称 + engine_name = "" + + # llm返回的数据经过提取后用于显示的内容 + output = [] + # llm返回的原生数据或请求异常信息 + output_all = [] + + lixiang = {} + + # agent + azure_client = None + + # Df组件 + langchain_azure_client = None + + # openai token 计算 + @classmethod + async def num_tokens_from_messages(cls, messages, model="gpt-3.5-turbo-0613"): + """Return the number of tokens used by a list of messages.""" + try: + encoding = tiktoken.encoding_for_model(model) + except KeyError: + print("Warning: model not found. Using cl100k_base encoding.") + encoding = tiktoken.get_encoding("cl100k_base") + if model in { + "gpt-3.5-turbo-0613", + "gpt-3.5-turbo-16k-0613", + "gpt-4-0314", + "gpt-4-32k-0314", + "gpt-4-0613", + "gpt-4-32k-0613", + }: + tokens_per_message = 3 + tokens_per_name = 1 + elif model == "gpt-3.5-turbo-0301": + tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n + tokens_per_name = -1 # if there's a name, the role is omitted + elif "gpt-3.5-turbo" in model: + print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.") + return await cls.num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613") + elif "gpt-4" in model: + print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.") + return await cls.num_tokens_from_messages(messages, model="gpt-4-0613") + else: + raise NotImplementedError( + f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" + ) + num_tokens = 0 + for message in messages: + num_tokens += tokens_per_message + for key, value in message.items(): + num_tokens += len(encoding.encode(value)) + if key == "name": + num_tokens += tokens_per_name + num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> + return num_tokens + + # 记录会话,等gpt处理后再依据返回更新该对话 + @classmethod + async def chat_add(cls): + + chat_data = {} + chat_data["engine"] = cls.engine_name + chat_data["chat_topic_id"] = cls.request.ctx.chat_topic_id + chat_data["input"] = cls.user_question + # chat_data["output"] = "" + + res = await chat_record_worker.chat_record_add(user_info=cls.user_info, args={}, data=chat_data) + + cls.res_chat_id = res.get("res_chat_id", 0) + + if cls.request.ctx.chat_topic_id <= 0: + cls.request.ctx.chat_topic_id = res.get("res_chat_topic_id", 0) + + # 更新会话 + @classmethod + async def chat_up(cls): + chat_data = {} + chat_data["output"] = ''.join(cls.output) + chat_data["output_all"] = cls.output_all + + await chat_record_worker.chat_record_update(user_info=cls.user_info, res_chat_id=cls.res_chat_id, data=chat_data) + + # 基础参数配置 + @classmethod + async def assistant_base(cls, request, user_info, platform, engine_name, prompt_type, args, data): + + # user_id = user_info.get("ID", 0) + if not isinstance(data, dict) or "user_content" not in data or "system_content" not in data: + raise BadRequestException("INVALID_POST_DATA", f"{const.INVALID_PARAMETERS}, 缺失user_content或system_content参数") + + # + cls.query = [{"role": "user", "content": data["user_content"]}] + + cls.system_content = data["system_content"] + + if platform == "baidu" or platform == "zhipu": + cls.query = [{"role": "user", "content": cls.system_content}, {"role": "assistant", "content": "好的,后面的回复将按照你给我的角色和要求来解答"}, {"role": "user", "content": data["question"]}] + + # 获取配置 + data_info = {} + # data_info["user_id"] = user_id + data_info["platform"] = platform + + engine_config = {} + + if hasattr(config, "platforms"): + res_config = config.platforms + for _info in res_config: + if _info.get('platform', '') == platform and _info.get('enable', False): + _engine_name = _info.get('engine_name', []) + if engine_name in _engine_name: + _info['engine_name'] = f"{engine_name}" + else: + _info['engine_name'] = '' + + engine_config = _info + + if not engine_config.get("enable", False): + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 平台: {platform} 未启用") + + else: + try: + res_config = await db_models.LlmConfig.filter(**data_info).all() + except Exception as e: + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + # key = engine_name 时可能会存在多个配置,设置为当前使用值 + for v in res_config: + v_dict = dict(v) + _key = v_dict["key"] + _value = v_dict["value"] + + if _key == "engine_name": + if _value == engine_name: + engine_config[_key] = _value + else: + engine_config[_key] = _value + + if engine_config.get("enable", "") != "1": + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 平台: {platform} 未启用") + + # print(engine_config, engine_config.get("engine_name"), engine_name) + + if engine_config.get("engine_name", "") != engine_name: + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 引用的引擎错误: {engine_name}") + + # 检查配置项目 + if platform == "azure": + for key in ("api_key", "api_type", "api_base", "api_version", "engine_name"): + if key not in engine_config or engine_config.get(f"{key}", "") == "": + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 请确认{key}已经正确配置") + + # 组件 + if prompt_type == "langchain": + cls.langchain_azure_client = AzureChatOpenAI( + azure_endpoint=engine_config.get("api_base"), + deployment_name=engine_config.get("engine_name"), + openai_api_version=engine_config.get("api_version"), + openai_api_type=engine_config.get("api_type"), + openai_api_key=engine_config.get("api_key") + ) + else: + cls.azure_client = AsyncAzureOpenAI( + api_key=engine_config.get("api_key"), + api_version=engine_config.get("api_version"), + azure_endpoint=engine_config.get("api_base") + ) + + cls.engine_name = engine_config.get("engine_name") + + elif platform == "openai": + for key in ("api_key", "engine_name"): + if key not in engine_config or engine_config.get(f"{key}", "") == "": + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 请确认{key}已经正确配置") + + openai.api_key = engine_config.get("api_key") + cls.engine_name = engine_config.get("engine_name") + + elif platform == "aliyun": + for key in ("api_key", "engine_name"): + if key not in engine_config or engine_config.get(f"{key}", "") == "": + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 请确认{key}已经正确配置") + + dashscope.api_key = engine_config.get("api_key") + cls.engine_name = engine_config.get("engine_name") + + elif platform == "baidu": + for key in ("api_key", "api_secre", "engine_name"): + if key not in engine_config or engine_config.get(f"{key}", "") == "": + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 请确认{key}已经正确配置") + + qianfan.AK(engine_config.get("api_key")) + qianfan.SK(engine_config.get("api_secre")) + cls.engine_name = engine_config.get("engine_name") + + elif platform == "zhipu": + for key in ("api_key", "engine_name"): + if key not in engine_config or engine_config.get(f"{key}", "") == "": + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 请确认{key}已经正确配置") + + zhipuai.api_key = engine_config.get("api_key") + cls.engine_name = engine_config.get("engine_name") + + elif platform == "lixiang": + for key in ("api_key", "api_type", "api_base", "api_version", "engine_name"): + if key not in engine_config or engine_config.get(f"{key}", "") == "": + raise BadRequestException("DATA_NOT_FOUND", f"{const.DATA_NOT_FOUND}: 请确认{key}已经正确配置") + + lixiang_config = {} + lixiang_config["api_key"] = engine_config.get("api_key") + lixiang_config["api_type"] = engine_config.get("api_type") + lixiang_config["api_base"] = engine_config.get("api_base") + lixiang_config["api_version"] = engine_config.get("api_version") + cls.lixiang = lixiang_config + cls.engine_name = engine_config.get("engine_name") + else: + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 模型所在平台名称错误") + # + cls.user_info = user_info + cls.request = request + cls.user_question = json.dumps(data) + + # 消息 + + # cls.messages = [{'role': 'user', 'content': 'Count to 1000, with a comma between each number and no newlines. E.g., 1, 2, 3, ...'}] + + cls.messages = [{"role": "system", "content": cls.system_content}, *cls.query] + + if platform == "baidu" or platform == "zhipu": + cls.messages = [*cls.query] + + if platform == "lixiang": + cls.messages = { + "messages": cls.messages + } + + conv_history_tokens = 0 + if platform == "azure" or platform == "openai": + try: + conv_history_tokens = await cls.num_tokens_from_messages(cls.messages) + except Exception as e: + raise BadRequestException("FAIL", f"{const.FAIL}: 计算token数量错误: {e}") + + elif platform == "aliyun": + + response_token = dashscope.Tokenization.call(model=cls.engine_name, + messages=cls.messages + ) + if response_token.status_code != HTTPStatus.OK: + raise BadRequestException("FAIL", f"{const.FAIL}: 计算token数量错误: {response_token.message}") + + usage = response_token.usage + + conv_history_tokens = usage.get('input_tokens', 0) + + print(conv_history_tokens) + # 记录会话 + await cls.chat_add() + + # 流处理 + + @classmethod + async def assistant_stream(cls, request, user_info, platform, engine_name, prompt_type, args, data): + + # 校验 + await cls.assistant_base(request, user_info, platform, engine_name, prompt_type, args, data) + + # 开始时间 + working_start_time = datetime.datetime.now() + + if platform == "azure" or platform == "openai": + try: + if platform == "azure": + response = await cls.azure_client.chat.completions.create(model=cls.engine_name, messages=cls.messages, stream=True) + else: + response = await openai.ChatCompletion.acreate(engine=cls.engine_name, messages=cls.messages, stream=True) + + except Exception as e: + raise BadRequestException("APP_ERROR", const.APP_ERROR, f"{e}") + + output = [] + output_all = [] + + async def generate_data(output, output_all): + + async for item in response: + item_json = json.loads(item.model_dump_json(indent=2)) + # print(item_json) + # 结束时间 + working_end_time = datetime.datetime.now() + + all_time = working_end_time.timestamp() - working_start_time.timestamp() + + # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" + msg = {} + msg["user_id"] = cls.user_info.get('ID', 0) + msg["start_time"] = f"{working_start_time}" + msg["end_time"] = f"{working_end_time}" + msg["all_time"] = all_time + msg["return"] = item_json + + output_all.append(msg) + + content = "" + if "choices" in item_json: + choices = item_json["choices"] + if choices: + delta = choices[0].get("delta", {}) + if 'content' in delta: + delta_content = delta.get("content", None) + if delta_content is not None: + content = delta_content + + output.append(f"{content}") + + yield content + + # 写入 + cls.output = output + cls.output_all = output_all + await cls.chat_up() + + return generate_data(output, output_all) + + elif platform == "aliyun": + try: + + responses = dashscope.Generation.call( + model=cls.engine_name, # Generation.Models.qwen_turbo, + messages=cls.messages, + result_format='message', + stream=True, + incremental_output=True + ) + except Exception as e: + raise BadRequestException("APP_ERROR", const.APP_ERROR, f"{e}") + + # 定义迭代器 + + output = [] + output_all = [] + + async def generate_data(output, output_all): + for response in responses: + # 结束时间 + working_end_time = datetime.datetime.now() + + all_time = working_end_time.timestamp() - working_start_time.timestamp() + + # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {response}" + msg = {} + msg["user_id"] = cls.user_info.get('ID', 0) + msg["start_time"] = f"{working_start_time}" + msg["end_time"] = f"{working_end_time}" + msg["all_time"] = all_time + msg["return"] = response + + output_all.append(msg) + + content = "" + if response.status_code == HTTPStatus.OK: + # print(response) + item = response.output + if item['choices']: + delta = item["choices"][0].get("message", {}) + if 'content' in delta: + content = delta.get("content", "") + else: + content = response.message + # content = '流式返回错误: 请求id: %s, 状态码: %s, 错误码: %s, 错误信息: %s' % (response.request_id, response.status_code, response.code, response.message) + + output.append(f"{content}") + yield content + + # 写入 + cls.output = output + cls.output_all = output_all + await cls.chat_up() + + return generate_data(output, output_all) + + elif platform == "baidu": + + try: + chat_comp = qianfan.ChatCompletion() + # 指定特定模型 + response = await chat_comp.ado(model=cls.engine_name, messages=cls.messages, stream=True) + + except Exception as e: + raise BadRequestException("APP_ERROR", const.APP_ERROR, f"{e}") + + # 定义迭代器 + output = [] + output_all = [] + + async def generate_data(output, output_all): + async for item in response: + # 结束时间 + working_end_time = datetime.datetime.now() + + all_time = working_end_time.timestamp() - working_start_time.timestamp() + + # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" + + msg = {} + msg["user_id"] = cls.user_info.get('ID', 0) + msg["start_time"] = f"{working_start_time}" + msg["end_time"] = f"{working_end_time}" + msg["all_time"] = all_time + msg["return"] = item + + output_all.append(msg) + + content = "" + + if item.get('code', 0) == HTTPStatus.OK: + + content = item.get('result', '') + else: + # msg = '返回错误: 请求id: %s, 状态码: %s, 错误信息: %s' % (item.get('id', ''), item.get('code', 0), item.get('result', '')) + content = item.get('result', '') + + output.append(f"{content}") + yield content + + # 写入 + cls.output = output + cls.output_all = output_all + await cls.chat_up() + + return generate_data(output, output_all) + + elif platform == "zhipu": + + try: + response = zhipuai.model_api.sse_invoke( + model=cls.engine_name, + prompt=cls.messages + ) + + except Exception as e: + raise BadRequestException("APP_ERROR", const.APP_ERROR, f"{e}") + + # 定义迭代器 + output = [] + output_all = [] + + async def generate_data(output, output_all): + + for event in response.events(): + + # 结束时间 + working_end_time = datetime.datetime.now() + + all_time = working_end_time.timestamp() - working_start_time.timestamp() + + # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: " + msg = {} + msg["user_id"] = cls.user_info.get('ID', 0) + msg["start_time"] = f"{working_start_time}" + msg["end_time"] = f"{working_end_time}" + msg["all_time"] = all_time + + content = "" + if event.event == "add": + content = event.data + # msg += f"{content}" + msg["return"] = content + elif event.event == "error" or event.event == "interrupted": + content = event.data + # msg += f"{content}" + msg["return"] = content + elif event.event == "finish": + content = event.data + # msg += f"{content}, meta:{event.meta}" + msg["return"] = content + msg["return_meta"] = event.meta + else: + content = event.data + # msg += f"{content}" + msg["return"] = content + + output_all.append(msg) + + output.append(f"{content}") + yield content + + # 写入 + cls.output = output + cls.output_all = output_all + await cls.chat_up() + + return generate_data(output, output_all) + + elif platform == "lixiang": + + lcuuid = generate_uuid() + headers = {"BCS-APIHub-RequestId": lcuuid, "X-CHJ-GWToken": cls.lixiang["api_key"]} + url = f"{cls.lixiang['api_base']}/bcs-apihub-ai-proxy-service/apihub/openai/{cls.lixiang['api_version']}/{cls.lixiang['api_type']}/models/{cls.engine_name}?stream=true" + + try: + response = curl_tools.curl_app_stream("post", url, headers, json.dumps(cls.messages)) + except BadRequestException as e: + raise BadRequestException("APP_ERROR", f"{const.APP_ERROR}:{e.message}", f"{e}") + + output = [] + output_all = [] + + async def generate_data(output, output_all): + async for chunked in response: + line = chunked.decode('utf-8').strip() + # 找到包含"data:"的部分并提取"data"字段的值 + event_data = "" + if line.startswith('data:'): + data_start = len('data:') + data_value = line[data_start:].strip() + if data_value != "[DONE]": + # 解析JSON格式的数据 + try: + event_data = json.loads(data_value) + except json.JSONDecodeError as e: + print(f"Error decoding JSON:{e}") + event_data = data_value + + # 结束时间 + working_end_time = datetime.datetime.now() + + all_time = working_end_time.timestamp() - working_start_time.timestamp() + + # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" + msg = {} + msg["user_id"] = cls.user_info.get('ID', 0) + msg["start_time"] = f"{working_start_time}" + msg["end_time"] = f"{working_end_time}" + msg["all_time"] = all_time + msg["return"] = line + + output_all.append(msg) + + content = "" + if event_data or isinstance(event_data, dict): + + if event_data.get('code', 0) != 0 or 'data' not in event_data: + content = event_data.get('msg', '') + else: + data = event_data['data'] + if data['choices']: + choices = data["choices"][0] + if isinstance(choices, dict): + content = choices.get("content", "") + + output.append(f"{content}") + + yield content + + # 写入 + cls.output = output + cls.output_all = output_all + await cls.chat_up() + + return generate_data(output, output_all) + + # 组件 + @classmethod + async def module(cls, request, user_info, platform, engine_name, args, data): + # 校验 + await cls.assistant_base(request, user_info, platform, engine_name, 'langchain', args, data) + + # 开始时间 + working_start_time = datetime.datetime.now() + + # azure模型 + llm = cls.langchain_azure_client + + # 字符串返回 + output_parser = StrOutputParser() + + # 基础模板:问题分类指令模板 + prompt = PromptTemplate.from_template( + """鉴于下面的用户问题,将其分类为“LangChain”、“LLM”或“其他”。不要用超过一个字来回应. + + <问题> + {question} + + + 分类:""" + ) + + chain = ( + prompt + | llm + | output_parser + ) + + # res = chain.invoke({"question": "如何使用llm?"}) + # res = chain.invoke({"question": "如何使用langchain?"}) + # print(res) + + # 子链 + # langchain专家 + langchain_chain = ( + PromptTemplate.from_template( + """您是 langchain 方面的专家。 \ + 回答问题时始终以“正如官方文档中所诉”开头。 \ + 回答以下问题: + + 问题: {question} + 回答:""" + ) + | llm + | output_parser + ) + + # 大模型专家 + llm_chain = ( + PromptTemplate.from_template( + """您是 llm大模型 方面的专家。 \ + 回答问题时始终以“以我所知道的所有模型”开头。 \ + 回答以下问题: + + 问题: {question} + 回答:""" + ) + | llm + | output_parser + ) + + # 默认链 + general_chain = ( + PromptTemplate.from_template( + """回答以下问题: + + 问题: {question} + 回答:""" + ) + | llm + | output_parser + ) + + branch = RunnableBranch( + # 多个子链依次追加 + (lambda x: "llm" in x["topic"].lower(), llm_chain), + (lambda x: "langchain" in x["topic"].lower(), langchain_chain), + # 默认链 + general_chain, + ) + + full_chain = {"topic": chain, "question": lambda x: x["question"]} | branch + + # 问题 + question = cls.query[0]['content'] + + try: + # 异步一次性返回 + res = await full_chain.ainvoke({"question": question}) + + # 结束时间 + working_end_time = datetime.datetime.now() + all_time = working_end_time.timestamp() - working_start_time.timestamp() + msg = {} + msg["user_id"] = cls.user_info.get('ID', 0) + msg["start_time"] = f"{working_start_time}" + msg["end_time"] = f"{working_end_time}" + msg["all_time"] = all_time + msg["return"] = res + # 记录并返回 + cls.output.append(f"{res}") + cls.output_all.append(msg) + + return res + except Exception as e: + cls.output_all.append(e) + raise BadRequestException("APP_ERROR", const.APP_ERROR, f"{e}") + finally: + # 更新会话记录,包括所有返回可记录数据 + await cls.chat_up() + + +llm_agent_worker = llmAgentWorker() diff --git a/df-llm-agent/llm_agent_app/llm_agent_config.py b/df-llm-agent/llm_agent_app/llm_agent_config.py new file mode 100644 index 0000000..07f4819 --- /dev/null +++ b/df-llm-agent/llm_agent_app/llm_agent_config.py @@ -0,0 +1,181 @@ +from exception import BadRequestException +import const +import json +from database import db_models +from utils import logger + +log = logger.getLogger(__name__) + + +# agent配置 +class llmAgentConfigWorker(object): + + # 校验数据 + @staticmethod + async def verify_data(data): + pass + + # 新增 + @classmethod + async def llm_agent_config_add(cls, user_info, args, data): + user_id = user_info.get("ID", 0) + user_type = user_info.get("TYPE", 0) + if user_type != 1: + raise BadRequestException("SERVER_ERROR", f"{const.SERVER_ERROR}, 没有权限,只允许超管") + + data_info = {} + data_info["user_id"] = user_id + data_info["platform"] = data.get("platform", "") + data_info["model"] = data.get("model", "") + data_info["model_info"] = data.get("model_info", "") + data_info["key"] = data.get("key_name", "") + data_info["value"] = data.get("value", "") + + # 其他key必须唯一 + # todoing key 需要给范围 + if data_info["key"] != "engine_name": + + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 只允许添加模型引擎配置") + # where_info = {} + # where_info = {} + # where_info["user_id"] = user_id + # where_info["platform"] = data_info["platform"] + # where_info["key"] = data_info["key"] + + # res = await db_models.LlmConfig.exists(**where_info) + + # if res: + # raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 该配置项在一个平台下必须唯一") + + try: + await db_models.LlmConfig.create(**data_info) + except Exception as e: + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + log.info(f"用户:{user_id}, 添加配置, 数据: {data_info}") + + return True + + # 获取所有配置 + @classmethod + async def llm_agent_config_list(cls, user_info, platform=""): + # user_id = user_info.get("ID", 0) + data_info = {} + # data_info["user_id"] = user_id + + if platform: + data_info["platform"] = platform + + try: + if data_info: + sql_res = await db_models.LlmConfig.filter(**data_info).all() + else: + sql_res = await db_models.LlmConfig.all() + except Exception as e: + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + res = {} + for v in sql_res: + _config = dict(v) + + _platform = _config.get("platform") + _model = _config.get("model") + _model_info = _config.get("model_info") + + _key = _config.get("key") + _value = _config.get("value") + + # 列表过滤敏感数据,详情不过滤 + if platform == "": + if _key not in ["enable", "engine_name"]: + continue + + _merge_config = {} + _merge_config["model"] = _model + _merge_config["model_info"] = _model_info + + if f"{_key}" == "engine_name": + _merge_config[f"{_key}"] = [_value] + else: + _merge_config[f"{_key}"] = _value + + if _platform not in res: + res[f"{_platform}"] = _merge_config + else: + if f"{_key}" == "engine_name": + if f"{_key}" not in res[f"{_platform}"]: + res[f"{_platform}"][f"{_key}"] = [_value] + else: + res[f"{_platform}"][f"{_key}"] += [_value] + else: + res[f"{_platform}"][f"{_key}"] = _value + + return res + + # 更新 + @classmethod + async def llm_agent_config_update(cls, user_info, platform, key_name, args, data): + user_id = user_info.get("ID", 0) + user_type = user_info.get("TYPE", 0) + if user_type != 1: + raise BadRequestException("SERVER_ERROR", f"{const.SERVER_ERROR}, 没有权限,只允许超管") + + if not platform or not key_name: + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 缺失平台名或key") + + # engine可以删除和新增。修改意义不大 + if key_name == "engine_name": + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 引擎值不支持修改") + + where_info = {} + where_info["user_id"] = user_id + where_info["platform"] = platform + where_info["key"] = key_name + + data_info = {} + data_info["value"] = data.get("value", "") + + try: + res = await db_models.LlmConfig.get(**where_info) + if res: + await db_models.LlmConfig.filter(**where_info).update(**data_info) + else: + raise BadRequestException("DATA_NOT_FOUND", const.DATA_NOT_FOUND) + + except Exception as e: + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + log.info(f"用户:{user_id}, 更新配置, 数据: {data_info}") + + return True + + # 删除 + @classmethod + async def llm_agent_config_delete(cls, user_info, platform, engine_name, args, data): + user_id = user_info.get("ID", 0) + user_type = user_info.get("TYPE", 0) + if user_type != 1: + raise BadRequestException("SERVER_ERROR", f"{const.SERVER_ERROR}, 没有权限,只允许超管") + + where_info = {} + where_info["user_id"] = user_id + where_info["platform"] = platform + where_info["key"] = "engine_name" + where_info["value"] = engine_name + + # 其他配置不允许删除,只有引擎可以 + try: + llm_config_exist = await db_models.LlmConfig.filter(**where_info).count() + except Exception as e: + raise BadRequestException("SQL_ERROR", const.SQL_ERROR, f"{e}") + + if llm_config_exist > 0: + await db_models.LlmConfig.filter(**where_info).delete() + else: + raise BadRequestException("DATA_NOT_FOUND", const.DATA_NOT_FOUND) + + log.info(f"用户:{user_id}, 删除配置, 查询数据: {where_info}") + return True + + +llm_agent_config_worker = llmAgentConfigWorker() diff --git a/df-llm-agent/llm_agent_app/worker.py b/df-llm-agent/llm_agent_app/worker.py new file mode 100644 index 0000000..382fae5 --- /dev/null +++ b/df-llm-agent/llm_agent_app/worker.py @@ -0,0 +1,64 @@ +from exception import BadRequestException +import const +from llm_agent_app.llm_agent import llm_agent_worker +from config import config +from utils.curl_tools import curl_tools +from utils import logger +import json +import time + +from llm_agent_app.llm_agent_config import llm_agent_config_worker + +log = logger.getLogger(__name__) + + +class app_worker(object): + + args = data = user_info = None + + def __init__(self, request): + app_worker.request = request + app_worker.args = request.args + if app_worker.args: + for k, v in self.args.items(): + app_worker.args[k] = [i for i in v] + + app_worker.data = request.json + app_worker.user_info = request.ctx.user + + @classmethod + async def llm_agent_config_add(cls): + # 校验todoing + return await llm_agent_config_worker.llm_agent_config_add(cls.user_info, cls.args, cls.data) + + @classmethod + async def llm_agent_config_list(cls, platform=""): + + return await llm_agent_config_worker.llm_agent_config_list(cls.user_info, platform) + + @classmethod + async def llm_agent_config_update(cls, platform="", key_name=""): + # 校验todoing + return await llm_agent_config_worker.llm_agent_config_update(cls.user_info, platform, key_name, cls.args, cls.data) + + @classmethod + async def llm_agent_config_delete(cls, platform="", engine_name=""): + # 校验todoing + return await llm_agent_config_worker.llm_agent_config_delete(cls.user_info, platform, engine_name, cls.args, cls.data) + + # 流处理 + @classmethod + async def llm_agent_stream(cls, platform, prompt_type=''): + # 校验todoing + engine_name = cls.args.get("engine", "") + if not engine_name: + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 缺失使用的引擎名称") + return await llm_agent_worker.assistant_stream(cls.request, cls.user_info, platform, engine_name, prompt_type, cls.args, cls.data) + + # 组件 + @classmethod + async def llm_agent_module(cls, platform): + engine_name = cls.args.get("engine", "") + if not engine_name: + raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 缺失使用的引擎名称") + return await llm_agent_worker.module(cls.request, cls.user_info, platform, engine_name, cls.args, cls.data) diff --git a/df-llm-agent/logger.py b/df-llm-agent/logger.py new file mode 100644 index 0000000..e69de29 diff --git a/df-llm-agent/models.py b/df-llm-agent/models.py new file mode 100644 index 0000000..2cef57d --- /dev/null +++ b/df-llm-agent/models.py @@ -0,0 +1,25 @@ +from schematics.models import Model +from schematics.types import IntType, StringType, BooleanType, DictType, BaseType +from schematics.types.compound import ListType, ModelType + +from config import config + +OUTPUT_FORMAT_CSV = 'csv' +OUTPUT_FORMAT_JSON = 'json' + +_sort_values = ['DESC', 'ASC'] +_output_format = [OUTPUT_FORMAT_JSON, OUTPUT_FORMAT_CSV] +_fills = ['0', 'null', 'none'] + + +class Query(Model): + query_id = StringType(serialized_name="QUERY_ID", required=True) + select = StringType(serialized_name="SELECT", required=True) + where = StringType(serialized_name="WHERE", required=False, default="1=1") + group_by = StringType(serialized_name="GROUP_BY", required=False) + having = StringType(serialized_name="HAVING", required=False) + metrics = ListType(StringType, serialized_name="METRICS", required=False) + tags = ListType(StringType, serialized_name="TAGS", required=False) + ctags = ListType(StringType, serialized_name="CTAGS", required=False) + stags = ListType(StringType, serialized_name="STAGS", required=False) + roles = ListType(StringType, serialized_name="ROLES", required=False) \ No newline at end of file diff --git a/df-llm-agent/py2c.sh b/df-llm-agent/py2c.sh new file mode 100644 index 0000000..7e0aa21 --- /dev/null +++ b/df-llm-agent/py2c.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +set -e + +echo "Starting py2c.sh script" + +cp llm_agent_app/llm_agent.py llm_agent_app/llm_agent.pyx + +/root/venv/bin/python3 setup.py build_ext -i + +rm llm_agent_app/llm_agent.py +rm llm_agent_app/llm_agent.c + +echo "py2c.sh script finished" \ No newline at end of file diff --git a/df-llm-agent/resource_app/__init__.py b/df-llm-agent/resource_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/df-llm-agent/resource_app/app.py b/df-llm-agent/resource_app/app.py new file mode 100644 index 0000000..d089985 --- /dev/null +++ b/df-llm-agent/resource_app/app.py @@ -0,0 +1,33 @@ +from sanic import Blueprint +from resource_app.worker import app_worker +from utils.response_tools import wrap_resp +from const import API_PREFIX + +resource_app = Blueprint("resource", url_prefix=API_PREFIX) + + +# 获取文件 +@resource_app.route("/img/", name="resource_get") +@wrap_resp +async def img_get(request, hash_name=""): + worker = app_worker(request) + res = await worker.img_get(hash_name) + return res + + +# 提交文件 +@resource_app.route("/imgs", methods=["POST"], name="resource_add") +@wrap_resp +async def img_add(request): + worker = app_worker(request) + res = await worker.img_add() + return res + + +# 提交文件编码 +@resource_app.route("/imgs/b64", methods=["POST"], name="resource_add_b64") +@wrap_resp +async def img_add_b64(request): + worker = app_worker(request) + res = await worker.img_add_b64() + return res diff --git a/df-llm-agent/resource_app/resource.py b/df-llm-agent/resource_app/resource.py new file mode 100644 index 0000000..6cb5798 --- /dev/null +++ b/df-llm-agent/resource_app/resource.py @@ -0,0 +1,108 @@ +from tortoise.transactions import atomic, in_transaction +from tortoise.exceptions import BaseORMException, OperationalError +from tortoise.expressions import Q +from tortoise.functions import Coalesce, Count, Length, Lower, Min, Sum, Trim, Upper +from exception import BadRequestException +import const +from database import db_models +from utils import logger +import traceback +import datetime +import json +import os +import base64 +from utils.tools import generate_uuid + +from database.cache import cache + +log = logger.getLogger(__name__) + + +class resourceWorker(object): + + # 校验数据 + @staticmethod + async def verify_data(file): + pass + + # 新增 + @classmethod + async def img_add(cls, user_info, args, files): + + allow_type = ['.jpg', '.png'] + + file = files.get('file') + + file_name = file.name + file_extension = os.path.splitext(file_name)[1] + + if file_extension not in allow_type: + raise BadRequestException("FAIL", f"{const.FAIL}: 文件类型格式错误", f"{const.FAIL}: 文件类型格式错误") + + # 文件大小,byte + filesize = len(file.body) + if filesize > 10 * 1024 * 1024: + # 10M + raise BadRequestException("FAIL", f"{const.FAIL}: 文件大小超过最大值", f"{const.FAIL}: 文件大小超过最大值") + + time_now = datetime.datetime.now() + create_time = time_now.strftime(const.DATE_PATTEN) + expire_time = (time_now + datetime.timedelta(days=1)).strftime(const.DATE_PATTEN) + + cache_client = await cache.GetCacheServer() + + # 记录 + lcuuid = generate_uuid() + + file_info = {} + file_info['lcuuid'] = lcuuid + file_info['name'] = file_name + # file_info['size'] = filesize + file_info['create_time'] = create_time + file_info['expire_time'] = expire_time + file_info['img'] = f"data:{file.type};base64,{base64.b64encode(file.body).decode('utf8')}" + try: + await cache_client.hmset(lcuuid, file_info) + await cache_client.expire(lcuuid, 86400) + return await cache_client.hgetall(lcuuid) + except Exception: + raise BadRequestException("FAIL", f"{const.FAIL}: 保存图片失败", f"{const.FAIL}: {traceback.format_exc()}") + + @classmethod + async def img_add_b64(cls, user_info, args, data): + time_now = datetime.datetime.now() + create_time = time_now.strftime(const.DATE_PATTEN) + expire_time = (time_now + datetime.timedelta(days=1)).strftime(const.DATE_PATTEN) + + cache_client = await cache.GetCacheServer() + + # 记录 + lcuuid = generate_uuid() + + file_info = {} + file_info['lcuuid'] = lcuuid + file_info['name'] = data.get('name', lcuuid) + # file_info['size'] = data.get('size') + file_info['create_time'] = create_time + file_info['expire_time'] = expire_time + file_info['img'] = data.get('img') + + try: + await cache_client.hmset(lcuuid, file_info) + await cache_client.expire(lcuuid, 86400) + return await cache_client.hgetall(lcuuid) + except Exception: + raise BadRequestException("FAIL", f"{const.FAIL}: 保存图片失败", f"{const.FAIL}: {traceback.format_exc()}") + + @classmethod + async def img_get(cls, user_info, args, hash_name): + cache_client = await cache.GetCacheServer() + try: + res = await cache_client.hgetall(hash_name) + # image_bytes = base64.b64decode(image_base64) + return res + except Exception: + raise BadRequestException("FAIL", f"{const.FAIL}: 获取图片失败", f"{const.FAIL}: {traceback.format_exc()}") + + +resource_worker = resourceWorker() diff --git a/df-llm-agent/resource_app/worker.py b/df-llm-agent/resource_app/worker.py new file mode 100644 index 0000000..c3529f3 --- /dev/null +++ b/df-llm-agent/resource_app/worker.py @@ -0,0 +1,42 @@ +from exception import BadRequestException +import const +from resource_app.resource import resource_worker +from config import config +from utils.curl_tools import curl_tools +from utils import logger +import json +import time + +log = logger.getLogger(__name__) + + +class app_worker(object): + + args = data = user_info = None + + def __init__(self, request): + app_worker.request = request + app_worker.args = request.args + if app_worker.args: + for k, v in self.args.items(): + app_worker.args[k] = [i for i in v] + # app_worker.files = request.files + # app_worker.data = request.json + app_worker.user_info = request.ctx.user + + @classmethod + async def img_add(cls): + # 校验todoing + files = cls.request.files + return await resource_worker.img_add(cls.user_info, cls.args, files) + + @classmethod + async def img_add_b64(cls): + # 校验todoing + data = cls.request.json + return await resource_worker.img_add_b64(cls.user_info, cls.args, data) + + @classmethod + async def img_get(cls, hash_name=""): + # 校验todoing + return await resource_worker.img_get(cls.user_info, cls.args, hash_name) diff --git a/df-llm-agent/server.py b/df-llm-agent/server.py new file mode 100644 index 0000000..b9cc9fe --- /dev/null +++ b/df-llm-agent/server.py @@ -0,0 +1,105 @@ +from sanic import Sanic +from sanic_cors import CORS +from sanic import HTTPResponse +from sanic.worker.manager import WorkerManager +from sanic.exceptions import Unauthorized, NotFound, Forbidden, ServerError +from sanic.response import json as json_response +from utils import common +from data import init_db, close_db +from chat_record_app.app import chat_record_app +from llm_agent_app.app import llm_agent_app +from health_app.app import health_app +from resource_app.app import resource_app +import traceback +from utils import logger + +app = Sanic("df-llm-agent") + +app.config.CORS_ORIGINS = "*" +app.config.CORS_AUTOMATIC_OPTIONS = True +app.config.RESPONSE_TIMEOUT = 500 + +app.blueprint(health_app) +app.blueprint(llm_agent_app) +app.blueprint(chat_record_app) +app.blueprint(resource_app) + +CORS(app) + +WorkerManager.THRESHOLD = 600 + +log = logger.getLogger(__name__) + + +@app.middleware("request") +async def run_before_handler(request): + req_headers = dict(request.headers) + req_path = request.path + + if request.method.lower() == 'options': + headers = { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': '*', + 'Access-Control-Allow-Headers': '*', + } + return HTTPResponse('', headers=headers) + + current_dict = {} + current_dict['ID'] = "1" + current_dict['TYPE'] = "1" + + request.ctx.user = current_dict + + request.ctx.chat_topic_id = int(req_headers.get("x-chat-topic-id", 0)) + + +@app.listener("before_server_start") +async def init_orm(app, loop): + await init_db() + + +@app.listener("after_server_stop") +async def close_orm(app, loop): + await close_db() + + +@app.middleware("response") +async def custom_banner(request, response): + + if hasattr(request.ctx, 'chat_topic_id'): + response.headers["X-chat-topic-id"] = int(request.ctx.chat_topic_id) + + +@app.exception(Unauthorized) +async def Not_Unauthorized(request, exception): + traceback.print_exc() + resp = await common.falseReturn("SERVER_ERROR", "系统错误", f"{exception}") + status = 400 + + return json_response(resp, status=status) + + +@app.exception(NotFound) +async def Not_Found(request, exception): + traceback.print_exc() + resp = await common.falseReturn("SERVER_ERROR", "系统错误", f"{exception}") + status = 400 + return json_response(resp, status=status) + + +@app.exception(Forbidden) +async def Not_Forbidden(request, exception): + traceback.print_exc() + resp = await common.falseReturn("SERVER_ERROR", "系统错误", f"{exception}") + status = 400 + + return json_response(resp, status=status) + + +@app.exception(ServerError) +async def Not_ServerError(request, exception): + traceback.print_exc() + resp = await common.falseReturn("SERVER_ERROR", "系统错误", f"{exception}") + status = 500 + + return json_response(resp, status=status) diff --git a/df-llm-agent/setup.py b/df-llm-agent/setup.py new file mode 100644 index 0000000..87555d6 --- /dev/null +++ b/df-llm-agent/setup.py @@ -0,0 +1,6 @@ +from setuptools import setup +from Cython.Build import cythonize +import os + +pwd = os.getcwd() +setup(ext_modules=cythonize([f"{pwd}/llm_agent_app/llm_agent.pyx"])) diff --git a/df-llm-agent/utils/__init__.py b/df-llm-agent/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/df-llm-agent/utils/common.py b/df-llm-agent/utils/common.py new file mode 100644 index 0000000..28ffb2c --- /dev/null +++ b/df-llm-agent/utils/common.py @@ -0,0 +1,18 @@ +async def trueReturn(data=None, status='SUCCESS'): + return { + "OPT_STATUS": status, + "DATA": data, + "DESCRIPTION": 'SUCCESS', + } + + +async def falseReturn(status='FAIL', message=None, err=None, data=False): + return { + "OPT_STATUS": status, + "DESCRIPTION": message, + "DATA": data, + "LEVEL": 0, + "ERR": err, + } + + # return LCJSONEncoder().encode(resp) diff --git a/df-llm-agent/utils/curl_tools.py b/df-llm-agent/utils/curl_tools.py new file mode 100644 index 0000000..aaf33ea --- /dev/null +++ b/df-llm-agent/utils/curl_tools.py @@ -0,0 +1,67 @@ +import asyncio +import aiohttp +import json +from exception import BadRequestException +import const +from config import config +import time +from utils import logger + +log = logger.getLogger(__name__) + + +class curlTools(object): + + @classmethod + async def curl_app(cls, method, url, headers, data=None, params=None): + + async with aiohttp.ClientSession() as session: + try: + _headers = headers_base = {} + # 默认header + headers_base['content-type'] = 'application/json' + + if headers: + _headers = {**headers_base, **headers} + + # 如果content-type = application/json,data 必须为json.dumps()后的数据 + # 如果content-type = application/x-www-form-urlencoded,直接传dict + # params dict格式 + + async with getattr(session, method)(url, timeout=config.api_timeout, headers=_headers, data=data, params=params) as r: + response = await r.read() + response = json.loads(response) + status_code = r.status + + except asyncio.TimeoutError as e: + raise BadRequestException('APP_TIMEOUT', f"{const.APP_TIMEOUT}: {e}") + except Exception as e: + raise BadRequestException('APP_ERROR', f"{const.APP_ERROR}: {e}") + + return response, status_code + + @classmethod + async def curl_app_stream(cls, method, url, headers, data=None, params=None): + + async with aiohttp.ClientSession() as session: + try: + _headers = headers_base = {} + # 默认header + headers_base['content-type'] = 'application/json' + + if headers: + _headers = {**headers_base, **headers} + + async with getattr(session, method)(url, timeout=config.api_timeout, headers=_headers, data=data, params=params) as resp: + while True: + chunk = await resp.content.readline() + if not chunk: + break + yield chunk + except asyncio.TimeoutError as e: + raise BadRequestException('APP_TIMEOUT', f"{const.APP_TIMEOUT}: {e}") + except Exception as e: + raise BadRequestException('APP_ERROR', f"{const.APP_ERROR}: {e}") + + +curl_tools = curlTools() diff --git a/df-llm-agent/utils/exception_tools.py b/df-llm-agent/utils/exception_tools.py new file mode 100644 index 0000000..cd6a78e --- /dev/null +++ b/df-llm-agent/utils/exception_tools.py @@ -0,0 +1,34 @@ +from functools import wraps +from sanic.response import json as json_response +from schematics.exceptions import ModelConversionError, ModelValidationError +import traceback + +import const +from exception import BadRequestException, NotFoundException, AccessDeniedException, MethodNotAllowedException, SQLException +from utils import common + + +def exception_decorate(func): + + @wraps(func) + async def wrapper(*args, **kwargs): + try: + response = await func(*args, **kwargs) + return response + except (ModelConversionError, ModelValidationError) as error: + + resp = await common.falseReturn("FAIL", "Model转换或校验失败", f"{error}") + status = const.HTTP_BAD_REQUEST + + except (BadRequestException, NotFoundException, AccessDeniedException, MethodNotAllowedException, SQLException) as error: + traceback.print_exc() + resp = await common.falseReturn(error.status, error.message, f"{error.err}") + status = error.status_code + + except Exception as error: + traceback.print_exc() + resp = await common.falseReturn("SERVER_ERROR", "系统错误", f"{error}") + status = const.HTTP_INTERNAL_SERVER_ERROR + return json_response(resp, status=status) + + return wrapper diff --git a/df-llm-agent/utils/file_read_tools.py b/df-llm-agent/utils/file_read_tools.py new file mode 100644 index 0000000..a4955a5 --- /dev/null +++ b/df-llm-agent/utils/file_read_tools.py @@ -0,0 +1,33 @@ +import yaml +import configparser + + +class fileReadTools(object): + + # yaml文件解析 + @classmethod + def yaml_read(cls, YML_FILE): + try: + with open(YML_FILE, "r", encoding='utf-8') as y: + yml = yaml.load(y, Loader=yaml.FullLoader) + if not yml: + print("yaml文件不存在") + return {} + return yml + except Exception as e: + print("yaml文件解析错误: %s" % e) + return {} + + # conf文件解析 + @classmethod + def config_parser(cls, config_file, selects=None): + config_conf = configparser.ConfigParser() + config_conf.read(config_file, encoding="utf-8") + if selects: + items_list = config_conf.items(selects) + return dict(items_list) + else: + return config_conf + + +file_read_tools = fileReadTools() diff --git a/df-llm-agent/utils/logger.py b/df-llm-agent/utils/logger.py new file mode 100644 index 0000000..ac4be87 --- /dev/null +++ b/df-llm-agent/utils/logger.py @@ -0,0 +1,54 @@ +import sys +import logging +from logging import StreamHandler +from logging.handlers import SysLogHandler +from logging.handlers import TimedRotatingFileHandler + +LOG_LEVEL_MAP = { + "debug": logging.DEBUG, + "info": logging.INFO, + "warn": logging.WARN, + "error": logging.ERROR, +} + + +class LoggerManager(object): + + LOGGER = logging.getLogger('root') + + def __init__(self, model_name, log_level, log_file): + self.model_name = model_name + self.log_level = LOG_LEVEL_MAP[log_level] + self.log_file = log_file + + @property + def syslog_handler(self): + syslog_handler = SysLogHandler(address='/dev/log', facility=SysLogHandler.LOG_LOCAL2) + syslog_handler.setFormatter(logging.Formatter(self.model_name + '/%(module)s: %(message)s')) + return syslog_handler + + @property + def file_handler(self): + file_handler = TimedRotatingFileHandler(self.log_file, when='midnight', backupCount=365) + file_handler.setFormatter(logging.Formatter('%(asctime)s T%(thread)d-%(threadName)s ' '%(levelname)s %(module)s.' '%(funcName)s.%(lineno)s: %(message)s')) + return file_handler + + @property + def stdout_handler(self): + stdout_handler = StreamHandler(sys.stdout) + stdout_handler.setFormatter(logging.Formatter('%(asctime)s T%(thread)d-%(threadName)s ' '%(levelname)s %(module)s.' '%(funcName)s.%(lineno)s: %(message)s')) + return stdout_handler + + @classmethod + def get_logger(cls, name='root'): + return cls.LOGGER + + def init_logger(self): + self.LOGGER.setLevel(self.log_level) + self.LOGGER.addHandler(self.file_handler) + # self.LOGGER.addHandler(self.syslog_handler) + self.LOGGER.addHandler(self.stdout_handler) + + +def getLogger(name='root'): + return LoggerManager.get_logger(name) diff --git a/df-llm-agent/utils/response_tools.py b/df-llm-agent/utils/response_tools.py new file mode 100644 index 0000000..6098042 --- /dev/null +++ b/df-llm-agent/utils/response_tools.py @@ -0,0 +1,98 @@ +from sanic.response import HTTPResponse, ResponseStream +from sanic.response import text as s_text_response +from sanic.response import json as s_json_response +from tortoise.models import ModelMeta +from functools import wraps +from utils.exception_tools import exception_decorate +from utils import common +import json +from datetime import datetime +import const + + +class LCJSONEncoder(json.JSONEncoder): + + def default(self, obj): + # Datetime class + if isinstance(obj, datetime): + return obj.strftime(const.DATE_PATTEN) + + elif isinstance(obj.__class__, ModelMeta): + fields = {} + for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']: + + if hasattr(obj, '_fillable') and obj._fillable \ + and field not in obj._fillable: + continue + + if hasattr(obj, '_hidden') and obj._hidden\ + and field in obj._hidden: + continue + + data = obj.__getattribute__(field) + try: + if isinstance(data, datetime): + data = data.strftime(const.DATE_PATTEN) + # this will fail on non-encodable values, like other + # classes + json.dumps(data) + fields[field] = data + except TypeError: + continue + + return fields + return json.JSONEncoder.default(self, obj) + + +# json格式化 +async def json_response(data=None, status='SUCCESS'): + resp = await common.trueReturn(data=data, status=status) + + return LCJSONEncoder().encode(resp) + + +# +def wrap_resp_stream(func): + + @wraps(func) + @exception_decorate + async def wrap_decorator(*args, **kwargs): + res = await func(*args, **kwargs) + # chat_res = [] + + # async def sample_streaming_fn(response): + + # async def chat_streaming_fn(response): + # async for chunk in res: + # chat_res.append(chunk) + # await response.write(f"{chunk}") + + # print("~~~~~~~~~~") + # print(chat_res) + # return await chat_streaming_fn(response) + # print("!!!!!!!!!") + # print(chat_res) + + # return ResponseStream(sample_streaming_fn, content_type="text/plain; charset=utf-8") + + async def sample_streaming_fn(response): + async for chunk in res: + await response.write(f"{chunk}") + return ResponseStream(sample_streaming_fn, content_type="text/plain; charset=utf-8") + + # return ResponseStream.stream(res()) + + return wrap_decorator + + +# json +def wrap_resp(func): + + @wraps(func) + @exception_decorate + async def wrap_decorator(*args, **kwargs): + resp = await func(*args, **kwargs) + res = await json_response(resp) + return HTTPResponse(res, content_type="application/json") + + return wrap_decorator diff --git a/df-llm-agent/utils/tools.py b/df-llm-agent/utils/tools.py new file mode 100644 index 0000000..dd4687c --- /dev/null +++ b/df-llm-agent/utils/tools.py @@ -0,0 +1,37 @@ +import threading +import secrets +import time +import uuid + +uuid_lock = threading.Lock() + + +def generate_uuid(): + """Generates uuid with date and mac + """ + global uuid_lock + with uuid_lock: + new_uuid = uuid.uuid4() + + return str(new_uuid) + + +def generate_short_uuid(): + DIGITS = [ + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', + 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '*', '-' + ] + + def to_62_string(time_ms): + mask = 62 + result = [] + while time_ms > 0: + result.insert(0, DIGITS[int(time_ms % mask)]) + time_ms = int(time_ms / mask) + return "".join(result) + + a = secrets.randbelow(62 * 62 * 62 - 1) + time_str = to_62_string(int(time.time())) + random_str = to_62_string(a) + + return time_str.zfill(7) + random_str.zfill(3) diff --git a/etc/df-llm-agent.yaml b/etc/df-llm-agent.yaml new file mode 100644 index 0000000..9124d6d --- /dev/null +++ b/etc/df-llm-agent.yaml @@ -0,0 +1,57 @@ +daemon: True +api_timeout: 500 +sql_show: False +log_file: /var/log/df-llm-agent.log +log_level: info +instance_path: /root/df-llm-agent + +redis: + host: + - redis + cluster_enabled: False # True,False + port: 6379 + db: 7 + password: "password123" +mysql: + user_name: root + user_password: password123 + host: mysql + port: 30130 + database: deepflow_llm +ai: + enable: False # True,False + platforms: + - + enable: False + platform: "azure" + model: "gpt" + api_type: "azure" + api_key: "" + api_base: "" + api_version: "" + engine_name: + - "" + - + enable: False + platform: "aliyun" + model: "dashscope" + api_key: "" + engine_name: + - "qwen-turbo" + - "qwen-plus" + - + enable: False + platform: "baidu" + model: "qianfan" + api_key: "" + api_secre: "" + engine_name: + - "ERNIE-Bot" + - "ERNIE-Bot-turbo" + - + enable: False + platform: "zhipu" + model: "zhipuai" + api_key: "" + engine_name: + - "chatglm_turbo" \ No newline at end of file diff --git a/requirements3.txt b/requirements3.txt new file mode 100644 index 0000000..bb172af --- /dev/null +++ b/requirements3.txt @@ -0,0 +1,21 @@ +sanic==22.12.0 +Sanic-Cors==2.2.0 +tortoise-orm==0.20.0 +aredis==1.1.8 +aiofiles==23.2.1 +aiohttp==3.8.4 +schematics==2.1.1 +PyYAML==6.0 +tzlocal==5.0.1 +aiomysql==0.2.0 +Cython==3.0.2 +zhipuai==1.0.7 +dashscope==1.13.5 +qianfan==0.0.5 +openai==1.7.1 +langchain-openai==0.0.2 +tiktoken==0.5.2 +langchain==0.1.0 +python-dotenv==1.0.0 + + diff --git a/stream.html b/stream.html new file mode 100644 index 0000000..36a2802 --- /dev/null +++ b/stream.html @@ -0,0 +1,71 @@ + + + + POST Request with JSON Parameter + + + +
+ + + +