Skip to content

Commit

Permalink
feat: add more detail log in chat api
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <[email protected]>
  • Loading branch information
Abirdcfly committed Dec 26, 2023
1 parent 249850e commit d9aae55
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 68 deletions.
11 changes: 6 additions & 5 deletions apiserver/pkg/chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package chat
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -46,17 +47,17 @@ func AppRun(ctx context.Context, req ChatReqBody, respStream chan string) (*Chat
token := auth.ForOIDCToken(ctx)
c, err := client.GetClient(token)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get a dynamic client: %w", err)
}
obj, err := c.Resource(schema.GroupVersionResource{Group: v1alpha1.GroupVersion.Group, Version: v1alpha1.GroupVersion.Version, Resource: "applications"}).
Namespace(req.AppNamespace).Get(ctx, req.APPName, metav1.GetOptions{})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get application: %w", err)
}
app := &v1alpha1.Application{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), app)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to convert application: %w", err)
}
if !app.Status.IsReady() {
return nil, errors.New("application is not ready")
Expand Down Expand Up @@ -102,8 +103,8 @@ func AppRun(ctx context.Context, req ChatReqBody, respStream chan string) (*Chat
if err != nil {
return nil, err
}
klog.Infoln("begin to run application", obj.GetName())
out, err := appRun.Run(ctx, c, respStream, application.Input{Question: req.Query, NeedStream: req.ResponseMode == Streaming, History: conversation.History})
klog.FromContext(ctx).Info("begin to run application", "appName", req.APPName, "appNamespace", req.AppNamespace)
out, err := appRun.Run(ctx, c, respStream, application.Input{Question: req.Query, NeedStream: req.ResponseMode.IsStreaming(), History: conversation.History})
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions apiserver/pkg/chat/chat_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (

type ResponseMode string

func (r ResponseMode) IsStreaming() bool {
return r == Streaming
}

const (
// Blocking means the response is returned in a blocking manner
Blocking ResponseMode = "blocking"
Expand Down
21 changes: 21 additions & 0 deletions apiserver/pkg/requestid/requestid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package requestid

import (
"github.com/gin-contrib/requestid"
"github.com/gin-gonic/gin"
"k8s.io/klog/v2"
)

func RequestIDInterceptor() gin.HandlerFunc {
return requestid.New(requestid.WithHandler(addRequestIDToLog))
}

func addRequestIDToLog(c *gin.Context, id string) {
ctx := c.Request.Context()
logger := klog.FromContext(ctx)
newLogger := logger.WithValues("requestID", id)
newLogger.Info("new request")
c.Request = c.Request.WithContext(klog.NewContext(ctx, newLogger))

c.Next()
}
76 changes: 54 additions & 22 deletions apiserver/service/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package service

import (
"errors"
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/kubeagi/arcadia/apiserver/pkg/auth"
"github.com/kubeagi/arcadia/apiserver/pkg/chat"
"github.com/kubeagi/arcadia/apiserver/pkg/oidc"
"github.com/kubeagi/arcadia/apiserver/pkg/requestid"
)

// @BasePath /chat
Expand All @@ -53,23 +56,40 @@ func chatHandler() gin.HandlerFunc {
return
}
req.Debug = c.Query("debug") == "true"
stream := req.ResponseMode == chat.Streaming
var response *chat.ChatRespBody
var err error

if stream {
if req.ResponseMode.IsStreaming() {
buf := strings.Builder{}
// handle chat streaming mode
respStream := make(chan string, 1)
go func() {
defer func() {
if err := recover(); err != nil {
klog.Errorln("An error occurred when run chat.AppRun: %s", err)
if e := recover(); e != nil {
err, ok := e.(error)
if ok {
klog.FromContext(c.Request.Context()).Error(err, "A panic occurred when run chat.AppRun")
} else {
klog.FromContext(c.Request.Context()).Error(fmt.Errorf("get err:%#v", e), "A panic occurred when run chat.AppRun")
}
}
}()
response, err = chat.AppRun(c, req, respStream)
if response.Message == buf.String() {
response, err = chat.AppRun(c.Request.Context(), req, respStream)
if err != nil {
c.SSEvent("error", chat.ChatRespBody{
ConversationID: req.ConversationID,
Message: err.Error(),
CreatedAt: time.Now(),
})
// c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.FromContext(c.Request.Context()).Error(err, "error resp")
close(respStream)
return
}
if response != nil {
if str := buf.String(); response.Message == str || strings.TrimSpace(str) == strings.TrimSpace(response.Message) {
close(respStream)
}
}
}()

Expand Down Expand Up @@ -98,7 +118,7 @@ func chatHandler() gin.HandlerFunc {
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
klog.Infoln("start to receive messages...")
klog.FromContext(c.Request.Context()).Info("start to receive messages...")
clientDisconnected := c.Stream(func(w io.Writer) bool {
if msg, ok := <-respStream; ok {
c.SSEvent("", chat.ChatRespBody{
Expand All @@ -113,19 +133,20 @@ func chatHandler() gin.HandlerFunc {
return false
})
if clientDisconnected {
klog.Infoln("chatHandler: client is disconnected")
klog.FromContext(c.Request.Context()).Info("chatHandler: the client is disconnected")
}
klog.Infoln("end to receive messages.")
klog.FromContext(c.Request.Context()).Info("end to receive messages")
} else {
// handle chat blocking mode
response, err = chat.AppRun(c, req, nil)
response, err = chat.AppRun(c.Request.Context(), req, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
klog.FromContext(c.Request.Context()).Error(err, "error resp")
return
}
c.JSON(http.StatusOK, response)
}
klog.FromContext(c.Request.Context()).V(3).Info("chat done", "req", req)
}
}

Expand All @@ -144,15 +165,17 @@ func listConversationHandler() gin.HandlerFunc {
return func(c *gin.Context) {
req := chat.APPMetadata{}
if err := c.ShouldBindJSON(&req); err != nil {
klog.FromContext(c.Request.Context()).Error(err, "list conversation: error binding json")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
resp, err := chat.ListConversations(c, req)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error list conversation")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
klog.FromContext(c.Request.Context()).V(3).Info("list conversation done", "req", req)
c.JSON(http.StatusOK, resp)
}
}
Expand All @@ -172,15 +195,18 @@ func deleteConversationHandler() gin.HandlerFunc {
return func(c *gin.Context) {
conversationID := c.Param("conversationID")
if conversationID == "" {
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: "conversationID is required"})
err := errors.New("conversationID is required")
klog.FromContext(c.Request.Context()).Error(err, "conversationID is required")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
err := chat.DeleteConversation(c, conversationID)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error delete conversation")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
klog.FromContext(c.Request.Context()).V(3).Info("delete conversation done", "conversationID", conversationID)
c.JSON(http.StatusOK, chat.SimpleResp{Message: "ok"})
}
}
Expand All @@ -200,16 +226,18 @@ func historyHandler() gin.HandlerFunc {
return func(c *gin.Context) {
req := chat.ConversationReqBody{}
if err := c.ShouldBindJSON(&req); err != nil {
klog.FromContext(c.Request.Context()).Error(err, "historyHandler: error binding json")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
resp, err := chat.ListMessages(c, req)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error list messages")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
c.JSON(http.StatusOK, resp)
klog.FromContext(c.Request.Context()).V(3).Info("get message history done", "req", req)
}
}

Expand All @@ -229,32 +257,36 @@ func referenceHandler() gin.HandlerFunc {
return func(c *gin.Context) {
messageID := c.Param("messageID")
if messageID == "" {
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: "messageID is required"})
err := errors.New("messageID is required")
klog.FromContext(c.Request.Context()).Error(err, "messageID is required")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
req := chat.MessageReqBody{
MessageID: messageID,
}
if err := c.ShouldBindJSON(&req); err != nil {
klog.FromContext(c.Request.Context()).Error(err, "referenceHandler: error binding json")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
resp, err := chat.GetMessageReferences(c, req)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error get message references")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
klog.FromContext(c.Request.Context()).V(3).Info("get message references done", "req", req)
c.JSON(http.StatusOK, resp)
}
}

func registerChat(g *gin.RouterGroup, conf config.ServerConfig) {
g.POST("", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), chatHandler()) // chat with bot
g.POST("", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), chatHandler()) // chat with bot

g.POST("/conversations", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), listConversationHandler()) // list conversations
g.DELETE("/conversations/:conversationID", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), deleteConversationHandler()) // delete conversation
g.POST("/conversations", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), listConversationHandler()) // list conversations
g.DELETE("/conversations/:conversationID", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), deleteConversationHandler()) // delete conversation

g.POST("/messages", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), historyHandler()) // messages history
g.POST("/messages/:messageID/references", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), referenceHandler()) // messages reference
g.POST("/messages", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), historyHandler()) // messages history
g.POST("/messages/:messageID/references", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), referenceHandler()) // messages reference
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/KawashiroNitori/butcher/v2 v2.0.1
github.com/amikos-tech/chroma-go v0.0.0-20230901221218-d0087270239e
github.com/coreos/go-oidc/v3 v3.7.0
github.com/gin-contrib/requestid v0.0.6
github.com/gin-gonic/gin v1.9.1
github.com/go-logr/logr v1.2.0
github.com/gofiber/fiber/v2 v2.49.1
Expand Down
Loading

0 comments on commit d9aae55

Please sign in to comment.