Skip to content

Commit

Permalink
Merge pull request #39 from 0x0elliot/0x0elliot/bugs-fix-ops
Browse files Browse the repository at this point in the history
Ops bugs fix
  • Loading branch information
0x0elliot authored Oct 12, 2023
2 parents 8ee5738 + 43bbc46 commit c19f79e
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 36 deletions.
228 changes: 192 additions & 36 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"os"
"strings"
"time"

"github.com/google/uuid"
)

type appConfig struct {
Expand Down Expand Up @@ -480,6 +478,12 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
// get last health check from database
healths, err := GetPlatformHealth(ctx, 1)

if len(healths) == 0 {
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false, "reason": "Health check has never been run before! Nothing to display!"}`))
return
}

health := healths[0]

if err == nil {
Expand Down Expand Up @@ -507,13 +511,15 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
workflowHealthChannel := make(chan WorkflowHealth)
// appHealthChannel := make(chan AppHealth)
go func() {
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
workflowHealth, err := RunOpsWorkflow(apiKey, orgId)
if err != nil {
log.Printf("[ERROR] Failed workflow health check: %s", err)
}

if workflowHealth.Create == true {
log.Printf("[DEBUG] Deleting created ops workflow")
err = deleteWorkflow(workflowHealth, apiKey)
err = deleteOpsWorkflow(workflowHealth, apiKey)
if err != nil {
log.Printf("[ERROR] Failed deleting workflow: %s", err)
} else {
Expand All @@ -539,7 +545,11 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
// platformHealth.Apps = <-appHealthChannel
platformHealth.Workflows = <-workflowHealthChannel

platformHealth.Success = true
if platformHealth.Workflows.Create == true && platformHealth.Workflows.Delete == true && platformHealth.Workflows.Run == true && platformHealth.Workflows.RunFinished == true {
log.Printf("[DEBUG] Platform health check successful! All necessary values are true.")
platformHealth.Success = true
}

platformHealth.Updated = time.Now().Unix()

var HealthCheck HealthCheckDB
Expand Down Expand Up @@ -573,8 +583,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) {
// for now, the limit is last 100 runs
limit := 100

healthChecks := []HealthCheckDB{}
ctx := GetContext(request)

healthChecks, err := GetPlatformHealth(ctx, limit)
Expand All @@ -583,7 +591,7 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) {
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false, "reason": "Failed getting platform health from database."}`))
return
}
}

healthChecksData, err := json.Marshal(healthChecks)
if err != nil {
Expand All @@ -597,19 +605,27 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) {
resp.Write(healthChecksData)
}

func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (error) {
func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) (error) {
baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
if len(baseUrl) == 0 {
log.Printf("[DEBUG] Base url not set. Setting to default: for delete")
baseUrl = "https://shuffler.io"
}

if project.Environment == "onprem" {
log.Printf("[DEBUG] Onprem environment. Setting base url to localhost: for delete")
baseUrl = "http://localhost:5001"
}

id := workflowHealth.ExecutionId
if workflowHealth.Create == false || len(workflowHealth.WorkflowId) == 0 {
log.Printf("[DEBUG] Seems like workflow wasn't created properly, and then delete workflow was called.")
log.Printf("[DEBUG] Returning without deleting workflow. WorkflowHealth: %#v", workflowHealth)
return errors.New("Workflow wasn't created properly")
}

id := workflowHealth.WorkflowId

// 4. Delete workflow
// 4. Delete workflow
// make a DELETE request to https://shuffler.io/api/v1/workflows/<workflow_id>
url := baseUrl + "/api/v1/workflows/" + id
log.Printf("[DEBUG] Deleting workflow with id: %s", id)
Expand All @@ -633,8 +649,13 @@ func deleteWorkflow(workflowHealth WorkflowHealth , apiKey string) (error) {
}

if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed deleting the health check workflow: %s. The status code was: %d", err, resp.StatusCode)
return err
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading HTTP response body: %s", err)
} else {
log.Printf("[ERROR] Failed deleting the health check workflow. The status code was: %d and body was: %s", resp.StatusCode, body)
}
return errors.New("Failed deleting the health check workflow")
}

defer resp.Body.Close()
Expand All @@ -653,14 +674,17 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
Delete: false,
RunStatus: "",
ExecutionId: "",
WorkflowId: "",
}

baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
if len(baseUrl) == 0 {
log.Printf("[DEBUG] Base url not set. Setting to default")
baseUrl = "https://shuffler.io"
}

if project.Environment == "onprem" {
log.Printf("[DEBUG] Onprem environment. Setting base url to localhost")
baseUrl = "http://localhost:5001"
}

Expand All @@ -671,6 +695,11 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
return workflowHealth, err
}

if len(opsWorkflowID) == 0 {
log.Printf("[ERROR] Failed creating Health check workflow. Exiting..")
return workflowHealth, err
}

workflowPtr, err := GetWorkflow(ctx, opsWorkflowID)
if err != nil {
log.Printf("[ERROR] Failed getting Health check workflow: %s", err)
Expand All @@ -679,6 +708,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
}

workflowHealth.Create = true
workflowHealth.WorkflowId = opsWorkflowID
updateCache(workflowHealth)

workflow := *workflowPtr
Expand Down Expand Up @@ -736,6 +766,9 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
log.Printf("[ERROR] Health check running Workflow Response: %s", respBodyErr)
}

log.Printf("[DEBUG] Setting workflowHealth.Create = false")
workflowHealth.Create = false

return workflowHealth, err
}

Expand Down Expand Up @@ -872,8 +905,9 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
return "", errors.New("Error creating HTTP request: " + err.Error())
}

// Send the HTTP request using the default HTTP client
resp, err := http.DefaultClient.Do(req)
// send the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("[ERROR] sending Ops fetch app HTTP request:", err)
return "", errors.New("Error sending HTTP request: " + err.Error())
Expand All @@ -888,6 +922,8 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
return "", errors.New("Error reading HTTP App response response body: " + err.Error())
}

log.Printf("[DEBUG] Successfully fetched workflow! Now creating a copy workflow for ops dashboard")

// Unmarshal the JSON data into a Workflow instance
var workflowData Workflow
err = json.Unmarshal(body, &workflowData)
Expand All @@ -906,21 +942,34 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
}

// fix workflow org
workflowData.OrgId = opsDashboardOrg.Id
workflowData.Owner = user.Id
workflowData.Public = false
workflowData.Status = ""
workflowData.Name = "Ops Dashboard Workflow"
workflowData.Hidden = true

miniOrg := OrgMini{
Id: opsDashboardOrg.Id,
Name: opsDashboardOrg.Name,
Users: []UserMini{},
}

workflowData.Org = []OrgMini{}
workflowData.Org = append(workflowData.Org, miniOrg)

var actions []Action
// var blacklisted = []string{"Date_to_epoch", "input_data", "Compare_timestamps", "Get_current_timestamp"}

for actionIndex, _ := range workflowData.Actions {
action := workflowData.Actions[actionIndex]

if action.Environment != "Shuffle" {
action.Environment = "Shuffle"
if project.Environment == "onprem" {
if action.Environment != "Shuffle" {
action.Environment = "Shuffle"
}
} else {
if action.Environment != "Cloud" {
action.Environment = "Cloud"
}
}

workflowData.Actions[actionIndex] = action
Expand All @@ -936,34 +985,141 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {

workflowData.Actions = actions

workflowData.ExecutingOrg = OrgMini{
Id: opsDashboardOrg.Id,
Name: opsDashboardOrg.Name,
Users: []UserMini{},
// // Save the workflow
// err = SetWorkflow(ctx, workflowData, workflowData.ID)

// if err != nil {
// log.Println("[ERROR] saving ops dashboard workflow:", err)
// return "", errors.New("Error saving ops dashboard workflow: " + err.Error())
// }

// create an empty workflow
// make a POST request to https://shuffler.io/api/v1/workflows
baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
if len(baseUrl) == 0 {
log.Printf("[DEBUG] Base url not set. Setting to default")
baseUrl = "https://shuffler.io"
}

workflowData.WorkflowVariables = variables
if project.Environment == "onprem" {
log.Printf("[DEBUG] Onprem environment. Setting base url to localhost")
baseUrl = "http://localhost:5001"
}

// {"name":"demo","description":"demo","blogpost":"","status":"test","default_return_value":"","usecase_ids":[]}
jsonData := `{"name":"demo","description":"demo","blogpost":"","status":"test","default_return_value":"","usecase_ids":[]}`

// res, err := http.Post(url, "application/json", bytes.NewBuffer([]byte(jsonData)))
req, err = http.NewRequest("POST", baseUrl + "/api/v1/workflows", bytes.NewBuffer([]byte(jsonData)))
log.Printf("[SANITY CHECK] req method is: %s", req.Method)

if err != nil {
log.Println("[ERROR] creating HTTP request:", err)
return "", errors.New("Error creating HTTP request: " + err.Error())
}

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+apiKey)
req.Header.Set("Org-Id", opsDashboardOrgId)

// send the request
client = &http.Client{}
resp, err = client.Do(req)
if err != nil {
log.Println("[ERROR] sending Ops create workflow HTTP request:", err)
return "", errors.New("Error sending HTTP request: " + err.Error())
}

uniqueCheck := false
for uniqueCheck == false {
// generate a random UUID for the workflow
randomUUID := uuid.New().String()
log.Printf("[DEBUG] Random UUID generated for Ops dashboard: %s", randomUUID)
if err != nil {
log.Println("[ERROR] sending Ops create workflow HTTP request:", err)
return "", errors.New("Error sending HTTP request: " + err.Error())
}

// check if workflow with id randomUUID exists
_, err = GetWorkflow(ctx, randomUUID)
if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed creating ops dashboard workflow: %s. The status code was: %d", err, resp.StatusCode)
// print the response body
respBodyErr, err := ioutil.ReadAll(resp.Body)
if err != nil {
uniqueCheck = true
workflowData.ID = randomUUID
log.Printf("[ERROR] Failed reading HTTP response body: %s", err)
} else {
log.Printf("[ERROR] Ops dashboard creating Workflow Response: %s", respBodyErr)
}
return "", errors.New("Failed creating ops dashboard workflow")
}

defer resp.Body.Close()

// Read the response body
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("[ERROR] reading HTTP response body:", err)
return "", errors.New("Error reading HTTP response response body: " + err.Error())
}

log.Printf("[DEBUG] body is: %s", body)

var tmpworkflow Workflow

// Unmarshal the JSON data into a Workflow instance
err = json.Unmarshal(body, &tmpworkflow)

if err != nil {
log.Println("[ERROR] unmarshalling Ops workflowData JSON data:", err)
return "", errors.New("Error unmarshalling JSON data: " + err.Error())
}

workflowData.ID = tmpworkflow.ID
workflowData.Org = tmpworkflow.Org
workflowData.OrgId = tmpworkflow.OrgId
workflowData.Owner = tmpworkflow.Owner
workflowData.ExecutingOrg = tmpworkflow.ExecutingOrg

// Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true

req, err = http.NewRequest("PUT", baseUrl + "/api/v1/workflows/" + workflowData.ID + "?skip_save=true", nil)
if err != nil {
log.Println("[ERROR] creating HTTP request:", err)
return "", errors.New("Error creating HTTP request: " + err.Error())
}

// Save the workflow
err = SetWorkflow(ctx, workflowData, workflowData.ID)
// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+apiKey)

// convert the body to JSON
workflowDataJSON, err := json.Marshal(workflowData)
if err != nil {
log.Println("[ERROR] saving ops dashboard workflow:", err)
return "", errors.New("Error saving ops dashboard workflow: " + err.Error())
log.Printf("[ERROR] Failed marshalling workflow data: %s", err)
return "", err
}

log.Printf("[DEBUG] Sending workflow JSON data: %s", workflowDataJSON)

// set the body
req.Body = ioutil.NopCloser(bytes.NewBuffer(workflowDataJSON))

// send the request
client = &http.Client{}
resp, err = client.Do(req)
if err != nil {
log.Printf("[ERROR] Failed sending HTTP request: %s", err)
return "", err
}

defer resp.Body.Close()


if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed saving ops dashboard workflow: %s. The status code was: %d", err, resp.StatusCode)
// print the response body
respBodyErr, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading HTTP response body: %s", err)
} else {
log.Printf("[ERROR] Ops dashboard saving Workflow Response: %s", respBodyErr)
}
return "", errors.New("Failed saving ops dashboard workflow")
}

log.Printf("[INFO] Ops dashboard workflow saved successfully with ID: %s", workflowData.ID)
Expand Down
1 change: 1 addition & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3496,6 +3496,7 @@ type WorkflowHealth struct {
RunStatus string `json:"run_status"`
Delete bool `json:"delete"`
ExecutionId string `json:"execution_id"`
WorkflowId string `json:"workflow_id"`
}

type HealthCheck struct {
Expand Down

0 comments on commit c19f79e

Please sign in to comment.