Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Oct 12, 2023
2 parents d964620 + 3b1a53a commit 3a7b309
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 37 deletions.
67 changes: 59 additions & 8 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5288,9 +5288,8 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error {
return nil
}

func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) {
func GetPlatformHealth(ctx context.Context, beforeTimestamp int, afterTimestamp int, limit int) ([]HealthCheckDB, error) {
nameKey := "platform_health"

// sort by "updated", and get the first one
health := []HealthCheckDB{}

Expand All @@ -5302,7 +5301,44 @@ func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error)
"order": "desc",
},
},
"size": limit,
}

if limit != 0 {
query["size"] = limit
}

if beforeTimestamp > 0 || afterTimestamp > 0 {
query["query"] = map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{},
},
}
}

if beforeTimestamp > 0 {
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}),
map[string]interface{}{
"range": map[string]interface{}{
"updated": map[string]interface{}{
"gt": beforeTimestamp,
},
},
},
)
}

if afterTimestamp > 0 {
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}),
map[string]interface{}{
"range": map[string]interface{}{
"updated": map[string]interface{}{
"lt": afterTimestamp,
},
},
},
)
}

if err := json.NewEncoder(&buf).Encode(query); err != nil {
Expand Down Expand Up @@ -5352,16 +5388,31 @@ func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error)
return health, err
}

if len(wrapped.Hits.Hits) == 0 {
return health, errors.New("No healthchecks found")
}

for _, hit := range wrapped.Hits.Hits {
health = append(health, hit.Source)
}

} else {
q := datastore.NewQuery(nameKey).Order("-Updated").Limit(limit)
q := datastore.NewQuery(nameKey)


if beforeTimestamp != 0 {
// Modify the query to filter for "before" timestamp.
q = q.Filter("Updated >", beforeTimestamp)
}

if afterTimestamp != 0 {
// Modify the query to filter for "after" timestamp.
q = q.Filter("Updated <", afterTimestamp)
}

if limit != 0 {
log.Printf("Limiting platform health to %d", limit)
q = q.Limit(limit)
}

q = q.Order("-Updated")


_, err := project.Dbclient.GetAll(ctx, q, &health)
if err != nil {
Expand Down
89 changes: 60 additions & 29 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
)
Expand All @@ -36,7 +37,7 @@ func updateCache(workflowHealth WorkflowHealth) {
cacheKey := fmt.Sprintf("ops-health-check")
ctx := context.Background()

if project.CacheDb {
if project.CacheDb {
platformHealthCheck := HealthCheck{}
platformHealthCheck.Updated = time.Now().Unix()
platformHealthCheck.Workflows = workflowHealth
Expand All @@ -47,7 +48,7 @@ func updateCache(workflowHealth WorkflowHealth) {
err = SetCache(ctx, cacheKey, platformData, 15)
if err != nil {
log.Printf("[WARNING] Failed setting cache ops health: %s", err)
}
}
}
}

Expand Down Expand Up @@ -143,7 +144,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) {

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer " + apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -201,7 +202,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) {

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -247,7 +248,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) {
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -305,7 +306,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) {

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -363,7 +364,7 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) {

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer " + apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -458,25 +459,21 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {

userInfo, err := HandleApiAuthentication(resp, request)
if err != nil {
log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err)

resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
log.Printf("[WARNING] Api authentication failed in handleInfo: %s. Continuing anyways here..", err)
}

if project.Environment == "onprem" && userInfo.Role != "admin" {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`))
return
} else if project.Environment == "Cloud" && userInfo.ApiKey != os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") {
resp.WriteHeader(401)
} else if project.Environment == "Cloud" && !(userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") || userInfo.SupportAccess) {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`))
return
}
} else if force != "true" {
// get last health check from database
healths, err := GetPlatformHealth(ctx, 1)
healths, err := GetPlatformHealth(ctx, 0, 0, 1)

if len(healths) == 0 {
resp.WriteHeader(500)
Expand Down Expand Up @@ -511,7 +508,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
workflowHealthChannel := make(chan WorkflowHealth)
// appHealthChannel := make(chan AppHealth)
go func() {
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
workflowHealth, err := RunOpsWorkflow(apiKey, orgId)
if err != nil {
log.Printf("[ERROR] Failed workflow health check: %s", err)
Expand Down Expand Up @@ -581,11 +578,34 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
}

func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) {
// for now, the limit is last 100 runs
limit := 100
ctx := GetContext(request)

healthChecks, err := GetPlatformHealth(ctx, limit)
log.Printf("[DEBUG] request URL query:", request.URL.Query())

limit := request.URL.Query().Get("limit")
before := request.URL.Query().Get("before")
after := request.URL.Query().Get("after")

// convert all to int64
limitInt, err := strconv.Atoi(limit)
if err != nil {
log.Printf("[ERROR] Failed converting limit to int64: %s", err)
limitInt = 0
}

beforeInt, err := strconv.Atoi(before)
if err != nil {
log.Printf("[ERROR] Failed converting before to int64: %s", err)
beforeInt = 0
}

afterInt, err := strconv.Atoi(after)
if err != nil {
log.Printf("[ERROR] Failed converting after to int64: %s", err)
afterInt = 0
}

healthChecks, err := GetPlatformHealth(ctx, afterInt, beforeInt, limitInt)
if err != nil {
log.Printf("[ERROR] Failed getting platform health from database: %s", err)
resp.WriteHeader(500)
Expand All @@ -605,13 +625,13 @@ func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) {
resp.Write(healthChecksData)
}

func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) (error) {
func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) error {
baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
if len(baseUrl) == 0 {
log.Printf("[DEBUG] Base url not set. Setting to default: for delete")
baseUrl = "https://shuffler.io"
}

if project.Environment == "onprem" {
log.Printf("[DEBUG] Onprem environment. Setting base url to localhost: for delete")
baseUrl = "http://localhost:5001"
Expand Down Expand Up @@ -674,7 +694,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
Delete: false,
RunStatus: "",
ExecutionId: "",
WorkflowId: "",
WorkflowId: "",
}

baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
Expand Down Expand Up @@ -731,7 +751,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// startId := "98713d6a-dd6b-4bd6-a11c-9778b80f2a28"
// body := map[string]string{"execution_argument": "", "start": startId}
Expand Down Expand Up @@ -787,6 +807,8 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {

updateCache(workflowHealth)

timeout := time.After(5 * time.Minute)

// 3. Check if workflow ran successfully
// ping /api/v1/streams/results/<execution_id> while workflowHealth.RunFinished is false
// if workflowHealth.RunFinished is true, return workflowHealth
Expand All @@ -800,7 +822,7 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer " + apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// convert the body to JSON
reqBody := map[string]string{"execution_id": execution.ExecutionId, "authorization": os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")}
Expand Down Expand Up @@ -847,6 +869,17 @@ func RunOpsWorkflow(apiKey string, orgId string) (WorkflowHealth, error) {
updateCache(workflowHealth)

log.Printf("[DEBUG] Workflow Health execution Result Status: %#v for executionID: %s", executionResults.Status, workflowHealth.ExecutionId)

// check if timeout
select {
case <-timeout:
log.Printf("[ERROR] Timeout reached for workflow health check. Returning")
workflowHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK"
return workflowHealth, errors.New("Timeout reached for workflow health check")
default:
// do nothing
}

log.Printf("[DEBUG] Waiting 2 seconds before retrying")
time.Sleep(2 * time.Second)
}
Expand All @@ -864,7 +897,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {

}


if len(opsDashboardOrgId) == 0 {
log.Printf("[WARNING] Ops dashboard org not set. Not setting up ops workflow")
return "", errors.New("Ops dashboard org not set")
Expand Down Expand Up @@ -1010,7 +1042,7 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
jsonData := `{"name":"demo","description":"demo","blogpost":"","status":"test","default_return_value":"","usecase_ids":[]}`

// res, err := http.Post(url, "application/json", bytes.NewBuffer([]byte(jsonData)))
req, err = http.NewRequest("POST", baseUrl + "/api/v1/workflows", bytes.NewBuffer([]byte(jsonData)))
req, err = http.NewRequest("POST", baseUrl+"/api/v1/workflows", bytes.NewBuffer([]byte(jsonData)))
log.Printf("[SANITY CHECK] req method is: %s", req.Method)

if err != nil {
Expand Down Expand Up @@ -1077,7 +1109,7 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {

// Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true

req, err = http.NewRequest("PUT", baseUrl + "/api/v1/workflows/" + workflowData.ID + "?skip_save=true", nil)
req, err = http.NewRequest("PUT", baseUrl+"/api/v1/workflows/"+workflowData.ID+"?skip_save=true", nil)
if err != nil {
log.Println("[ERROR] creating HTTP request:", err)
return "", errors.New("Error creating HTTP request: " + err.Error())
Expand Down Expand Up @@ -1109,7 +1141,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {

defer resp.Body.Close()


if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed saving ops dashboard workflow: %s. The status code was: %d", err, resp.StatusCode)
// print the response body
Expand All @@ -1124,4 +1155,4 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {

log.Printf("[INFO] Ops dashboard workflow saved successfully with ID: %s", workflowData.ID)
return workflowData.ID, nil
}
}

0 comments on commit 3a7b309

Please sign in to comment.