From 61449cee51c040004e3f24f618974c357846df15 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Thu, 12 Oct 2023 19:48:57 +0530 Subject: [PATCH 1/4] fix: adding stats searching for datastore --- db-connector.go | 21 ++++++++++++++++++--- health.go | 30 ++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/db-connector.go b/db-connector.go index 289c02db..75d2aa1a 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5288,9 +5288,8 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error { return nil } -func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) { +func GetPlatformHealth(ctx context.Context, limit int, beforeTimestamp int, afterTimestamp int) ([]HealthCheckDB, error) { nameKey := "platform_health" - // sort by "updated", and get the first one health := []HealthCheckDB{} @@ -5361,7 +5360,23 @@ func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) } } else { - q := datastore.NewQuery(nameKey).Order("-Updated").Limit(limit) + q := datastore.NewQuery(nameKey).Order("-Updated") + + + if beforeTimestamp != 0 { + // Modify the query to filter for "before" timestamp. + q = q.Filter("Updated <", beforeTimestamp) + } + + if afterTimestamp != 0 { + // Modify the query to filter for "after" timestamp. + q = q.Filter("Updated >", afterTimestamp) + } + + if limit != 0 { + q.Limit(limit) + } + _, err := project.Dbclient.GetAll(ctx, q, &health) if err != nil { diff --git a/health.go b/health.go index 620caff8..627a1981 100644 --- a/health.go +++ b/health.go @@ -13,6 +13,7 @@ import ( "os" "strings" "time" + "strconv" ) type appConfig struct { @@ -476,7 +477,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } } else if force != "true" { // get last health check from database - healths, err := GetPlatformHealth(ctx, 1) + healths, err := GetPlatformHealth(ctx, 0, 0, 1) if len(healths) == 0 { resp.WriteHeader(500) @@ -581,11 +582,32 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { - // for now, the limit is last 100 runs - limit := 100 ctx := GetContext(request) - healthChecks, err := GetPlatformHealth(ctx, limit) + limit := request.URL.Query().Get("limit") + before := request.URL.Query().Get("before") + after := request.URL.Query().Get("after") + + // convert all to int64 + limitInt, err := strconv.Atoi(limit) + if err != nil { + log.Printf("[ERROR] Failed converting limit to int64: %s", err) + limitInt = 0 + } + + beforeInt, err := strconv.Atoi(before) + if err != nil { + log.Printf("[ERROR] Failed converting before to int64: %s", err) + beforeInt = 0 + } + + afterInt, err := strconv.Atoi(after) + if err != nil { + log.Printf("[ERROR] Failed converting after to int64: %s", err) + afterInt = 0 + } + + healthChecks, err := GetPlatformHealth(ctx, afterInt, beforeInt, limitInt) if err != nil { log.Printf("[ERROR] Failed getting platform health from database: %s", err) resp.WriteHeader(500) From e08544775fd49591210af27a166e8afa37d568eb Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Thu, 12 Oct 2023 20:07:53 +0530 Subject: [PATCH 2/4] fix: stat query for cloud --- db-connector.go | 13 ++++++++----- health.go | 2 ++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/db-connector.go b/db-connector.go index 75d2aa1a..0bf9c3c3 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5288,7 +5288,7 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error { return nil } -func GetPlatformHealth(ctx context.Context, limit int, beforeTimestamp int, afterTimestamp int) ([]HealthCheckDB, error) { +func GetPlatformHealth(ctx context.Context, beforeTimestamp int, afterTimestamp int, limit int) ([]HealthCheckDB, error) { nameKey := "platform_health" // sort by "updated", and get the first one health := []HealthCheckDB{} @@ -5360,22 +5360,25 @@ func GetPlatformHealth(ctx context.Context, limit int, beforeTimestamp int, afte } } else { - q := datastore.NewQuery(nameKey).Order("-Updated") + q := datastore.NewQuery(nameKey) if beforeTimestamp != 0 { // Modify the query to filter for "before" timestamp. - q = q.Filter("Updated <", beforeTimestamp) + q = q.Filter("Updated >", beforeTimestamp) } if afterTimestamp != 0 { // Modify the query to filter for "after" timestamp. - q = q.Filter("Updated >", afterTimestamp) + q = q.Filter("Updated <", afterTimestamp) } if limit != 0 { - q.Limit(limit) + log.Printf("Limiting platform health to %d", limit) + q = q.Limit(limit) } + + q = q.Order("-Updated") _, err := project.Dbclient.GetAll(ctx, q, &health) diff --git a/health.go b/health.go index 627a1981..ae1542ee 100644 --- a/health.go +++ b/health.go @@ -584,6 +584,8 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { ctx := GetContext(request) + log.Printf("[DEBUG] request URL query:", request.URL.Query()) + limit := request.URL.Query().Get("limit") before := request.URL.Query().Get("before") after := request.URL.Query().Get("after") From 3f1cf6f9859d00728280ea101ba66c2084a38660 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Thu, 12 Oct 2023 20:33:10 +0530 Subject: [PATCH 3/4] fix: stat searching fixed for both opensearch and cloud --- db-connector.go | 43 ++++++++++++++++++++++++++++++++++++++----- health.go | 38 ++++++++++++++++++-------------------- 2 files changed, 56 insertions(+), 25 deletions(-) diff --git a/db-connector.go b/db-connector.go index 0bf9c3c3..fe4b28ea 100755 --- a/db-connector.go +++ b/db-connector.go @@ -5301,7 +5301,44 @@ func GetPlatformHealth(ctx context.Context, beforeTimestamp int, afterTimestamp "order": "desc", }, }, - "size": limit, + } + + if limit != 0 { + query["size"] = limit + } + + if beforeTimestamp > 0 || afterTimestamp > 0 { + query["query"] = map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{}, + }, + } + } + + if beforeTimestamp > 0 { + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append( + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}), + map[string]interface{}{ + "range": map[string]interface{}{ + "updated": map[string]interface{}{ + "gt": beforeTimestamp, + }, + }, + }, + ) + } + + if afterTimestamp > 0 { + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append( + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}), + map[string]interface{}{ + "range": map[string]interface{}{ + "updated": map[string]interface{}{ + "lt": afterTimestamp, + }, + }, + }, + ) } if err := json.NewEncoder(&buf).Encode(query); err != nil { @@ -5351,10 +5388,6 @@ func GetPlatformHealth(ctx context.Context, beforeTimestamp int, afterTimestamp return health, err } - if len(wrapped.Hits.Hits) == 0 { - return health, errors.New("No healthchecks found") - } - for _, hit := range wrapped.Hits.Hits { health = append(health, hit.Source) } diff --git a/health.go b/health.go index ae1542ee..34afd888 100644 --- a/health.go +++ b/health.go @@ -11,9 +11,9 @@ import ( "log" "net/http" "os" + "strconv" "strings" "time" - "strconv" ) type appConfig struct { @@ -37,7 +37,7 @@ func updateCache(workflowHealth WorkflowHealth) { cacheKey := fmt.Sprintf("ops-health-check") ctx := context.Background() - if project.CacheDb { + if project.CacheDb { platformHealthCheck := HealthCheck{} platformHealthCheck.Updated = time.Now().Unix() platformHealthCheck.Workflows = workflowHealth @@ -48,7 +48,7 @@ func updateCache(workflowHealth WorkflowHealth) { err = SetCache(ctx, cacheKey, platformData, 15) if err != nil { log.Printf("[WARNING] Failed setting cache ops health: %s", err) - } + } } } @@ -144,7 +144,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer " + apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -202,7 +202,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -248,7 +248,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { } req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -306,7 +306,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -364,7 +364,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer " + apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // send the request client = &http.Client{} @@ -460,7 +460,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { userInfo, err := HandleApiAuthentication(resp, request) if err != nil { log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err) - + resp.WriteHeader(401) resp.Write([]byte(`{"success": false}`)) return @@ -512,7 +512,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { workflowHealthChannel := make(chan WorkflowHealth) // appHealthChannel := make(chan AppHealth) go func() { - log.Printf("[DEBUG] Running workflowHealthChannel goroutine") + log.Printf("[DEBUG] Running workflowHealthChannel goroutine") workflowHealth, err := RunOpsWorkflow(apiKey, orgId) if err != nil { log.Printf("[ERROR] Failed workflow health check: %s", err) @@ -629,13 +629,13 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { resp.Write(healthChecksData) } -func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) (error) { +func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) error { baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") if len(baseUrl) == 0 { log.Printf("[DEBUG] Base url not set. Setting to default: for delete") baseUrl = "https://shuffler.io" } - + if project.Environment == "onprem" { log.Printf("[DEBUG] Onprem environment. Setting base url to localhost: for delete") baseUrl = "http://localhost:5001" @@ -698,7 +698,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { Delete: false, RunStatus: "", ExecutionId: "", - WorkflowId: "", + WorkflowId: "", } baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") @@ -755,7 +755,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+ apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // startId := "98713d6a-dd6b-4bd6-a11c-9778b80f2a28" // body := map[string]string{"execution_argument": "", "start": startId} @@ -824,7 +824,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { // set the headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer " + apiKey) + req.Header.Set("Authorization", "Bearer "+apiKey) // convert the body to JSON reqBody := map[string]string{"execution_id": execution.ExecutionId, "authorization": os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")} @@ -888,7 +888,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { } - if len(opsDashboardOrgId) == 0 { log.Printf("[WARNING] Ops dashboard org not set. Not setting up ops workflow") return "", errors.New("Ops dashboard org not set") @@ -1034,7 +1033,7 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { jsonData := `{"name":"demo","description":"demo","blogpost":"","status":"test","default_return_value":"","usecase_ids":[]}` // res, err := http.Post(url, "application/json", bytes.NewBuffer([]byte(jsonData))) - req, err = http.NewRequest("POST", baseUrl + "/api/v1/workflows", bytes.NewBuffer([]byte(jsonData))) + req, err = http.NewRequest("POST", baseUrl+"/api/v1/workflows", bytes.NewBuffer([]byte(jsonData))) log.Printf("[SANITY CHECK] req method is: %s", req.Method) if err != nil { @@ -1101,7 +1100,7 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { // Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true - req, err = http.NewRequest("PUT", baseUrl + "/api/v1/workflows/" + workflowData.ID + "?skip_save=true", nil) + req, err = http.NewRequest("PUT", baseUrl+"/api/v1/workflows/"+workflowData.ID+"?skip_save=true", nil) if err != nil { log.Println("[ERROR] creating HTTP request:", err) return "", errors.New("Error creating HTTP request: " + err.Error()) @@ -1133,7 +1132,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { defer resp.Body.Close() - if resp.StatusCode != 200 { log.Printf("[ERROR] Failed saving ops dashboard workflow: %s. The status code was: %d", err, resp.StatusCode) // print the response body @@ -1148,4 +1146,4 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { log.Printf("[INFO] Ops dashboard workflow saved successfully with ID: %s", workflowData.ID) return workflowData.ID, nil -} \ No newline at end of file +} From d7d8da3839d3782a1529e2bc64098a55a057e8e8 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Thu, 12 Oct 2023 21:34:14 +0530 Subject: [PATCH 4/4] fix: finalising some more things --- health.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/health.go b/health.go index 34afd888..5ec2a8a2 100644 --- a/health.go +++ b/health.go @@ -459,19 +459,15 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { userInfo, err := HandleApiAuthentication(resp, request) if err != nil { - log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err) - - resp.WriteHeader(401) - resp.Write([]byte(`{"success": false}`)) - return + log.Printf("[WARNING] Api authentication failed in handleInfo: %s. Continuing anyways here..", err) } if project.Environment == "onprem" && userInfo.Role != "admin" { resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`)) return - } else if project.Environment == "Cloud" && userInfo.ApiKey != os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") { - resp.WriteHeader(401) + } else if project.Environment == "Cloud" && !(userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") || userInfo.SupportAccess) { + resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`)) return } @@ -811,6 +807,8 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { updateCache(workflowHealth) + timeout := time.After(5 * time.Minute) + // 3. Check if workflow ran successfully // ping /api/v1/streams/results/ while workflowHealth.RunFinished is false // if workflowHealth.RunFinished is true, return workflowHealth @@ -871,6 +869,17 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { updateCache(workflowHealth) log.Printf("[DEBUG] Workflow Health execution Result Status: %#v for executionID: %s", executionResults.Status, workflowHealth.ExecutionId) + + // check if timeout + select { + case <-timeout: + log.Printf("[ERROR] Timeout reached for workflow health check. Returning") + workflowHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK" + return workflowHealth, errors.New("Timeout reached for workflow health check") + default: + // do nothing + } + log.Printf("[DEBUG] Waiting 2 seconds before retrying") time.Sleep(2 * time.Second) }