Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: more stats things #41

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5288,9 +5288,8 @@ func SetNewValue(ctx context.Context, newvalue NewValue) error {
return nil
}

func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error) {
func GetPlatformHealth(ctx context.Context, beforeTimestamp int, afterTimestamp int, limit int) ([]HealthCheckDB, error) {
nameKey := "platform_health"

// sort by "updated", and get the first one
health := []HealthCheckDB{}

Expand All @@ -5302,7 +5301,44 @@ func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error)
"order": "desc",
},
},
"size": limit,
}

if limit != 0 {
query["size"] = limit
}

if beforeTimestamp > 0 || afterTimestamp > 0 {
query["query"] = map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{},
},
}
}

if beforeTimestamp > 0 {
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}),
map[string]interface{}{
"range": map[string]interface{}{
"updated": map[string]interface{}{
"gt": beforeTimestamp,
},
},
},
)
}

if afterTimestamp > 0 {
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}),
map[string]interface{}{
"range": map[string]interface{}{
"updated": map[string]interface{}{
"lt": afterTimestamp,
},
},
},
)
}

if err := json.NewEncoder(&buf).Encode(query); err != nil {
Expand Down Expand Up @@ -5352,16 +5388,31 @@ func GetPlatformHealth(ctx context.Context, limit int) ([]HealthCheckDB, error)
return health, err
}

if len(wrapped.Hits.Hits) == 0 {
return health, errors.New("No healthchecks found")
}

for _, hit := range wrapped.Hits.Hits {
health = append(health, hit.Source)
}

} else {
q := datastore.NewQuery(nameKey).Order("-Updated").Limit(limit)
q := datastore.NewQuery(nameKey)


if beforeTimestamp != 0 {
// Modify the query to filter for "before" timestamp.
q = q.Filter("Updated >", beforeTimestamp)
}

if afterTimestamp != 0 {
// Modify the query to filter for "after" timestamp.
q = q.Filter("Updated <", afterTimestamp)
}

if limit != 0 {
log.Printf("Limiting platform health to %d", limit)
q = q.Limit(limit)
}

q = q.Order("-Updated")


_, err := project.Dbclient.GetAll(ctx, q, &health)
if err != nil {
Expand Down
89 changes: 60 additions & 29 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
)
Expand All @@ -36,7 +37,7 @@
cacheKey := fmt.Sprintf("ops-health-check")
ctx := context.Background()

if project.CacheDb {
if project.CacheDb {
platformHealthCheck := HealthCheck{}
platformHealthCheck.Updated = time.Now().Unix()
platformHealthCheck.Workflows = workflowHealth
Expand All @@ -47,7 +48,7 @@
err = SetCache(ctx, cacheKey, platformData, 15)
if err != nil {
log.Printf("[WARNING] Failed setting cache ops health: %s", err)
}
}
}
}

Expand Down Expand Up @@ -143,7 +144,7 @@

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer " + apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -201,7 +202,7 @@

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -247,7 +248,7 @@
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -305,7 +306,7 @@

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -363,7 +364,7 @@

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer " + apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// send the request
client = &http.Client{}
Expand Down Expand Up @@ -458,25 +459,21 @@

userInfo, err := HandleApiAuthentication(resp, request)
if err != nil {
log.Printf("[WARNING] Api authentication failed in handleInfo: %s", err)

resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
log.Printf("[WARNING] Api authentication failed in handleInfo: %s. Continuing anyways here..", err)

Check failure

Code scanning / CodeQL

Log entries created from user input High

This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
}

if project.Environment == "onprem" && userInfo.Role != "admin" {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`))
return
} else if project.Environment == "Cloud" && userInfo.ApiKey != os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") {
resp.WriteHeader(401)
} else if project.Environment == "Cloud" && !(userInfo.ApiKey == os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY") || userInfo.SupportAccess) {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Only admins can run health check!"}`))
return
}
} else if force != "true" {
// get last health check from database
healths, err := GetPlatformHealth(ctx, 1)
healths, err := GetPlatformHealth(ctx, 0, 0, 1)

if len(healths) == 0 {
resp.WriteHeader(500)
Expand Down Expand Up @@ -511,7 +508,7 @@
workflowHealthChannel := make(chan WorkflowHealth)
// appHealthChannel := make(chan AppHealth)
go func() {
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
workflowHealth, err := RunOpsWorkflow(apiKey, orgId)
if err != nil {
log.Printf("[ERROR] Failed workflow health check: %s", err)
Expand Down Expand Up @@ -581,11 +578,34 @@
}

func GetOpsDashboardStats(resp http.ResponseWriter, request *http.Request) {
// for now, the limit is last 100 runs
limit := 100
ctx := GetContext(request)

healthChecks, err := GetPlatformHealth(ctx, limit)
log.Printf("[DEBUG] request URL query:", request.URL.Query())

Check failure

Code scanning / CodeQL

Log entries created from user input High

This log entry depends on a
user-provided value
.

limit := request.URL.Query().Get("limit")
before := request.URL.Query().Get("before")
after := request.URL.Query().Get("after")

// convert all to int64
limitInt, err := strconv.Atoi(limit)
if err != nil {
log.Printf("[ERROR] Failed converting limit to int64: %s", err)
limitInt = 0
}

beforeInt, err := strconv.Atoi(before)
if err != nil {
log.Printf("[ERROR] Failed converting before to int64: %s", err)
beforeInt = 0
}

afterInt, err := strconv.Atoi(after)
if err != nil {
log.Printf("[ERROR] Failed converting after to int64: %s", err)
afterInt = 0
}

healthChecks, err := GetPlatformHealth(ctx, afterInt, beforeInt, limitInt)
if err != nil {
log.Printf("[ERROR] Failed getting platform health from database: %s", err)
resp.WriteHeader(500)
Expand All @@ -605,13 +625,13 @@
resp.Write(healthChecksData)
}

func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) (error) {
func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string) error {
baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
if len(baseUrl) == 0 {
log.Printf("[DEBUG] Base url not set. Setting to default: for delete")
baseUrl = "https://shuffler.io"
}

if project.Environment == "onprem" {
log.Printf("[DEBUG] Onprem environment. Setting base url to localhost: for delete")
baseUrl = "http://localhost:5001"
Expand Down Expand Up @@ -674,7 +694,7 @@
Delete: false,
RunStatus: "",
ExecutionId: "",
WorkflowId: "",
WorkflowId: "",
}

baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
Expand Down Expand Up @@ -731,7 +751,7 @@

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+ apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// startId := "98713d6a-dd6b-4bd6-a11c-9778b80f2a28"
// body := map[string]string{"execution_argument": "", "start": startId}
Expand Down Expand Up @@ -787,6 +807,8 @@

updateCache(workflowHealth)

timeout := time.After(5 * time.Minute)

// 3. Check if workflow ran successfully
// ping /api/v1/streams/results/<execution_id> while workflowHealth.RunFinished is false
// if workflowHealth.RunFinished is true, return workflowHealth
Expand All @@ -800,7 +822,7 @@

// set the headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer " + apiKey)
req.Header.Set("Authorization", "Bearer "+apiKey)

// convert the body to JSON
reqBody := map[string]string{"execution_id": execution.ExecutionId, "authorization": os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")}
Expand Down Expand Up @@ -847,6 +869,17 @@
updateCache(workflowHealth)

log.Printf("[DEBUG] Workflow Health execution Result Status: %#v for executionID: %s", executionResults.Status, workflowHealth.ExecutionId)

// check if timeout
select {
case <-timeout:
log.Printf("[ERROR] Timeout reached for workflow health check. Returning")
workflowHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK"
return workflowHealth, errors.New("Timeout reached for workflow health check")
default:
// do nothing
}

log.Printf("[DEBUG] Waiting 2 seconds before retrying")
time.Sleep(2 * time.Second)
}
Expand All @@ -864,7 +897,6 @@

}


if len(opsDashboardOrgId) == 0 {
log.Printf("[WARNING] Ops dashboard org not set. Not setting up ops workflow")
return "", errors.New("Ops dashboard org not set")
Expand Down Expand Up @@ -1010,7 +1042,7 @@
jsonData := `{"name":"demo","description":"demo","blogpost":"","status":"test","default_return_value":"","usecase_ids":[]}`

// res, err := http.Post(url, "application/json", bytes.NewBuffer([]byte(jsonData)))
req, err = http.NewRequest("POST", baseUrl + "/api/v1/workflows", bytes.NewBuffer([]byte(jsonData)))
req, err = http.NewRequest("POST", baseUrl+"/api/v1/workflows", bytes.NewBuffer([]byte(jsonData)))
log.Printf("[SANITY CHECK] req method is: %s", req.Method)

if err != nil {
Expand Down Expand Up @@ -1077,7 +1109,7 @@

// Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true

req, err = http.NewRequest("PUT", baseUrl + "/api/v1/workflows/" + workflowData.ID + "?skip_save=true", nil)
req, err = http.NewRequest("PUT", baseUrl+"/api/v1/workflows/"+workflowData.ID+"?skip_save=true", nil)
if err != nil {
log.Println("[ERROR] creating HTTP request:", err)
return "", errors.New("Error creating HTTP request: " + err.Error())
Expand Down Expand Up @@ -1109,7 +1141,6 @@

defer resp.Body.Close()


if resp.StatusCode != 200 {
log.Printf("[ERROR] Failed saving ops dashboard workflow: %s. The status code was: %d", err, resp.StatusCode)
// print the response body
Expand All @@ -1124,4 +1155,4 @@

log.Printf("[INFO] Ops dashboard workflow saved successfully with ID: %s", workflowData.ID)
return workflowData.ID, nil
}
}
Loading