Skip to content

Commit

Permalink
Merge pull request #136 from meetwangdk/dk/tkp
Browse files Browse the repository at this point in the history
Add the TKP feature item.
  • Loading branch information
hnhbwlp authored Sep 5, 2024
2 parents 6f2ce75 + 7ca8c4c commit b19cf06
Show file tree
Hide file tree
Showing 18 changed files with 1,080 additions and 50 deletions.
10 changes: 9 additions & 1 deletion cmd/grafanadi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"flag"
tkpReqProvider "github.com/alipay/container-observability-service/pkg/tkp_provider"
"os"
"os/signal"
"syscall"
Expand All @@ -25,7 +26,7 @@ var (

func newRootCmd() *cobra.Command {
config := &server.ServerConfig{}
var cfgFile, kubeConfigFile string
var cfgFile, kubeConfigFile, tkpRefCfgFile string

cmd := &cobra.Command{
Use: "grafanadi",
Expand All @@ -49,6 +50,12 @@ func newRootCmd() *cobra.Command {
panic(err.Error())
}

err = tkpReqProvider.InitTkpReqConfig(tkpRefCfgFile)
if err != nil {
klog.Errorf("failed to init tkp config [%s] err:%s", tkpRefCfgFile, err.Error())
panic(err.Error())
}

serverConfig := &server.ServerConfig{
ListenAddr: config.ListenAddr,
Storage: storage,
Expand All @@ -70,6 +77,7 @@ func newRootCmd() *cobra.Command {
// for storage
cmd.PersistentFlags().StringVarP(&cfgFile, "config-file", "", "/app/storage-config.yaml", "storage config file")
cmd.PersistentFlags().StringVarP(&service.GrafanaUrl, "grafana-url", "", "", "grafana url")
cmd.PersistentFlags().StringVarP(&tkpRefCfgFile, "tkp-req-config-file", "", "/app/tkp-req-config-file.json", "tkp req config file")

// kubeconfig for k8s client
cmd.PersistentFlags().StringVarP(&kubeConfigFile, "kubeconfig", "", "/etc/kubernetes/kubeconfig/admin.kubeconfig", "Path to kubeconfig file with authorization and apiserver information.")
Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/lunettes/templates/grafanadi/grafanadi-cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ data:
endpoint: "http://es-cluster-svc.{{ .Values.namespace }}:9200"
username: {{ .Values.esUser }}
password: {{ .Values.esPassword }}
tkp-req-config-file.json: |
{
"staging":"http://alipay-tkp-manager.tkp.svc.cluster.local:9999"
}
kind: ConfigMap
metadata:
name: grafanadi-cm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ spec:
name: logs
- mountPath: /var/grafana
name: grafana-pv
- mountPath: /app/config-file.yaml
- mountPath: /app
name: cm-vol
readOnly: true
subPath: config-file.yaml
volumes:
- name: grafana-pv
hostPath:
Expand Down
214 changes: 214 additions & 0 deletions internal/grafanadi/handler/owner_podmap_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package handler

import (
"context"
"fmt"
eavesmodel "github.com/alipay/container-observability-service/internal/grafanadi/model"
"github.com/alipay/container-observability-service/internal/grafanadi/service"
interutils "github.com/alipay/container-observability-service/internal/grafanadi/utils"
"github.com/alipay/container-observability-service/pkg/dal/storage-client/data_access"
"github.com/alipay/container-observability-service/pkg/dal/storage-client/model"
"github.com/alipay/container-observability-service/pkg/metrics"
"github.com/alipay/container-observability-service/pkg/utils"
"github.com/olivere/elastic/v7"
"k8s.io/klog/v2"
"net/http"
"time"
)

type OwnerPodMapHandler struct {
request *http.Request
writer http.ResponseWriter
requestParams *OwnerPodMapParams
storage data_access.StorageInterface
}

func (handler *OwnerPodMapHandler) GetOwnerPodMap(debugfrom, key, value string) (int, interface{}, error) {
sloTraceData := make([]*model.SloTraceData, 0)
result := []model.SloTraceData{}
podYamls := make([]*model.PodYaml, 0)
begin := time.Now()
defer func() {
cost := utils.TimeSinceInMilliSeconds(begin)
metrics.QueryMethodDurationMilliSeconds.WithLabelValues("GetOwnerPodMap").Observe(cost)
}()
if debugfrom == "pod" {
// get owneref pod with pod key/value
util := interutils.Util{
Storage: handler.storage,
}
py, err := util.GetPodYaml(podYamls, key, value)
if err != nil || len(py) == 0 {
return http.StatusOK, eavesmodel.DataFrame{}, err
}
if py[0].Pod == nil {
return http.StatusOK, eavesmodel.DataFrame{}, err
}

if len(py[0].Pod.OwnerReferences) != 0 {
or := py[0].Pod.OwnerReferences[0]
value = string(or.UID)
}
} else {
switch key {
case "name":
uid, err := findUniqueId(value, handler.storage)
klog.Info("uid is %s", uid)
if err != nil {
klog.Errorf("findUniqueId error, error is %s", err)
return http.StatusOK, eavesmodel.DataFrame{}, err
}
value = uid
default:
fmt.Println("currently only supports uid or name")
return http.StatusOK, eavesmodel.DataFrame{}, nil
}
}
if value == "" {
return http.StatusOK, eavesmodel.DataFrame{}, nil
}
err := handler.storage.QuerySloTraceDataWithOwnerId(&sloTraceData, value,
model.WithFrom(handler.requestParams.From),
model.WithTo(handler.requestParams.To),
model.WithLimit(1000))
if err != nil {
return http.StatusOK, eavesmodel.DataFrame{}, fmt.Errorf("QuerySloTraceDataWithOwnerId error, error is %s", err)
}
for _, std := range sloTraceData {
if std.Type == "create" || std.Type == "delete" {
found := false
for i, pod := range result {
if pod.PodUID == std.PodUID {
if std.Type == "create" {
result[i].CreatedTime = std.CreatedTime
result[i].OwnerRefStr = std.OwnerRefStr
if std.RunningAt.After(std.ReadyAt) {
std.ReadyAt = std.RunningAt
}
result[i].ReadyAt = std.ReadyAt
result[i].SLOViolationReason = std.SLOViolationReason
} else {
result[i].DeletedTime = std.CreatedTime
result[i].DeleteEndTime = std.DeleteEndTime
result[i].DeleteResult = std.DeleteResult
}
found = true
}
}
if !found {
if std.RunningAt.After(std.ReadyAt) {
std.ReadyAt = std.RunningAt
}
if std.Type == "delete" {
std.DeletedTime = std.CreatedTime
}
result = append(result, *std)
}
}
}
return http.StatusOK, service.ConvertSloDataTrace2Graph(result), nil
}

type OwnerPodMapParams struct {
Key string
Value string
DebugFrom string

From time.Time // range query
To time.Time // range query

}

func (handler *OwnerPodMapHandler) RequestParams() interface{} {
return handler.requestParams
}

func (handler *OwnerPodMapHandler) ParseRequest() error {
params := OwnerPodMapParams{}
if handler.request.Method == http.MethodGet {
key := handler.request.URL.Query().Get("searchkey")
value := handler.request.URL.Query().Get("searchvalue")
debugfrom := handler.request.URL.Query().Get("debugfrom")
params.Key = key
params.Value = value
params.DebugFrom = debugfrom

setTPLayout(handler.request.URL.Query(), "from", &params.From)
setTPLayout(handler.request.URL.Query(), "to", &params.To)
}

handler.requestParams = &params
return nil
}

func (handler *OwnerPodMapHandler) ValidRequest() error {

return nil
}

func OwnerPodMapFactory(w http.ResponseWriter, r *http.Request, storage data_access.StorageInterface) Handler {
return &OwnerPodMapHandler{
request: r,
writer: w,
storage: storage,
}
}

func (handler *OwnerPodMapHandler) Process() (int, interface{}, error) {
defer utils.IgnorePanic("ContainerlifecycleHandler.Process ")

var result interface{}
var err error
var httpStatus int

httpStatus, result, err = handler.GetOwnerPodMap(handler.requestParams.DebugFrom, handler.requestParams.Key, handler.requestParams.Value)

return httpStatus, result, err
}

func findUniqueId(workloadName string, storage data_access.StorageInterface) (uid string, err error) {
esClient, ok := storage.(*data_access.StorageEsImpl)
if !ok {
err = fmt.Errorf("parse errror")
return
}
query := elastic.NewBoolQuery().
Must(
elastic.NewTermQuery("ExtraProperties.ownerref.name.Value.keyword", workloadName),
elastic.NewExistsQuery("ExtraProperties.ownerref.uid.Value.keyword"),
elastic.NewExistsQuery("ExtraProperties.ownerref.name.Value.keyword"),
)
aggs := elastic.NewTermsAggregation().
Field("ExtraProperties.ownerref.name.Value.keyword").
SubAggregation("group_by_ownerref_uid", elastic.NewTermsAggregation().Field("ExtraProperties.ownerref.uid.Value.keyword"))

searchResult, err := esClient.DB.Search().
Index("slo_trace_data_daily").
Query(query).
Size(0).
Aggregation("group_by_ownerref_name", aggs).
Do(context.Background())
if err != nil {
err = fmt.Errorf("failed to execute search query: %v", err)
klog.Errorf("Failed to execute search query: %v", err)
return
}

if agg, found := searchResult.Aggregations.Terms("group_by_ownerref_name"); found {
for _, bucket := range agg.Buckets {
if uidAgg, uidFound := bucket.Aggregations.Terms("group_by_ownerref_uid"); uidFound {
for _, detail := range uidAgg.Buckets {
if strKey, ok := detail.Key.(string); ok {
return strKey, nil
} else {
return "", fmt.Errorf("workload uid key is not a string")
}
}
}
break
}
} else {
klog.Infof("No aggs aggregation found")
}
return
}
Loading

0 comments on commit b19cf06

Please sign in to comment.