Skip to content

Commit

Permalink
Added more error handling inside workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Sep 26, 2023
1 parent d22a44c commit beb4669
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 26 deletions.
3 changes: 2 additions & 1 deletion db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
)

var requestCache *cache.Cache
//var requestCache *cache.Cache
var requestCache = cache.New(60*time.Minute, 60*time.Minute)
var memcached = os.Getenv("SHUFFLE_MEMCACHED")
var mc = gomemcache.New(memcached)

Expand Down
105 changes: 87 additions & 18 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4574,11 +4574,20 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
org := &Org{}

workflowapps, apperr := GetPrioritizedApps(ctx, user)
allNames := []string{}
for _, action := range workflow.Actions {
if action.SourceWorkflow != workflow.ID && len(action.SourceWorkflow) > 0 {
continue
}

if len(action.Label) > 0 && ArrayContains(allNames, action.Label) {
parsedError := fmt.Sprintf("Multiple actions with name '%s'", action.Label)
if !ArrayContains(workflow.Errors, parsedError) {
workflow.Errors = append(workflow.Errors, parsedError)
}
}

allNames = append(allNames, action.Label)
allNodes = append(allNodes, action.ID)
if workflow.Start == action.ID {
//log.Printf("[INFO] FOUND STARTNODE %d", workflow.Start)
Expand Down Expand Up @@ -5020,9 +5029,10 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
if email == "[email protected]" {
log.Printf("Email isn't specified during save.")
if workflow.PreviouslySaved {
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Email field in user input can't be empty"}`)))
return
workflow.Errors = append(workflow.Errors, "Email field in user input can't be empty")
continue
//resp.WriteHeader(401)
//resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Email field in user input can't be empty"}`)))
}
}

Expand All @@ -5033,9 +5043,8 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
if sms == "0000000" {
log.Printf("Email isn't specified during save.")
if workflow.PreviouslySaved {
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "SMS field in user input can't be empty"}`)))
return
workflow.Errors = append(workflow.Errors, "SMS field in user input can't be empty")
continue
}
}

Expand Down Expand Up @@ -5407,16 +5416,14 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}

action.IsValid = false

//if workflow.PreviouslySaved {
// resp.WriteHeader(401)
// resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Action %s in app %s doesn't exist"}`, action.Name, curapp.Name)))
// return
//}
}

// FIXME - check all parameters to see if they're valid
// Includes checking required fields

selectedAuth := AppAuthenticationStorage{}
if len(action.AuthenticationId) > 0 && autherr == nil {
for _, auth := range allAuths {
Expand All @@ -5427,6 +5434,38 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

// Check if it uses oauth2 and if it's authenticated or not
if selectedAuth.Id == "" && len(action.AuthenticationId) == 0 {
authRequired := false
fieldsFilled := 0
for _, param := range curappaction.Parameters {
if param.Configuration {
if len(param.Value) > 0 {
fieldsFilled += 1
}

authRequired = true
break
}
}

if authRequired && fieldsFilled > 1 {
foundErr := fmt.Sprintf("%s requires authentication", strings.Replace(action.AppName, "_", " ", -1))

if !ArrayContains(workflow.Errors, foundErr) {
workflow.Errors = append(workflow.Errors, foundErr)
continue
}
} else if authRequired && fieldsFilled == 1 {
foundErr := fmt.Sprintf("%s requires authentication", strings.Replace(action.AppName, "_", " ", -1))

if !ArrayContains(workflow.Errors, foundErr) {
workflow.Errors = append(workflow.Errors, foundErr)
continue
}
}
}

newParams := []WorkflowAppActionParameter{}
for _, param := range curappaction.Parameters {
paramFound := false
Expand Down Expand Up @@ -5455,15 +5494,33 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}

//log.Printf("[WARNING] Appaction %s with required param '%s' is empty. Can't save.", action.Name, param.Name)
thisError := fmt.Sprintf("%s is missing required parameter %s", action.Label, param.Name)
if handleOauth {
//log.Printf("[WARNING] Handling oauth2 app saving, hence not throwing warnings (1)")
//workflow.Errors = append(workflow.Errors, fmt.Sprintf("Debug: Handling one Oauth2 app (%s). May cause issues during initial configuration (1)", action.Name))
} else {
action.Errors = append(action.Errors, thisError)

thisError := fmt.Sprintf("Action %s is missing required parameter %s", action.Label, param.Name)
if actionParam.Configuration && len(action.AuthenticationId) == 0 {
thisError = fmt.Sprintf("%s requires authentication", strings.Replace(action.AppName, "_", " ", -1))
}

action.Errors = append(action.Errors, thisError)

errorFound := false
for errIndex, oldErr := range workflow.Errors {
if oldErr == thisError {
errorFound = true
break
}

if strings.Contains(oldErr, action.Label) && strings.Contains(oldErr, "missing required parameter") {
workflow.Errors[errIndex] += ", " + param.Name
errorFound = true
break
}
}

if !errorFound {
workflow.Errors = append(workflow.Errors, thisError)
action.IsValid = false
}

action.IsValid = false
}

if actionParam.Variant == "" {
Expand Down Expand Up @@ -5502,6 +5559,15 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

for _, trigger := range workflow.Triggers {
if trigger.Status != "running" && trigger.TriggerType != "SUBFLOW" && trigger.TriggerType != "USERINPUT" {
errorInfo := fmt.Sprintf("Trigger %s needs to be started", trigger.Name)
if !ArrayContains(workflow.Errors, errorInfo) {
workflow.Errors = append(workflow.Errors, errorInfo)
}
}
}

if !workflow.PreviouslySaved {
log.Printf("[WORKFLOW INIT] NOT PREVIOUSLY SAVED - SET ACTION AUTH!")
timeNow := int64(time.Now().Unix())
Expand Down Expand Up @@ -7471,7 +7537,7 @@ func HandleCreateSubOrg(resp http.ResponseWriter, request *http.Request) {

if user.Role != "admin" {
log.Printf("[WARNING] Can't make suborg without being admin: %s (%s).", user.Username, user.Id)
resp.WriteHeader(401)
resp.WriteHeader(403)
resp.Write([]byte(`{"success": false, "reason": "Not admin"}`))
return
}
Expand Down Expand Up @@ -15761,7 +15827,7 @@ func RunExecuteAccessValidation(request *http.Request, workflow *Workflow) (bool
//log.Printf("[DEBUG] Got source exec %s", sourceExecution)
newExec, err := GetWorkflowExecution(ctx, sourceExecution[0])
if err != nil {
log.Printf("[INFO] Failed getting source_execution in test validation based on %s", sourceExecution[0])
log.Printf("[INFO] Failed getting source_execution in test validation based on '%s'", sourceExecution[0])
return false, ""
} else {
workflowExecution = newExec
Expand Down Expand Up @@ -19639,7 +19705,10 @@ func HandleDeleteOrg(resp http.ResponseWriter, request *http.Request) {
return
}


// Get workflows
user.ActiveOrg.Id = org.Id
user.ActiveOrg.Name = org.Name
workflows, err := GetAllWorkflowsByQuery(ctx, user)
if err != nil {
log.Printf("[WARNING] Failed getting workflows for user %s (0): %s", user.Username, err)
Expand Down
18 changes: 11 additions & 7 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ type WorkflowApp struct {
Name string `json:"name" datastore:"name" yaml:"name"`
Url string `json:"url" datastore:"url" yaml:"url"`
} `json:"contact_info" datastore:"contact_info" yaml:"contact_info" required:false`
ReferenceInfo struct {
DocumentationUrl string `json:"documentation_url" datastore:"documentation_url"`
GithubUrl string `json:"github_url" datastore:"github_url"`
} `json:"reference_info" datastore:"reference_info"`
FolderMount struct {
FolderMount bool `json:"folder_mount" datastore:"folder_mount"`
SourceFolder string `json:"source_folder" datastore:"source_folder"`
Expand All @@ -128,12 +124,20 @@ type WorkflowApp struct {
Documentation string `json:"documentation" datastore:"documentation,noindex"`
Description string `json:"description" datastore:"description,noindex"`
DocumentationDownloadUrl string `json:"documentation_download_url" datastore:"documentation_download_url"`
Blogpost string `json:"blogpost" yaml:"blogpost" datastore:"blogpost"`
Video string `json:"video" yaml:"video" datastore:"video"`
PrimaryUsecases []string `json:"primary_usecases" yaml:"primary_usecases" datastore:"primary_usecases"`
CompanyURL string `json:"company_url" datastore:"company_url" required:false yaml:"company_url"`

SkippedBuild bool `json:"skipped_build" yaml:"skipped_build" required:false datastore:"skipped_build"`
//SelectedTemplate WorkflowApp `json:"selected_template" datastore:"selected_template,noindex"`

ReferenceInfo struct {
IsPartner bool `json:"is_partner" datastore:"is_partner"`
PartnerContacts string `json:"partner_contacts" datastore:"partner_contacts"`
DocumentationUrl string `json:"documentation_url" datastore:"documentation_url"`
GithubUrl string `json:"github_url" datastore:"github_url"`
} `json:"reference_info" datastore:"reference_info"`
Blogpost string `json:"blogpost" yaml:"blogpost" datastore:"blogpost"`
Video string `json:"video" yaml:"video" datastore:"video"`
CompanyURL string `json:"company_url" datastore:"company_url" required:false yaml:"company_url"`
}

type AppVersion struct {
Expand Down

0 comments on commit beb4669

Please sign in to comment.