Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Health check addition #30

Merged
merged 4 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 157 additions & 94 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package shuffle

import (
"context"
"net/http"
"encoding/json"
"fmt"
"os"
"io/ioutil"
"strconv"
"time"
"log"
"encoding/json"
"net/http"
"os"
"time"
"bytes"
)

func RunOpsAppHealthCheck() (AppHealth, error) {
appHealth := AppHealth{
Create: false,
Run: false,
Delete: false,
Run: false,
Delete: false,
}

return appHealth, nil
Expand All @@ -28,10 +28,11 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
return
}

log.Printf("cacheDb %#v", project.CacheDb)
// Check cache if health check was run in last 5 minutes
// If yes, return cached result, else run health check
ctx := GetContext(request)
platformHealth := HealthCheck{}
platformHealth := HealthCheck{}
cacheKey := fmt.Sprintf("ops-health-check")
if project.CacheDb {
cache, err := GetCache(ctx, cacheKey)
Expand All @@ -42,20 +43,22 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
if err == nil {
// FIXME: Check if last updated is less than 5 minutes with platformHealth.Edited in unix time
// If yes, return cached result, else run health check
if platformHealth.Updated + 300 < time.Now().Unix() {
marshalledData, err := json.Marshal(platformHealth)
if platformHealth.Updated+300 < time.Now().Unix() {
log.Printf("Platform health returned: %#v", platformHealth)
marshalledData, err := json.Marshal(platformHealth)
if err == nil {
resp.WriteHeader(200)
resp.Write(marshalledData)
return
}
} else {
log.Printf("[ERROR] Failed marshalling cached platform health data: %s", err)
}
}

log.Printf("[ERROR] Failed marshalling cached platform health data: %s", err)
}
}
}

log.Printf("channel making started")

// Use channel for getting RunOpsWorkflow function results
workflowHealthChannel := make(chan WorkflowHealth)
Expand All @@ -79,8 +82,8 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
}()

// Use channel for getting RunOpsWorkflow function results
platformHealth.Apps = <- appHealthChannel
platformHealth.Workflows = <- workflowHealthChannel
platformHealth.Apps = <-appHealthChannel
platformHealth.Workflows = <-workflowHealthChannel

platformHealth.Success = true
platformHealth.Updated = time.Now().Unix()
Expand Down Expand Up @@ -110,10 +113,11 @@ func RunOpsWorkflow() (WorkflowHealth, error) {
ctx := context.Background()

workflowHealth := WorkflowHealth{
Create: false,
Run: false,
Create: false,
Run: false,
RunFinished: false,
Delete: false,
Delete: false,
RunStatus: "",
}

// 1. Get workflow
Expand All @@ -124,75 +128,145 @@ func RunOpsWorkflow() (WorkflowHealth, error) {
return workflowHealth, err
}

workflowHealth.Create = true

workflow := *workflowPtr

// 2. Check if workflow ran in last SHUFFLE_OPS_WORKFLOW_RUN_TIME seconds
opsShuffleRunTime := os.Getenv("SHUFFLE_OPS_WORKFLOW_RUN_TIME")
if len(opsShuffleRunTime) == 0 {
opsShuffleRunTime = "3600"
// 2. Run workflow
id := workflow.ID
orgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG")
_ = id
_ = orgId


workflowJson, _ := json.Marshal(workflow)
baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
if len(baseUrl) == 0 {
baseUrl = "https://shuffler.io"
}

opsShuffleRunTimeInt, err := strconv.Atoi(opsShuffleRunTime)
// prepare the request
url := baseUrl + "/api/v1/workflows/" + id + "/execute"
req, err := http.NewRequest("POST", url, bytes.NewBuffer(workflowJson))
if err != nil {
log.Printf("[ERROR] Failed converting SHUFFLE_OPS_WORKFLOW_RUN_TIME to int: %s", err)
log.Printf("[ERROR] Failed creating HTTP request: %s", err)
return workflowHealth, err
}

// 2.1 Get workflow executions
workflowExecutions, err := GetAllWorkflowExecutions(ctx, opsWorkflowID, 1)
// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY"))

startId := "98713d6a-dd6b-4bd6-a11c-9778b80f2a28"
body := map[string]string{"execution_argument": "", "start": startId}

// 2.2 Check if workflow ran in last SHUFFLE_OPS_WORKFLOW_RUN_TIME seconds
if len(workflowExecutions) != 0 {
// get last workflow execution
lastWorkflowExecution := workflowExecutions[len(workflowExecutions)-1]
// convert the body to JSON
bodyJson, err := json.Marshal(body)
if err != nil {
log.Printf("[ERROR] Failed marshalling body: %s", err)
return workflowHealth, err
}

lastWorkflowExecutionTimeInt := lastWorkflowExecution.StartedAt
// set the body
req.Body = ioutil.NopCloser(bytes.NewBuffer(bodyJson))

// send the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Printf("[ERROR] Failed sending HTTP request: %s", err)
return workflowHealth, err
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed running workflow: %s. The status code is: %d", err, resp.StatusCode)
// print the response body
respBodyErr, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[ERROR] Failed converting last workflow execution time to int: %s", err)
return workflowHealth, err
log.Printf("[ERROR] Failed reading HTTP response body: %s", err)
} else {
log.Printf("[ERROR] Workflow Response: %s", respBodyErr)
}

// check if last workflow execution time is less than SHUFFLE_OPS_WORKFLOW_RUN_TIME seconds
if lastWorkflowExecutionTimeInt+int64(opsShuffleRunTimeInt) > int64(time.Now().Unix()) {
log.Printf("[INFO] Last workflow execution time is less than SHUFFLE_OPS_WORKFLOW_RUN_TIME seconds. Returning last result")
//lastWorkflowExecutionResultsJson, err := json.Marshal(lastWorkflowExecution.Results)
//if err != nil {
// log.Printf("[ERROR] Failed marshalling last workflow execution results: %s", err)
// return workflowHealth, err
//}
return workflowHealth, err
}

return workflowHealth, nil
}
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading HTTP response body: %s", err)
return workflowHealth, err
}

// 3. Run workflow
id := workflow.ID
orgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG")
_ = id
_ = orgId
// Unmarshal the JSON data into a Workflow instance
var execution WorkflowExecution
err = json.Unmarshal(respBody, &execution)

workflowHealth.Run = true

log.Printf("[DEBUG] Execution: %#v", execution)

/*
@0x0eliot: Replace this with an API call to do this.
The point is to test the API, not just the function
*/
/*
execution, _, err := handleExecution(id, workflow, request, orgId)
if err != nil {
log.Printf("[ERROR] Failed running workflow: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Failed running workflow"}`))
return
}
// 3. Check if workflow ran successfully
// ping /api/v1/streams/results/<execution_id> while workflowHealth.RunFinished is false
// if workflowHealth.RunFinished is true, return workflowHealth
for workflowHealth.RunFinished == false {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wtf is this how while loops work in Go? Never used it :P

url := baseUrl + "/api/v1/streams/results"
req, err := http.NewRequest("POST", url, nil)
if err != nil {
log.Printf("[ERROR] Failed creating HTTP request: %s", err)
return workflowHealth, err
}

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY"))

// convert the body to JSON
reqBody := map[string]string{"execution_id": execution.ExecutionId, "authorization": os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")}
reqBodyJson, err := json.Marshal(reqBody)

// set the body
req.Body = ioutil.NopCloser(bytes.NewBuffer(reqBodyJson))

// send the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Printf("[ERROR] Failed sending HTTP request: %s", err)
return workflowHealth, err
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed checking results for the workflow: %s. The status code was: %d", err, resp.StatusCode)
return workflowHealth, err
}

respBody, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading HTTP response body: %s", err)
return workflowHealth, err
}

// Unmarshal the JSON data into a Workflow instance
var executionResults WorkflowExecution
err = json.Unmarshal(respBody, &executionResults)

if err != nil {
log.Printf("[ERROR] Failed unmarshalling JSON data: %s", err)
return workflowHealth, err
}

if executionResults.Status != "EXECUTING" {
workflowHealth.RunFinished = true
workflowHealth.RunStatus = executionResults.Status
}

// JSONify execution.Results
executionResultsJson, _ := json.Marshal(execution.Results)
*/
log.Printf("[DEBUG] Execution Result Status: %#v", executionResults.Status)
}

//resp.WriteHeader(200)
//resp.Write([]byte(fmt.Sprintf(`{"success": true, "result": %s}`, executionResultsJson)))
return workflowHealth, nil
}

Expand Down Expand Up @@ -222,7 +296,7 @@ func InitOpsWorkflow() {
log.Printf("[ERROR] Error in finding user: %s", err)
return
}

if len(user.Id) == 0 && len(user.Username) == 0 {
fmt.Println("[ERROR] Ops dashboard user not found. Not setting up ops workflow")
return
Expand All @@ -233,7 +307,6 @@ func InitOpsWorkflow() {
return
}


// make a GET request to https://shuffler.io/api/v1/workflows/602c7cf5-500e-4bd1-8a97-aa5bc8a554e6
// to get the workflow
workflow, err := GetWorkflow(ctx, "602c7cf5-500e-4bd1-8a97-aa5bc8a554e6")
Expand All @@ -258,21 +331,21 @@ func InitOpsWorkflow() {
return
}

// Send the HTTP request using the default HTTP client
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("[ERROR] sending HTTP request:", err)
return
}
// Send the HTTP request using the default HTTP client
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("[ERROR] sending HTTP request:", err)
return
}

defer resp.Body.Close()

// Read the response body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("[ERROR] reading HTTP response body:", err)
return
}
// Read the response body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("[ERROR] reading HTTP response body:", err)
return
}

// Unmarshal the JSON data into a Workflow instance
var workflowData Workflow
Expand Down Expand Up @@ -303,43 +376,33 @@ func InitOpsWorkflow() {

for actionIndex, _ := range workflowData.Actions {
action := workflowData.Actions[actionIndex]

// capitalise the first letter of the environment
if project.Environment != "cloud" {
action.Environment = "Shuffle"
} else {
action.Environment = "Cloud"
}

if action.Position.X == 206 {
action.Position.X = 206.1
}

action.Position.X = float64(action.Position.X)
action.Position.Y = float64(action.Position.Y)

log.Println(action.Position.X)
log.Println(action.Position.Y)

workflowData.Actions[actionIndex] = action
if ArrayContains(blacklisted, action.Label) {
// dates keep failing in opensearch
// this is a grander issue, but for now, we'll just skip these actions
log.Printf("[WARNING] Skipping action %s", action.Label)
continue
}
}

actions = append(actions, action)
}

workflowData.Actions = actions

workflowData.ExecutingOrg = OrgMini{
Id: opsDashboardOrg.Id,
Name: opsDashboardOrg.Name,
Id: opsDashboardOrg.Id,
Name: opsDashboardOrg.Name,
Users: []UserMini{},
}

workflowData.WorkflowVariables = variables

// Save the workflow
Expand Down
1 change: 1 addition & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3451,6 +3451,7 @@ type WorkflowHealth struct {
Create bool `json:"create"`
Run bool `json:"run"`
RunFinished bool `json:"run_finished"`
RunStatus string `json:"run_status"`
Delete bool `json:"delete"`
}

Expand Down