Skip to content

Commit

Permalink
Fixed Opensearch workflow debug search parameters to make them work p…
Browse files Browse the repository at this point in the history
…roperly opensearch
  • Loading branch information
frikky committed Nov 16, 2023
1 parent ce0e4d4 commit d27c989
Showing 1 changed file with 37 additions and 40 deletions.
77 changes: 37 additions & 40 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func IncrementCacheDump(ctx context.Context, orgId, dataType string) {
log.Printf("[ERROR] Failed indexing org STATS body (2): %s", err)
}

log.Printf("[DEBUG] Incremented org stats for %s", orgId)
//log.Printf("[DEBUG] Incremented org stats for %s", orgId)
} else {
tx, err := project.Dbclient.NewTransaction(ctx)
if err != nil {
Expand Down Expand Up @@ -10080,7 +10080,7 @@ func GetStorage() *storage.Client {
}

func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowSearch) ([]WorkflowExecution, string, error) {
nameKey := "workflowexecution"
index := "workflowexecution"

var executions []WorkflowExecution
totalMaxSize := 11184810
Expand All @@ -10095,40 +10095,46 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
if project.DbType == "opensearch" {
var buf bytes.Buffer
query := map[string]interface{}{
"size": maxLimit,
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{
map[string]interface{}{
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"execution_org": orgId,
},
},
},
},
},
"sort": []interface{}{
map[string]interface{}{
"started_at": map[string]interface{}{
"order": "desc",
},
"sort": map[string]interface{}{
"started_at": map[string]interface{}{
"order": "desc",
},
},
"size": maxLimit,
}

if len(search.WorkflowId) > 0 {
//log.Printf("[DEBUG] Filtering on workflow_id: %s", search.WorkflowId)
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{}), map[string]interface{}{
"match": map[string]interface{}{
"workflow_id": search.WorkflowId,
// Change out the "must" part entirely to contain the workflow id as well
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = []map[string]interface{}{
{
"match": map[string]interface{}{
"execution_org": orgId,
},
},
})
{
"match": map[string]interface{}{
"workflow_id": search.WorkflowId,
},
},
}
}

if len(search.Status) > 0 {
log.Printf("[DEBUG] Filtering on status: %s", search.Status)

query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{}), map[string]interface{}{
// Change out the "must" part entirely to contain the workflow id as well
// Append map[string]interface{} to the "must" part
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}), map[string]interface{}{
"match": map[string]interface{}{
"status": search.Status,
},
Expand All @@ -10138,10 +10144,10 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
// String to timestamp for search.SearchFrom (string)
startTimestamp, err := time.Parse(time.RFC3339, search.SearchFrom)
if err != nil {
log.Printf("[WARNING] Failed parsing start time: %s", err)
//log.Printf("[WARNING] Failed parsing start time: %s", err)
} else {
// Make it into a number instead of a string
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{}), map[string]interface{}{
// Make sure to add map[string]interface{} to the "must" part
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}), map[string]interface{}{
"range": map[string]interface{}{
"started_at": map[string]interface{}{
"gte": startTimestamp.Unix(),
Expand All @@ -10153,10 +10159,9 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
// String to timestamp for search.SearchTo (string)
endTimestamp, err := time.Parse(time.RFC3339, search.SearchUntil)
if err != nil {
log.Printf("[WARNING] Failed parsing end time: %s", err)
//log.Printf("[WARNING] Failed parsing end time: %s", err)
} else {
// Make it into a number instead of a string
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{}), map[string]interface{}{
query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}), map[string]interface{}{
"range": map[string]interface{}{
"started_at": map[string]interface{}{
"lte": endTimestamp.Unix(),
Expand All @@ -10170,11 +10175,15 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
query["search_after"] = []interface{}{inputcursor}
}

log.Printf("[DEBUG] Query: %+v", query)
// Run the query
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Printf("[WARNING] Error encoding executions query: %s", err)
return executions, cursor, err
}

// Perform the search request.
res, err := project.Es.Search(
project.Es.Search.WithContext(ctx),
project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(nameKey))),
project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(index))),
project.Es.Search.WithBody(&buf),
project.Es.Search.WithTrackTotalHits(true),
)
Expand All @@ -10185,18 +10194,11 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
}

defer res.Body.Close()

if res.IsError() {
log.Printf("[WARNING] Failed executing query: %s", res.String())
return executions, "", errors.New(res.String())
}

var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("[WARNING] Failed decoding response: %s", err)
return executions, "", err
}

if res.StatusCode != 200 && res.StatusCode != 201 {
return executions, "", errors.New(fmt.Sprintf("Bad statuscode: %d", res.StatusCode))
}
Expand All @@ -10217,14 +10219,11 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
executions = append(executions, hit.Source)
}



return executions, "", errors.New("Not implemented yet")
//return executions, "", errors.New("Not implemented yet")
} else {
query := datastore.NewQuery(nameKey).Filter("execution_org=", orgId).Order("-started_at").Limit(5)
query := datastore.NewQuery(index).Filter("execution_org=", orgId).Order("-started_at").Limit(5)

if len(search.WorkflowId) > 0 {
//log.Printf("[DEBUG] Filtering on workflow_id: %s", search.WorkflowId)
query = query.Filter("workflow_id =", search.WorkflowId)
}

Expand Down Expand Up @@ -10378,8 +10377,6 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
}
}

log.Printf("[DEBUG] Returning %d executions", len(executions))

// Find difference between what's in the list and what is in cache

removeIndexes := []int{}
Expand Down

0 comments on commit d27c989

Please sign in to comment.