From 9c44a81a678391ceb28069ec25eef05f9a98ec8f Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:22:24 +0530 Subject: [PATCH 1/8] fix: auto fix workflow mappings for first set ups --- health.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 15 deletions(-) diff --git a/health.go b/health.go index 50d58015..7b8019e6 100644 --- a/health.go +++ b/health.go @@ -690,6 +690,74 @@ func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) error { return nil } +func fixOpensearch() error { + // Define the index mapping + mapping := `{ + "properties": { + "workflow": { + "properties": { + "actions": { + "properties": { + "parameters": { + "properties": { + "value": { + "type": "text" + }, + "example": { + "type": "text" + }, + } + } + } + } + } + } + } + }` + + // Get the username and password from environment variables + username := os.Getenv("OPENSEARCH_USERNAME") + password := os.Getenv("OPENSEARCH_PASSWORD") + + // Create a new request + req, err := http.NewRequest("PUT", "http://shuffle-opensearch:9200/workflowexecution/_mapping", bytes.NewBufferString(mapping)) + if err != nil { + log.Fatalf("Error creating the request: %s", err) + } + + // Set the request headers + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(username, password) + + // Create a new HTTP client + client := &http.Client{} + +// Send the request in a loop until a 200 status code is received + res, err := client.Do(req) + if err != nil { + log.Printf("Error sending the request while fixing execution body: %s", err) + return err + } + + // Read the response body + body, err := ioutil.ReadAll(res.Body) + if err != nil { + log.Printf("Error reading the response body while fixing execution body: %s", err) + return err + } + res.Body.Close() + + if res.StatusCode == 200 { + log.Printf("Index created successfully: %s. Opensearch mappings should be fixed.", body) + return nil + } else { + log.Printf("Failed to create index, retrying: %s", body) + return errors.New("Failed index mapping") + } + + return nil +} + func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { // run workflow with id 602c7cf5-500e-4bd1-8a97-aa5bc8a554e6 ctx := context.Background() @@ -740,7 +808,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { workflow := *workflowPtr - log.Printf("[DEBUG] Running health check workflow") + log.Printf("[DEBUG] Running health check workflow. workflowHealth till now: %#v", workflowHealth) // 2. Run workflow id := workflow.ID @@ -784,7 +852,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { defer resp.Body.Close() if resp.StatusCode != 200 { - log.Printf("[ERROR] Failed running health check workflow: %s. The status code is: %d", err, resp.StatusCode) + log.Printf("[ERROR] Failed running health check workflow: %s. The status code is: %d", id, resp.StatusCode) // print the response body respBodyErr, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -792,9 +860,15 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) { } else { log.Printf("[ERROR] Health check running Workflow Response: %s", respBodyErr) } - - log.Printf("[DEBUG] Setting workflowHealth.Create = false") - workflowHealth.Create = false + if project.Environment == "onprem" { + log.Printf("Trying to fix opensearch mappings") + err = fixOpensearch() + if err != nil { + log.Printf("[ERROR] Failed fixing opensearch mappings: %s", err) + } else { + log.Printf("[DEBUG] Fixed opensearch mappings successfully! Maybe try ops dashboard again?") + } + } return workflowHealth, err } @@ -1012,12 +1086,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { } workflowData.Actions[actionIndex] = action - // if ArrayContains(blacklisted, action.Label) { - // // dates keep failing in opensearch - // // this is a grander issue, but for now, we'll just skip these actions - // log.Printf("[WARNING] Skipping action %s", action.Label) - // continue - // } actions = append(actions, action) } @@ -1096,8 +1164,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { return "", errors.New("Error reading HTTP response response body: " + err.Error()) } - log.Printf("[DEBUG] body is: %s", body) - var tmpworkflow Workflow // Unmarshal the JSON data into a Workflow instance @@ -1133,8 +1199,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) { return "", err } - log.Printf("[DEBUG] Sending workflow JSON data: %s", workflowDataJSON) - // set the body req.Body = ioutil.NopCloser(bytes.NewBuffer(workflowDataJSON)) From 031647bab76508f3d594a25a51e4eb64c06f6e3b Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:23:57 +0530 Subject: [PATCH 2/8] fix: making opensearch url get auto picked up --- health.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/health.go b/health.go index 7b8019e6..696ad699 100644 --- a/health.go +++ b/health.go @@ -718,9 +718,11 @@ func fixOpensearch() error { // Get the username and password from environment variables username := os.Getenv("OPENSEARCH_USERNAME") password := os.Getenv("OPENSEARCH_PASSWORD") + opensearchUrl := os.Getenv("OPENSEARCH_URL") + apiUrl := opensearchUrl + "/workflowexecution/_mapping" // Create a new request - req, err := http.NewRequest("PUT", "http://shuffle-opensearch:9200/workflowexecution/_mapping", bytes.NewBufferString(mapping)) + req, err := http.NewRequest("PUT", apiUrl, bytes.NewBufferString(mapping)) if err != nil { log.Fatalf("Error creating the request: %s", err) } From 49b67f5ac197f9e3e6bb7f60f3fa719529905dd5 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:25:01 +0530 Subject: [PATCH 3/8] fix: better stat logs --- health.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/health.go b/health.go index 696ad699..de63b812 100644 --- a/health.go +++ b/health.go @@ -596,19 +596,19 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { // convert all to int64 limitInt, err := strconv.Atoi(limit) if err != nil { - log.Printf("[ERROR] Failed converting limit to int64: %s", err) + log.Printf("[ERROR] Failed converting limit to int: %s", err) limitInt = 0 } beforeInt, err := strconv.Atoi(before) if err != nil { - log.Printf("[ERROR] Failed converting before to int64: %s", err) + log.Printf("[ERROR] Failed converting before to int: %s", err) beforeInt = 0 } afterInt, err := strconv.Atoi(after) if err != nil { - log.Printf("[ERROR] Failed converting after to int64: %s", err) + log.Printf("[ERROR] Failed converting after to int: %s", err) afterInt = 0 } From 75ce9617dfdb8982b70a20c63bdcddbbf1f41266 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:30:12 +0530 Subject: [PATCH 4/8] fix: empty stats on first run for opensource version --- health.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/health.go b/health.go index de63b812..54b6e1f9 100644 --- a/health.go +++ b/health.go @@ -488,6 +488,13 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } + if err == "Bad statuscode: 404" && project.Environment == "onprem" { + log.Printf("[WARNING] Failed getting platform health from database: %s. Probably because no workflowexecutions have been done",err) + resp.WriteHeader(200) + resp.Write([]byte(`[]`)) + return + } + health := healths[0] if err == nil { From 6f668cea407b47226138110db1500b4e9f326f11 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:31:02 +0530 Subject: [PATCH 5/8] fix: minor bug --- health.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/health.go b/health.go index 54b6e1f9..462eaa89 100644 --- a/health.go +++ b/health.go @@ -488,7 +488,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } - if err == "Bad statuscode: 404" && project.Environment == "onprem" { + if err.error() == "Bad statuscode: 404" && project.Environment == "onprem" { log.Printf("[WARNING] Failed getting platform health from database: %s. Probably because no workflowexecutions have been done",err) resp.WriteHeader(200) resp.Write([]byte(`[]`)) From 85c0293800781bb12611d6ba043cc5e0be4bdd8c Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:31:35 +0530 Subject: [PATCH 6/8] fix: minor bug --- health.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/health.go b/health.go index 462eaa89..c1e89e9b 100644 --- a/health.go +++ b/health.go @@ -488,7 +488,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } - if err.error() == "Bad statuscode: 404" && project.Environment == "onprem" { + if err.Error() == "Bad statuscode: 404" && project.Environment == "onprem" { log.Printf("[WARNING] Failed getting platform health from database: %s. Probably because no workflowexecutions have been done",err) resp.WriteHeader(200) resp.Write([]byte(`[]`)) From b405bbb1baf8783798c8e627cd03bcec8ac0c32a Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:35:04 +0530 Subject: [PATCH 7/8] fix: minor bug --- health.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/health.go b/health.go index c1e89e9b..21eacbb8 100644 --- a/health.go +++ b/health.go @@ -488,13 +488,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { return } - if err.Error() == "Bad statuscode: 404" && project.Environment == "onprem" { - log.Printf("[WARNING] Failed getting platform health from database: %s. Probably because no workflowexecutions have been done",err) - resp.WriteHeader(200) - resp.Write([]byte(`[]`)) - return - } - health := healths[0] if err == nil { @@ -620,6 +613,14 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) { } healthChecks, err := GetPlatformHealth(ctx, afterInt, beforeInt, limitInt) + + if strings.Contains(err.Error(), "Bad statuscode: 404") && project.Environment == "onprem" { + log.Printf("[WARNING] Failed getting platform health from database: %s. Probably because no workflowexecutions have been done",err) + resp.WriteHeader(200) + resp.Write([]byte(`[]`)) + return + } + if err != nil { log.Printf("[ERROR] Failed getting platform health from database: %s", err) resp.WriteHeader(500) From f5b64850af841a5f8bf573f5b1cf81cfcdc873cd Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sat, 14 Oct 2023 15:49:12 +0530 Subject: [PATCH 8/8] fix: fixed #43 --- health.go | 53 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/health.go b/health.go index 21eacbb8..964b72f4 100644 --- a/health.go +++ b/health.go @@ -702,33 +702,50 @@ func fixOpensearch() error { // Define the index mapping mapping := `{ "properties": { - "workflow": { + "workflow": { + "properties": { + "actions": { "properties": { - "actions": { - "properties": { - "parameters": { - "properties": { - "value": { - "type": "text" - }, - "example": { - "type": "text" - }, - } - } - } + "parameters": { + "properties": { + "value": { + "type": "text" + }, + "example": { + "type": "text" + } } + } } + } } + } } - }` + }` // Get the username and password from environment variables - username := os.Getenv("OPENSEARCH_USERNAME") - password := os.Getenv("OPENSEARCH_PASSWORD") - opensearchUrl := os.Getenv("OPENSEARCH_URL") + username := os.Getenv("SHUFFLE_OPENSEARCH_USERNAME") + if len(username) == 0 { + log.Printf("[DEBUG] Opensearch username not set. Setting to default") + username = "admin" + } + + password := os.Getenv("SHUFFLE_OPENSEARCH_PASSWORD") + if len(password) == 0 { + log.Printf("[DEBUG] Opensearch password not set. Setting to default") + password = "admin" + } + + opensearchUrl := os.Getenv("SHUFFLE_OPENSEARCH_URL") + if len(opensearchUrl) == 0 { + log.Printf("[DEBUG] Opensearch url not set. Setting to default") + opensearchUrl = "http://localhost:9200" + } + apiUrl := opensearchUrl + "/workflowexecution/_mapping" + log.Printf("[DEBUG] apiurl for fixing opensearch: %s", apiUrl) + // Create a new request req, err := http.NewRequest("PUT", apiUrl, bytes.NewBufferString(mapping)) if err != nil {