From d59995aef2b957e71060755f1280f9a7e8e3a8a9 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Tue, 27 Aug 2024 20:04:07 +0800 Subject: [PATCH 01/18] update --- .../wasm-go/extensions/ai-statistics/go.mod | 3 - .../wasm-go/extensions/ai-statistics/go.sum | 9 +- .../wasm-go/extensions/ai-statistics/main.go | 222 ++++++++++-------- 3 files changed, 124 insertions(+), 110 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/go.mod b/plugins/wasm-go/extensions/ai-statistics/go.mod index 8d0f87c062..a5c87ef617 100644 --- a/plugins/wasm-go/extensions/ai-statistics/go.mod +++ b/plugins/wasm-go/extensions/ai-statistics/go.mod @@ -10,8 +10,6 @@ require ( github.com/tidwall/gjson v1.14.3 ) -require github.com/tetratelabs/wazero v1.7.1 // indirect - require ( github.com/google/uuid v1.3.0 // indirect github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect @@ -19,5 +17,4 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/resp v0.1.1 // indirect - github.com/wasilibs/go-re2 v1.5.3 ) diff --git a/plugins/wasm-go/extensions/ai-statistics/go.sum b/plugins/wasm-go/extensions/ai-statistics/go.sum index b0732f4e65..f473e12b2d 100644 --- a/plugins/wasm-go/extensions/ai-statistics/go.sum +++ b/plugins/wasm-go/extensions/ai-statistics/go.sum @@ -1,19 +1,14 @@ -github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906 h1:RhEmB+ApLKsClZD7joTC4ifmsVgOVz4pFLdPR3xhNaE= -github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906/go.mod h1:10jQXKsYFUF7djs+Oy7t82f4dbie9pISfP9FJwpPLuk= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA= github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew= -github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc h1:t2AT8zb6N/59Y78lyRWedVoVWHNRSCBh0oWCC+bluTQ= -github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f h1:ZIiIBRvIw62gA5MJhuwp1+2wWbqL9IGElQ499rUsYYg= github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= -github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -22,6 +17,4 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= -github.com/wasilibs/go-re2 v1.5.3 h1:wiuTcgDZdLhu8NG8oqF5sF5Q3yIU14lPAvXqeYzDK3g= -github.com/wasilibs/go-re2 v1.5.3/go.mod h1:PzpVPsBdFC7vM8QJbbEnOeTmwA0DGE783d/Gex8eCV8= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index e7396160f4..1af81e8b92 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -2,20 +2,15 @@ package main import ( "bytes" - "encoding/json" "fmt" - "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" - "github.com/tidwall/gjson" "strconv" "strings" "time" -) -const ( - StatisticsRequestStartTime = "ai-statistics-request-start-time" - StatisticsFirstTokenTime = "ai-statistics-first-token-time" + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + "github.com/tidwall/gjson" ) func main() { @@ -30,6 +25,12 @@ func main() { ) } +const ( + StatisticsRequestStartTime = "ai-statistics-request-start-time" + StatisticsFirstTokenTime = "ai-statistics-first-token-time" + TracePrefix = "trace_span_tag." +) + // TracingSpan is the tracing span configuration. type TracingSpan struct { Key string `required:"true" yaml:"key" json:"key"` @@ -38,24 +39,62 @@ type TracingSpan struct { } type AIStatisticsConfig struct { - Enable bool `required:"true" yaml:"enable" json:"enable"` + // Metrics + counterMetrics map[string]proxywasm.MetricCounter + gaugeMetrics map[string]proxywasm.MetricGauge + histogramMetrics map[string]proxywasm.MetricHistogram // TracingSpan array define the tracing span. - TracingSpan []TracingSpan `required:"true" yaml:"tracingSpan" json:"tracingSpan"` - Metrics map[string]proxywasm.MetricCounter `required:"true" yaml:"metrics" json:"metrics"` + TracingSpan []TracingSpan +} + +func generateMetricName(route, cluster, model, metricName string) string { + return fmt.Sprintf("route.%s.upstream.%s.model.%s.%s", route, cluster, model, metricName) } -func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64, log wrapper.Log) { - counter, ok := config.Metrics[metricName] +func getRouteName() string { + var route string + if raw, err := proxywasm.GetProperty([]string{"route_name"}); err == nil { + route = string(raw) + } + return route +} + +func getClusterName() string { + var cluster string + if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err == nil { + cluster = string(raw) + } + return cluster +} + +func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64) { + counter, ok := config.counterMetrics[metricName] if !ok { counter = proxywasm.DefineCounterMetric(metricName) - config.Metrics[metricName] = counter + config.counterMetrics[metricName] = counter } counter.Increment(inc) } -func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error { - config.Enable = configJson.Get("enable").Bool() +func (config *AIStatisticsConfig) addGauge(metricName string, inc int64) { + gauge, ok := config.gaugeMetrics[metricName] + if !ok { + gauge = proxywasm.DefineGaugeMetric(metricName) + config.gaugeMetrics[metricName] = gauge + } + gauge.Add(inc) +} + +func (config *AIStatisticsConfig) recodeHistogram(metricName string, value uint64) { + histogram, ok := config.histogramMetrics[metricName] + if !ok { + histogram = proxywasm.DefineHistogramMetric(metricName) + config.histogramMetrics[metricName] = histogram + } + histogram.Record(value) +} +func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error { // Parse tracing span. tracingSpanConfigArray := configJson.Get("tracing_span").Array() config.TracingSpan = make([]TracingSpan, len(tracingSpanConfigArray)) @@ -68,20 +107,13 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe config.TracingSpan[i] = tracingSpan } - config.Metrics = make(map[string]proxywasm.MetricCounter) - - configStr, _ := json.Marshal(config) - log.Infof("Init ai-statistics config success, config: %s.", configStr) + config.counterMetrics = make(map[string]proxywasm.MetricCounter) + config.gaugeMetrics = make(map[string]proxywasm.MetricGauge) + config.histogramMetrics = make(map[string]proxywasm.MetricHistogram) return nil } func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { - - if !config.Enable { - ctx.DontReadRequestBody() - return types.ActionContinue - } - // Fetch request header tracing span value. setTracingSpanValueBySource(config, "request_header", nil, log) // Fetch request process proxy wasm property. @@ -89,7 +121,7 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo setTracingSpanValueBySource(config, "property", nil, log) // Set request start time. - ctx.SetContext(StatisticsRequestStartTime, strconv.FormatUint(uint64(time.Now().UnixMilli()), 10)) + ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) // The request has a body and requires delaying the header transmission until a cache miss occurs, // at which point the header should be sent. @@ -103,10 +135,6 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body } func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { - if !config.Enable { - ctx.DontReadResponseBody() - return types.ActionContinue - } contentType, _ := proxywasm.GetHttpResponseHeader("content-type") if !strings.Contains(contentType, "text/event-stream") { ctx.BufferResponseBody() @@ -119,32 +147,55 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, l } func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool, log wrapper.Log) []byte { + requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64) + if !ok { + return data + } - // If the end of the stream is reached, calculate the total time and set tracing span tag total_time. - // Otherwise, set tracing span tag first_token_time. + // If the end of the stream is reached, calculate the total time and set metric and span attribute. if endOfStream { - requestStartTimeStr := ctx.GetContext(StatisticsRequestStartTime).(string) - requestStartTime, _ := strconv.ParseInt(requestStartTimeStr, 10, 64) - responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("total_time", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) - } else { - firstTokenTime := ctx.GetContext(StatisticsFirstTokenTime) - if firstTokenTime == nil { - firstTokenTimeStr := strconv.FormatInt(time.Now().UnixMilli(), 10) - ctx.SetContext(StatisticsFirstTokenTime, firstTokenTimeStr) - setTracingSpanValue("first_token_time", firstTokenTimeStr, log) + if model, ok := ctx.GetContext("model").(string); ok { + route := getRouteName() + cluster := getClusterName() + responseEndTime := time.Now().UnixMilli() + setTracingSpanValue("llm_service_duration", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) + config.recodeHistogram(generateMetricName(route, cluster, model, "llm_service_duration"), + uint64(responseEndTime-requestStartTime)) } } + // Get infomations about this request model, inputToken, outputToken, ok := getUsage(data) if !ok { return data } - setFilterStateData(model, inputToken, outputToken, log) - incrementCounter(config, model, inputToken, outputToken, log) - // Set tracing span tag input_tokens and output_tokens. - setTracingSpanValue("input_tokens", strconv.FormatInt(inputToken, 10), log) - setTracingSpanValue("output_tokens", strconv.FormatInt(outputToken, 10), log) + route := getRouteName() + cluster := getClusterName() + // Set model context used in the last chunk which can be empty + if ctx.GetContext("model") == nil { + ctx.SetContext("model", model) + } + + // If this is the first chunk, record first token duration metric and span attribute + firstTokenTime, ok := ctx.GetContext(StatisticsFirstTokenTime).(int64) + if !ok { + firstTokenTime = time.Now().UnixMilli() + ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime) + setTracingSpanValue("llm_first_token_duration", fmt.Sprint(firstTokenTime-requestStartTime), log) + config.recodeHistogram(generateMetricName(route, cluster, model, "llm_first_token_duration"), + uint64(firstTokenTime-requestStartTime)) + } + + // Set token usage metrics + config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) + // Set filter states which can be used by other plugins. + setFilterState("model", model, log) + setFilterState("input_token", inputToken, log) + setFilterState("output_token", outputToken, log) + // Set tracing span tag input_token and output_token. + setTracingSpanValue("input_token", strconv.FormatInt(inputToken, 10), log) + setTracingSpanValue("output_token", strconv.FormatInt(outputToken, 10), log) // Set response process proxy wasm property. setTracingSpanValueBySource(config, "property", nil, log) @@ -152,22 +203,31 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat } func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { - // Calculate the total time and set tracing span tag total_time. - requestStartTimeStr := ctx.GetContext(StatisticsRequestStartTime).(string) - requestStartTime, _ := strconv.ParseInt(requestStartTimeStr, 10, 64) + requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64) + if !ok { + return types.ActionContinue + } responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("total_time", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) - + setTracingSpanValue("llm_service_duration", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) + // Get infomations about this request model, inputToken, outputToken, ok := getUsage(body) if !ok { return types.ActionContinue } - setFilterStateData(model, inputToken, outputToken, log) - incrementCounter(config, model, inputToken, outputToken, log) + route := getRouteName() + cluster := getClusterName() + // Set metrics + config.recodeHistogram(generateMetricName(route, cluster, model, "llm_service_duration"), uint64(responseEndTime-requestStartTime)) + config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) + // Set filter states which can be used by other plugins. + setFilterState("model", model, log) + setFilterState("input_token", inputToken, log) + setFilterState("output_token", outputToken, log) // Set tracing span tag input_tokens and output_tokens. - setTracingSpanValue("input_tokens", strconv.FormatInt(inputToken, 10), log) - setTracingSpanValue("output_tokens", strconv.FormatInt(outputToken, 10), log) + setTracingSpanValue("input_token", strconv.FormatInt(inputToken, 10), log) + setTracingSpanValue("output_token", strconv.FormatInt(outputToken, 10), log) // Set response process proxy wasm property. setTracingSpanValueBySource(config, "property", nil, log) return types.ActionContinue @@ -198,30 +258,10 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag return } -// setFilterData sets the input_token and output_token in the filter state. -// ai-token-ratelimit will use these values to calculate the total token usage. -func setFilterStateData(model string, inputToken int64, outputToken int64, log wrapper.Log) { - if e := proxywasm.SetProperty([]string{"model"}, []byte(model)); e != nil { - log.Errorf("failed to set model in filter state: %v", e) - } - if e := proxywasm.SetProperty([]string{"input_token"}, []byte(fmt.Sprintf("%d", inputToken))); e != nil { - log.Errorf("failed to set input_token in filter state: %v", e) - } - if e := proxywasm.SetProperty([]string{"output_token"}, []byte(fmt.Sprintf("%d", outputToken))); e != nil { - log.Errorf("failed to set output_token in filter state: %v", e) - } -} - -func incrementCounter(config AIStatisticsConfig, model string, inputToken int64, outputToken int64, log wrapper.Log) { - var route, cluster string - if raw, err := proxywasm.GetProperty([]string{"route_name"}); err == nil { - route = string(raw) +func setFilterState(key string, value interface{}, log wrapper.Log) { + if e := proxywasm.SetProperty([]string{key}, []byte(fmt.Sprint(value))); e != nil { + log.Errorf("failed to set %s in filter state: %v", key, e) } - if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err == nil { - cluster = string(raw) - } - config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".input_token", uint64(inputToken), log) - config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".output_token", uint64(outputToken), log) } // fetches the tracing span value from the specified source. @@ -235,7 +275,7 @@ func setTracingSpanValueBySource(config AIStatisticsConfig, tracingSource string } case "request_body": bodyJson := gjson.ParseBytes(body) - value := trimQuote(bodyJson.Get(tracingSpanEle.Value).String()) + value := bodyJson.Get(tracingSpanEle.Value).String() setTracingSpanValue(tracingSpanEle.Key, value, log) case "request_header": if value, err := proxywasm.GetHttpRequestHeader(tracingSpanEle.Value); err == nil { @@ -257,7 +297,7 @@ func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) { log.Debugf("try to set trace span [%s] with value [%s].", tracingKey, tracingValue) if tracingValue != "" { - traceSpanTag := "trace_span_tag." + tracingKey + traceSpanTag := TracePrefix + tracingKey if raw, err := proxywasm.GetProperty([]string{traceSpanTag}); err == nil { if raw != nil { @@ -271,19 +311,3 @@ func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) { log.Debugf("successed to set trace span [%s] with value [%s].", traceSpanTag, tracingValue) } } - -// trims the quote from the source string. -func trimQuote(source string) string { - TempKey := strings.Trim(source, `"`) - Key, _ := zhToUnicode([]byte(TempKey)) - return string(Key) -} - -// converts the zh string to Unicode. -func zhToUnicode(raw []byte) ([]byte, error) { - str, err := strconv.Unquote(strings.Replace(strconv.Quote(string(raw)), `\\u`, `\u`, -1)) - if err != nil { - return nil, err - } - return []byte(str), nil -} From 5fb856af3321c6e2c0a05c231528e32c4aa283ac Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Wed, 28 Aug 2024 14:39:36 +0800 Subject: [PATCH 02/18] add rt metrics --- .../wasm-go/extensions/ai-statistics/main.go | 60 +++++++------------ 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 1af81e8b92..bdc89d5518 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -40,10 +40,8 @@ type TracingSpan struct { type AIStatisticsConfig struct { // Metrics - counterMetrics map[string]proxywasm.MetricCounter - gaugeMetrics map[string]proxywasm.MetricGauge - histogramMetrics map[string]proxywasm.MetricHistogram - // TracingSpan array define the tracing span. + counterMetrics map[string]proxywasm.MetricCounter + // TODO: add more metrics in Gauge and Histogram format TracingSpan []TracingSpan } @@ -76,24 +74,6 @@ func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64 counter.Increment(inc) } -func (config *AIStatisticsConfig) addGauge(metricName string, inc int64) { - gauge, ok := config.gaugeMetrics[metricName] - if !ok { - gauge = proxywasm.DefineGaugeMetric(metricName) - config.gaugeMetrics[metricName] = gauge - } - gauge.Add(inc) -} - -func (config *AIStatisticsConfig) recodeHistogram(metricName string, value uint64) { - histogram, ok := config.histogramMetrics[metricName] - if !ok { - histogram = proxywasm.DefineHistogramMetric(metricName) - config.histogramMetrics[metricName] = histogram - } - histogram.Record(value) -} - func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error { // Parse tracing span. tracingSpanConfigArray := configJson.Get("tracing_span").Array() @@ -108,8 +88,6 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe } config.counterMetrics = make(map[string]proxywasm.MetricCounter) - config.gaugeMetrics = make(map[string]proxywasm.MetricGauge) - config.histogramMetrics = make(map[string]proxywasm.MetricHistogram) return nil } @@ -152,6 +130,14 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat return data } + // If this is the first chunk, record first token duration metric and span attribute + firstTokenTime, ok := ctx.GetContext(StatisticsFirstTokenTime).(int64) + if !ok { + firstTokenTime = time.Now().UnixMilli() + ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime) + setTracingSpanValue("llm_first_token_duration", fmt.Sprint(firstTokenTime-requestStartTime), log) + } + // If the end of the stream is reached, calculate the total time and set metric and span attribute. if endOfStream { if model, ok := ctx.GetContext("model").(string); ok { @@ -159,7 +145,10 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat cluster := getClusterName() responseEndTime := time.Now().UnixMilli() setTracingSpanValue("llm_service_duration", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) - config.recodeHistogram(generateMetricName(route, cluster, model, "llm_service_duration"), + config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_duration_count"), 1) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_first_token_duration"), + uint64(firstTokenTime-requestStartTime)) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_service_duration"), uint64(responseEndTime-requestStartTime)) } } @@ -176,19 +165,9 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat ctx.SetContext("model", model) } - // If this is the first chunk, record first token duration metric and span attribute - firstTokenTime, ok := ctx.GetContext(StatisticsFirstTokenTime).(int64) - if !ok { - firstTokenTime = time.Now().UnixMilli() - ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime) - setTracingSpanValue("llm_first_token_duration", fmt.Sprint(firstTokenTime-requestStartTime), log) - config.recodeHistogram(generateMetricName(route, cluster, model, "llm_first_token_duration"), - uint64(firstTokenTime-requestStartTime)) - } - // Set token usage metrics - config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.input_token"), uint64(inputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.output_token"), uint64(outputToken)) // Set filter states which can be used by other plugins. setFilterState("model", model, log) setFilterState("input_token", inputToken, log) @@ -218,9 +197,10 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body route := getRouteName() cluster := getClusterName() // Set metrics - config.recodeHistogram(generateMetricName(route, cluster, model, "llm_service_duration"), uint64(responseEndTime-requestStartTime)) - config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_duration_count"), 1) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_service_duration"), uint64(responseEndTime-requestStartTime)) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.input_token"), uint64(inputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "metric.output_token"), uint64(outputToken)) // Set filter states which can be used by other plugins. setFilterState("model", model, log) setFilterState("input_token", inputToken, log) From 2574938af2ecbe0be9b6f3e086f03d7eb204f240 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Thu, 29 Aug 2024 15:51:16 +0800 Subject: [PATCH 03/18] update --- .../wasm-go/extensions/ai-statistics/main.go | 79 ++++++++++++------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index bdc89d5518..c310c9f412 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -46,23 +46,23 @@ type AIStatisticsConfig struct { } func generateMetricName(route, cluster, model, metricName string) string { - return fmt.Sprintf("route.%s.upstream.%s.model.%s.%s", route, cluster, model, metricName) + return fmt.Sprintf("route.%s.upstream.%s.model.%s.metric.%s", route, cluster, model, metricName) } -func getRouteName() string { - var route string - if raw, err := proxywasm.GetProperty([]string{"route_name"}); err == nil { - route = string(raw) +func getRouteName() (string, error) { + if raw, err := proxywasm.GetProperty([]string{"route_name"}); err != nil { + return "", err + } else { + return string(raw), nil } - return route } -func getClusterName() string { - var cluster string - if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err == nil { - cluster = string(raw) +func getClusterName() (string, error) { + if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err != nil { + return "", err + } else { + return string(raw), nil } - return cluster } func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64) { @@ -141,15 +141,21 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat // If the end of the stream is reached, calculate the total time and set metric and span attribute. if endOfStream { if model, ok := ctx.GetContext("model").(string); ok { - route := getRouteName() - cluster := getClusterName() + route, err := getRouteName() + if err != nil { + return data + } + cluster, err := getClusterName() + if err != nil { + return data + } responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("llm_service_duration", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) - config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_duration_count"), 1) - config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_first_token_duration"), - uint64(firstTokenTime-requestStartTime)) - config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_service_duration"), - uint64(responseEndTime-requestStartTime)) + setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) + config.incrementCounter(generateMetricName(route, cluster, model, "llm_duration_count"), 1) + llm_first_token_duration := uint64(firstTokenTime - requestStartTime) + config.incrementCounter(generateMetricName(route, cluster, model, "llm_first_token_duration"), llm_first_token_duration) + llm_service_duration := uint64(responseEndTime - requestStartTime) + config.incrementCounter(generateMetricName(route, cluster, model, "llm_service_duration"), llm_service_duration) } } @@ -158,16 +164,22 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat if !ok { return data } - route := getRouteName() - cluster := getClusterName() + route, err := getRouteName() + if err != nil { + return data + } + cluster, err := getClusterName() + if err != nil { + return data + } // Set model context used in the last chunk which can be empty if ctx.GetContext("model") == nil { ctx.SetContext("model", model) } // Set token usage metrics - config.incrementCounter(generateMetricName(route, cluster, model, "metric.input_token"), uint64(inputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "metric.output_token"), uint64(outputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) // Set filter states which can be used by other plugins. setFilterState("model", model, log) setFilterState("input_token", inputToken, log) @@ -188,19 +200,26 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body return types.ActionContinue } responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("llm_service_duration", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) + setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) // Get infomations about this request model, inputToken, outputToken, ok := getUsage(body) if !ok { return types.ActionContinue } - route := getRouteName() - cluster := getClusterName() + route, err := getRouteName() + if err != nil { + return types.ActionContinue + } + cluster, err := getClusterName() + if err != nil { + return types.ActionContinue + } // Set metrics - config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_duration_count"), 1) - config.incrementCounter(generateMetricName(route, cluster, model, "metric.llm_service_duration"), uint64(responseEndTime-requestStartTime)) - config.incrementCounter(generateMetricName(route, cluster, model, "metric.input_token"), uint64(inputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "metric.output_token"), uint64(outputToken)) + llm_service_duration := uint64(responseEndTime - requestStartTime) + config.incrementCounter(generateMetricName(route, cluster, model, "llm_service_duration"), llm_service_duration) + config.incrementCounter(generateMetricName(route, cluster, model, "llm_duration_count"), 1) + config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) // Set filter states which can be used by other plugins. setFilterState("model", model, log) setFilterState("input_token", inputToken, log) From 6c081d7d2247655031903e7e353bddfb9531c7de Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Wed, 11 Sep 2024 16:02:06 +0800 Subject: [PATCH 04/18] support log & metric & trace --- .../wasm-go/extensions/ai-statistics/main.go | 348 ++++++++++++------ 1 file changed, 240 insertions(+), 108 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index c310c9f412..9b9bf41bca 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -32,7 +32,7 @@ const ( ) // TracingSpan is the tracing span configuration. -type TracingSpan struct { +type Attribute struct { Key string `required:"true" yaml:"key" json:"key"` ValueSource string `required:"true" yaml:"valueSource" json:"valueSource"` Value string `required:"true" yaml:"value" json:"value"` @@ -42,7 +42,9 @@ type AIStatisticsConfig struct { // Metrics counterMetrics map[string]proxywasm.MetricCounter // TODO: add more metrics in Gauge and Histogram format - TracingSpan []TracingSpan + logAttributes []Attribute + spanAttributes []Attribute + shouldBufferStreamingBody bool } func generateMetricName(route, cluster, model, metricName string) string { @@ -51,7 +53,7 @@ func generateMetricName(route, cluster, model, metricName string) string { func getRouteName() (string, error) { if raw, err := proxywasm.GetProperty([]string{"route_name"}); err != nil { - return "", err + return "-", err } else { return string(raw), nil } @@ -59,7 +61,7 @@ func getRouteName() (string, error) { func getClusterName() (string, error) { if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err != nil { - return "", err + return "-", err } else { return string(raw), nil } @@ -75,28 +77,49 @@ func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64 } func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error { - // Parse tracing span. - tracingSpanConfigArray := configJson.Get("tracing_span").Array() - config.TracingSpan = make([]TracingSpan, len(tracingSpanConfigArray)) - for i, tracingSpanConfig := range tracingSpanConfigArray { - tracingSpan := TracingSpan{ - Key: tracingSpanConfig.Get("key").String(), - ValueSource: tracingSpanConfig.Get("value_source").String(), - Value: tracingSpanConfig.Get("value").String(), + // Parse tracing span attributes setting. + tracingSpanConfigArray := configJson.Get("spanAttributes").Array() + config.spanAttributes = make([]Attribute, len(tracingSpanConfigArray)) + for i, traceAttributeConfig := range tracingSpanConfigArray { + spanAttribute := Attribute{ + Key: traceAttributeConfig.Get("key").String(), + ValueSource: traceAttributeConfig.Get("value_source").String(), + Value: traceAttributeConfig.Get("value").String(), } - config.TracingSpan[i] = tracingSpan + if spanAttribute.ValueSource == "response_streaming_body" { + config.shouldBufferStreamingBody = true + } + config.spanAttributes[i] = spanAttribute } - + // Parse log attributes setting. + logConfigArray := configJson.Get("logAttributes").Array() + config.logAttributes = make([]Attribute, len(logConfigArray)) + for i, logAttributeConfig := range logConfigArray { + logAttribute := Attribute{ + Key: logAttributeConfig.Get("key").String(), + ValueSource: logAttributeConfig.Get("value_source").String(), + Value: logAttributeConfig.Get("value").String(), + } + if logAttribute.ValueSource == "response_streaming_body" { + config.shouldBufferStreamingBody = true + } + config.logAttributes[i] = logAttribute + } + // Metric settings config.counterMetrics = make(map[string]proxywasm.MetricCounter) return nil } func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { - // Fetch request header tracing span value. - setTracingSpanValueBySource(config, "request_header", nil, log) - // Fetch request process proxy wasm property. + logAttributes := make(map[string]string) + ctx.SetContext("logAttributes", logAttributes) + // Set user defined log & span attributes. + setTraceAttributeValueBySource(config, "request_header", nil, log) + setLogAttributeValueBySource(ctx, config, "request_header", nil, log) + // Fetch log & trace attributes from wasm filter property. // Warn: The property may be modified by response process , so the value of the property may be overwritten. - setTracingSpanValueBySource(config, "property", nil, log) + setTraceAttributeValueBySource(config, "property", nil, log) + setLogAttributeValueBySource(ctx, config, "property", nil, log) // Set request start time. ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) @@ -107,8 +130,9 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo } func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { - // Set request body tracing span value. - setTracingSpanValueBySource(config, "request_body", body, log) + // Set user defined log & span attributes. + setTraceAttributeValueBySource(config, "request_body", body, log) + setLogAttributeValueBySource(ctx, config, "request_body", body, log) return types.ActionContinue } @@ -118,15 +142,30 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, l ctx.BufferResponseBody() } - // Set response header tracing span value. - setTracingSpanValueBySource(config, "response_header", nil, log) + // Set user defined log & span attributes. + setTraceAttributeValueBySource(config, "response_header", nil, log) + setLogAttributeValueBySource(ctx, config, "response_header", nil, log) return types.ActionContinue } func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool, log wrapper.Log) []byte { + // Buffer stream body for record log & span attributes + if config.shouldBufferStreamingBody { + var streamingBodyBuffer []byte + streamingBodyBuffer, ok := ctx.GetContext("streamingBodyBuffer").([]byte) + if !ok { + streamingBodyBuffer = data + } else { + streamingBodyBuffer = append(streamingBodyBuffer, data...) + } + ctx.SetContext("streamingBodyBuffer", streamingBodyBuffer) + } + + // Get requestStartTime from http context requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64) if !ok { + log.Error("failed to get requestStartTime from http context") return data } @@ -138,97 +177,126 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat setTracingSpanValue("llm_first_token_duration", fmt.Sprint(firstTokenTime-requestStartTime), log) } - // If the end of the stream is reached, calculate the total time and set metric and span attribute. + // Set infomations about this request + route, _ := getRouteName() + cluster, _ := getClusterName() + if model, inputToken, outputToken, ok := getUsage(data); ok { + // Get logAttributes from http context + logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) + if !ok { + log.Error("failed to get logAttributes from http context") + return data + } + // Record Log Attributes + logAttributes["route"] = route + logAttributes["cluster"] = cluster + logAttributes["model"] = model + logAttributes["input_token"] = fmt.Sprint(inputToken) + logAttributes["output_token"] = fmt.Sprint(outputToken) + // Set logAttributes to http context + ctx.SetContext("logAttributes", logAttributes) + } + // If the end of the stream is reached, record metrics/logs/spans. if endOfStream { - if model, ok := ctx.GetContext("model").(string); ok { - route, err := getRouteName() - if err != nil { - return data - } - cluster, err := getClusterName() - if err != nil { + // Get logAttributes from http context + logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) + if !ok { + log.Error("failed to get logAttributes from http context") + return data + } + responseEndTime := time.Now().UnixMilli() + llm_first_token_duration := uint64(firstTokenTime - requestStartTime) + llm_service_duration := uint64(responseEndTime - requestStartTime) + logAttributes["llm_first_token_duration"] = fmt.Sprint(llm_first_token_duration) + logAttributes["llm_service_duration"] = fmt.Sprint(llm_service_duration) + inputTokenUint64, err := strconv.ParseUint(logAttributes["input_token"], 10, 0) + if err != nil || inputTokenUint64 == 0 { + log.Errorf("input_token convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) + return data + } + outputTokenUint64, err := strconv.ParseUint(logAttributes["output_token"], 10, 0) + if err != nil || outputTokenUint64 == 0 { + log.Errorf("output_token convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) + return data + } + // Set filter states which can be used by other plugins such as ai-token-ratelimit + setFilterState("model", logAttributes["model"], log) + setFilterState("input_token", logAttributes["input_token"], log) + setFilterState("output_token", logAttributes["output_token"], log) + // Set metrics + config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "input_token"), inputTokenUint64) + config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "output_token"), outputTokenUint64) + config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_first_token_duration"), llm_first_token_duration) + config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_service_duration"), llm_service_duration) + config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_duration_count"), 1) + // Set tracing span attributes. + setTracingSpanValue("input_token", logAttributes["input_token"], log) + setTracingSpanValue("output_token", logAttributes["output_token"], log) + setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) + // Set user defined log & span attributes. + if config.shouldBufferStreamingBody { + streamingBodyBuffer, ok := ctx.GetContext("streamingBodyBuffer").([]byte) + if !ok { return data } - responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) - config.incrementCounter(generateMetricName(route, cluster, model, "llm_duration_count"), 1) - llm_first_token_duration := uint64(firstTokenTime - requestStartTime) - config.incrementCounter(generateMetricName(route, cluster, model, "llm_first_token_duration"), llm_first_token_duration) - llm_service_duration := uint64(responseEndTime - requestStartTime) - config.incrementCounter(generateMetricName(route, cluster, model, "llm_service_duration"), llm_service_duration) + setTraceAttributeValueBySource(config, "response_streaming_body", streamingBodyBuffer, log) + setLogAttributeValueBySource(ctx, config, "response_streaming_body", streamingBodyBuffer, log) } + writeLog(logAttributes, log) } - - // Get infomations about this request - model, inputToken, outputToken, ok := getUsage(data) - if !ok { - return data - } - route, err := getRouteName() - if err != nil { - return data - } - cluster, err := getClusterName() - if err != nil { - return data - } - // Set model context used in the last chunk which can be empty - if ctx.GetContext("model") == nil { - ctx.SetContext("model", model) - } - - // Set token usage metrics - config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) - // Set filter states which can be used by other plugins. - setFilterState("model", model, log) - setFilterState("input_token", inputToken, log) - setFilterState("output_token", outputToken, log) - // Set tracing span tag input_token and output_token. - setTracingSpanValue("input_token", strconv.FormatInt(inputToken, 10), log) - setTracingSpanValue("output_token", strconv.FormatInt(outputToken, 10), log) - // Set response process proxy wasm property. - setTracingSpanValueBySource(config, "property", nil, log) - return data } func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { - // Calculate the total time and set tracing span tag total_time. + // Get logAttributes from http context + logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) + if !ok { + log.Error("failed to get logAttributes from http context") + return types.ActionContinue + } + + // Get requestStartTime from http context requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64) if !ok { + log.Error("failed to get requestStartTime from http context") return types.ActionContinue } + responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) + llm_service_duration := uint64(responseEndTime - requestStartTime) + // Get infomations about this request + route, _ := getRouteName() + cluster, _ := getClusterName() model, inputToken, outputToken, ok := getUsage(body) if !ok { return types.ActionContinue } - route, err := getRouteName() - if err != nil { - return types.ActionContinue - } - cluster, err := getClusterName() - if err != nil { - return types.ActionContinue - } - // Set metrics - llm_service_duration := uint64(responseEndTime - requestStartTime) - config.incrementCounter(generateMetricName(route, cluster, model, "llm_service_duration"), llm_service_duration) - config.incrementCounter(generateMetricName(route, cluster, model, "llm_duration_count"), 1) - config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) - // Set filter states which can be used by other plugins. + // Set filter states which can be used by other plugins such as ai-token-ratelimit setFilterState("model", model, log) setFilterState("input_token", inputToken, log) setFilterState("output_token", outputToken, log) + // Set metrics + config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) + config.incrementCounter(generateMetricName(route, cluster, model, "llm_service_duration"), llm_service_duration) + config.incrementCounter(generateMetricName(route, cluster, model, "llm_duration_count"), 1) // Set tracing span tag input_tokens and output_tokens. setTracingSpanValue("input_token", strconv.FormatInt(inputToken, 10), log) setTracingSpanValue("output_token", strconv.FormatInt(outputToken, 10), log) - // Set response process proxy wasm property. - setTracingSpanValueBySource(config, "property", nil, log) + setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) + // Set Log Attributes + logAttributes["route"] = route + logAttributes["cluster"] = cluster + logAttributes["model"] = model + logAttributes["input_token"] = fmt.Sprint(inputToken) + logAttributes["output_token"] = fmt.Sprint(outputToken) + logAttributes["llm_service_duration"] = fmt.Sprint(llm_service_duration) + // Write log + writeLog(logAttributes, log) + // Set user defined log & span attributes. + setTraceAttributeValueBySource(config, "response_body", body, log) + setLogAttributeValueBySource(ctx, config, "response_body", body, log) return types.ActionContinue } @@ -257,6 +325,18 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag return } +func extractDataByJsonPath(data []byte, jsonPath string) string { + chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n")) + value := "" + for _, chunk := range chunks { + raw := gjson.GetBytes(chunk, jsonPath).Raw + if len(raw) > 2 { + value += raw[1 : len(raw)-1] + } + } + return value +} + func setFilterState(key string, value interface{}, log wrapper.Log) { if e := proxywasm.SetProperty([]string{key}, []byte(fmt.Sprint(value))); e != nil { log.Errorf("failed to set %s in filter state: %v", key, e) @@ -264,28 +344,34 @@ func setFilterState(key string, value interface{}, log wrapper.Log) { } // fetches the tracing span value from the specified source. -func setTracingSpanValueBySource(config AIStatisticsConfig, tracingSource string, body []byte, log wrapper.Log) { - for _, tracingSpanEle := range config.TracingSpan { - if tracingSource == tracingSpanEle.ValueSource { - switch tracingSource { - case "response_header": - if value, err := proxywasm.GetHttpResponseHeader(tracingSpanEle.Value); err == nil { - setTracingSpanValue(tracingSpanEle.Key, value, log) - } - case "request_body": - bodyJson := gjson.ParseBytes(body) - value := bodyJson.Get(tracingSpanEle.Value).String() - setTracingSpanValue(tracingSpanEle.Key, value, log) +func setTraceAttributeValueBySource(config AIStatisticsConfig, source string, body []byte, log wrapper.Log) { + for _, spanAttribute := range config.spanAttributes { + if source == spanAttribute.ValueSource { + switch source { case "request_header": - if value, err := proxywasm.GetHttpRequestHeader(tracingSpanEle.Value); err == nil { - setTracingSpanValue(tracingSpanEle.Key, value, log) + if value, err := proxywasm.GetHttpRequestHeader(spanAttribute.Value); err == nil { + log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) + setTracingSpanValue(spanAttribute.Key, value, log) } - case "property": - if raw, err := proxywasm.GetProperty([]string{tracingSpanEle.Value}); err == nil { - setTracingSpanValue(tracingSpanEle.Key, string(raw), log) + case "request_body": + value := gjson.GetBytes(body, spanAttribute.Value).String() + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) + setTracingSpanValue(spanAttribute.Key, value, log) + case "response_header": + if value, err := proxywasm.GetHttpResponseHeader(spanAttribute.Value); err == nil { + log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) + setTracingSpanValue(spanAttribute.Key, value, log) } + case "response_streaming_body": + value := extractDataByJsonPath(body, spanAttribute.Value) + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) + setTracingSpanValue(spanAttribute.Key, value, log) + case "response_body": + value := gjson.GetBytes(body, spanAttribute.Value).String() + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) + setTracingSpanValue(spanAttribute.Key, value, log) default: - + log.Errorf("source type %s is error", source) } } } @@ -293,8 +379,6 @@ func setTracingSpanValueBySource(config AIStatisticsConfig, tracingSource string // Set the tracing span with value. func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) { - log.Debugf("try to set trace span [%s] with value [%s].", tracingKey, tracingValue) - if tracingValue != "" { traceSpanTag := TracePrefix + tracingKey @@ -310,3 +394,51 @@ func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) { log.Debugf("successed to set trace span [%s] with value [%s].", traceSpanTag, tracingValue) } } + +// fetches the tracing span value from the specified source. +func setLogAttributeValueBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, source string, body []byte, log wrapper.Log) { + logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) + if !ok { + log.Error("failed to get logAttributes from http context") + return + } + for _, logAttribute := range config.logAttributes { + if source == logAttribute.ValueSource { + switch source { + case "request_header": + if value, err := proxywasm.GetHttpRequestHeader(logAttribute.Value); err == nil { + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) + logAttributes[logAttribute.Key] = value + } + case "request_body": + value := gjson.GetBytes(body, logAttribute.Value).String() + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) + logAttributes[logAttribute.Key] = value + case "response_header": + if value, err := proxywasm.GetHttpResponseHeader(logAttribute.Value); err == nil { + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) + logAttributes[logAttribute.Key] = value + } + case "response_streaming_body": + value := extractDataByJsonPath(body, logAttribute.Value) + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) + logAttributes[logAttribute.Key] = value + case "response_body": + value := gjson.GetBytes(body, logAttribute.Value).String() + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) + logAttributes[logAttribute.Key] = value + default: + log.Errorf("source type %s is error", source) + } + } + } + ctx.SetContext("logAttributes", logAttributes) +} + +func writeLog(logAttributes map[string]string, log wrapper.Log) { + items := []string{} + for k, v := range logAttributes { + items = append(items, fmt.Sprintf(`"%s":"%s"`, k, v)) + } + log.Infof("ai request json log: {%s}", strings.Join(items, ",")) +} From abdc2af12183ab424c574e62d8b477d5f6a82318 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Wed, 11 Sep 2024 18:00:48 +0800 Subject: [PATCH 05/18] update --- .../wasm-go/extensions/ai-statistics/main.go | 98 +++++++++++++------ 1 file changed, 66 insertions(+), 32 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 9b9bf41bca..6fdeb7003b 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -33,9 +33,10 @@ const ( // TracingSpan is the tracing span configuration. type Attribute struct { - Key string `required:"true" yaml:"key" json:"key"` - ValueSource string `required:"true" yaml:"valueSource" json:"valueSource"` - Value string `required:"true" yaml:"value" json:"value"` + Key string + ValueSource string + Value string + Rule string } type AIStatisticsConfig struct { @@ -85,6 +86,7 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe Key: traceAttributeConfig.Get("key").String(), ValueSource: traceAttributeConfig.Get("value_source").String(), Value: traceAttributeConfig.Get("value").String(), + Rule: traceAttributeConfig.Get("rule").String(), } if spanAttribute.ValueSource == "response_streaming_body" { config.shouldBufferStreamingBody = true @@ -99,6 +101,7 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe Key: logAttributeConfig.Get("key").String(), ValueSource: logAttributeConfig.Get("value_source").String(), Value: logAttributeConfig.Get("value").String(), + Rule: logAttributeConfig.Get("rule").String(), } if logAttribute.ValueSource == "response_streaming_body" { config.shouldBufferStreamingBody = true @@ -113,19 +116,17 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { logAttributes := make(map[string]string) ctx.SetContext("logAttributes", logAttributes) - // Set user defined log & span attributes. + // Set base span attributes + setTracingSpanValue("gen_ai.span.kind", "LLM", log) + // Set user defined log & span attributes which type is request_header setTraceAttributeValueBySource(config, "request_header", nil, log) setLogAttributeValueBySource(ctx, config, "request_header", nil, log) - // Fetch log & trace attributes from wasm filter property. - // Warn: The property may be modified by response process , so the value of the property may be overwritten. - setTraceAttributeValueBySource(config, "property", nil, log) - setLogAttributeValueBySource(ctx, config, "property", nil, log) - + // Set user defined log & span attributes which type is fixed_value + setTraceAttributeValueBySource(config, "fixed_value", nil, log) + setLogAttributeValueBySource(ctx, config, "fixed_value", nil, log) // Set request start time. ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) - // The request has a body and requires delaying the header transmission until a cache miss occurs, - // at which point the header should be sent. return types.ActionContinue } @@ -133,6 +134,7 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body // Set user defined log & span attributes. setTraceAttributeValueBySource(config, "request_body", body, log) setLogAttributeValueBySource(ctx, config, "request_body", body, log) + return types.ActionContinue } @@ -230,8 +232,10 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_service_duration"), llm_service_duration) config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_duration_count"), 1) // Set tracing span attributes. - setTracingSpanValue("input_token", logAttributes["input_token"], log) - setTracingSpanValue("output_token", logAttributes["output_token"], log) + setTracingSpanValue("gen_ai.model_name", logAttributes["model"], log) + setTracingSpanValue("gen_ai.usage.input_tokens", logAttributes["input_token"], log) + setTracingSpanValue("gen_ai.usage.output_tokens", logAttributes["output_token"], log) + setTracingSpanValue("gen_ai.usage.total_tokens", fmt.Sprint(inputTokenUint64+outputTokenUint64), log) setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) // Set user defined log & span attributes. if config.shouldBufferStreamingBody { @@ -282,8 +286,10 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body config.incrementCounter(generateMetricName(route, cluster, model, "llm_service_duration"), llm_service_duration) config.incrementCounter(generateMetricName(route, cluster, model, "llm_duration_count"), 1) // Set tracing span tag input_tokens and output_tokens. - setTracingSpanValue("input_token", strconv.FormatInt(inputToken, 10), log) - setTracingSpanValue("output_token", strconv.FormatInt(outputToken, 10), log) + setTracingSpanValue("gen_ai.model_name", model, log) + setTracingSpanValue("gen_ai.usage.input_tokens", fmt.Sprint(inputToken), log) + setTracingSpanValue("gen_ai.usage.output_tokens", fmt.Sprint(outputToken), log) + setTracingSpanValue("gen_ai.usage.total_tokens", fmt.Sprint(inputToken+outputToken), log) setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) // Set Log Attributes logAttributes["route"] = route @@ -325,14 +331,34 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag return } -func extractDataByJsonPath(data []byte, jsonPath string) string { +func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, log wrapper.Log) string { chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n")) value := "" - for _, chunk := range chunks { - raw := gjson.GetBytes(chunk, jsonPath).Raw - if len(raw) > 2 { - value += raw[1 : len(raw)-1] + if rule == "first" { + for _, chunk := range chunks { + jsonObj := gjson.GetBytes(chunk, jsonPath) + if jsonObj.Exists() { + value = jsonObj.String() + break + } + } + } else if rule == "replace" { + for _, chunk := range chunks { + jsonObj := gjson.GetBytes(chunk, jsonPath) + if jsonObj.Exists() { + value = jsonObj.String() + } } + } else if rule == "append" { + // extract llm response + for _, chunk := range chunks { + raw := gjson.GetBytes(chunk, jsonPath).Raw + if len(raw) > 2 { + value += raw[1 : len(raw)-1] + } + } + } else { + log.Errorf("unsupported rule type: %s", rule) } return value } @@ -348,6 +374,9 @@ func setTraceAttributeValueBySource(config AIStatisticsConfig, source string, bo for _, spanAttribute := range config.spanAttributes { if source == spanAttribute.ValueSource { switch source { + case "fixed_value": + log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, spanAttribute.Value) + setTracingSpanValue(spanAttribute.Key, spanAttribute.Value, log) case "request_header": if value, err := proxywasm.GetHttpRequestHeader(spanAttribute.Value); err == nil { log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) @@ -363,7 +392,7 @@ func setTraceAttributeValueBySource(config AIStatisticsConfig, source string, bo setTracingSpanValue(spanAttribute.Key, value, log) } case "response_streaming_body": - value := extractDataByJsonPath(body, spanAttribute.Value) + value := extractStreamingBodyByJsonPath(body, spanAttribute.Value, spanAttribute.Rule, log) log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) setTracingSpanValue(spanAttribute.Key, value, log) case "response_body": @@ -379,20 +408,22 @@ func setTraceAttributeValueBySource(config AIStatisticsConfig, source string, bo // Set the tracing span with value. func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) { - if tracingValue != "" { - traceSpanTag := TracePrefix + tracingKey + if tracingValue == "" { + tracingValue = "-" + } - if raw, err := proxywasm.GetProperty([]string{traceSpanTag}); err == nil { - if raw != nil { - log.Warnf("trace span [%s] already exists, value will be overwrite, orign value: %s.", traceSpanTag, string(raw)) - } - } + traceSpanTag := TracePrefix + tracingKey - if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(tracingValue)); e != nil { - log.Errorf("failed to set %s in filter state: %v", traceSpanTag, e) + if raw, err := proxywasm.GetProperty([]string{traceSpanTag}); err == nil { + if raw != nil { + log.Warnf("trace span [%s] already exists, value will be overwrite, orign value: %s.", traceSpanTag, string(raw)) } - log.Debugf("successed to set trace span [%s] with value [%s].", traceSpanTag, tracingValue) } + + if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(tracingValue)); e != nil { + log.Errorf("failed to set %s in filter state: %v", traceSpanTag, e) + } + log.Debugf("successed to set trace span [%s] with value [%s].", traceSpanTag, tracingValue) } // fetches the tracing span value from the specified source. @@ -405,6 +436,9 @@ func setLogAttributeValueBySource(ctx wrapper.HttpContext, config AIStatisticsCo for _, logAttribute := range config.logAttributes { if source == logAttribute.ValueSource { switch source { + case "fixed_value": + log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, logAttribute.Value) + logAttributes[logAttribute.Key] = logAttribute.Value case "request_header": if value, err := proxywasm.GetHttpRequestHeader(logAttribute.Value); err == nil { log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) @@ -420,7 +454,7 @@ func setLogAttributeValueBySource(ctx wrapper.HttpContext, config AIStatisticsCo logAttributes[logAttribute.Key] = value } case "response_streaming_body": - value := extractDataByJsonPath(body, logAttribute.Value) + value := extractStreamingBodyByJsonPath(body, logAttribute.Value, logAttribute.Rule, log) log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) logAttributes[logAttribute.Key] = value case "response_body": From c472f3dd8d6ee84ac5b7c331bac1c10e4455c49f Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Wed, 11 Sep 2024 18:13:56 +0800 Subject: [PATCH 06/18] update --- plugins/wasm-go/extensions/ai-statistics/main.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 6fdeb7003b..2885320fa4 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -115,15 +115,20 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { logAttributes := make(map[string]string) + request_id, err := proxywasm.GetHttpRequestHeader("x-request-id") + if err != nil || request_id == "" { + log.Errorf("failed to get request_id, err: %v", err) + } + logAttributes["request_id"] = request_id ctx.SetContext("logAttributes", logAttributes) - // Set base span attributes + // Set base log & span attributes setTracingSpanValue("gen_ai.span.kind", "LLM", log) - // Set user defined log & span attributes which type is request_header - setTraceAttributeValueBySource(config, "request_header", nil, log) - setLogAttributeValueBySource(ctx, config, "request_header", nil, log) // Set user defined log & span attributes which type is fixed_value setTraceAttributeValueBySource(config, "fixed_value", nil, log) setLogAttributeValueBySource(ctx, config, "fixed_value", nil, log) + // Set user defined log & span attributes which type is request_header + setTraceAttributeValueBySource(config, "request_header", nil, log) + setLogAttributeValueBySource(ctx, config, "request_header", nil, log) // Set request start time. ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) From 5e14d55bd9bf8eef16805008ece0e2f75785d06d Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Thu, 12 Sep 2024 15:04:18 +0800 Subject: [PATCH 07/18] update --- .../wasm-go/extensions/ai-statistics/main.go | 50 +++++++++++++++---- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 2885320fa4..3d0fa9cb9b 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "encoding/json" "fmt" "strconv" "strings" @@ -251,7 +252,7 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat setTraceAttributeValueBySource(config, "response_streaming_body", streamingBodyBuffer, log) setLogAttributeValueBySource(ctx, config, "response_streaming_body", streamingBodyBuffer, log) } - writeLog(logAttributes, log) + writeLog(ctx, log) } return data } @@ -303,11 +304,12 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body logAttributes["input_token"] = fmt.Sprint(inputToken) logAttributes["output_token"] = fmt.Sprint(outputToken) logAttributes["llm_service_duration"] = fmt.Sprint(llm_service_duration) - // Write log - writeLog(logAttributes, log) + ctx.SetContext("logAttributes", logAttributes) // Set user defined log & span attributes. setTraceAttributeValueBySource(config, "response_body", body, log) setLogAttributeValueBySource(ctx, config, "response_body", body, log) + // Write log + writeLog(ctx, log) return types.ActionContinue } @@ -388,7 +390,11 @@ func setTraceAttributeValueBySource(config AIStatisticsConfig, source string, bo setTracingSpanValue(spanAttribute.Key, value, log) } case "request_body": - value := gjson.GetBytes(body, spanAttribute.Value).String() + raw := gjson.GetBytes(body, spanAttribute.Value).Raw + var value string + if len(raw) > 2 { + value = raw[1 : len(raw)-1] + } log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) setTracingSpanValue(spanAttribute.Key, value, log) case "response_header": @@ -401,7 +407,11 @@ func setTraceAttributeValueBySource(config AIStatisticsConfig, source string, bo log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) setTracingSpanValue(spanAttribute.Key, value, log) case "response_body": - value := gjson.GetBytes(body, spanAttribute.Value).String() + raw := gjson.GetBytes(body, spanAttribute.Value).Raw + var value string + if len(raw) > 2 { + value = raw[1 : len(raw)-1] + } log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) setTracingSpanValue(spanAttribute.Key, value, log) default: @@ -450,7 +460,11 @@ func setLogAttributeValueBySource(ctx wrapper.HttpContext, config AIStatisticsCo logAttributes[logAttribute.Key] = value } case "request_body": - value := gjson.GetBytes(body, logAttribute.Value).String() + raw := gjson.GetBytes(body, logAttribute.Value).Raw + var value string + if len(raw) > 2 { + value = raw[1 : len(raw)-1] + } log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) logAttributes[logAttribute.Key] = value case "response_header": @@ -463,7 +477,11 @@ func setLogAttributeValueBySource(ctx wrapper.HttpContext, config AIStatisticsCo log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) logAttributes[logAttribute.Key] = value case "response_body": - value := gjson.GetBytes(body, logAttribute.Value).String() + raw := gjson.GetBytes(body, logAttribute.Value).Raw + var value string + if len(raw) > 2 { + value = raw[1 : len(raw)-1] + } log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) logAttributes[logAttribute.Key] = value default: @@ -474,10 +492,24 @@ func setLogAttributeValueBySource(ctx wrapper.HttpContext, config AIStatisticsCo ctx.SetContext("logAttributes", logAttributes) } -func writeLog(logAttributes map[string]string, log wrapper.Log) { +func writeLog(ctx wrapper.HttpContext, log wrapper.Log) { + logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) + if !ok { + log.Error("failed to write log") + } items := []string{} for k, v := range logAttributes { items = append(items, fmt.Sprintf(`"%s":"%s"`, k, v)) } - log.Infof("ai request json log: {%s}", strings.Join(items, ",")) + aiLogField := fmt.Sprintf(`{%s}`, strings.Join(items, ",")) + log.Infof("ai request json log: %s", aiLogField) + jsonMap := map[string]string{ + "ai_log": aiLogField, + } + serialized, _ := json.Marshal(jsonMap) + jsonLogRaw := gjson.GetBytes(serialized, "ai_log").Raw + jsonLog := jsonLogRaw[1 : len(jsonLogRaw)-1] + if err := proxywasm.SetProperty([]string{"ai_log"}, []byte(jsonLog)); err != nil { + log.Errorf("failed to set ai_log in filter state: %v", err) + } } From 6033011542b1743e0f38dc7eecfbc8c6f0920e5c Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Thu, 12 Sep 2024 15:47:34 +0800 Subject: [PATCH 08/18] update README.md --- .../extensions/ai-statistics/README.md | 104 ++++++++++-------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 211201be26..38967ae0eb 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -1,69 +1,87 @@ # 介绍 -提供AI可观测基础能力,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则只支持openai协议。 +提供AI可观测基础能力,包括 metric, log, trace,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则只支持openai协议。 # 配置说明 +插件提供了以下基础可观测值,用户无需配置: +- metric:提供了输入token、输出token、首个token的rt(流式请求)、请求总rt等指标,分别在网关、路由、服务、模型四个维度上生效 +- log:提供了 input_token, output_token, model, cluster, route, llm_service_duration, llm_first_token_duration 等字段 +- trace:提供了 input_token, output_token, model, cluster, route, llm_service_duration, llm_first_token_duration 等字段 + +用户还可以通过配置的方式对可观测的值进行扩展: | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|------|-----|------------------------| -| `enable` | bool | 必填 | - | 是否开启ai统计功能 | -| `tracing_span` | array | 非必填 | - | 自定义tracing span tag 配置 | +| `spanAttributes` | []Attribute | 非必填 | - | 自定义 ai 请求中日志字段 | +| `logAttributes` | []Attribute | 非必填 | - | 自定义 ai 请求中链路追踪 span attrribute | -## tracing_span 配置说明 +## Attribute 配置说明 | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|-----|-----|------------------------| -| `key` | string | 必填 | - | tracing tag 名称 | -| `value_source` | string | 必填 | - | tag 取值来源 | -| `value` | string | 必填 | - | tag 取值 key value/path | +| `key` | string | 必填 | - | attrribute 名称 | +| `value_source` | string | 必填 | - | attrribute 取值来源,可选值为 `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | +| `value` | string | 必填 | - | attrribute 取值 key value/path | +| `rule` | string | 非必填 | - | 从流式响应中提取 attrribute 的规则,可选值为 `first`, `replace`, `append`| + +`value_source` 的各种取值含义如下: +- `fixed_value`:固定值 +- `requeset_header` : attrribute 值通过 http 请求头获取,value 配置为 header key +- `request_body` :attrribute 值通过请求 body 获取,value 配置格式为 gjson 的 jsonpath +- `response_header` :attrribute 值通过 http 响应头获取,value 配置为header key +- `response_body` :attrribute 值通过响应 body 获取,value 配置格式为 gjson 的 jsonpath +- `response_streaming_body` :attrribute 值通过流式响应 body 获取,value 配置格式为 gjson 的 jsonpath + -value_source为 tag 值的取值来源,可选配置值有 4 个: -- property : tag 值通过proxywasm.GetProperty()方法获取,value配置GetProperty()方法要提取的key名 -- requeset_header : tag 值通过http请求头获取,value配置为header key -- request_body :tag 值通过请求body获取,value配置格式为 gjson的 GJSON PATH 语法 -- response_header : tag 值通过http响应头获取,value配置为header key +当 `value_source` 为 `response_streaming_body` 时,应当配置 `rule`,用于指定如何从流式body中获取指定值,取值含义如下: +- `first`:(多个chunk中取第一个chunk的值), +- `replace`:(多个chunk中取最后一个chunk的值), +- `append`:(拼接多个chunk中的值,可用于获取回答内容) +## 配置示例 举例如下: ```yaml -tracing_label: -- key: "session_id" - value_source: "requeset_header" - value: "session_id" -- key: "user_content" - value_source: "request_body" - value: "input.messages.1.content" +logAttributes: + - key: consumer # 配合认证鉴权记录consumer + value_source: request_header + value: x-mse-consumer + - key: question # 记录问题 + value_source: request_body + value: messages.@reverse.0.content + - key: answer # 在流式响应中提取大模型的回答 + value_source: response_streaming_body + value: choices.0.delta.content + rule: append + - key: answer # 在非流式响应中提取大模型的回答 + value_source: response_body + value: choices.0.message.content +spanAttributes: + - key: consumer + value_source: request_header + value: x-mse-consumer ``` +## 可观测指标示例 +### Metric 开启后 metrics 示例: ``` -route_upstream_model_input_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 21 -route_upstream_model_output_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 17 +route_upstream_model_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-max"} 21 +route_upstream_model_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-max"} 17 ``` +### Log 日志示例: ```json { - "model": "qwen-max", - "input_token": "21", + "consumer": "21321r9fncsb2dq", + "route": "llm", "output_token": "17", - "authority": "dashscope.aliyuncs.com", - "bytes_received": "336", - "bytes_sent": "1675", - "duration": "1590", - "istio_policy_status": "-", - "method": "POST", - "path": "/v1/chat/completions", - "protocol": "HTTP/1.1", - "request_id": "5895f5a9-e4e3-425b-98db-6c6a926195b7", - "requested_server_name": "-", - "response_code": "200", - "response_flags": "-", - "route_name": "openai", - "start_time": "2024-06-18T09:37:14.078Z", - "trace_id": "-", - "upstream_cluster": "qwen", - "upstream_service_time": "496", - "upstream_transport_failure_reason": "-", - "user_agent": "PostmanRuntime/7.37.3", - "x_forwarded_for": "-" + "llm_service_duration": "3518", + "answer": "我是来自阿里云的超大规模语言模型,我叫通义千问。", + "request_id": "2d8ffda2-dc43-933d-ad72-7679cfbbaf15", + "question": "你是谁", + "cluster": "outbound|443||qwen.dns", + "model": "qwen-max", + "input_token": "10", + "llm_first_token_duration": "676" } ``` \ No newline at end of file From e73d9a3be5e199d064e19db0efcc18f469571634 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Fri, 20 Sep 2024 10:35:42 +0800 Subject: [PATCH 09/18] update README --- plugins/wasm-go/extensions/ai-statistics/README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 38967ae0eb..44ad9d832d 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -68,10 +68,23 @@ route_upstream_model_output_token{ai_route="llm",ai_cluster="outbound|443||qwen. ``` ### Log +要想在日志中看到相关统计信息,需要在meshconfig中修改log_format,添加以下字段 +```yaml +access_log: +- name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + log_format: + text_format_source: + inline_string: '{"ai-statistics":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' + path: /dev/stdout +``` + 日志示例: ```json { + "ai-statistics": { "consumer": "21321r9fncsb2dq", "route": "llm", "output_token": "17", @@ -83,5 +96,6 @@ route_upstream_model_output_token{ai_route="llm",ai_cluster="outbound|443||qwen. "model": "qwen-max", "input_token": "10", "llm_first_token_duration": "676" + } } ``` \ No newline at end of file From 2b92116166d99e3e00091df51996d6211787b686 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Fri, 20 Sep 2024 16:16:16 +0800 Subject: [PATCH 10/18] update --- .../extensions/ai-statistics/README.md | 142 +++--- .../wasm-go/extensions/ai-statistics/main.go | 428 ++++++++---------- 2 files changed, 280 insertions(+), 290 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 44ad9d832d..b424eeebca 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -1,18 +1,16 @@ # 介绍 -提供AI可观测基础能力,包括 metric, log, trace,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则只支持openai协议。 +提供AI可观测基础能力,包括 metric, log, trace,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则需要用户进行相应配置才可生效。 # 配置说明 -插件提供了以下基础可观测值,用户无需配置: -- metric:提供了输入token、输出token、首个token的rt(流式请求)、请求总rt等指标,分别在网关、路由、服务、模型四个维度上生效 -- log:提供了 input_token, output_token, model, cluster, route, llm_service_duration, llm_first_token_duration 等字段 -- trace:提供了 input_token, output_token, model, cluster, route, llm_service_duration, llm_first_token_duration 等字段 +插件默认请求符合openai协议格式,并提供了以下基础可观测值,用户无需特殊配置: +- metric:提供了输入token、输出token、首个token的rt(流式请求)、请求总rt等指标,支持在网关、路由、服务、模型四个维度上进行观测 +- log:提供了 input_token, output_token, model, llm_service_duration, llm_first_token_duration 等字段 用户还可以通过配置的方式对可观测的值进行扩展: | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|------|-----|------------------------| -| `spanAttributes` | []Attribute | 非必填 | - | 自定义 ai 请求中日志字段 | -| `logAttributes` | []Attribute | 非必填 | - | 自定义 ai 请求中链路追踪 span attrribute | +| `attributes` | []Attribute | 非必填 | - | 用户希望记录在log/span中的信息 | ## Attribute 配置说明 | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | @@ -21,6 +19,8 @@ | `value_source` | string | 必填 | - | attrribute 取值来源,可选值为 `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | | `value` | string | 必填 | - | attrribute 取值 key value/path | | `rule` | string | 非必填 | - | 从流式响应中提取 attrribute 的规则,可选值为 `first`, `replace`, `append`| +| `apply_to_log` | bool | 非必填 | false | 是否将提取的信息记录在日志中 | +| `apply_to_span` | bool | 非必填 | false | 是否将提取的信息记录在链路追踪span中 | `value_source` 的各种取值含义如下: - `fixed_value`:固定值 @@ -37,65 +37,101 @@ - `append`:(拼接多个chunk中的值,可用于获取回答内容) ## 配置示例 +如果希望在网关访问日志中记录ai-statistic相关的统计值,需要修改log_format,在原log_format基础上添加一个新字段,示例如下: + +```yaml +access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + log_format: + text_format_source: + inline_string: '{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' + path: /dev/stdout +``` + +### 空配置 +当不进行任何配置时,效果如下 + +#### 监控 +``` +route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309 +route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69 +``` + +#### 日志 +此配置下日志效果如下: +```json +{ + "ai_log":"{\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}" +} +``` + +#### 链路追踪 +配置为空时,不会在span中添加额外的attribute + +### 从非openai协议提取token使用信息 +在ai-proxy中设置协议为original时,以百炼为例,可作如下配置指定如何提取model, input_token, output_token + +```yaml +attributes: + - key: model + value_source: response_body + value: usage.models.0.model_id + apply_to_log: true + apply_to_span: false + - key: input_token + value_source: response_body + value: usage.models.0.input_tokens + apply_to_log: true + apply_to_span: false + - key: output_token + value_source: response_body + value: usage.models.0.output_tokens + apply_to_log: true + apply_to_span: false +``` +#### 监控 +``` +route_upstream_model_metric_input_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 343 +route_upstream_model_metric_output_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 153 +``` + +#### 日志 +此配置下日志效果如下: +```json +{ + "ai_log": "{\"model\":\"qwen-max\",\"input_token\":\"343\",\"output_token\":\"153\",\"llm_service_duration\":\"19110\"}" +} +``` + +#### 链路追踪 +链路追踪的 span 中可以看到 model, input_token, output_token 三个额外的 attribute + +### 配合认证鉴权记录consumer 举例如下: ```yaml -logAttributes: +attributes: - key: consumer # 配合认证鉴权记录consumer value_source: request_header value: x-mse-consumer + apply_to_log: true +``` + +### 记录问题与回答 +```yaml +attributes: - key: question # 记录问题 value_source: request_body value: messages.@reverse.0.content + apply_to_log: true - key: answer # 在流式响应中提取大模型的回答 value_source: response_streaming_body value: choices.0.delta.content rule: append + apply_to_log: true - key: answer # 在非流式响应中提取大模型的回答 value_source: response_body value: choices.0.message.content -spanAttributes: - - key: consumer - value_source: request_header - value: x-mse-consumer -``` - -## 可观测指标示例 -### Metric -开启后 metrics 示例: -``` -route_upstream_model_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-max"} 21 -route_upstream_model_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-max"} 17 -``` - -### Log -要想在日志中看到相关统计信息,需要在meshconfig中修改log_format,添加以下字段 -```yaml -access_log: -- name: envoy.access_loggers.file - typed_config: - "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - log_format: - text_format_source: - inline_string: '{"ai-statistics":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' - path: /dev/stdout -``` - -日志示例: - -```json -{ - "ai-statistics": { - "consumer": "21321r9fncsb2dq", - "route": "llm", - "output_token": "17", - "llm_service_duration": "3518", - "answer": "我是来自阿里云的超大规模语言模型,我叫通义千问。", - "request_id": "2d8ffda2-dc43-933d-ad72-7679cfbbaf15", - "question": "你是谁", - "cluster": "outbound|443||qwen.dns", - "model": "qwen-max", - "input_token": "10", - "llm_first_token_duration": "676" - } -} + apply_to_log: true ``` \ No newline at end of file diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 3d0fa9cb9b..49ef049db9 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -34,18 +34,21 @@ const ( // TracingSpan is the tracing span configuration. type Attribute struct { - Key string - ValueSource string - Value string - Rule string + Key string `json:"key"` + ValueSource string `json:"value_source"` + Value string `json:"value"` + Rule string `json:"rule,omitempty"` + ApplyToLog bool `json:"apply_to_log,omitempty"` + ApplyToSpan bool `json:"apply_to_span,omitempty"` } type AIStatisticsConfig struct { // Metrics - counterMetrics map[string]proxywasm.MetricCounter // TODO: add more metrics in Gauge and Histogram format - logAttributes []Attribute - spanAttributes []Attribute + counterMetrics map[string]proxywasm.MetricCounter + // Attributes to be recorded in log & span + attributes []Attribute + // If there exist attributes extracted from streaming body, chunks should be buffered shouldBufferStreamingBody bool } @@ -80,34 +83,20 @@ func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64 func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error { // Parse tracing span attributes setting. - tracingSpanConfigArray := configJson.Get("spanAttributes").Array() - config.spanAttributes = make([]Attribute, len(tracingSpanConfigArray)) - for i, traceAttributeConfig := range tracingSpanConfigArray { - spanAttribute := Attribute{ - Key: traceAttributeConfig.Get("key").String(), - ValueSource: traceAttributeConfig.Get("value_source").String(), - Value: traceAttributeConfig.Get("value").String(), - Rule: traceAttributeConfig.Get("rule").String(), + attributeConfigs := configJson.Get("attributes").Array() + config.attributes = make([]Attribute, len(attributeConfigs)) + for i, attributeConfig := range attributeConfigs { + attribute := Attribute{} + err := json.Unmarshal([]byte(attributeConfig.Raw), &attribute) + if err != nil { + log.Errorf("parse config failed, %v", err) + return err } - if spanAttribute.ValueSource == "response_streaming_body" { + if attribute.ValueSource == "response_streaming_body" { config.shouldBufferStreamingBody = true } - config.spanAttributes[i] = spanAttribute - } - // Parse log attributes setting. - logConfigArray := configJson.Get("logAttributes").Array() - config.logAttributes = make([]Attribute, len(logConfigArray)) - for i, logAttributeConfig := range logConfigArray { - logAttribute := Attribute{ - Key: logAttributeConfig.Get("key").String(), - ValueSource: logAttributeConfig.Get("value_source").String(), - Value: logAttributeConfig.Get("value").String(), - Rule: logAttributeConfig.Get("rule").String(), - } - if logAttribute.ValueSource == "response_streaming_body" { - config.shouldBufferStreamingBody = true - } - config.logAttributes[i] = logAttribute + log.Infof("%v", attribute) + config.attributes[i] = attribute } // Metric settings config.counterMetrics = make(map[string]proxywasm.MetricCounter) @@ -115,21 +104,12 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe } func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { - logAttributes := make(map[string]string) - request_id, err := proxywasm.GetHttpRequestHeader("x-request-id") - if err != nil || request_id == "" { - log.Errorf("failed to get request_id, err: %v", err) - } - logAttributes["request_id"] = request_id - ctx.SetContext("logAttributes", logAttributes) - // Set base log & span attributes - setTracingSpanValue("gen_ai.span.kind", "LLM", log) + ctx.SetContext("attributes", map[string]string{}) + ctx.SetContext("logAttributes", map[string]string{}) // Set user defined log & span attributes which type is fixed_value - setTraceAttributeValueBySource(config, "fixed_value", nil, log) - setLogAttributeValueBySource(ctx, config, "fixed_value", nil, log) + setAttributeBySource(ctx, config, "fixed_value", nil, log) // Set user defined log & span attributes which type is request_header - setTraceAttributeValueBySource(config, "request_header", nil, log) - setLogAttributeValueBySource(ctx, config, "request_header", nil, log) + setAttributeBySource(ctx, config, "request_header", nil, log) // Set request start time. ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) @@ -138,9 +118,7 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { // Set user defined log & span attributes. - setTraceAttributeValueBySource(config, "request_body", body, log) - setLogAttributeValueBySource(ctx, config, "request_body", body, log) - + setAttributeBySource(ctx, config, "request_body", body, log) return types.ActionContinue } @@ -151,13 +129,19 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, l } // Set user defined log & span attributes. - setTraceAttributeValueBySource(config, "response_header", nil, log) - setLogAttributeValueBySource(ctx, config, "response_header", nil, log) + setAttributeBySource(ctx, config, "response_header", nil, log) return types.ActionContinue } func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool, log wrapper.Log) []byte { + // Get attributes from http context + attributes, ok := ctx.GetContext("attributes").(map[string]string) + if !ok { + log.Error("failed to get attributes from http context") + return data + } + // Buffer stream body for record log & span attributes if config.shouldBufferStreamingBody { var streamingBodyBuffer []byte @@ -182,86 +166,76 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat if !ok { firstTokenTime = time.Now().UnixMilli() ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime) - setTracingSpanValue("llm_first_token_duration", fmt.Sprint(firstTokenTime-requestStartTime), log) } // Set infomations about this request route, _ := getRouteName() cluster, _ := getClusterName() if model, inputToken, outputToken, ok := getUsage(data); ok { - // Get logAttributes from http context - logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) - if !ok { - log.Error("failed to get logAttributes from http context") - return data - } // Record Log Attributes - logAttributes["route"] = route - logAttributes["cluster"] = cluster - logAttributes["model"] = model - logAttributes["input_token"] = fmt.Sprint(inputToken) - logAttributes["output_token"] = fmt.Sprint(outputToken) - // Set logAttributes to http context - ctx.SetContext("logAttributes", logAttributes) + attributes["model"] = model + attributes["input_token"] = fmt.Sprint(inputToken) + attributes["output_token"] = fmt.Sprint(outputToken) + // Set attributes to http context + ctx.SetContext("attributes", attributes) } // If the end of the stream is reached, record metrics/logs/spans. if endOfStream { - // Get logAttributes from http context - logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) - if !ok { - log.Error("failed to get logAttributes from http context") - return data - } responseEndTime := time.Now().UnixMilli() llm_first_token_duration := uint64(firstTokenTime - requestStartTime) llm_service_duration := uint64(responseEndTime - requestStartTime) - logAttributes["llm_first_token_duration"] = fmt.Sprint(llm_first_token_duration) - logAttributes["llm_service_duration"] = fmt.Sprint(llm_service_duration) - inputTokenUint64, err := strconv.ParseUint(logAttributes["input_token"], 10, 0) - if err != nil || inputTokenUint64 == 0 { - log.Errorf("input_token convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) - return data - } - outputTokenUint64, err := strconv.ParseUint(logAttributes["output_token"], 10, 0) - if err != nil || outputTokenUint64 == 0 { - log.Errorf("output_token convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) - return data - } - // Set filter states which can be used by other plugins such as ai-token-ratelimit - setFilterState("model", logAttributes["model"], log) - setFilterState("input_token", logAttributes["input_token"], log) - setFilterState("output_token", logAttributes["output_token"], log) - // Set metrics - config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "input_token"), inputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "output_token"), outputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_first_token_duration"), llm_first_token_duration) - config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_service_duration"), llm_service_duration) - config.incrementCounter(generateMetricName(route, cluster, logAttributes["model"], "llm_duration_count"), 1) - // Set tracing span attributes. - setTracingSpanValue("gen_ai.model_name", logAttributes["model"], log) - setTracingSpanValue("gen_ai.usage.input_tokens", logAttributes["input_token"], log) - setTracingSpanValue("gen_ai.usage.output_tokens", logAttributes["output_token"], log) - setTracingSpanValue("gen_ai.usage.total_tokens", fmt.Sprint(inputTokenUint64+outputTokenUint64), log) - setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) + // Set user defined log & span attributes. if config.shouldBufferStreamingBody { streamingBodyBuffer, ok := ctx.GetContext("streamingBodyBuffer").([]byte) if !ok { return data } - setTraceAttributeValueBySource(config, "response_streaming_body", streamingBodyBuffer, log) - setLogAttributeValueBySource(ctx, config, "response_streaming_body", streamingBodyBuffer, log) + setAttributeBySource(ctx, config, "response_streaming_body", streamingBodyBuffer, log) } + // Get updated(maybe) attributes from http context + attributes, _ := ctx.GetContext("attributes").(map[string]string) + + // Inner filter states which can be used by other plugins such as ai-token-ratelimit + setFilterState("model", attributes["model"], log) + setFilterState("input_token", attributes["input_token"], log) + setFilterState("output_token", attributes["output_token"], log) + + // Inner log attribute + setLogAttribute(ctx, "model", attributes["model"], log) + setLogAttribute(ctx, "input_token", attributes["input_token"], log) + setLogAttribute(ctx, "output_token", attributes["output_token"], log) + setLogAttribute(ctx, "llm_first_token_duration", llm_first_token_duration, log) + setLogAttribute(ctx, "llm_service_duration", llm_service_duration, log) + + // Write log writeLog(ctx, log) + + // Set metrics + inputTokenUint64, err := strconv.ParseUint(attributes["input_token"], 10, 0) + if err != nil || inputTokenUint64 == 0 { + log.Errorf("input_token convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) + return data + } + outputTokenUint64, err := strconv.ParseUint(attributes["output_token"], 10, 0) + if err != nil || outputTokenUint64 == 0 { + log.Errorf("output_token convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) + return data + } + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "input_token"), inputTokenUint64) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "output_token"), outputTokenUint64) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_first_token_duration"), llm_first_token_duration) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llm_service_duration) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_duration_count"), 1) } return data } func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { - // Get logAttributes from http context - logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) + // Get attributes from http context + attributes, ok := ctx.GetContext("attributes").(map[string]string) if !ok { - log.Error("failed to get logAttributes from http context") + log.Error("failed to get attributes from http context") return types.ActionContinue } @@ -279,37 +253,50 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body route, _ := getRouteName() cluster, _ := getClusterName() model, inputToken, outputToken, ok := getUsage(body) - if !ok { - return types.ActionContinue + if ok { + attributes["model"] = model + attributes["input_token"] = fmt.Sprint(inputToken) + attributes["output_token"] = fmt.Sprint(outputToken) + // Update attributes + ctx.SetContext("attributes", attributes) } - // Set filter states which can be used by other plugins such as ai-token-ratelimit - setFilterState("model", model, log) - setFilterState("input_token", inputToken, log) - setFilterState("output_token", outputToken, log) - // Set metrics - config.incrementCounter(generateMetricName(route, cluster, model, "input_token"), uint64(inputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "output_token"), uint64(outputToken)) - config.incrementCounter(generateMetricName(route, cluster, model, "llm_service_duration"), llm_service_duration) - config.incrementCounter(generateMetricName(route, cluster, model, "llm_duration_count"), 1) - // Set tracing span tag input_tokens and output_tokens. - setTracingSpanValue("gen_ai.model_name", model, log) - setTracingSpanValue("gen_ai.usage.input_tokens", fmt.Sprint(inputToken), log) - setTracingSpanValue("gen_ai.usage.output_tokens", fmt.Sprint(outputToken), log) - setTracingSpanValue("gen_ai.usage.total_tokens", fmt.Sprint(inputToken+outputToken), log) - setTracingSpanValue("llm_service_duration", fmt.Sprint(responseEndTime-requestStartTime), log) - // Set Log Attributes - logAttributes["route"] = route - logAttributes["cluster"] = cluster - logAttributes["model"] = model - logAttributes["input_token"] = fmt.Sprint(inputToken) - logAttributes["output_token"] = fmt.Sprint(outputToken) - logAttributes["llm_service_duration"] = fmt.Sprint(llm_service_duration) - ctx.SetContext("logAttributes", logAttributes) + // Set user defined log & span attributes. - setTraceAttributeValueBySource(config, "response_body", body, log) - setLogAttributeValueBySource(ctx, config, "response_body", body, log) + setAttributeBySource(ctx, config, "response_body", body, log) + + // Get updated(maybe) attributes from http context + attributes, _ = ctx.GetContext("attributes").(map[string]string) + + // Inner filter states which can be used by other plugins such as ai-token-ratelimit + setFilterState("model", attributes["model"], log) + setFilterState("input_token", attributes["input_token"], log) + setFilterState("output_token", attributes["output_token"], log) + + // Inner log attribute + setLogAttribute(ctx, "model", attributes["model"], log) + setLogAttribute(ctx, "input_token", attributes["input_token"], log) + setLogAttribute(ctx, "output_token", attributes["output_token"], log) + setLogAttribute(ctx, "llm_service_duration", llm_service_duration, log) + // Write log writeLog(ctx, log) + + // Set metrics + inputTokenUint64, err := strconv.ParseUint(attributes["input_token"], 10, 0) + if err != nil || inputTokenUint64 == 0 { + log.Errorf("input_token convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) + return types.ActionContinue + } + outputTokenUint64, err := strconv.ParseUint(attributes["output_token"], 10, 0) + if err != nil || outputTokenUint64 == 0 { + log.Errorf("output_token convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) + return types.ActionContinue + } + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "input_token"), inputTokenUint64) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "output_token"), outputTokenUint64) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llm_service_duration) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_duration_count"), 1) + return types.ActionContinue } @@ -338,9 +325,65 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag return } +// fetches the tracing span value from the specified source. +func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, source string, body []byte, log wrapper.Log) { + attributes, ok := ctx.GetContext("attributes").(map[string]string) + if !ok { + log.Error("failed to get attributes from http context") + return + } + for _, attribute := range config.attributes { + if source == attribute.ValueSource { + switch source { + case "fixed_value": + log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, attribute.Value) + attributes[attribute.Key] = attribute.Value + case "request_header": + if value, err := proxywasm.GetHttpRequestHeader(attribute.Value); err == nil { + log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value + } + case "request_body": + raw := gjson.GetBytes(body, attribute.Value).Raw + var value string + if len(raw) > 2 { + value = raw[1 : len(raw)-1] + } + log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value + case "response_header": + if value, err := proxywasm.GetHttpResponseHeader(attribute.Value); err == nil { + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value + } + case "response_streaming_body": + value := extractStreamingBodyByJsonPath(body, attribute.Value, attribute.Rule, log) + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value + case "response_body": + value := gjson.GetBytes(body, attribute.Value).Raw + if len(value) > 2 && value[0] == '"' && value[len(value)-1] == '"' { + value = value[1 : len(value)-1] + } + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value + default: + log.Errorf("source type %s is error", source) + } + } + if attribute.ApplyToLog { + setLogAttribute(ctx, attribute.Key, attributes[attribute.Key], log) + } + if attribute.ApplyToSpan { + setSpanAttribute(attribute.Key, attributes[attribute.Key], log) + } + } + ctx.SetContext("attributes", attributes) +} + func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, log wrapper.Log) string { chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n")) - value := "" + var value string if rule == "first" { for _, chunk := range chunks { jsonObj := gjson.GetBytes(chunk, jsonPath) @@ -360,7 +403,7 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, l // extract llm response for _, chunk := range chunks { raw := gjson.GetBytes(chunk, jsonPath).Raw - if len(raw) > 2 { + if len(raw) > 2 && raw[0] == '"' && raw[len(raw)-1] == '"' { value += raw[1 : len(raw)-1] } } @@ -370,125 +413,36 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, l return value } -func setFilterState(key string, value interface{}, log wrapper.Log) { - if e := proxywasm.SetProperty([]string{key}, []byte(fmt.Sprint(value))); e != nil { - log.Errorf("failed to set %s in filter state: %v", key, e) - } -} - -// fetches the tracing span value from the specified source. -func setTraceAttributeValueBySource(config AIStatisticsConfig, source string, body []byte, log wrapper.Log) { - for _, spanAttribute := range config.spanAttributes { - if source == spanAttribute.ValueSource { - switch source { - case "fixed_value": - log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, spanAttribute.Value) - setTracingSpanValue(spanAttribute.Key, spanAttribute.Value, log) - case "request_header": - if value, err := proxywasm.GetHttpRequestHeader(spanAttribute.Value); err == nil { - log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) - setTracingSpanValue(spanAttribute.Key, value, log) - } - case "request_body": - raw := gjson.GetBytes(body, spanAttribute.Value).Raw - var value string - if len(raw) > 2 { - value = raw[1 : len(raw)-1] - } - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) - setTracingSpanValue(spanAttribute.Key, value, log) - case "response_header": - if value, err := proxywasm.GetHttpResponseHeader(spanAttribute.Value); err == nil { - log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) - setTracingSpanValue(spanAttribute.Key, value, log) - } - case "response_streaming_body": - value := extractStreamingBodyByJsonPath(body, spanAttribute.Value, spanAttribute.Rule, log) - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) - setTracingSpanValue(spanAttribute.Key, value, log) - case "response_body": - raw := gjson.GetBytes(body, spanAttribute.Value).Raw - var value string - if len(raw) > 2 { - value = raw[1 : len(raw)-1] - } - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, spanAttribute.Key, value) - setTracingSpanValue(spanAttribute.Key, value, log) - default: - log.Errorf("source type %s is error", source) - } +func setFilterState(key, value string, log wrapper.Log) { + if value != "" { + if e := proxywasm.SetProperty([]string{key}, []byte(fmt.Sprint(value))); e != nil { + log.Errorf("failed to set %s in filter state: %v", key, e) } + } else { + log.Debugf("failed to write filter state [%s], because it's value is empty") } } // Set the tracing span with value. -func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) { - if tracingValue == "" { - tracingValue = "-" - } - - traceSpanTag := TracePrefix + tracingKey - - if raw, err := proxywasm.GetProperty([]string{traceSpanTag}); err == nil { - if raw != nil { - log.Warnf("trace span [%s] already exists, value will be overwrite, orign value: %s.", traceSpanTag, string(raw)) +func setSpanAttribute(key, value string, log wrapper.Log) { + if value != "" { + traceSpanTag := TracePrefix + key + if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(value)); e != nil { + log.Errorf("failed to set %s in filter state: %v", traceSpanTag, e) } + } else { + log.Debugf("failed to write span attribute [%s], because it's value is empty") } - - if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(tracingValue)); e != nil { - log.Errorf("failed to set %s in filter state: %v", traceSpanTag, e) - } - log.Debugf("successed to set trace span [%s] with value [%s].", traceSpanTag, tracingValue) } // fetches the tracing span value from the specified source. -func setLogAttributeValueBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, source string, body []byte, log wrapper.Log) { +func setLogAttribute(ctx wrapper.HttpContext, key string, value interface{}, log wrapper.Log) { logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) if !ok { log.Error("failed to get logAttributes from http context") return } - for _, logAttribute := range config.logAttributes { - if source == logAttribute.ValueSource { - switch source { - case "fixed_value": - log.Debugf("[span attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, logAttribute.Value) - logAttributes[logAttribute.Key] = logAttribute.Value - case "request_header": - if value, err := proxywasm.GetHttpRequestHeader(logAttribute.Value); err == nil { - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) - logAttributes[logAttribute.Key] = value - } - case "request_body": - raw := gjson.GetBytes(body, logAttribute.Value).Raw - var value string - if len(raw) > 2 { - value = raw[1 : len(raw)-1] - } - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) - logAttributes[logAttribute.Key] = value - case "response_header": - if value, err := proxywasm.GetHttpResponseHeader(logAttribute.Value); err == nil { - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) - logAttributes[logAttribute.Key] = value - } - case "response_streaming_body": - value := extractStreamingBodyByJsonPath(body, logAttribute.Value, logAttribute.Rule, log) - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) - logAttributes[logAttribute.Key] = value - case "response_body": - raw := gjson.GetBytes(body, logAttribute.Value).Raw - var value string - if len(raw) > 2 { - value = raw[1 : len(raw)-1] - } - log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, logAttribute.Key, value) - logAttributes[logAttribute.Key] = value - default: - log.Errorf("source type %s is error", source) - } - } - } + logAttributes[key] = fmt.Sprint(value) ctx.SetContext("logAttributes", logAttributes) } @@ -502,7 +456,7 @@ func writeLog(ctx wrapper.HttpContext, log wrapper.Log) { items = append(items, fmt.Sprintf(`"%s":"%s"`, k, v)) } aiLogField := fmt.Sprintf(`{%s}`, strings.Join(items, ",")) - log.Infof("ai request json log: %s", aiLogField) + // log.Infof("ai request json log: %s", aiLogField) jsonMap := map[string]string{ "ai_log": aiLogField, } From 7a5284247d437783a630593478fc26927c08888c Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Fri, 20 Sep 2024 16:27:48 +0800 Subject: [PATCH 11/18] update README --- plugins/wasm-go/extensions/ai-statistics/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index b424eeebca..6beece39d0 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -55,6 +55,7 @@ access_log: #### 监控 ``` +route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10 route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309 route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69 ``` From 19c6889d67ea597f618f7013537d56199a121258 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Fri, 20 Sep 2024 16:30:09 +0800 Subject: [PATCH 12/18] update README --- plugins/wasm-go/extensions/ai-statistics/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 6beece39d0..f8a097fd73 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -56,7 +56,9 @@ access_log: #### 监控 ``` route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10 +route_upstream_model_metric_llm_duration_count{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1 route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309 +route_upstream_model_metric_llm_service_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1955 route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69 ``` @@ -96,6 +98,8 @@ attributes: ``` route_upstream_model_metric_input_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 343 route_upstream_model_metric_output_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 153 +route_upstream_model_metric_llm_service_duration{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 3725 +route_upstream_model_metric_llm_duration_count{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 1 ``` #### 日志 From 5f8f7a660e798e227cd507f73a7afe23a9570696 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Mon, 23 Sep 2024 10:21:32 +0800 Subject: [PATCH 13/18] extract consts --- .../wasm-go/extensions/ai-statistics/main.go | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 49ef049db9..6536f31b57 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -30,6 +30,12 @@ const ( StatisticsRequestStartTime = "ai-statistics-request-start-time" StatisticsFirstTokenTime = "ai-statistics-first-token-time" TracePrefix = "trace_span_tag." + FixedValue = "fixed_value" + RequestHeader = "request_header" + RequestBody = "request_body" + ResponseHeader = "response_header" + ResponseStreamingBody = "response_streaming_body" + ResponseBody = "response_body" ) // TracingSpan is the tracing span configuration. @@ -92,7 +98,7 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe log.Errorf("parse config failed, %v", err) return err } - if attribute.ValueSource == "response_streaming_body" { + if attribute.ValueSource == ResponseStreamingBody { config.shouldBufferStreamingBody = true } log.Infof("%v", attribute) @@ -107,9 +113,9 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo ctx.SetContext("attributes", map[string]string{}) ctx.SetContext("logAttributes", map[string]string{}) // Set user defined log & span attributes which type is fixed_value - setAttributeBySource(ctx, config, "fixed_value", nil, log) + setAttributeBySource(ctx, config, FixedValue, nil, log) // Set user defined log & span attributes which type is request_header - setAttributeBySource(ctx, config, "request_header", nil, log) + setAttributeBySource(ctx, config, RequestHeader, nil, log) // Set request start time. ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) @@ -118,7 +124,7 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { // Set user defined log & span attributes. - setAttributeBySource(ctx, config, "request_body", body, log) + setAttributeBySource(ctx, config, RequestBody, body, log) return types.ActionContinue } @@ -129,7 +135,7 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, l } // Set user defined log & span attributes. - setAttributeBySource(ctx, config, "response_header", nil, log) + setAttributeBySource(ctx, config, ResponseHeader, nil, log) return types.ActionContinue } @@ -182,8 +188,8 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat // If the end of the stream is reached, record metrics/logs/spans. if endOfStream { responseEndTime := time.Now().UnixMilli() - llm_first_token_duration := uint64(firstTokenTime - requestStartTime) - llm_service_duration := uint64(responseEndTime - requestStartTime) + llmFirstTokenDuration := uint64(firstTokenTime - requestStartTime) + llmServiceDuration := uint64(responseEndTime - requestStartTime) // Set user defined log & span attributes. if config.shouldBufferStreamingBody { @@ -191,7 +197,7 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat if !ok { return data } - setAttributeBySource(ctx, config, "response_streaming_body", streamingBodyBuffer, log) + setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer, log) } // Get updated(maybe) attributes from http context attributes, _ := ctx.GetContext("attributes").(map[string]string) @@ -205,8 +211,8 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat setLogAttribute(ctx, "model", attributes["model"], log) setLogAttribute(ctx, "input_token", attributes["input_token"], log) setLogAttribute(ctx, "output_token", attributes["output_token"], log) - setLogAttribute(ctx, "llm_first_token_duration", llm_first_token_duration, log) - setLogAttribute(ctx, "llm_service_duration", llm_service_duration, log) + setLogAttribute(ctx, "llm_first_token_duration", llmFirstTokenDuration, log) + setLogAttribute(ctx, "llm_service_duration", llmServiceDuration, log) // Write log writeLog(ctx, log) @@ -224,8 +230,8 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat } config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "input_token"), inputTokenUint64) config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "output_token"), outputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_first_token_duration"), llm_first_token_duration) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llm_service_duration) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_first_token_duration"), llmFirstTokenDuration) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llmServiceDuration) config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_duration_count"), 1) } return data @@ -247,7 +253,7 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body } responseEndTime := time.Now().UnixMilli() - llm_service_duration := uint64(responseEndTime - requestStartTime) + llmServiceDuration := uint64(responseEndTime - requestStartTime) // Get infomations about this request route, _ := getRouteName() @@ -262,7 +268,7 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body } // Set user defined log & span attributes. - setAttributeBySource(ctx, config, "response_body", body, log) + setAttributeBySource(ctx, config, ResponseBody, body, log) // Get updated(maybe) attributes from http context attributes, _ = ctx.GetContext("attributes").(map[string]string) @@ -276,7 +282,7 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body setLogAttribute(ctx, "model", attributes["model"], log) setLogAttribute(ctx, "input_token", attributes["input_token"], log) setLogAttribute(ctx, "output_token", attributes["output_token"], log) - setLogAttribute(ctx, "llm_service_duration", llm_service_duration, log) + setLogAttribute(ctx, "llm_service_duration", llmServiceDuration, log) // Write log writeLog(ctx, log) @@ -294,7 +300,7 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body } config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "input_token"), inputTokenUint64) config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "output_token"), outputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llm_service_duration) + config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llmServiceDuration) config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_duration_count"), 1) return types.ActionContinue @@ -335,15 +341,15 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so for _, attribute := range config.attributes { if source == attribute.ValueSource { switch source { - case "fixed_value": + case FixedValue: log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, attribute.Value) attributes[attribute.Key] = attribute.Value - case "request_header": + case RequestHeader: if value, err := proxywasm.GetHttpRequestHeader(attribute.Value); err == nil { log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) attributes[attribute.Key] = value } - case "request_body": + case RequestBody: raw := gjson.GetBytes(body, attribute.Value).Raw var value string if len(raw) > 2 { @@ -351,16 +357,16 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so } log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) attributes[attribute.Key] = value - case "response_header": + case ResponseHeader: if value, err := proxywasm.GetHttpResponseHeader(attribute.Value); err == nil { log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) attributes[attribute.Key] = value } - case "response_streaming_body": + case ResponseStreamingBody: value := extractStreamingBodyByJsonPath(body, attribute.Value, attribute.Rule, log) log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) attributes[attribute.Key] = value - case "response_body": + case ResponseBody: value := gjson.GetBytes(body, attribute.Value).Raw if len(value) > 2 && value[0] == '"' && value[len(value)-1] == '"' { value = value[1 : len(value)-1] From 063e418094e28b797186ffd9a4b7923a42fdeeff Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Mon, 23 Sep 2024 14:05:38 +0800 Subject: [PATCH 14/18] optimize --- .../wasm-go/extensions/ai-statistics/main.go | 258 ++++++++++-------- 1 file changed, 141 insertions(+), 117 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 6536f31b57..60c78f8d39 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -27,15 +27,35 @@ func main() { } const ( + // Trace span prefix + TracePrefix = "trace_span_tag." + // Context consts StatisticsRequestStartTime = "ai-statistics-request-start-time" StatisticsFirstTokenTime = "ai-statistics-first-token-time" - TracePrefix = "trace_span_tag." - FixedValue = "fixed_value" - RequestHeader = "request_header" - RequestBody = "request_body" - ResponseHeader = "response_header" - ResponseStreamingBody = "response_streaming_body" - ResponseBody = "response_body" + CtxGeneralAtrribute = "attributes" + CtxLogAtrribute = "logAttributes" + CtxStreamingBodyBuffer = "streamingBodyBuffer" + + // Source Type + FixedValue = "fixed_value" + RequestHeader = "request_header" + RequestBody = "request_body" + ResponseHeader = "response_header" + ResponseStreamingBody = "response_streaming_body" + ResponseBody = "response_body" + + // Inner metric & log attributes name + Model = "model" + InputToken = "input_token" + OutputToken = "output_token" + LLMFirstTokenDuration = "llm_first_token_duration" + LLMServiceDuration = "llm_service_duration" + LLMDurationCount = "llm_duration_count" + + // Extract Rule + RuleFirst = "first" + RuleReplace = "replace" + RuleAppend = "append" ) // TracingSpan is the tracing span configuration. @@ -110,14 +130,15 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe } func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { - ctx.SetContext("attributes", map[string]string{}) - ctx.SetContext("logAttributes", map[string]string{}) + ctx.SetContext(CtxGeneralAtrribute, map[string]string{}) + ctx.SetContext(CtxLogAtrribute, map[string]string{}) + ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) + // Set user defined log & span attributes which type is fixed_value setAttributeBySource(ctx, config, FixedValue, nil, log) // Set user defined log & span attributes which type is request_header setAttributeBySource(ctx, config, RequestHeader, nil, log) // Set request start time. - ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) return types.ActionContinue } @@ -141,23 +162,16 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, l } func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool, log wrapper.Log) []byte { - // Get attributes from http context - attributes, ok := ctx.GetContext("attributes").(map[string]string) - if !ok { - log.Error("failed to get attributes from http context") - return data - } - // Buffer stream body for record log & span attributes if config.shouldBufferStreamingBody { var streamingBodyBuffer []byte - streamingBodyBuffer, ok := ctx.GetContext("streamingBodyBuffer").([]byte) + streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte) if !ok { streamingBodyBuffer = data } else { streamingBodyBuffer = append(streamingBodyBuffer, data...) } - ctx.SetContext("streamingBodyBuffer", streamingBodyBuffer) + ctx.SetContext(CtxStreamingBodyBuffer, streamingBodyBuffer) } // Get requestStartTime from http context @@ -168,140 +182,84 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat } // If this is the first chunk, record first token duration metric and span attribute - firstTokenTime, ok := ctx.GetContext(StatisticsFirstTokenTime).(int64) - if !ok { - firstTokenTime = time.Now().UnixMilli() + if ctx.GetContext(StatisticsFirstTokenTime) == nil { + firstTokenTime := time.Now().UnixMilli() ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime) + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + attributes[LLMFirstTokenDuration] = fmt.Sprint(firstTokenTime - requestStartTime) + ctx.SetContext(CtxGeneralAtrribute, attributes) } - // Set infomations about this request - route, _ := getRouteName() - cluster, _ := getClusterName() + // Set information about this request + if model, inputToken, outputToken, ok := getUsage(data); ok { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) // Record Log Attributes - attributes["model"] = model - attributes["input_token"] = fmt.Sprint(inputToken) - attributes["output_token"] = fmt.Sprint(outputToken) + attributes[Model] = model + attributes[InputToken] = fmt.Sprint(inputToken) + attributes[OutputToken] = fmt.Sprint(outputToken) // Set attributes to http context - ctx.SetContext("attributes", attributes) + ctx.SetContext(CtxGeneralAtrribute, attributes) } // If the end of the stream is reached, record metrics/logs/spans. if endOfStream { responseEndTime := time.Now().UnixMilli() - llmFirstTokenDuration := uint64(firstTokenTime - requestStartTime) - llmServiceDuration := uint64(responseEndTime - requestStartTime) + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime) + ctx.SetContext(CtxGeneralAtrribute, attributes) // Set user defined log & span attributes. if config.shouldBufferStreamingBody { - streamingBodyBuffer, ok := ctx.GetContext("streamingBodyBuffer").([]byte) + streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte) if !ok { return data } setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer, log) } - // Get updated(maybe) attributes from http context - attributes, _ := ctx.GetContext("attributes").(map[string]string) - - // Inner filter states which can be used by other plugins such as ai-token-ratelimit - setFilterState("model", attributes["model"], log) - setFilterState("input_token", attributes["input_token"], log) - setFilterState("output_token", attributes["output_token"], log) - // Inner log attribute - setLogAttribute(ctx, "model", attributes["model"], log) - setLogAttribute(ctx, "input_token", attributes["input_token"], log) - setLogAttribute(ctx, "output_token", attributes["output_token"], log) - setLogAttribute(ctx, "llm_first_token_duration", llmFirstTokenDuration, log) - setLogAttribute(ctx, "llm_service_duration", llmServiceDuration, log) + // Write inner filter states which can be used by other plugins such as ai-token-ratelimit + writeFilterStates(ctx, log) // Write log writeLog(ctx, log) - // Set metrics - inputTokenUint64, err := strconv.ParseUint(attributes["input_token"], 10, 0) - if err != nil || inputTokenUint64 == 0 { - log.Errorf("input_token convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) - return data - } - outputTokenUint64, err := strconv.ParseUint(attributes["output_token"], 10, 0) - if err != nil || outputTokenUint64 == 0 { - log.Errorf("output_token convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) - return data - } - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "input_token"), inputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "output_token"), outputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_first_token_duration"), llmFirstTokenDuration) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llmServiceDuration) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_duration_count"), 1) + // Write metrics + writeMetric(ctx, config, log) } return data } func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { // Get attributes from http context - attributes, ok := ctx.GetContext("attributes").(map[string]string) - if !ok { - log.Error("failed to get attributes from http context") - return types.ActionContinue - } + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) // Get requestStartTime from http context - requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64) - if !ok { - log.Error("failed to get requestStartTime from http context") - return types.ActionContinue - } + requestStartTime, _ := ctx.GetContext(StatisticsRequestStartTime).(int64) responseEndTime := time.Now().UnixMilli() - llmServiceDuration := uint64(responseEndTime - requestStartTime) + attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime) - // Get infomations about this request - route, _ := getRouteName() - cluster, _ := getClusterName() + // Set information about this request model, inputToken, outputToken, ok := getUsage(body) if ok { - attributes["model"] = model - attributes["input_token"] = fmt.Sprint(inputToken) - attributes["output_token"] = fmt.Sprint(outputToken) + attributes[Model] = model + attributes[InputToken] = fmt.Sprint(inputToken) + attributes[OutputToken] = fmt.Sprint(outputToken) // Update attributes - ctx.SetContext("attributes", attributes) + ctx.SetContext(CtxGeneralAtrribute, attributes) } // Set user defined log & span attributes. setAttributeBySource(ctx, config, ResponseBody, body, log) - // Get updated(maybe) attributes from http context - attributes, _ = ctx.GetContext("attributes").(map[string]string) - - // Inner filter states which can be used by other plugins such as ai-token-ratelimit - setFilterState("model", attributes["model"], log) - setFilterState("input_token", attributes["input_token"], log) - setFilterState("output_token", attributes["output_token"], log) - - // Inner log attribute - setLogAttribute(ctx, "model", attributes["model"], log) - setLogAttribute(ctx, "input_token", attributes["input_token"], log) - setLogAttribute(ctx, "output_token", attributes["output_token"], log) - setLogAttribute(ctx, "llm_service_duration", llmServiceDuration, log) + // Write inner filter states which can be used by other plugins such as ai-token-ratelimit + writeFilterStates(ctx, log) // Write log writeLog(ctx, log) - // Set metrics - inputTokenUint64, err := strconv.ParseUint(attributes["input_token"], 10, 0) - if err != nil || inputTokenUint64 == 0 { - log.Errorf("input_token convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) - return types.ActionContinue - } - outputTokenUint64, err := strconv.ParseUint(attributes["output_token"], 10, 0) - if err != nil || outputTokenUint64 == 0 { - log.Errorf("output_token convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) - return types.ActionContinue - } - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "input_token"), inputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "output_token"), outputTokenUint64) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_service_duration"), llmServiceDuration) - config.incrementCounter(generateMetricName(route, cluster, attributes["model"], "llm_duration_count"), 1) + // Write metrics + writeMetric(ctx, config, log) return types.ActionContinue } @@ -333,7 +291,7 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag // fetches the tracing span value from the specified source. func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, source string, body []byte, log wrapper.Log) { - attributes, ok := ctx.GetContext("attributes").(map[string]string) + attributes, ok := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) if !ok { log.Error("failed to get attributes from http context") return @@ -384,13 +342,13 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so setSpanAttribute(attribute.Key, attributes[attribute.Key], log) } } - ctx.SetContext("attributes", attributes) + ctx.SetContext(CtxGeneralAtrribute, attributes) } func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, log wrapper.Log) string { chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n")) var value string - if rule == "first" { + if rule == RuleFirst { for _, chunk := range chunks { jsonObj := gjson.GetBytes(chunk, jsonPath) if jsonObj.Exists() { @@ -398,14 +356,14 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, l break } } - } else if rule == "replace" { + } else if rule == RuleReplace { for _, chunk := range chunks { jsonObj := gjson.GetBytes(chunk, jsonPath) if jsonObj.Exists() { value = jsonObj.String() } } - } else if rule == "append" { + } else if rule == RuleAppend { // extract llm response for _, chunk := range chunks { raw := gjson.GetBytes(chunk, jsonPath).Raw @@ -443,20 +401,86 @@ func setSpanAttribute(key, value string, log wrapper.Log) { // fetches the tracing span value from the specified source. func setLogAttribute(ctx wrapper.HttpContext, key string, value interface{}, log wrapper.Log) { - logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) + logAttributes, ok := ctx.GetContext(CtxLogAtrribute).(map[string]string) if !ok { log.Error("failed to get logAttributes from http context") return } logAttributes[key] = fmt.Sprint(value) - ctx.SetContext("logAttributes", logAttributes) + ctx.SetContext(CtxLogAtrribute, logAttributes) } -func writeLog(ctx wrapper.HttpContext, log wrapper.Log) { - logAttributes, ok := ctx.GetContext("logAttributes").(map[string]string) +func writeFilterStates(ctx wrapper.HttpContext, log wrapper.Log) { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + setFilterState(Model, attributes[Model], log) + setFilterState(InputToken, attributes[InputToken], log) + setFilterState(OutputToken, attributes[OutputToken], log) +} + +func writeMetric(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + route, _ := getRouteName() + cluster, _ := getClusterName() + model, ok := attributes["model"] if !ok { - log.Error("failed to write log") + log.Errorf("Get model failed") + return + } + if inputToken, ok := attributes[InputToken]; ok { + inputTokenUint64, err := strconv.ParseUint(inputToken, 10, 0) + if err != nil || inputTokenUint64 == 0 { + log.Errorf("inputToken convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, InputToken), inputTokenUint64) + } + if outputToken, ok := attributes[OutputToken]; ok { + outputTokenUint64, err := strconv.ParseUint(outputToken, 10, 0) + if err != nil || outputTokenUint64 == 0 { + log.Errorf("outputToken convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, OutputToken), outputTokenUint64) + } + if llmFirstTokenDuration, ok := attributes[LLMFirstTokenDuration]; ok { + llmFirstTokenDurationUint64, err := strconv.ParseUint(llmFirstTokenDuration, 10, 0) + if err != nil || llmFirstTokenDurationUint64 == 0 { + log.Errorf("llmFirstTokenDuration convert failed, value is %d, err msg is [%v]", llmFirstTokenDurationUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, LLMFirstTokenDuration), llmFirstTokenDurationUint64) + } + if llmServiceDuration, ok := attributes[LLMServiceDuration]; ok { + llmServiceDurationUint64, err := strconv.ParseUint(llmServiceDuration, 10, 0) + if err != nil || llmServiceDurationUint64 == 0 { + log.Errorf("llmServiceDuration convert failed, value is %d, err msg is [%v]", llmServiceDurationUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, LLMServiceDuration), llmServiceDurationUint64) + } + config.incrementCounter(generateMetricName(route, cluster, model, LLMDurationCount), 1) +} + +func writeLog(ctx wrapper.HttpContext, log wrapper.Log) { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + logAttributes, _ := ctx.GetContext(CtxLogAtrribute).(map[string]string) + // Set inner log fields + if attributes[Model] != "" { + logAttributes[Model] = attributes[Model] + } + if attributes[InputToken] != "" { + logAttributes[InputToken] = attributes[InputToken] + } + if attributes[OutputToken] != "" { + logAttributes[OutputToken] = attributes[OutputToken] + } + if attributes[LLMFirstTokenDuration] != "" { + logAttributes[LLMFirstTokenDuration] = attributes[LLMFirstTokenDuration] + } + if attributes[LLMServiceDuration] != "" { + logAttributes[LLMServiceDuration] = attributes[LLMServiceDuration] } + // Traverse log fields items := []string{} for k, v := range logAttributes { items = append(items, fmt.Sprintf(`"%s":"%s"`, k, v)) From 6fd24412a4b9d80b69f10e576d55d138c19708a9 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Mon, 23 Sep 2024 14:14:35 +0800 Subject: [PATCH 15/18] parse config optimize --- plugins/wasm-go/extensions/ai-statistics/main.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 60c78f8d39..14fcc4d2ab 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/json" + "errors" "fmt" "strconv" "strings" @@ -121,7 +122,9 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe if attribute.ValueSource == ResponseStreamingBody { config.shouldBufferStreamingBody = true } - log.Infof("%v", attribute) + if attribute.Rule != "" && attribute.Rule != RuleFirst && attribute.Rule != RuleReplace && attribute.Rule != RuleAppend { + return errors.New("value of rule must be one of [nil, first, replace, append]") + } config.attributes[i] = attribute } // Metric settings @@ -332,7 +335,6 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) attributes[attribute.Key] = value default: - log.Errorf("source type %s is error", source) } } if attribute.ApplyToLog { From 9e48586c67b86658f94fbb450dfb5f0c06e98575 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Tue, 24 Sep 2024 11:08:24 +0800 Subject: [PATCH 16/18] update README --- .../extensions/ai-statistics/README.md | 34 ++--- .../extensions/ai-statistics/README_EN.md | 140 ++++++++++++++++++ 2 files changed, 157 insertions(+), 17 deletions(-) create mode 100644 plugins/wasm-go/extensions/ai-statistics/README_EN.md diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index f8a097fd73..61e04969aa 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -1,8 +1,15 @@ -# 介绍 +--- +title: AI可观测 +keywords: [higress, AI, observability] +description: AI可观测配置参考 +--- + +## 介绍 提供AI可观测基础能力,包括 metric, log, trace,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则需要用户进行相应配置才可生效。 -# 配置说明 +## 配置说明 插件默认请求符合openai协议格式,并提供了以下基础可观测值,用户无需特殊配置: + - metric:提供了输入token、输出token、首个token的rt(流式请求)、请求总rt等指标,支持在网关、路由、服务、模型四个维度上进行观测 - log:提供了 input_token, output_token, model, llm_service_duration, llm_first_token_duration 等字段 @@ -12,7 +19,8 @@ |----------------|-------|------|-----|------------------------| | `attributes` | []Attribute | 非必填 | - | 用户希望记录在log/span中的信息 | -## Attribute 配置说明 +Attribute 配置说明: + | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|-----|-----|------------------------| | `key` | string | 必填 | - | attrribute 名称 | @@ -23,6 +31,7 @@ | `apply_to_span` | bool | 非必填 | false | 是否将提取的信息记录在链路追踪span中 | `value_source` 的各种取值含义如下: + - `fixed_value`:固定值 - `requeset_header` : attrribute 值通过 http 请求头获取,value 配置为 header key - `request_body` :attrribute 值通过请求 body 获取,value 配置格式为 gjson 的 jsonpath @@ -32,27 +41,19 @@ 当 `value_source` 为 `response_streaming_body` 时,应当配置 `rule`,用于指定如何从流式body中获取指定值,取值含义如下: -- `first`:(多个chunk中取第一个chunk的值), -- `replace`:(多个chunk中取最后一个chunk的值), -- `append`:(拼接多个chunk中的值,可用于获取回答内容) + +- `first`:多个chunk中取第一个有效chunk的值 +- `replace`:多个chunk中取最后一个有效chunk的值 +- `append`:拼接多个有效chunk中的值,可用于获取回答内容 ## 配置示例 如果希望在网关访问日志中记录ai-statistic相关的统计值,需要修改log_format,在原log_format基础上添加一个新字段,示例如下: ```yaml -access_log: - - name: envoy.access_loggers.file - typed_config: - "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - log_format: - text_format_source: - inline_string: '{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' - path: /dev/stdout +'{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' ``` ### 空配置 -当不进行任何配置时,效果如下 - #### 监控 ``` route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10 @@ -63,7 +64,6 @@ route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443 ``` #### 日志 -此配置下日志效果如下: ```json { "ai_log":"{\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}" diff --git a/plugins/wasm-go/extensions/ai-statistics/README_EN.md b/plugins/wasm-go/extensions/ai-statistics/README_EN.md new file mode 100644 index 0000000000..1eccffcacc --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/README_EN.md @@ -0,0 +1,140 @@ +--- +title: AI Statistics +keywords: [higress, AI, observability] +description: AI Statistics plugin configuration reference +--- + +## Introduction +Provides basic AI observability capabilities, including metric, log, and trace. The ai-proxy plug-in needs to be connected afterwards. If the ai-proxy plug-in is not connected, the user needs to configure it accordingly to take effect. + +## Configuration instructions +The default request of the plug-in conforms to the openai protocol format and provides the following basic observable values. Users do not need special configuration: + +- metric: It provides indicators such as input token, output token, rt of the first token (streaming request), total request rt, etc., and supports observation in the four dimensions of gateway, routing, service, and model. +- log: Provides input_token, output_token, model, llm_service_duration, llm_first_token_duration and other fields + +Users can also expand observable values ​​through configuration: + +| Name | Type | Required | Default | Description | +|----------------|-------|------|-----|------------------------| +| `attributes` | []Attribute | required | - | Information that the user wants to record in log/span | + +Attribute Configuration instructions: + +| Name | Type | Required | Default | Description | +|----------------|-------|-----|-----|------------------------| +| `key` | string | required | - | attrribute key | +| `value_source` | string | required | - | attrribute value source, optional values ​​are `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | +| `value` | string | required | - | how to get attrribute value | +| `rule` | string | optional | - | Rule to extract attribute from streaming response, optional values ​​are `first`, `replace`, `append`| +| `apply_to_log` | bool | optional | false | Whether to record the extracted information in the log | +| `apply_to_span` | bool | optional | false | Whether to record the extracted information in the link tracking span | + +The meanings of various values for `value_source` ​​are as follows: + +- `fixed_value`: fixed value +- `requeset_header`: The attrribute is obtained through the http request header +- `request_body`: The attrribute is obtained through the http request body +- `response_header`: The attrribute is obtained through the http response header +- `response_body`: The attrribute is obtained through the http response body +- `response_streaming_body`: The attrribute is obtained through the http streaming response body + + +When `value_source` is `response_streaming_body`, `rule` should be configured to specify how to obtain the specified value from the streaming body. The meaning of the value is as follows: + +- `first`: extract value from the first valid chunk +- `replace`: extract value from the last valid chunk +- `append`: join value pieces from all valid chunks + +## Configuration example +If you want to record ai-statistic related statistical values ​​​​in the gateway access log, you need to modify log_format and add a new field based on the original log_format. The example is as follows: + +```yaml +'{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' +``` + +### Empty +#### Metric +``` +route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10 +route_upstream_model_metric_llm_duration_count{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1 +route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309 +route_upstream_model_metric_llm_service_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1955 +route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69 +``` + +#### Log +```json +{ + "ai_log":"{\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}" +} +``` + +#### Trace +When the configuration is empty, no additional attributes will be added to the span. + +### Extract token usage information from non-openai protocols +When setting the protocol to original in ai-proxy, taking Alibaba Cloud Bailian as an example, you can make the following configuration to specify how to extract `model`, `input_token`, `output_token` + +```yaml +attributes: + - key: model + value_source: response_body + value: usage.models.0.model_id + apply_to_log: true + apply_to_span: false + - key: input_token + value_source: response_body + value: usage.models.0.input_tokens + apply_to_log: true + apply_to_span: false + - key: output_token + value_source: response_body + value: usage.models.0.output_tokens + apply_to_log: true + apply_to_span: false +``` +#### Metric +``` +route_upstream_model_metric_input_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 343 +route_upstream_model_metric_output_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 153 +route_upstream_model_metric_llm_service_duration{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 3725 +route_upstream_model_metric_llm_duration_count{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 1 +``` + +#### Log +```json +{ + "ai_log": "{\"model\":\"qwen-max\",\"input_token\":\"343\",\"output_token\":\"153\",\"llm_service_duration\":\"19110\"}" +} +``` + +#### Trace +Three additional attributes `model`, `input_token`, and `output_token` can be seen in the trace spans. + +### Cooperate with authentication and authentication record consumer +```yaml +attributes: + - key: consumer + value_source: request_header + value: x-mse-consumer + apply_to_log: true +``` + +### Record questions and answers +```yaml +attributes: + - key: question + value_source: request_body + value: messages.@reverse.0.content + apply_to_log: true + - key: answer + value_source: response_streaming_body + value: choices.0.delta.content + rule: append + apply_to_log: true + - key: answer + value_source: response_body + value: choices.0.message.content + apply_to_log: true +``` \ No newline at end of file From 0b8b7fde2c6f92972f74d382d1197828fbe8c6bc Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Tue, 24 Sep 2024 14:03:28 +0800 Subject: [PATCH 17/18] add advanced usage --- .../extensions/ai-statistics/README.md | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 61e04969aa..5409f2c7c4 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -139,4 +139,35 @@ attributes: value_source: response_body value: choices.0.message.content apply_to_log: true +``` + +## 进阶 +配合阿里云SLS数据加工,可以将ai相关的字段进行提取加工,例如原始日志为: + +``` +ai_log:{"question":"用python计算2的3次方","answer":"你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方,即2乘以自己2次,可以用以下代码表示:\n\n```python\nresult = 2 ** 3\nprint(result)\n```\n\n运行这段代码,你会得到输出结果为8,因为2乘以自己两次等于8。","model":"qwen-max","input_token":"16","output_token":"76","llm_service_duration":"5913"} +``` + +使用如下数据加工脚本,可以提取出question和answer: + +``` +e_regex("ai_log", grok("%{EXTRACTJSON}")) +e_set("question", json_select(v("json"), "question", default="-")) +e_set("answer", json_select(v("json"), "answer", default="-")) +``` + +提取后,SLS中会添加question和answer两个字段,示例如下: + +``` +ai_log:{"question":"用python计算2的3次方","answer":"你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方,即2乘以自己2次,可以用以下代码表示:\n\n```python\nresult = 2 ** 3\nprint(result)\n```\n\n运行这段代码,你会得到输出结果为8,因为2乘以自己两次等于8。","model":"qwen-max","input_token":"16","output_token":"76","llm_service_duration":"5913"} + +question:用python计算2的3次方 + +answer:你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方,即2乘以自己2次,可以用以下代码表示: + +result = 2 ** 3 +print(result) + +运行这段代码,你会得到输出结果为8,因为2乘以自己两次等于8。 + ``` \ No newline at end of file From 744d39b81a2804532f12e41a7a2ec1928a732459 Mon Sep 17 00:00:00 2001 From: rinfx <893383980@qq.com> Date: Tue, 24 Sep 2024 18:53:00 +0800 Subject: [PATCH 18/18] update README --- plugins/wasm-go/extensions/ai-statistics/README.md | 5 +++++ plugins/wasm-go/extensions/ai-statistics/README_EN.md | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 5409f2c7c4..31fb207f9e 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -7,6 +7,11 @@ description: AI可观测配置参考 ## 介绍 提供AI可观测基础能力,包括 metric, log, trace,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则需要用户进行相应配置才可生效。 +## 运行属性 + +插件执行阶段:`默认阶段` +插件执行优先级:`200` + ## 配置说明 插件默认请求符合openai协议格式,并提供了以下基础可观测值,用户无需特殊配置: diff --git a/plugins/wasm-go/extensions/ai-statistics/README_EN.md b/plugins/wasm-go/extensions/ai-statistics/README_EN.md index 1eccffcacc..e94544a510 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README_EN.md +++ b/plugins/wasm-go/extensions/ai-statistics/README_EN.md @@ -7,6 +7,11 @@ description: AI Statistics plugin configuration reference ## Introduction Provides basic AI observability capabilities, including metric, log, and trace. The ai-proxy plug-in needs to be connected afterwards. If the ai-proxy plug-in is not connected, the user needs to configure it accordingly to take effect. +## Runtime Properties + +Plugin Phase: `CUSTOM` +Plugin Priority: `200` + ## Configuration instructions The default request of the plug-in conforms to the openai protocol format and provides the following basic observable values. Users do not need special configuration: