From 6245d44fc1b77abb1d0cef7a76e0861bc0c0a10b Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sun, 3 Sep 2023 20:35:07 +0530 Subject: [PATCH 1/2] fix: adding better error messages --- health.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/health.go b/health.go index 0ff5cf0..dc02e89 100644 --- a/health.go +++ b/health.go @@ -389,20 +389,18 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { //log.Printf("CACHEDATA: %s", cacheData) err = json.Unmarshal(cacheData, &platformHealth) if err == nil { - // FIXME: Check if last updated is less than 5 minutes with platformHealth.Edited in unix time - // If yes, return cached result, else run health check - if platformHealth.Updated+300 < time.Now().Unix() { - log.Printf("Platform health returned: %#v", platformHealth) - marshalledData, err := json.Marshal(platformHealth) - if err == nil { - resp.WriteHeader(200) - resp.Write(marshalledData) - return - } else { - log.Printf("[ERROR] Failed marshalling cached platform health data: %s", err) - } + log.Printf("Platform health returned: %#v", platformHealth) + marshalledData, err := json.Marshal(platformHealth) + if err == nil { + resp.WriteHeader(200) + resp.Write(marshalledData) + return + } else { + log.Printf("[ERROR] Failed marshalling cached platform health data: %s", err) } } + } else { + log.Printf("[WARNING] Failed getting cache ops health on first time: %s", err) } } From 0e721cc144a1af7f6b11ede422147c94a5c62b80 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Sun, 3 Sep 2023 21:48:30 +0530 Subject: [PATCH 2/2] fix: fixing cache issues --- health.go | 116 ++++++++++++++++++++++-------------------------------- 1 file changed, 47 insertions(+), 69 deletions(-) diff --git a/health.go b/health.go index dc02e89..c750a65 100644 --- a/health.go +++ b/health.go @@ -3,6 +3,7 @@ package shuffle import ( "bytes" "context" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -10,9 +11,9 @@ import ( "log" "net/http" "os" - "time" - "encoding/base64" "strings" + "time" + "github.com/google/uuid" ) @@ -23,14 +24,33 @@ type appConfig struct { } type genericResp struct { - Success bool `json:"success"` - ID string `json:"id"` + Success bool `json:"success"` + ID string `json:"id"` } type executionResult struct { - Success bool `json:"success"` - Result string `json:"result"` - ID string `json:"id"` + Success bool `json:"success"` + Result string `json:"result"` + ID string `json:"id"` +} + +func updateCache(workflowHealth WorkflowHealth) { + cacheKey := fmt.Sprintf("ops-health-check") + ctx := context.Background() + + if project.CacheDb { + platformHealthCheck := HealthCheck{} + platformHealthCheck.Updated = time.Now().Unix() + platformHealthCheck.Workflows = workflowHealth + + platformData, err := json.Marshal(platformHealthCheck) + + // Set cache + err = SetCache(ctx, cacheKey, platformData, 15) + if err != nil { + log.Printf("[WARNING] Failed setting cache ops health: %s", err) + } + } } func base64StringToString(base64String string) (string, error) { @@ -44,20 +64,20 @@ func base64StringToString(base64String string) (string, error) { func RunOpsAppHealthCheck() (AppHealth, error) { log.Printf("[DEBUG] Running app health check") appHealth := AppHealth{ - Create: false, - Run: false, - Delete: false, - Read: false, - Validate: false, - AppId: "", - Result: "", + Create: false, + Run: false, + Delete: false, + Read: false, + Validate: false, + AppId: "", + Result: "", ExecutionID: "", } // 1. Get App baseURL := os.Getenv("SHUFFLE_CLOUDRUN_URL") // if len(baseURL) == 0 { - baseURL = "https://shuffler.io" + baseURL = "https://shuffler.io" // } url := baseURL + "/api/v1/apps/edaa73d40238ee60874a853dc3ccaa6f/config" @@ -247,7 +267,6 @@ func RunOpsAppHealthCheck() (AppHealth, error) { return appHealth, err } - appHealth.Create = true id = validatedResp.ID @@ -268,8 +287,8 @@ func RunOpsAppHealthCheck() (AppHealth, error) { Value: os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY"), }, { - Name: "url", - Value: baseURL, + Name: "url", + Value: baseURL, Configuration: true, }, } @@ -313,9 +332,8 @@ func RunOpsAppHealthCheck() (AppHealth, error) { return appHealth, err } - // Unmarshal the JSON data into a Workflow instance - var runResponse executionResult + var runResponse executionResult err = json.Unmarshal(respBody, &runResponse) @@ -389,6 +407,8 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { //log.Printf("CACHEDATA: %s", cacheData) err = json.Unmarshal(cacheData, &platformHealth) if err == nil { + // FIXME: Check if last updated is less than 5 minutes with platformHealth.Edited in unix time + // If yes, return cached result, else run health check log.Printf("Platform health returned: %#v", platformHealth) marshalledData, err := json.Marshal(platformHealth) if err == nil { @@ -400,7 +420,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } } } else { - log.Printf("[WARNING] Failed getting cache ops health on first time: %s", err) + log.Print("[WARNING] Failed getting cache ops health on first try: %s", err) } } @@ -443,10 +463,10 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } if project.CacheDb { - // Set cache + // Set cache err = SetCache(ctx, cacheKey, platformData, 15) if err != nil { - log.Printf("[WARNING] Failed setting cache ops health: %s", err) + log.Printf("[WARNING] Failed setting cache ops health at last: %s", err) } } @@ -478,10 +498,11 @@ func RunOpsWorkflow() (WorkflowHealth, error) { if err != nil { log.Printf("[ERROR] Failed getting Health check workflow: %s", err) log.Printf("[DEBUG] Creating health check workflow") - // Create workflow + return workflowHealth, err } workflowHealth.Create = true + updateCache(workflowHealth) workflow := *workflowPtr @@ -561,36 +582,7 @@ func RunOpsWorkflow() (WorkflowHealth, error) { workflowHealth.Run = true workflowHealth.ExecutionId = execution.ExecutionId - var platformHealthCheck HealthCheck - cacheKey := fmt.Sprintf("ops-health-check") - if project.CacheDb { - // Get cache - cache, err := GetCache(ctx, cacheKey) - if err == nil { - cacheData := []byte(cache.([]uint8)) - //log.Printf("CACHEDATA: %s", cacheData) - err = json.Unmarshal(cacheData, &platformHealthCheck) - if err != nil { - log.Printf("[ERROR] Failed unmarshalling cached platform health data: %s", err) - } - platformHealthCheck.Workflows = workflowHealth - platformHealthCheck.Updated = time.Now().Unix() - - platformData, err := json.Marshal(platformHealthCheck) - if err != nil { - log.Printf("[ERROR] Failed marshalling platform health data: %s", err) - } - - // Set cache - err = SetCache(ctx, cacheKey, platformData, 15) - if err != nil { - log.Printf("[WARNING] Failed setting cache ops health: %s", err) - } - - } else { - log.Printf("[WARNING] Failed setting cache ops health: %s", err) - } - } + updateCache(workflowHealth) // 3. Check if workflow ran successfully // ping /api/v1/streams/results/ while workflowHealth.RunFinished is false @@ -649,21 +641,7 @@ func RunOpsWorkflow() (WorkflowHealth, error) { workflowHealth.RunStatus = executionResults.Status } - if project.CacheDb { - platformHealthCheck.Workflows = workflowHealth - platformHealthCheck.Updated = time.Now().Unix() - - platformData, err := json.Marshal(platformHealthCheck) - if err != nil { - log.Printf("[ERROR] Failed marshalling platform health data: %s", err) - } - - // Set cache - err = SetCache(ctx, cacheKey, platformData, 15) - if err != nil { - log.Printf("[WARNING] Failed setting cache ops health: %s", err) - } - } + updateCache(workflowHealth) log.Printf("[DEBUG] Workflow Health execution Result Status: %#v", executionResults.Status) }