diff --git a/cmd/grafanadi/main.go b/cmd/grafanadi/main.go index 3f5c8dc8..907ec7ee 100644 --- a/cmd/grafanadi/main.go +++ b/cmd/grafanadi/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + tkpReqProvider "github.com/alipay/container-observability-service/pkg/tkp_provider" "os" "os/signal" "syscall" @@ -25,7 +26,7 @@ var ( func newRootCmd() *cobra.Command { config := &server.ServerConfig{} - var cfgFile, kubeConfigFile string + var cfgFile, kubeConfigFile, tkpRefCfgFile string cmd := &cobra.Command{ Use: "grafanadi", @@ -49,6 +50,12 @@ func newRootCmd() *cobra.Command { panic(err.Error()) } + err = tkpReqProvider.InitTkpReqConfig(tkpRefCfgFile) + if err != nil { + klog.Errorf("failed to init tkp config [%s] err:%s", tkpRefCfgFile, err.Error()) + panic(err.Error()) + } + serverConfig := &server.ServerConfig{ ListenAddr: config.ListenAddr, Storage: storage, @@ -70,6 +77,7 @@ func newRootCmd() *cobra.Command { // for storage cmd.PersistentFlags().StringVarP(&cfgFile, "config-file", "", "/app/storage-config.yaml", "storage config file") cmd.PersistentFlags().StringVarP(&service.GrafanaUrl, "grafana-url", "", "", "grafana url") + cmd.PersistentFlags().StringVarP(&tkpRefCfgFile, "tkp-req-config-file", "", "/app/tkp-req-config-file.json", "tkp req config file") // kubeconfig for k8s client cmd.PersistentFlags().StringVarP(&kubeConfigFile, "kubeconfig", "", "/etc/kubernetes/kubeconfig/admin.kubeconfig", "Path to kubeconfig file with authorization and apiserver information.") diff --git a/deploy/helm/lunettes/templates/grafanadi/grafanadi-cm.yaml b/deploy/helm/lunettes/templates/grafanadi/grafanadi-cm.yaml index 8c105900..daa842a9 100644 --- a/deploy/helm/lunettes/templates/grafanadi/grafanadi-cm.yaml +++ b/deploy/helm/lunettes/templates/grafanadi/grafanadi-cm.yaml @@ -8,6 +8,10 @@ data: endpoint: "http://es-cluster-svc.{{ .Values.namespace }}:9200" username: {{ .Values.esUser }} password: {{ .Values.esPassword }} + tkp-req-config-file.json: | + { + "staging":"http://alipay-tkp-manager.tkp.svc.cluster.local:9999" + } kind: ConfigMap metadata: name: grafanadi-cm diff --git a/deploy/helm/lunettes/templates/grafanadi/grafanadi-deploy.yaml b/deploy/helm/lunettes/templates/grafanadi/grafanadi-deploy.yaml index 098e21f2..627c7ee7 100644 --- a/deploy/helm/lunettes/templates/grafanadi/grafanadi-deploy.yaml +++ b/deploy/helm/lunettes/templates/grafanadi/grafanadi-deploy.yaml @@ -68,10 +68,9 @@ spec: name: logs - mountPath: /var/grafana name: grafana-pv - - mountPath: /app/config-file.yaml + - mountPath: /app name: cm-vol readOnly: true - subPath: config-file.yaml volumes: - name: grafana-pv hostPath: diff --git a/internal/grafanadi/handler/owner_podmap_handler.go b/internal/grafanadi/handler/owner_podmap_handler.go new file mode 100644 index 00000000..cc559f4e --- /dev/null +++ b/internal/grafanadi/handler/owner_podmap_handler.go @@ -0,0 +1,214 @@ +package handler + +import ( + "context" + "fmt" + eavesmodel "github.com/alipay/container-observability-service/internal/grafanadi/model" + "github.com/alipay/container-observability-service/internal/grafanadi/service" + interutils "github.com/alipay/container-observability-service/internal/grafanadi/utils" + "github.com/alipay/container-observability-service/pkg/dal/storage-client/data_access" + "github.com/alipay/container-observability-service/pkg/dal/storage-client/model" + "github.com/alipay/container-observability-service/pkg/metrics" + "github.com/alipay/container-observability-service/pkg/utils" + "github.com/olivere/elastic/v7" + "k8s.io/klog/v2" + "net/http" + "time" +) + +type OwnerPodMapHandler struct { + request *http.Request + writer http.ResponseWriter + requestParams *OwnerPodMapParams + storage data_access.StorageInterface +} + +func (handler *OwnerPodMapHandler) GetOwnerPodMap(debugfrom, key, value string) (int, interface{}, error) { + sloTraceData := make([]*model.SloTraceData, 0) + result := []model.SloTraceData{} + podYamls := make([]*model.PodYaml, 0) + begin := time.Now() + defer func() { + cost := utils.TimeSinceInMilliSeconds(begin) + metrics.QueryMethodDurationMilliSeconds.WithLabelValues("GetOwnerPodMap").Observe(cost) + }() + if debugfrom == "pod" { + // get owneref pod with pod key/value + util := interutils.Util{ + Storage: handler.storage, + } + py, err := util.GetPodYaml(podYamls, key, value) + if err != nil || len(py) == 0 { + return http.StatusOK, eavesmodel.DataFrame{}, err + } + if py[0].Pod == nil { + return http.StatusOK, eavesmodel.DataFrame{}, err + } + + if len(py[0].Pod.OwnerReferences) != 0 { + or := py[0].Pod.OwnerReferences[0] + value = string(or.UID) + } + } else { + switch key { + case "name": + uid, err := findUniqueId(value, handler.storage) + klog.Info("uid is %s", uid) + if err != nil { + klog.Errorf("findUniqueId error, error is %s", err) + return http.StatusOK, eavesmodel.DataFrame{}, err + } + value = uid + default: + fmt.Println("currently only supports uid or name") + return http.StatusOK, eavesmodel.DataFrame{}, nil + } + } + if value == "" { + return http.StatusOK, eavesmodel.DataFrame{}, nil + } + err := handler.storage.QuerySloTraceDataWithOwnerId(&sloTraceData, value, + model.WithFrom(handler.requestParams.From), + model.WithTo(handler.requestParams.To), + model.WithLimit(1000)) + if err != nil { + return http.StatusOK, eavesmodel.DataFrame{}, fmt.Errorf("QuerySloTraceDataWithOwnerId error, error is %s", err) + } + for _, std := range sloTraceData { + if std.Type == "create" || std.Type == "delete" { + found := false + for i, pod := range result { + if pod.PodUID == std.PodUID { + if std.Type == "create" { + result[i].CreatedTime = std.CreatedTime + result[i].OwnerRefStr = std.OwnerRefStr + if std.RunningAt.After(std.ReadyAt) { + std.ReadyAt = std.RunningAt + } + result[i].ReadyAt = std.ReadyAt + result[i].SLOViolationReason = std.SLOViolationReason + } else { + result[i].DeletedTime = std.CreatedTime + result[i].DeleteEndTime = std.DeleteEndTime + result[i].DeleteResult = std.DeleteResult + } + found = true + } + } + if !found { + if std.RunningAt.After(std.ReadyAt) { + std.ReadyAt = std.RunningAt + } + if std.Type == "delete" { + std.DeletedTime = std.CreatedTime + } + result = append(result, *std) + } + } + } + return http.StatusOK, service.ConvertSloDataTrace2Graph(result), nil +} + +type OwnerPodMapParams struct { + Key string + Value string + DebugFrom string + + From time.Time // range query + To time.Time // range query + +} + +func (handler *OwnerPodMapHandler) RequestParams() interface{} { + return handler.requestParams +} + +func (handler *OwnerPodMapHandler) ParseRequest() error { + params := OwnerPodMapParams{} + if handler.request.Method == http.MethodGet { + key := handler.request.URL.Query().Get("searchkey") + value := handler.request.URL.Query().Get("searchvalue") + debugfrom := handler.request.URL.Query().Get("debugfrom") + params.Key = key + params.Value = value + params.DebugFrom = debugfrom + + setTPLayout(handler.request.URL.Query(), "from", ¶ms.From) + setTPLayout(handler.request.URL.Query(), "to", ¶ms.To) + } + + handler.requestParams = ¶ms + return nil +} + +func (handler *OwnerPodMapHandler) ValidRequest() error { + + return nil +} + +func OwnerPodMapFactory(w http.ResponseWriter, r *http.Request, storage data_access.StorageInterface) Handler { + return &OwnerPodMapHandler{ + request: r, + writer: w, + storage: storage, + } +} + +func (handler *OwnerPodMapHandler) Process() (int, interface{}, error) { + defer utils.IgnorePanic("ContainerlifecycleHandler.Process ") + + var result interface{} + var err error + var httpStatus int + + httpStatus, result, err = handler.GetOwnerPodMap(handler.requestParams.DebugFrom, handler.requestParams.Key, handler.requestParams.Value) + + return httpStatus, result, err +} + +func findUniqueId(workloadName string, storage data_access.StorageInterface) (uid string, err error) { + esClient, ok := storage.(*data_access.StorageEsImpl) + if !ok { + err = fmt.Errorf("parse errror") + return + } + query := elastic.NewBoolQuery(). + Must( + elastic.NewTermQuery("ExtraProperties.ownerref.name.Value.keyword", workloadName), + elastic.NewExistsQuery("ExtraProperties.ownerref.uid.Value.keyword"), + elastic.NewExistsQuery("ExtraProperties.ownerref.name.Value.keyword"), + ) + aggs := elastic.NewTermsAggregation(). + Field("ExtraProperties.ownerref.name.Value.keyword"). + SubAggregation("group_by_ownerref_uid", elastic.NewTermsAggregation().Field("ExtraProperties.ownerref.uid.Value.keyword")) + + searchResult, err := esClient.DB.Search(). + Index("slo_trace_data_daily"). + Query(query). + Size(0). + Aggregation("group_by_ownerref_name", aggs). + Do(context.Background()) + if err != nil { + err = fmt.Errorf("failed to execute search query: %v", err) + klog.Errorf("Failed to execute search query: %v", err) + return + } + + if agg, found := searchResult.Aggregations.Terms("group_by_ownerref_name"); found { + for _, bucket := range agg.Buckets { + if uidAgg, uidFound := bucket.Aggregations.Terms("group_by_ownerref_uid"); uidFound { + for _, detail := range uidAgg.Buckets { + if strKey, ok := detail.Key.(string); ok { + return strKey, nil + } else { + return "", fmt.Errorf("workload uid key is not a string") + } + } + } + break + } + } else { + klog.Infof("No aggs aggregation found") + } + return +} diff --git a/internal/grafanadi/handler/tkp_handler.go b/internal/grafanadi/handler/tkp_handler.go new file mode 100644 index 00000000..bd3f4d74 --- /dev/null +++ b/internal/grafanadi/handler/tkp_handler.go @@ -0,0 +1,135 @@ +package handler + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/alipay/container-observability-service/pkg/dal/storage-client/data_access" + "github.com/alipay/container-observability-service/pkg/metrics" + tkpReqProvider "github.com/alipay/container-observability-service/pkg/tkp_provider" + "github.com/alipay/container-observability-service/pkg/utils" + "io" + "k8s.io/klog" + "log" + "net/http" + "time" +) + +const ( + tkpNamespace = "lunettes" + tkpSvcName = "alipay-tkp-manager" +) + +type TkpHandler struct { + request *http.Request + writer http.ResponseWriter + requestParams *TkpParams + storage data_access.StorageInterface +} + +type TkpParams struct { + Cluster string `json:"cluster"` + Namespace string `json:"pod_namespace"` + PodName string `json:"pod_name"` +} + +func (handler *TkpHandler) RequestParams() interface{} { + return handler.requestParams +} + +func (handler *TkpHandler) ParseRequest() error { + params := TkpParams{} + if handler.request.Method == http.MethodPost { + err := json.NewDecoder(handler.request.Body).Decode(¶ms) + if err != nil { + klog.Errorf("parse request body error: %s", err) + return err + } + } + handler.requestParams = ¶ms + klog.Infof("tkp request params: %+v", params) + return nil +} + +func (handler *TkpHandler) ValidRequest() error { + return nil +} + +type TkpResp struct { + Message string `json:"message"` + Code int `json:"code"` + Data TkpBody `json:"data"` +} + +type TkpBody struct { + PodNamespace string `json:"pod_namespace"` + PodName string `json:"pod_name"` + //被托管的pod 生成的vtk 对应的名称 + Vtkp string `json:"vtkp"` +} + +func buildReqUrl(cluster, uri string) string { + return fmt.Sprintf("%s%s", tkpReqProvider.GetTkpReqUrl(cluster), uri) +} + +func (handler *TkpHandler) Tkp(params *TkpParams) (int, interface{}, error) { + var tkpResp TkpResp + begin := time.Now() + defer func() { + cost := utils.TimeSinceInMilliSeconds(begin) + metrics.QueryMethodDurationMilliSeconds.WithLabelValues(" Tkp").Observe(cost) + }() + reqUrl := buildReqUrl(params.Cluster, "/apis/v2/turnkeypods/pods") + klog.Infof("tkp request url: %s", reqUrl) + jsonData, err := json.Marshal(params) + if err != nil { + fmt.Println("Error encoding JSON:", err) + return http.StatusOK, nil, err + } + //post tkp + req, err := http.NewRequest(http.MethodPost, reqUrl, bytes.NewBuffer(jsonData)) + if err != nil { + log.Printf("new request: %s error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("get url: %s error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("ReadAll req %s body error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + + err = json.Unmarshal(body, &tkpResp) + if err != nil { + log.Printf("Unmarshal req %s body error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + return http.StatusOK, tkpResp, nil + +} + +func (handler *TkpHandler) Process() (int, interface{}, error) { + defer utils.IgnorePanic(" TkpHandler.Process ") + + var result interface{} + var err error + var httpStatus int + + httpStatus, result, err = handler.Tkp(handler.requestParams) + + return httpStatus, result, err +} + +func TkpFactory(w http.ResponseWriter, r *http.Request, storage data_access.StorageInterface) Handler { + return &TkpHandler{ + request: r, + writer: w, + storage: storage, + } +} diff --git a/internal/grafanadi/handler/vtkp_detail_handler.go b/internal/grafanadi/handler/vtkp_detail_handler.go new file mode 100644 index 00000000..62528977 --- /dev/null +++ b/internal/grafanadi/handler/vtkp_detail_handler.go @@ -0,0 +1,200 @@ +package handler + +import ( + "encoding/json" + "github.com/alipay/container-observability-service/pkg/dal/storage-client/data_access" + "github.com/alipay/container-observability-service/pkg/metrics" + "github.com/alipay/container-observability-service/pkg/utils" + "io" + "k8s.io/klog" + "net/http" + "net/url" + "time" +) + +type VTkpDetailHandler struct { + request *http.Request + writer http.ResponseWriter + requestParams *VTkpDetailParams + storage data_access.StorageInterface +} + +type VTkpDetailParams struct { + Cluster string + VtkpNamespace string + VtkpName string +} + +func (handler *VTkpDetailHandler) RequestParams() interface{} { + return handler.requestParams +} + +func (handler *VTkpDetailHandler) ParseRequest() error { + params := VTkpDetailParams{} + if handler.request.Method == http.MethodGet { + klog.Infof("vtkpDetail request params: %+v", handler.request.URL.Query()) + params.Cluster = handler.request.URL.Query().Get("cluster") + params.VtkpNamespace = handler.request.URL.Query().Get("vtkp_namespace") + params.VtkpName = handler.request.URL.Query().Get("vtkp_name") + } + handler.requestParams = ¶ms + klog.Infof("vtkpDetail request params: %+v", params) + return nil +} + +func (handler *VTkpDetailHandler) ValidRequest() error { + return nil +} + +type VTkpDetailResp struct { + Message string `json:"message"` + Code int `json:"code"` + Data VTkpData `json:"data"` +} + +type VTkpData struct { + VtkpNamespace string `json:"vtkp_namespace"` + VtkpId string `json:"vtkp_uid"` + VtkpName string `json:"vtkp_name"` + PeerPod string `json:"peer_pod"` + PodInfos []TkpPodInfo `json:"pod_infos"` +} + +type TkpPodInfo struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + Age string `json:"age"` + Ip string `json:"ip"` + Node string `json:"node"` + Ready string `json:"ready"` + Status string `json:"status"` + Restarts int `json:"restarts"` + OwnerRefs []OwnerRefs `json:"ownerRefs"` +} + +type OwnerRefs struct { + ApiVersion string `json:"apiVersion"` + Kind string `json:"kind"` + Name string `json:"name"` + Uid string `json:"uid"` +} + +func (handler *VTkpDetailHandler) VTkpDetail(params *VTkpDetailParams) (int, interface{}, error) { + var tkpResp VTkpDetailResp + begin := time.Now() + defer func() { + cost := utils.TimeSinceInMilliSeconds(begin) + metrics.QueryMethodDurationMilliSeconds.WithLabelValues(" VTkpDetail ").Observe(cost) + }() + reqUrl := buildReqUrl(params.Cluster, "/apis/v2/turnkeypods/profiles") + tkpDetailReq, err := url.Parse(reqUrl) + if err != nil { + klog.Errorf("url parse error: %s\n", err) + return http.StatusOK, nil, err + } + queryParams := url.Values{} + queryParams.Set("vtkp_namespace", params.VtkpNamespace) + queryParams.Set("vtkp_name", params.VtkpName) + tkpDetailReq.RawQuery = queryParams.Encode() + klog.Infof("vTkpDetailHandler req: %s\n", tkpDetailReq.String()) + + req, err := http.NewRequest(http.MethodGet, tkpDetailReq.String(), nil) + if err != nil { + klog.Errorf("new request: %s error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + klog.Errorf("get url: %s error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + klog.Errorf("ReadAll req %s body error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + + err = json.Unmarshal(body, &tkpResp) + if err != nil { + klog.Errorf("Unmarshal req %s body error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + klog.Infof("vtkpDetailResp: %+v\n", tkpResp) + //结构转换 + return http.StatusOK, convertTkpForGData(&tkpResp), nil +} + +type PodType string + +type GVTkpDetail struct { + PodList []GPodInfos `json:"podList"` + WorkloadList []OwnerRefs `json:"workloadList"` +} + +type GPodInfos struct { + RefId []string `json:"refIds"` + PodInfo TkpPodInfo `json:"detail"` +} + +func convertTkpForGData(v *VTkpDetailResp) GVTkpDetail { + var gVTkpDetail GVTkpDetail + // HTTP code check + if v.Code < 200 || v.Code >= 300 { + klog.Errorf("http code error: %d\n", v.Code) + return gVTkpDetail + } + workloadMap := make(map[string]OwnerRefs) + for _, podInfo := range v.Data.PodInfos { + var refIds []string + for _, ownerRefDetail := range podInfo.OwnerRefs { + if _, exists := workloadMap[ownerRefDetail.Uid]; !exists { + workloadMap[ownerRefDetail.Uid] = ownerRefDetail + } + refIds = append(refIds, ownerRefDetail.Uid) + } + gVTkpDetail.PodList = append(gVTkpDetail.PodList, GPodInfos{ + RefId: refIds, + PodInfo: podInfo, + }) + } + + for _, ownerRefDetail := range workloadMap { + gVTkpDetail.WorkloadList = append(gVTkpDetail.WorkloadList, ownerRefDetail) + } + return gVTkpDetail +} + +func DeduplicateByUid(refs []OwnerRefs) []OwnerRefs { + uidMap := make(map[string]bool) + var uniqueRefs []OwnerRefs + + for _, ref := range refs { + if _, exists := uidMap[ref.Uid]; !exists { + uidMap[ref.Uid] = true + uniqueRefs = append(uniqueRefs, ref) + } + } + return uniqueRefs +} + +func (handler *VTkpDetailHandler) Process() (int, interface{}, error) { + defer utils.IgnorePanic(" VTkpDetailHandler.Process ") + + var result interface{} + var err error + var httpStatus int + + httpStatus, result, err = handler.VTkpDetail(handler.requestParams) + + return httpStatus, result, err +} + +func VTkpDetailFactory(w http.ResponseWriter, r *http.Request, storage data_access.StorageInterface) Handler { + return &VTkpDetailHandler{ + request: r, + writer: w, + storage: storage, + } +} diff --git a/internal/grafanadi/handler/vtkp_status_handler.go b/internal/grafanadi/handler/vtkp_status_handler.go new file mode 100644 index 00000000..6c540a20 --- /dev/null +++ b/internal/grafanadi/handler/vtkp_status_handler.go @@ -0,0 +1,191 @@ +package handler + +import ( + "encoding/json" + "github.com/alipay/container-observability-service/pkg/dal/storage-client/data_access" + "github.com/alipay/container-observability-service/pkg/metrics" + "github.com/alipay/container-observability-service/pkg/utils" + "io" + "k8s.io/klog" + "net/http" + "net/url" + "time" +) + +type VTkpStatusHandler struct { + request *http.Request + writer http.ResponseWriter + requestParams *VTkpStatusParams + storage data_access.StorageInterface +} + +type VTkpStatusParams struct { + Cluster string `json:"cluster"` + VtkpNamespace string `json:"vtkpNamespace"` + VtkpName string `json:"vtkpName"` +} + +func (handler *VTkpStatusHandler) RequestParams() interface{} { + return handler.requestParams +} + +func (handler *VTkpStatusHandler) ParseRequest() error { + params := VTkpStatusParams{} + if handler.request.Method == http.MethodGet { + params.Cluster = handler.request.URL.Query().Get("cluster") + params.VtkpNamespace = handler.request.URL.Query().Get("vtkp_namespace") + params.VtkpName = handler.request.URL.Query().Get("vtkp_name") + } + handler.requestParams = ¶ms + klog.Infof("vtkpStatus request params: %+v", params) + return nil +} + +func (handler *VTkpStatusHandler) ValidRequest() error { + return nil +} + +type VTkpStatusResp struct { + Message string `json:"message"` + Code int `json:"code"` + Data VTkpStatusData `json:"data"` +} + +type VTkpStatusData struct { + Type string `json:"type"` + Status string `json:"status"` + CurPhase string `json:"cur_phase"` + TotalPhases []string `json:"total_phases"` + CompletedPhases []string `json:"completed_phases"` +} + +func (handler *VTkpStatusHandler) VTkpStatus(params *VTkpStatusParams) (int, interface{}, error) { + var tkpResp VTkpStatusResp + begin := time.Now() + defer func() { + cost := utils.TimeSinceInMilliSeconds(begin) + metrics.QueryMethodDurationMilliSeconds.WithLabelValues(" VTkpStatus ").Observe(cost) + }() + reqUrl := buildReqUrl(params.Cluster, "/apis/v2/turnkeypods/progress") + tkpStatusReq, err := url.Parse(reqUrl) + if err != nil { + klog.Errorf("url parse error: %s\n", err) + return http.StatusOK, nil, err + } + queryParams := url.Values{} + queryParams.Set("vtkp_namespace", params.VtkpNamespace) + queryParams.Set("vtkp_name", params.VtkpName) + tkpStatusReq.RawQuery = queryParams.Encode() + klog.Infof("VTkpStatusHandler req : %s\n", tkpStatusReq.String()) + + req, err := http.NewRequest(http.MethodGet, tkpStatusReq.String(), nil) + if err != nil { + klog.Errorf("new request: %s error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + klog.Errorf("get url: %s error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + klog.Errorf("ReadAll req %s body error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + + err = json.Unmarshal(body, &tkpResp) + if err != nil { + klog.Errorf("Unmarshal req %s body error: %s\n", reqUrl, err) + return http.StatusOK, nil, err + } + klog.Infof("vtkpStatusResp: %+v\n", tkpResp) + //结构转换 + return http.StatusOK, convertTkpStatusForGData(&tkpResp), nil +} + +type GVTkpStatus struct { + PhaseList []Phase `json:"phase_list"` +} + +type Phase struct { + Name string `json:"name"` + Status string `json:"status"` + Trigger bool `json:"trigger"` + Reason string `json:"reason"` +} + +func convertTkpStatusForGData(v *VTkpStatusResp) interface{} { + var PhaseList []Phase + // HTTP code check + if v.Code < 200 || v.Code >= 300 { + klog.Errorf("http code error: %d\n", v.Code) + return PhaseList + } + var set []string + mapPhase := make(map[string]Phase, 4) + for _, phaseName := range v.Data.TotalPhases { + set = append(set, phaseName) + mapPhase[phaseName] = Phase{ + Name: phaseName, + Status: "", + Trigger: false, + Reason: "", + } + } + + for _, compPhase := range v.Data.CompletedPhases { + mapPhase[compPhase] = Phase{ + Name: compPhase, + Status: "success", + Trigger: true, + Reason: "", + } + } + mapPhase[v.Data.CurPhase] = Phase{ + Name: v.Data.CurPhase, + Status: v.Data.Status, + Trigger: true, + Reason: "", + } + + //根据 set 返回 + for _, phaseName := range set { + value, exists := mapPhase[phaseName] + if exists { + PhaseList = append(PhaseList, value) + } + } + return PhaseList +} + +func FindStringIndex(slice []string, str string) int { + for i, v := range slice { + if v == str { + return i + } + } + return -1 +} + +func (handler *VTkpStatusHandler) Process() (int, interface{}, error) { + defer utils.IgnorePanic(" VTkpStatusHandler.Process ") + + var result interface{} + var err error + var httpStatus int + + httpStatus, result, err = handler.VTkpStatus(handler.requestParams) + + return httpStatus, result, err +} + +func VTkpStatusFactory(w http.ResponseWriter, r *http.Request, storage data_access.StorageInterface) Handler { + return &VTkpStatusHandler{ + request: r, + writer: w, + storage: storage, + } +} diff --git a/internal/grafanadi/model/querypodlist.go b/internal/grafanadi/model/querypodlist.go index 61e0bba2..8ba73088 100644 --- a/internal/grafanadi/model/querypodlist.go +++ b/internal/grafanadi/model/querypodlist.go @@ -1,13 +1,23 @@ package model type QueryPodListTable struct { - Podname string `json:"podname,omitempty"` - PodIP string `json:"podip,omitempty"` - Cluster string `json:"cluster,omitempty"` - PodUID string `json:"poduid,omitempty"` - NodeIP string `json:"nodeip,omitempty"` - CreateTime string `json:"createTime,omitempty"` - Namespace string `json:"namespace,omitempty"` - State string `json:"state,omitempty"` - PodPhase string `json:"podphase,omitempty"` + Podname string `json:"podname,omitempty"` + PodIP string `json:"podip,omitempty"` + Cluster string `json:"cluster,omitempty"` + PodUID string `json:"poduid,omitempty"` + NodeIP string `json:"nodeip,omitempty"` + NodeName string `json:"nodename,omitempty"` + CreateTime string `json:"createTime,omitempty"` + Namespace string `json:"namespace,omitempty"` + State string `json:"state,omitempty"` + PodPhase string `json:"podphase,omitempty"` + WorkloadInfo WorkloadTable `json:"workloadInfo,omitempty"` +} + +type WorkloadTable struct { + ClusterName string `json:"ClusterName,omitempty"` + Namespace string `json:"Namespace,omitempty"` + Name string `json:"Name,omitempty"` + UID string `json:"Uid,omitempty"` + Kind string `json:"Kind,omitempty"` } diff --git a/internal/grafanadi/server/server.go b/internal/grafanadi/server/server.go index 8201dc72..303c2ab4 100644 --- a/internal/grafanadi/server/server.go +++ b/internal/grafanadi/server/server.go @@ -66,10 +66,16 @@ func (s *Server) StartServer(stopCh chan struct{}) { r.Path("/podyamlgraphedges").HandlerFunc(handlerWrapper(handler.NodeGraphParamsFactory, s.Storage)) r.Path("/elasticaggregations").HandlerFunc(corsWrapper(interutils.ServeSLOGrafanaDI, s.Storage)) r.Path("/rawdata").HandlerFunc(handlerWrapper(handler.RawdataFactory, s.Storage)) + r.Path("/ownerpodmaps").HandlerFunc(handlerWrapper(handler.OwnerPodMapFactory, s.Storage)) // federation api r.Path("/apis/v1/querypodlist").HandlerFunc(handlerWrapper(handler.QueryPodListFactory, s.Storage)) + //tkp + r.Path("/apis/v1/tkp").HandlerFunc(handlerWrapper(handler.TkpFactory, s.Storage)) + r.Path("/apis/v1/tkp_detail").HandlerFunc(handlerWrapper(handler.VTkpDetailFactory, s.Storage)) + r.Path("/apis/v1/tkp_status").HandlerFunc(handlerWrapper(handler.VTkpStatusFactory, s.Storage)) + //lunettes meta api r.Path("/apis/v1/lunettes-meta").HandlerFunc(handlerWrapper(handler.LunettesLatencyFactory, s.Storage)) diff --git a/internal/grafanadi/service/esgrafana.go b/internal/grafanadi/service/esgrafana.go new file mode 100644 index 00000000..d2bfa73c --- /dev/null +++ b/internal/grafanadi/service/esgrafana.go @@ -0,0 +1,150 @@ +package service + +import ( + "encoding/json" + "fmt" + eavesmodel "github.com/alipay/container-observability-service/internal/grafanadi/model" + "github.com/alipay/container-observability-service/pkg/dal/storage-client/model" + "time" +) + +func InsertTimeStamp(ts time.Time, timeAry []time.Time) []time.Time { + if len(timeAry) == 0 { + return []time.Time{ts} + } + + tmpAry := []time.Time{} + idx := 0 + insert := false + for ; idx < len(timeAry); idx++ { + if !timeAry[idx].Before(ts) && !insert { + tmpAry = append(tmpAry, ts) + insert = true + } + tmpAry = append(tmpAry, timeAry[idx]) + } + + if !insert { + tmpAry = append(tmpAry, ts) + } + return tmpAry +} +func ConvertSloDataTrace2Graph(sdts []model.SloTraceData) eavesmodel.DataFrame { + if len(sdts) == 0 { + return eavesmodel.DataFrame{} + } + var ( + ts []time.Time + needChangeFlag = true + ) + for _, sdt := range sdts { + if sdt.DeletedTime.IsZero() && needChangeFlag { + ts = InsertTimeStamp(time.Now(), ts) + needChangeFlag = false + } + ts = InsertTimeStamp(sdt.CreatedTime, ts) + if sdt.ReadyAt.After(sdt.CreatedTime) { + ts = InsertTimeStamp(sdt.ReadyAt, ts) + } + if sdt.DeletedTime.After(sdt.CreatedTime) { + ts = InsertTimeStamp(sdt.DeletedTime, ts) + } + if sdt.DeleteEndTime.After(sdt.CreatedTime) { + ts = InsertTimeStamp(sdt.DeleteEndTime, ts) + } + } + + s, _ := json.Marshal(sdts) + fmt.Println(string(s)) + b, _ := json.Marshal(ts) + fmt.Println(string(b)) + + fields := []eavesmodel.FieldType{ + {Name: "timestamp", Type: "time"}, + } + var values []interface{} + values = append(values, ts) + for _, sdt := range sdts { + spanAry := make([]string, len(ts)) + + idx := 0 + + // skip time spot before create + for ; idx < len(ts) && ts[idx].Before(sdt.CreatedTime); idx++ { + } + // set the create span + curStat := "Creating" + set := false + if sdt.ReadyAt.After(sdt.CreatedTime) { + for ; idx < len(ts) && ts[idx].Before(sdt.ReadyAt); idx++ { + spanAry[idx] = curStat + } + set = true + curStat = "Running" + } + // set the running span + + if sdt.DeletedTime.IsZero() { + for ; idx < len(ts); idx++ { + spanAry[idx] = curStat + } + } + + if sdt.DeletedTime.After(sdt.CreatedTime) { + for ; idx < len(ts) && ts[idx].Before(sdt.DeletedTime); idx++ { + spanAry[idx] = curStat + } + set = true + curStat = "Deleting" + } + // set the deleting span + if sdt.DeleteEndTime.IsZero() { + for ; idx < len(ts); idx++ { + spanAry[idx] = curStat + } + } + if sdt.DeleteEndTime.After(sdt.CreatedTime) { + for ; idx < len(ts) && ts[idx].Before(sdt.DeleteEndTime); idx++ { + spanAry[idx] = curStat + } + set = true + } + if !set { + for ; idx < len(ts); idx++ { + spanAry[idx] = curStat + } + } + + for ; idx < len(ts); idx++ { + spanAry[idx] = "" + } + spanAry[idx-1] = "" + + values = append(values, spanAry) + fields = append(fields, eavesmodel.FieldType{Name: sdt.PodName, Type: "string"}) + } + + sortMap := make(map[string]int, len(values)) + + for i := 1; i < len(values); i++ { + if strSlice, ok := values[i].([]string); ok { + var runIndex int + for ri, status := range strSlice { + if status == "Creating" { + runIndex = ri + break + } + } + sortMap[fields[i].Name] = runIndex + } + } + + for i := 2; i < len(fields); i++ { + for j := i; j > 0 && sortMap[fields[j].Name] < sortMap[fields[j-1].Name]; j-- { + fields[j], fields[j-1] = fields[j-1], fields[j] + values[j], values[j-1] = values[j-1], values[j] + } + } + + return eavesmodel.DataFrame{Schema: eavesmodel.SchemaType{Fields: fields}, Data: eavesmodel.DataType{Values: values}} +} diff --git a/internal/grafanadi/service/querypodlist.go b/internal/grafanadi/service/querypodlist.go index b1ff3887..4cbbdff6 100644 --- a/internal/grafanadi/service/querypodlist.go +++ b/internal/grafanadi/service/querypodlist.go @@ -32,6 +32,16 @@ func ConvertPodyamls2Table(podyamls []*storagemodel.PodYaml) []*model.QueryPodLi State: state, PodPhase: string(v.Pod.Status.Phase), } + workloadInfo := &model.WorkloadTable{ + ClusterName: v.ClusterName, + Namespace: v.Namespace, + } + if len(v.Pod.OwnerReferences) > 0 { + workloadInfo.Name = v.Pod.OwnerReferences[0].Name + workloadInfo.UID = string(v.Pod.OwnerReferences[0].UID) + workloadInfo.Kind = v.Pod.OwnerReferences[0].Kind + } + t.WorkloadInfo = *workloadInfo tables = append(tables, t) } diff --git a/internal/grafanadi/utils/utils.go b/internal/grafanadi/utils/utils.go index 091a4172..e92ee2a0 100644 --- a/internal/grafanadi/utils/utils.go +++ b/internal/grafanadi/utils/utils.go @@ -43,6 +43,23 @@ func (u *Util) GetUid(podYamls []*model.PodYaml, key string, value *string) { } } +func (u *Util) GetPodYaml(podYamls []*model.PodYaml, key string, value string) ([]*model.PodYaml, error) { + + var err error + switch key { + case "name": + err = u.Storage.QueryPodUIDListByPodName(&podYamls, value) + case "hostname": + err = u.Storage.QueryPodUIDListByHostname(&podYamls, value) + case "podip": + err = u.Storage.QueryPodUIDListByPodIP(&podYamls, value) + case "uid": + err = u.Storage.QueryPodYamlsWithPodUID(&podYamls, value) + } + + return podYamls, err +} + func ServeSLOGrafanaDI(w http.ResponseWriter, r *http.Request, storage data_access.StorageInterface) { switch r.Method { case http.MethodOptions: diff --git a/pkg/dal/storage-client/data_access/ealsticsearch.go b/pkg/dal/storage-client/data_access/ealsticsearch.go index ee5406ae..6e145fce 100644 --- a/pkg/dal/storage-client/data_access/ealsticsearch.go +++ b/pkg/dal/storage-client/data_access/ealsticsearch.go @@ -1534,3 +1534,42 @@ func (s *StorageEsImpl) QueryLunettesLatency(data interface{}, opts ...model.Opt } return nil } + +func (s *StorageEsImpl) QuerySloTraceDataWithOwnerId(data interface{}, ownerid string, opts ...model.OptionFunc) error { + options := &model.Options{ + Limit: 5, + } + for _, do := range opts { + do(options) + } + if len(ownerid) == 0 { + return fmt.Errorf("the params is error, uid is nil") + } + begin := time.Now() + defer func() { + metrics.ObserveQueryMethodDuration("QuerySloTraceData", begin) + }() + _, esTableName, esType, err := utils.GetMetaName(data) + if err != nil { + return err + } + stringQuery := elastic.NewQueryStringQuery(fmt.Sprintf("ExtraProperties.ownerref.uid.Value.keyword: \"%s\"", ownerid)) + query := elastic.NewBoolQuery().Must(stringQuery) + searchResult, err := s.DB.Search().Index(esTableName).Type(esType).Query(query).Size(options.Limit).Do(context.Background()) + if err != nil { + return fmt.Errorf("error%v", err) + } + var hits []json.RawMessage + for _, hit := range searchResult.Hits.Hits { + hits = append(hits, hit.Source) + } + hitsStr, err := json.Marshal(hits) + if err != nil { + return err + } + err = json.Unmarshal(hitsStr, data) + if err != nil { + return err + } + return nil +} diff --git a/pkg/dal/storage-client/data_access/mysql.go b/pkg/dal/storage-client/data_access/mysql.go index f0babe9a..a9d63425 100644 --- a/pkg/dal/storage-client/data_access/mysql.go +++ b/pkg/dal/storage-client/data_access/mysql.go @@ -676,6 +676,10 @@ func (s *StorageSqlImpl) QueryEventWithTimeRange(data interface{}, from, to time func (s *StorageSqlImpl) QueryPodLifePhaseByID(data interface{}, uid string) error { return nil } +func (s *StorageSqlImpl) QuerySloTraceDataWithOwnerId(data interface{}, ownerid string, opts ...model.OptionFunc) error { + return nil +} + func (s *StorageSqlImpl) QueryPodYamlWithParams(data interface{}, debugparams *model.PodParams) error { var resultOB *gorm.DB diff --git a/pkg/dal/storage-client/data_access/storage_interface.go b/pkg/dal/storage-client/data_access/storage_interface.go index 0042c3fa..cb9eb33b 100644 --- a/pkg/dal/storage-client/data_access/storage_interface.go +++ b/pkg/dal/storage-client/data_access/storage_interface.go @@ -43,6 +43,7 @@ type SloTraceDataInterface interface { QuerySloTraceDataWithPodUID(data interface{}, uid string) error QueryDeleteSloWithResult(data interface{}, opts *model.SloOptions) error QueryUpgradeSloWithResult(data interface{}, opts *model.SloOptions) error + QuerySloTraceDataWithOwnerId(data interface{}, ownerid string, opts ...model.OptionFunc) error QueryCreateSloWithResult(data interface{}, opts *model.SloOptions) error } diff --git a/pkg/dal/storage-client/model/options.go b/pkg/dal/storage-client/model/options.go deleted file mode 100644 index eb3421bb..00000000 --- a/pkg/dal/storage-client/model/options.go +++ /dev/null @@ -1,38 +0,0 @@ -package model - -import "time" - -type Options struct { - OrderBy string - Ascending bool - Limit int - From time.Time - To time.Time -} -type OptionFunc func(*Options) - -func WithLimit(limint int) OptionFunc { - return func(o *Options) { - o.Limit = limint - } -} -func WithOrderBy(orderBy string) OptionFunc { - return func(o *Options) { - o.OrderBy = orderBy - } -} -func WithDescending(descending bool) OptionFunc { - return func(o *Options) { - o.Ascending = descending - } -} -func WithFrom(from time.Time) OptionFunc { - return func(o *Options) { - o.From = from - } -} -func WithTo(to time.Time) OptionFunc { - return func(o *Options) { - o.To = to - } -} diff --git a/pkg/dal/storage-client/model/type.go b/pkg/dal/storage-client/model/type.go index 5da3eb83..ae7ec507 100644 --- a/pkg/dal/storage-client/model/type.go +++ b/pkg/dal/storage-client/model/type.go @@ -27,3 +27,38 @@ type PodParams struct { From time.Time To time.Time } + +type Options struct { + OrderBy string + Ascending bool + Limit int + From time.Time + To time.Time +} +type OptionFunc func(*Options) + +func WithLimit(limint int) OptionFunc { + return func(o *Options) { + o.Limit = limint + } +} +func WithOrderBy(orderBy string) OptionFunc { + return func(o *Options) { + o.OrderBy = orderBy + } +} +func WithDescending(descending bool) OptionFunc { + return func(o *Options) { + o.Ascending = descending + } +} +func WithFrom(from time.Time) OptionFunc { + return func(o *Options) { + o.From = from + } +} +func WithTo(to time.Time) OptionFunc { + return func(o *Options) { + o.To = to + } +} diff --git a/pkg/tkp_provider/config.go b/pkg/tkp_provider/config.go new file mode 100644 index 00000000..d5ac9d65 --- /dev/null +++ b/pkg/tkp_provider/config.go @@ -0,0 +1,45 @@ +package tkpReqProvider + +import ( + "encoding/json" + "fmt" + "os" + + "k8s.io/klog" +) + +var tkpRequestConfigMap map[string]string + +func InitTkpReqConfig(cfgFile string) error { + jsonFile, err := os.ReadFile(cfgFile) + if err != nil { + klog.Errorf("Failed to read tkpReqConfig file %s: %v", cfgFile, err) + return err + } + + var dataMap map[string]interface{} + if err := json.Unmarshal(jsonFile, &dataMap); err != nil { + klog.Errorf("Failed to parse tkpReqConfig JSON content from %s: %v", cfgFile, err) + return err + } + + stringMap := make(map[string]string) + for key, value := range dataMap { + if strValue, ok := value.(string); ok { + stringMap[key] = strValue + } else { + stringMap[key] = fmt.Sprintf("%v", value) + } + } + + tkpRequestConfigMap = stringMap + return nil +} + +func GetTkpReqUrl(key string) string { + if tkpRequestConfigMap == nil { + klog.Warningf("tkpReqConfig Configuration map is uninitialized. Returning empty string for key '%s'.", key) + return "" + } + return tkpRequestConfigMap[key] +}