diff --git a/db-connector.go b/db-connector.go index 289c02db..fe4b28ea 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5288,9 +5288,8 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error { return nil } -func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) { +func GetPlatformHealth(ctx context.Context, beforeTimestamp int, afterTimestamp int, limit int) ([]HealthCheckDB, error) { nameKey := "platform_health" - // sort by "updated", and get the first one health := []HealthCheckDB{} @@ -5302,7 +5301,44 @@ func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) "order": "desc", }, }, - "size": limit, + } + + if limit != 0 { + query["size"] = limit + } + + if beforeTimestamp > 0 || afterTimestamp > 0 { + query["query"] = map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{}, + }, + } + } + + if beforeTimestamp > 0 { + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append( + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}), + map[string]interface{}{ + "range": map[string]interface{}{ + "updated": map[string]interface{}{ + "gt": beforeTimestamp, + }, + }, + }, + ) + } + + if afterTimestamp > 0 { + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append( + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}), + map[string]interface{}{ + "range": map[string]interface{}{ + "updated": map[string]interface{}{ + "lt": afterTimestamp, + }, + }, + }, + ) } if err := json.NewEncoder(&buf).Encode(query); err != nil { @@ -5352,16 +5388,31 @@ func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) return health, err } - if len(wrapped.Hits.Hits) == 0 { - return health, errors.New("No healthchecks found") - } - for _, hit := range wrapped.Hits.Hits { health = append(health, hit.Source) } } else { - q := datastore.NewQuery(nameKey).Order("-Updated").Limit(limit) + q := datastore.NewQuery(nameKey) + + + if beforeTimestamp != 0 { + // Modify the query to filter for "before" timestamp. + q = q.Filter("Updated >", beforeTimestamp) + } + + if afterTimestamp != 0 { + // Modify the query to filter for "after" timestamp. + q = q.Filter("Updated <", afterTimestamp) + } + + if limit != 0 { + log.Printf("Limiting platform health to %d", limit) + q = q.Limit(limit) + } + + q = q.Order("-Updated") + _, err := project.Dbclient.GetAll(ctx, q, &health) if err != nil { diff --git a/health.go b/health.go index 620caff8..5ec2a8a2 100644 --- a/health.go +++ b/health.go @@ -11,6 +11,7 @@ import ( "log" "net/http" "os" + "strconv" "strings" "time" ) @@ -36,7 +37,7 @@ func updateCache(workflowHealth WorkflowHealth) { cacheKey := fmt.Sprintf("ops-health-check") ctx := context.Background() - if project.CacheDb { + if project.CacheDb { platformHealthCheck := HealthCheck{} platformHealthCheck.Updated = time.Now().Unix() platformHealthCheck.Workflows = workflowHealth @@ -47,7 +48,7 @@ func updateCache(workflowHealth WorkflowHealth) { err = SetCache(ctx, cacheKey, platformData, 15) if err != nil { log.Printf("[WARNING] Failed setting cache ops health: %s", err) - } + } } } @@ -143,7 +144,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer " + apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -201,7 +202,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -247,7 +248,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { } req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -305,7 +306,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -363,7 +364,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer " + apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -458,25 +459,21 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { userInfo, err := HandleApiAuthentication(resp, request) if err != nil { - log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err) - - resp.WriteHeader(401) - resp.Write([]byte(`{"success": false}`)) - return + log.Printf("[WARNING] Api authentication failed in handleInfo: %s. Continuing anyways here..", err) } if project.Environment == "onprem" && userInfo.Role != "admin" { resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`)) return - } else if project.Environment == "Cloud" && userInfo.ApiKey != os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") { - resp.WriteHeader(401) + } else if project.Environment == "Cloud" && !(userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") || userInfo.SupportAccess) { + resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`)) return } } else if force != "true" { // get last health check from database - healths, err := GetPlatformHealth(ctx, 1) + healths, err := GetPlatformHealth(ctx, 0, 0, 1) if len(healths) == 0 { resp.WriteHeader(500) @@ -511,7 +508,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { workflowHealthChannel := make(chan WorkflowHealth) // appHealthChannel := make(chan AppHealth) go func() { - log.Printf("[DEBUG] Running workflowHealthChannel goroutine") + log.Printf("[DEBUG] Running workflowHealthChannel goroutine") workflowHealth, err := RunOpsWorkflow(apiKey, orgId) if err != nil { log.Printf("[ERROR] Failed workflow health check: %s", err) @@ -581,11 +578,34 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { - // for now, the limit is last 100 runs - limit := 100 ctx := GetContext(request) - healthChecks, err := GetPlatformHealth(ctx, limit) + log.Printf("[DEBUG] request URL query:", request.URL.Query()) + + limit := request.URL.Query().Get("limit") + before := request.URL.Query().Get("before") + after := request.URL.Query().Get("after") + + // convert all to int64 + limitInt, err := strconv.Atoi(limit) + if err != nil { + log.Printf("[ERROR] Failed converting limit to int64: %s", err) + limitInt = 0 + } + + beforeInt, err := strconv.Atoi(before) + if err != nil { + log.Printf("[ERROR] Failed converting before to int64: %s", err) + beforeInt = 0 + } + + afterInt, err := strconv.Atoi(after) + if err != nil { + log.Printf("[ERROR] Failed converting after to int64: %s", err) + afterInt = 0 + } + + healthChecks, err := GetPlatformHealth(ctx, afterInt, beforeInt, limitInt) if err != nil { log.Printf("[ERROR] Failed getting platform health from database: %s", err) resp.WriteHeader(500) @@ -605,13 +625,13 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { resp.Write(healthChecksData) } -func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) (error) { +func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) error { baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") if len(baseUrl) == 0 { log.Printf("[DEBUG] Base url not set. Setting to default: for delete") baseUrl = "https://shuffler.io" } - + if project.Environment == "onprem" { log.Printf("[DEBUG] Onprem environment. Setting base url to localhost: for delete") baseUrl = "http://localhost:5001" @@ -674,7 +694,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { Delete: false, RunStatus: "", ExecutionId: "", - WorkflowId: "", + WorkflowId: "", } baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") @@ -731,7 +751,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // startId := "98713d6a-dd6b-4bd6-a11c-9778b80f2a28" // body := map[string]string{"execution_argument": "", "start": startId} @@ -787,6 +807,8 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { updateCache(workflowHealth) + timeout := time.After(5 * time.Minute) + // 3. Check if workflow ran successfully // ping /api/v1/streams/results/ while workflowHealth.RunFinished is false // if workflowHealth.RunFinished is true, return workflowHealth @@ -800,7 +822,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer " + apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // convert the body to JSON reqBody := map[string]string{"execution_id": execution.ExecutionId, "authorization": os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")} @@ -847,6 +869,17 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { updateCache(workflowHealth) log.Printf("[DEBUG] Workflow Health execution Result Status: %#v for executionID: %s", executionResults.Status, workflowHealth.ExecutionId) + + // check if timeout + select { + case <-timeout: + log.Printf("[ERROR] Timeout reached for workflow health check. Returning") + workflowHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK" + return workflowHealth, errors.New("Timeout reached for workflow health check") + default: + // do nothing + } + log.Printf("[DEBUG] Waiting 2 seconds before retrying") time.Sleep(2 * time.Second) } @@ -864,7 +897,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { } - if len(opsDashboardOrgId) == 0 { log.Printf("[WARNING] Ops dashboard org not set. Not setting up ops workflow") return "", errors.New("Ops dashboard org not set") @@ -1010,7 +1042,7 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { jsonData := `{"name":"demo","description":"demo","blogpost":"","status":"test","default_return_value":"","usecase_ids":[]}` // res, err := http.Post(url, "application/json", bytes.NewBuffer([]byte(jsonData))) - req, err = http.NewRequest("POST", baseUrl + "/api/v1/workflows", bytes.NewBuffer([]byte(jsonData))) + req, err = http.NewRequest("POST", baseUrl+"/api/v1/workflows", bytes.NewBuffer([]byte(jsonData))) log.Printf("[SANITY CHECK] req method is: %s", req.Method) if err != nil { @@ -1077,7 +1109,7 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { // Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true - req, err = http.NewRequest("PUT", baseUrl + "/api/v1/workflows/" + workflowData.ID + "?skip_save=true", nil) + req, err = http.NewRequest("PUT", baseUrl+"/api/v1/workflows/"+workflowData.ID+"?skip_save=true", nil) if err != nil { log.Println("[ERROR] creating HTTP request:", err) return "", errors.New("Error creating HTTP request: " + err.Error()) @@ -1109,7 +1141,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { defer resp.Body.Close() - if resp.StatusCode != 200 { log.Printf("[ERROR] Failed saving ops dashboard workflow: %s. The status code was: %d", err, resp.StatusCode) // print the response body @@ -1124,4 +1155,4 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { log.Printf("[INFO] Ops dashboard workflow saved successfully with ID: %s", workflowData.ID) return workflowData.ID, nil -} \ No newline at end of file +}