diff --git a/app_upload/stitcher.go b/app_upload/stitcher.go index 2e7b11dd..d1fa87b9 100755 --- a/app_upload/stitcher.go +++ b/app_upload/stitcher.go @@ -844,8 +844,8 @@ func main() { bucketName = os.Args[5] } - appname := "email" - appversion := "1.3.0" + appname := "shuffle-ai" + appversion := "1.0.0" err := deployConfigToBackend(appfolder, appname, appversion) if err != nil { log.Printf("[WARNING] Failed uploading config: %s", err) diff --git a/codegen.go b/codegen.go index f181d76e..a2e92004 100755 --- a/codegen.go +++ b/codegen.go @@ -2,7 +2,6 @@ package shuffle import ( "archive/zip" - "sort" "bytes" "context" "crypto/md5" @@ -14,11 +13,13 @@ import ( "log" "os" "regexp" + "sort" "strconv" "strings" "cloud.google.com/go/storage" "github.com/frikky/kin-openapi/openapi3" + //"github.com/satori/go.uuid" "gopkg.in/yaml.v2" ) @@ -839,6 +840,375 @@ func MakePythoncode(swagger *openapi3.Swagger, name, url, method string, paramet return functionname, data } +func GetCustomActionCode(swagger *openapi3.Swagger, api WorkflowApp) string{ + + authenticationParameter := "" + authenticationSetup := "" + authenticationAddin := "" + + if swagger.Components.SecuritySchemes != nil { + if swagger.Components.SecuritySchemes["BearerAuth"] != nil { + authenticationParameter = ", apikey" + authenticationSetup = "if apikey != \" \" and not apikey.startswith(\"Bearer\"): parsed_headers[\"Authorization\"] = f\"Bearer {apikey}\"" + + } else if swagger.Components.SecuritySchemes["BasicAuth"] != nil { + authenticationParameter = ", username_basic, password_basic" + authenticationSetup = "auth=None\n if username_basic or password_basic:\n if \"Authorization\" not in headers and \"Basic\" not in headers and not \"Bearer\" in headers:\n auth = requests.auth.HTTPBasicAuth(username_basic, password_basic)" + authenticationAddin = ", auth=auth" + + } else if swagger.Components.SecuritySchemes["ApiKeyAuth"] != nil { + authenticationParameter = ", apikey" + + if swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.In == "header" { + + authenticationSetup = fmt.Sprintf(`if apikey != " ": parsed_headers["%s"] = apikey`, swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Name) + + if len(swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Description) > 0 { + trimmedDescription := strings.Trim(swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Description, " ") + + authenticationSetup = fmt.Sprintf("if apikey != \" \":\n if apikey.startswith(\"%s\"):\n parsed_headers[\"%s\"] = apikey\n else:\n apikey = apikey.replace(\"%s\", \"\", -1).strip()\n parsed_headers[\"%s\"] = f\"%s{apikey}\"", trimmedDescription, swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Name, swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Description, swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Name, swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Description) + } + + } else if swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.In == "query" { + + authenticationSetup = fmt.Sprintf("if apikey != \" \": parsed_queries[\"%s\"] = requests.utils.quote(apikey)", swagger.Components.SecuritySchemes["ApiKeyAuth"].Value.Name) + } + + } else if swagger.Components.SecuritySchemes["Oauth2"] != nil { + + authenticationParameter = ", access_token" + authenticationSetup = fmt.Sprintf("if access_token != \" \": parsed_headers[\"Authorization\"] = f\"Bearer {access_token}\"\n #parsed_headers[\"Content-Type\"] = \"application/json\"") + + } else if swagger.Components.SecuritySchemes["jwt"] != nil { + authenticationParameter = ", username_basic, password_basic" + authenticationSetup = fmt.Sprintf("authret = requests.get(f\"{url}%s\", headers=parsed_headers, auth=(username_basic, password_basic), verify=False)\n if 'access_token' in authret.text:\n parsed_headers[\"Authorization\"] = f\"Bearer {authret.json()['access_token']}\"\n elif 'jwt' in authret.text:\n parsed_headers[\"Authorization\"] = f\"Bearer {authret.json()['jwt']}\"\n elif 'accessToken' in authret.text:\n parsed_headers[\"Authorization\"] = f\"Bearer {authret.json()['accessToken']}\"\n else:\n parsed_headers[\"Authorization\"] = f\"Bearer {authret.text}\"\n print(f\"Found Bearer auth: {authret.text}\")", api.Authentication.TokenUri) + } + + } + + pythonCode := fmt.Sprintf(` + def fix_url(self, url): + if "hhttp" in url: + url = url.replace("hhttp", "http") + + if "http:/" in url and not "http://" in url: + url = url.replace("http:/", "http://", -1) + if "https:/" in url and not "https://" in url: + url = url.replace("https:/", "https://", -1) + if "http:///" in url: + url = url.replace("http:///", "http://", -1) + if "https:///" in url: + url = url.replace("https:///", "https://", -1) + if not "http://" in url and not "http" in url: + url = f"http://{url}" + + return url + + + def checkverify(self, verify): + if str(verify).lower().strip() == "false": + return False + elif verify is None: + return False + elif verify: + return True + elif not verify: + return False + else: + return True + + + def is_valid_method(self, method): + valid_methods = ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"] + method = method.upper() + + if method in valid_methods: + return method + else: + raise ValueError(f"Invalid HTTP method: {method}") + + + def parse_headers(self, headers): + parsed_headers = {} + if headers: + split_headers = headers.split("\n") + self.logger.info(split_headers) + for header in split_headers: + if ":" in header: + splititem = ":" + elif "=" in header: + splititem = "=" + else: + continue + + splitheader = header.split(splititem) + if len(splitheader) >= 2: + parsed_headers[splitheader[0].strip()] = splititem.join( + splitheader[1:] + ).strip() + else: + continue + + return parsed_headers + + + def parse_queries(self, queries): + parsed_queries = {} + + if not queries: + return parsed_queries + + cleaned_queries = queries.strip() + + if not cleaned_queries: + return parsed_queries + + cleaned_queries = " ".join(cleaned_queries.split()) + splitted_queries = cleaned_queries.split("&") + self.logger.info(splitted_queries) + for query in splitted_queries: + + if "=" not in query: + self.logger.info("Skipping as there is no = in the query") + continue + key, value = query.split("=") + if not key.strip() or not value.strip(): + self.logger.info( + "Skipping because either key or value is not present in query" + ) + continue + parsed_queries[key.strip()] = value.strip() + + return parsed_queries + + + def custom_action(self%s, method="", url="", headers="", queries="", path="", ssl_verify=False, body=""): + url = self.fix_url(url) + + try: + method = self.is_valid_method(method) + except ValueError as e: + self.logger.error(e) + return {"error": str(e)} + + if path and not path.startswith('/'): + path = '/' + path + + url += path + + parsed_headers = self.parse_headers(headers) + parsed_queries = self.parse_queries(queries) + + %s + + ssl_verify = self.checkverify(ssl_verify) + + if isinstance(body, dict): + try: + body = json.dumps(body) + except json.JSONDecodeError as e: + self.logger.error(f"error : {e}") + return {"error: Invalid JSON format for request body"} + + try: + response = requests.request(method, url, headers=parsed_headers, params=parsed_queries, data=body, verify=ssl_verify%s) + response.raise_for_status() + return response.json() + + except requests.RequestException as e: + self.logger.error(f"Request failed: {e}") + return {"error": f"Request failed: {e}"} + `, authenticationParameter, authenticationSetup, authenticationAddin) + + return pythonCode +} + +func AddCustomAction(swagger *openapi3.Swagger, api WorkflowApp) (WorkflowAppAction, string) { + + parameters := []WorkflowAppActionParameter{} + pyCode := GetCustomActionCode(swagger, api) + + securitySchemes := swagger.Components.SecuritySchemes + if securitySchemes != nil { + + if securitySchemes["BearerAuth"] != nil { + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "apikey", + Description: "The apikey to use", + Multiline: false, + Required: true, + Example: "The API key to use. Space = skip", + Configuration: true, + Schema: SchemaDefinition{ + Type: "string", + }, + }) + } else if securitySchemes["ApiKeyAuth"] != nil { + + extraParam := WorkflowAppActionParameter{ + Name: "apikey", + Description: "The apikey to use", + Multiline: false, + Required: true, + Example: "**********", + Configuration: true, + Schema: SchemaDefinition{ + Type: "string", + }, + } + + if len(securitySchemes["ApiKeyAuth"].Value.Description) > 0 { + extraParam.Description = fmt.Sprintf("Start with %s", securitySchemes["ApiKeyAuth"].Value.Description) + } + + parameters = append(parameters, extraParam) + + } else if securitySchemes["jwt"] != nil { + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "username_basic", + Description: "The username to use", + Multiline: false, + Required: true, + Example: "The username to use", + Configuration: true, + Schema: SchemaDefinition{ + Type: "string", + }, + }) + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "password_basic", + Description: "The password to use", + Multiline: false, + Required: true, + Example: "***********", + Configuration: true, + Schema: SchemaDefinition{ + Type: "string", + }, + }) + } else if securitySchemes["BasicAuth"] != nil { + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "username_basic", + Description: "The username to use", + Multiline: false, + Required: true, + Example: "The username to use", + Configuration: true, + Schema: SchemaDefinition{ + Type: "string", + }, + }) + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "password_basic", + Description: "The password to use", + Multiline: false, + Required: true, + Example: "***********", + Configuration: true, + Schema: SchemaDefinition{ + Type: "string", + }, + }) + } + } + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "method", + Description: "The http method to use", + Multiline: false, + Required: true, + Options: []string{"GET","POST","PUT","DELETE","PATCH"}, + Example: "GET", + Schema: SchemaDefinition{ + Type: "string", + }, + }) + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "url", + Description: "The URL of the API", + Multiline: false, + Required: true, + Example: "https://api.example.com", + Schema: SchemaDefinition{ + Type: "string", + }, + }) + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "path", + Description: "the path to add to the base url", + Multiline: false, + Required: false, + Example: "/users/profile", + Schema: SchemaDefinition{ + Type: "string", + }, + }) + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "headers", + Description: "Add or edit headers", + Multiline: true, + Required: false, + Example: "Content-Type:application/json\nAccept:application/json", + Schema: SchemaDefinition{ + Type: "string", + }, + }) + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "queries", + Description: "Add or edit queries", + Multiline: true, + Required: false, + Example: "view=basic&redirect=test", + Schema: SchemaDefinition{ + Type: "string", + }, + }) + + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "ssl_verify", + Description: "Check if you want to verify request", + Multiline: false, + Options: []string{"False","True"}, + Required: false, + Example: "False", + Schema: SchemaDefinition{ + Type: "string", + }, + }) + + parameters = append(parameters, WorkflowAppActionParameter{ + Name: "body", + Description: "The body to use", + Multiline: true, + Required: false, + Example: `{"username": "example_user", "email": "user@example.com"}`, + Schema: SchemaDefinition{ + Type: "string", + }, + }) + + action := WorkflowAppAction{ + Description: "add a custom action for your app", + Name: "custom_action", + NodeType: "action", + Environment: "Shuffle", + Parameters: parameters, + } + + action.Returns.Schema.Type = "string" + + return action, pyCode + +} + func GenerateYaml(swagger *openapi3.Swagger, newmd5 string) (*openapi3.Swagger, WorkflowApp, []string, error) { api := WorkflowApp{} //log.Printf("%#v", swagger.Info) @@ -1319,7 +1689,8 @@ func GenerateYaml(swagger *openapi3.Swagger, newmd5 string) (*openapi3.Swagger, Type: "string", }, }) - + + // Fixing parameters with : newExtraParams := []WorkflowAppActionParameter{} newOptionalParams := []WorkflowAppActionParameter{} @@ -1382,6 +1753,7 @@ func GenerateYaml(swagger *openapi3.Swagger, newmd5 string) (*openapi3.Swagger, pythonFunctions = append(pythonFunctions, curCode) } + // Has to be here because its used differently above. // FIXING this is done during export instead? //log.Printf("OLDPATH: %s", actualPath) @@ -1393,6 +1765,10 @@ func GenerateYaml(swagger *openapi3.Swagger, newmd5 string) (*openapi3.Swagger, //newPaths[actualPath] = path } + action, curCode := AddCustomAction(swagger, api) + api.Actions = append(api.Actions, action) + pythonFunctions = append(pythonFunctions, curCode) + return swagger, api, pythonFunctions, nil } diff --git a/db-connector.go b/db-connector.go index a68d25ee..b4d3807d 100755 --- a/db-connector.go +++ b/db-connector.go @@ -1656,7 +1656,7 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e if err != nil { log.Printf("[DEBUG][%s] Failed to parse in execution file value for exec argument: %s (3)", workflowExecution.ExecutionId, err) } else { - log.Printf("[DEBUG][%s] Found a new value to parse with exec argument", workflowExecution.ExecutionId) + //log.Printf("[DEBUG][%s] Found a new value to parse with exec argument", workflowExecution.ExecutionId) workflowExecution.ExecutionArgument = newValue } } @@ -1721,7 +1721,7 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e // A workaround for large bits of information for execution argument if strings.Contains(workflowExecution.ExecutionArgument, "Result too large to handle") { - log.Printf("[DEBUG] Found prefix %s to be replaced for exec argument (3)", workflowExecution.ExecutionArgument) + //log.Printf("[DEBUG] Found prefix %s to be replaced for exec argument (3)", workflowExecution.ExecutionArgument) baseArgument := &ActionResult{ Result: workflowExecution.ExecutionArgument, Action: Action{ID: "execution_argument"}, @@ -1730,7 +1730,7 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e if err != nil { log.Printf("[DEBUG] Failed to parse in execution file value for exec argument: %s (4)", err) } else { - log.Printf("[DEBUG] Found a new value to parse with exec argument") + //log.Printf("[DEBUG] Found a new value to parse with exec argument") workflowExecution.ExecutionArgument = newValue } } @@ -1865,6 +1865,10 @@ func GetApp(ctx context.Context, id string, user User, skipCache bool) (*Workflo return workflowApp, errors.New("No ID provided to get an app") } + if id == "integration" { + return workflowApp, errors.New("Integration is for the integration framework. Uses the Shuffle-ai app") + } + nameKey := "workflowapp" cacheKey := fmt.Sprintf("%s_%s", nameKey, id) @@ -2357,7 +2361,7 @@ func FindSimilarFile(ctx context.Context, md5, orgId string) ([]File, error) { } } } else { - query := datastore.NewQuery(nameKey).Filter("md5_sum =", md5).Limit(25) + query := datastore.NewQuery(nameKey).Filter("md5_sum =", md5).Limit(250) _, err := project.Dbclient.GetAll(ctx, query, &files) if err != nil { log.Printf("[WARNING] Failed getting deals for org: %s", orgId) @@ -3168,7 +3172,7 @@ func GetAllWorkflowsByQuery(ctx context.Context, user User) ([]Workflow, error) for { innerWorkflow := Workflow{} - _, err := it.Next(&innerWorkflow) + _, err = it.Next(&innerWorkflow) if err != nil { if strings.Contains(fmt.Sprintf("%s", err), "cannot load field") { log.Printf("[ERROR] Fixing workflow %s to have proper org (0.8.74)", innerWorkflow.ID) @@ -3197,8 +3201,8 @@ func GetAllWorkflowsByQuery(ctx context.Context, user User) ([]Workflow, error) } if err != iterator.Done { - //log.Printf("[INFO] Failed fetching results: %v", err) - //break + log.Printf("[INFO] Failed fetching workflow results: %v", err) + break } // Get the cursor for the next page of results. @@ -3217,8 +3221,7 @@ func GetAllWorkflowsByQuery(ctx context.Context, user User) ([]Workflow, error) } } - //log.Printf("[INFO] Appending suborg distribution workflows for organization %s (%s)", user.ActiveOrg.Name, user.ActiveOrg.Id) - + log.Printf("[INFO] Appending suborg distribution workflows for organization %s (%s)", user.ActiveOrg.Name, user.ActiveOrg.Id) cursorStr = "" query = datastore.NewQuery(nameKey).Filter("suborg_distribution =", user.ActiveOrg.Id) for { @@ -3226,7 +3229,9 @@ func GetAllWorkflowsByQuery(ctx context.Context, user User) ([]Workflow, error) for { innerWorkflow := Workflow{} - _, err := it.Next(&innerWorkflow) + _, err = it.Next(&innerWorkflow) + //log.Printf("[DEBUG] SUBFLOW: %#v", innerWorkflow.ID) + if err != nil { if strings.Contains(fmt.Sprintf("%s", err), "cannot load field") { log.Printf("[ERROR] Error in workflow loading. Migrating workflow to new workflow handler (1): %s", err) @@ -6167,7 +6172,7 @@ func fixAppAppend(allApps []WorkflowApp, innerApp WorkflowApp) ([]WorkflowApp, W func GetUserApps(ctx context.Context, userId string) ([]WorkflowApp, error) { wrapper := []WorkflowApp{} - var err error + //var err error cacheKey := fmt.Sprintf("userapps-%s", userId) if project.CacheDb { @@ -6256,6 +6261,7 @@ func GetUserApps(ctx context.Context, userId string) ([]WorkflowApp, error) { cursorStr := "" log.Printf("[DEBUG] Getting user apps for %s", userId) + var err error queries := []datastore.Query{} q := datastore.NewQuery(indexName).Filter("contributors =", userId) @@ -6290,9 +6296,10 @@ func GetUserApps(ctx context.Context, userId string) ([]WorkflowApp, error) { } if err != nil { - log.Printf("[ERROR] Failed fetching user apps (1): %v", err) if !strings.Contains(fmt.Sprintf("%s", err), "cannot load field") { + log.Printf("[ERROR] Failed fetching user apps (1): %v", err) + if strings.Contains("no matching index found", fmt.Sprintf("%s", err)) { log.Printf("[ERROR] No more apps for %s in user app load? Breaking: %s.", userId, err) } else { @@ -6314,7 +6321,7 @@ func GetUserApps(ctx context.Context, userId string) ([]WorkflowApp, error) { if err != nil { if !strings.Contains(fmt.Sprintf("%s", err), "no more items") { - log.Printf("[ERROR] Failed fetching user apps (1): %v", err) + log.Printf("[ERROR] Failed fetching user apps (3): %v", err) } break @@ -8281,10 +8288,11 @@ func savePipelineData(ctx context.Context, pipeline Pipeline) error { return err } } else { - // key := datastore.NameKey(nameKey, pipelineId, nil) - // if _, err := project.Dbclient.Put(ctx, key, &pipeline); err != nil { - // log.Printf("[ERROR] failed to add pipeline: %s", err) - // return err + key := datastore.NameKey(nameKey, triggerId, nil) + if _, err := project.Dbclient.Put(ctx, key, &pipeline); err != nil { + log.Printf("[ERROR] failed to add pipeline: %s", err) + return err + } } return nil @@ -8594,9 +8602,11 @@ func SetFile(ctx context.Context, file File) error { file.CreatedAt = timeNow } + /* if !strings.HasPrefix(file.Id, "file_") { return errors.New("Invalid file ID. Must start with file_") } + */ cacheKey := fmt.Sprintf("%s_%s", nameKey, file.Id) diff --git a/files.go b/files.go index bbfa0368..e244bbd7 100755 --- a/files.go +++ b/files.go @@ -391,14 +391,14 @@ func HandleDeleteFile(resp http.ResponseWriter, request *http.Request) { file.Status = "deleted" err = SetFile(ctx, *file) if err != nil { - log.Printf("[ERROR] Failed setting file to deleted") + log.Printf("[ERROR] Failed setting file to deleted: %s", err) resp.WriteHeader(500) resp.Write([]byte(`{"success": false, "reason": "Failed setting file to deleted"}`)) return } outputFiles, err := FindSimilarFile(ctx, file.Md5sum, file.OrgId) - log.Printf("[INFO] Found %d similar files", len(outputFiles)) + log.Printf("[INFO] Found %d similar files for Md5 '%s'", len(outputFiles), file.Md5sum) if len(outputFiles) > 0 { for _, item := range outputFiles { item.Status = "deleted" @@ -596,7 +596,7 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) { // also be environment variables / input arguments filename, filenameOk := request.URL.Query()["filename"] if filenameOk && ArrayContains(reservedCategoryNames, namespace) { - log.Printf("[DEBUG] Filename '%s' in URL with reserved category name: %s. Listlength: %d", filename[0], namespace, len(fileResponse.List)) + //log.Printf("[DEBUG] Filename '%s' in URL with reserved category name: %s. Listlength: %d", filename[0], namespace, len(fileResponse.List)) // Load from Github repo https://github.com/Shuffle/standards filenameFound := false @@ -650,7 +650,7 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) { continue } - log.Printf("\n\n\n[DEBUG] Decoded file '%s' with content:\n%s\n\n\n", *item.Path, string(decoded)) + //log.Printf("[DEBUG] Decoded Github file '%s' with content:\n%s", *item.Path, string(decoded)) timeNow := time.Now().Unix() fileId := "file_"+uuid.NewV4().String() @@ -698,7 +698,7 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) { continue } - log.Printf("\n\n[DEBUG] Uploaded file %s with ID %s in category %#v\n\n", file.Filename, fileId, namespace) + log.Printf("[DEBUG] Uploaded file %#v with ID %s in category %#v", file.Filename, fileId, namespace) fileResponse.List = append(fileResponse.List, BaseFile{ Name: file.Filename, @@ -1641,7 +1641,7 @@ func HandleCreateFile(resp http.ResponseWriter, request *http.Request) { orgId := user.ActiveOrg.Id files, err := FindSimilarFilename(ctx, curfile.Filename, orgId) if err != nil { - log.Printf("[ERROR] Failed finding similar files: %s", err) + //log.Printf("[ERROR] Couldn't find any similar files: %s", err) } else { for _, item := range files { diff --git a/go.mod b/go.mod index c012da9a..85a31bef 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,9 @@ module github.com/shuffle/shuffle-shared -// Keep on 1.11 until AppEngine supports 1.17 or higher - go 1.11 //replace github.com/frikky/kin-openapi => ../kin-openapi -replace github.com/shuffle/opensearch-go => ../opensearch-go +//replace github.com/shuffle/opensearch-go => ../opensearch-go require ( cloud.google.com/go/datastore v1.4.0 @@ -16,7 +14,7 @@ require ( github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 github.com/bradfitz/slice v0.0.0-20180809154707-2b758aa73013 github.com/frikky/kin-openapi v0.41.0 - github.com/frikky/schemaless v0.0.9 + github.com/frikky/schemaless v0.0.11 github.com/go-git/go-billy/v5 v5.5.0 github.com/go-git/go-git/v5 v5.11.0 github.com/google/go-github/v28 v28.1.1 diff --git a/kms.go b/kms.go index 65667678..19f59537 100644 --- a/kms.go +++ b/kms.go @@ -23,7 +23,8 @@ import ( openai "github.com/sashabaranov/go-openai" ) -var model = "gpt-4-turbo-preview" +//var model = "gpt-4-turbo-preview" +var model = "gpt-4o" func GetKmsCache(ctx context.Context, auth AppAuthenticationStorage, key string) (string, error) { //log.Printf("\n\n[DEBUG] Getting KMS cache for key %s\n\n", key) @@ -682,30 +683,7 @@ func RunSelfCorrectingRequest(action Action, status int, additionalInfo, outputB log.Printf("\n\nTOKENS (AUTOFIX API~): In: %d, Out: %d\n\n", (len(systemMessage)+len(inputData))/4, len(contentOutput)/4) - if strings.Contains(contentOutput, "```") { - // Handle ```json - start := strings.Index(contentOutput, "```json") - end := strings.Index(contentOutput, "```") - if start != -1 { - end = strings.Index(contentOutput[start+7:], "```") - } - - if start != -1 && end != -1 { - contentOutput = contentOutput[start+7 : end+7] - } - } - - if strings.Contains(contentOutput, "```") { - start := strings.Index(contentOutput, "```") - end := strings.Index(contentOutput[start+3:], "```") - if start != -1 { - end = strings.Index(contentOutput[start+3:], "```") - } - - if start != -1 && end != -1 { - contentOutput = contentOutput[start+3 : end+3] - } - } + contentOutput = FixContentOutput(contentOutput) log.Printf("[INFO] Autocorrect output: %s", contentOutput) @@ -925,29 +903,7 @@ func UpdateActionBody(action WorkflowAppAction) (string, error) { return "", err } - if strings.Contains(contentOutput, "```json") { - start := strings.Index(contentOutput, "```json") - end := strings.Index(contentOutput, "```") - if start != -1 { - end = strings.Index(contentOutput[start+8:], "```") - } - - if start != -1 && end != -1 { - contentOutput = contentOutput[start+7 : end+7] - } - } - - if strings.Contains(contentOutput, "```") { - start := strings.Index(contentOutput, "```") - end := strings.Index(contentOutput[start+3:], "```") - if start != -1 { - end = strings.Index(contentOutput[start+3:], "```") - } - - if start != -1 && end != -1 { - contentOutput = contentOutput[start+3 : end+3] - } - } + contentOutput = FixContentOutput(contentOutput) output := map[string]interface{}{} err = json.Unmarshal([]byte(contentOutput), &output) @@ -1213,3 +1169,32 @@ func uploadParameterBase(ctx context.Context, orgId, appId, actionName, paramNam return nil } + +func FixContentOutput(contentOutput string) string { + if strings.Contains(contentOutput, "```") { + // Handle ```json + start := strings.Index(contentOutput, "```json") + end := strings.Index(contentOutput, "```") + if start != -1 { + end = strings.Index(contentOutput[start+7:], "```") + } + + if start != -1 && end != -1 { + contentOutput = contentOutput[start+7 : end+7] + } + } + + if strings.Contains(contentOutput, "```") { + start := strings.Index(contentOutput, "```") + end := strings.Index(contentOutput[start+3:], "```") + if start != -1 { + end = strings.Index(contentOutput[start+3:], "```") + } + + if start != -1 && end != -1 { + contentOutput = contentOutput[start+3 : end+3] + } + } + + return contentOutput +} diff --git a/oauth2.go b/oauth2.go index 7a644226..32ba9e37 100755 --- a/oauth2.go +++ b/oauth2.go @@ -5,6 +5,7 @@ package shuffle import ( "bytes" "context" + //"regexp" "crypto/sha256" "encoding/base64" "encoding/json" diff --git a/pipelines.go b/pipelines.go index 53150d11..ed6b696b 100644 --- a/pipelines.go +++ b/pipelines.go @@ -2,6 +2,8 @@ package shuffle import ( "encoding/json" + "errors" + "context" "fmt" "io/ioutil" "log" @@ -92,7 +94,7 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) } } - if !envFound { + if !envFound && pipeline.Type != "delete"{ log.Printf("[WARNING] Environment '%s' is not available", pipeline.Environment) resp.WriteHeader(400) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Environment '%s' is not available. Please make it, or change the environment you want to deploy to."}`, pipeline.Environment))) @@ -118,11 +120,24 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) return } - // 1. Add to trigger list - /* TBD */ - // Look for PIPELINE_ command that exists in the queue already startCommand := strings.ToUpper(strings.Split(pipeline.Type, " ")[0]) + + //check if this is the first time creating the pipeline + pipelineInfo, err := GetPipeline(ctx, pipeline.TriggerId) + if err != nil { + if (startCommand == "DELETE" || startCommand == "STOP") && err.Error() == "pipeline doesn't exist" { + log.Printf("[WARNING] Failed getting pipeline %s, reason: %s", pipeline.TriggerId, err) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } else if startCommand == "START" && err.Error() == "pipeline doesn't exist" { + startCommand = "CREATE" + } + } else if startCommand == "CREATE" { + startCommand = "START" + } + //parsedId := fmt.Sprintf("%s_%s", strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(pipeline.Environment, " ", "-"), "_", "-")), user.ActiveOrg.Id) parsedId := strings.ToLower(pipeline.Environment) formattedType := fmt.Sprintf("PIPELINE_%s", startCommand) @@ -147,26 +162,48 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) Priority: 11, } - if startCommand == "CREATE" { + pipelineData := Pipeline{} + + if startCommand == "DELETE" { + + err := deletePipeline(ctx, *pipelineInfo) + if err != nil { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Failed deleting the pipeline."}`)) + return + } + + } else if startCommand == "STOP" { + + pipelineInfo.Status = "stopped" + err = setPipelineTrigger(ctx, *pipelineInfo) + if err != nil { + log.Printf("[ERROR] Failed to stop the pipeline with trigger id: %s, reason: %s", pipelineInfo.TriggerId, err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false}`)) + return + } + log.Printf("[INFO] Stopped the pipeline %s sucessfully", pipelineInfo.TriggerId) + } else { - pipelineData := Pipeline{} pipelineData.Name = pipeline.Name pipelineData.Type = startCommand pipelineData.Command = pipeline.Command pipelineData.Environment = pipeline.Environment pipelineData.WorkflowId = pipeline.WorkflowId pipelineData.OrgId = user.ActiveOrg.Id - pipelineData.Status = "uninitialized" + pipelineData.Owner = user.Id + pipelineData.Status = "running" pipelineData.TriggerId = pipeline.TriggerId - err = savePipelineData(ctx, pipelineData) + err = setPipelineTrigger(ctx, pipelineData) if err != nil { - log.Printf("[ERROR] Failed to save the pipeline with trigger id: %s into the db: %s", pipeline.TriggerId, err) + log.Printf("[ERROR] Failed to create the pipeline with trigger id: %s, reason: %s", pipeline.TriggerId, err) resp.WriteHeader(500) resp.Write([]byte(`{"success": false}`)) return } - log.Printf("[INFO] Successfully saved the pipeline info") + log.Printf("[INFO] Set up pipeline with trigger ID %s and environment %s", pipeline.TriggerId, pipeline.Environment) } err = SetWorkflowQueue(ctx, execRequest, parsedId) @@ -180,3 +217,44 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) resp.WriteHeader(200) resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Pipeline will be created"}`))) } + + +func setPipelineTrigger(ctx context.Context, pipeline Pipeline) error{ + + input := pipeline.Command + index := strings.Index(input, "to ") + + if index == -1 { + return errors.New("url not found") + } + extractedURL := input[index+len("to "):] + extractedURL = strings.TrimSpace(extractedURL) + + pipeline.Url = extractedURL + err := savePipelineData(ctx, pipeline) + + if err != nil { + return err + } + + return nil +} + +func deletePipeline(ctx context.Context, pipeline Pipeline) error { + + pipeline.Status = "stopped" + err := savePipelineData(ctx, pipeline) + if err != nil { + log.Printf("[WARNING] Failed saving pipeline: %s", err) + return err + } + + err = DeleteKey(ctx, "pipelines", pipeline.TriggerId) + if err != nil { + log.Printf("[WARNING] Error deleting pipeline %s, reason: %s", pipeline.TriggerId) + return err + } + + log.Printf("[INFO] Successfully deleted pipeline %s", pipeline.TriggerId) + return nil +} \ No newline at end of file diff --git a/shared.go b/shared.go index ef71578b..346f5ef1 100755 --- a/shared.go +++ b/shared.go @@ -3786,6 +3786,7 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) { ctx := GetContext(request) var workflows []Workflow + /* cacheKey := fmt.Sprintf("%s_workflows", user.ActiveOrg.Id) cache, err := GetCache(ctx, cacheKey) if err == nil { @@ -3799,17 +3800,18 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) { } else { //log.Printf("[INFO] Failed getting cache for workflows for user %s", user.Id) } + */ workflows, err = GetAllWorkflowsByQuery(ctx, user) if err != nil { log.Printf("[WARNING] Failed getting workflows for user %s (0): %s", user.Username, err) - resp.WriteHeader(401) + resp.WriteHeader(400) resp.Write([]byte(`{"success": false}`)) return } if len(workflows) == 0 { - log.Printf("[INFO] No workflows found for user %s", user.Username) + log.Printf("[INFO] No workflows found for user %s (%s) in org %s (%s)", user.Username, user.Id, user.ActiveOrg.Name, user.ActiveOrg.Id) resp.WriteHeader(200) resp.Write([]byte("[]")) return @@ -3826,6 +3828,10 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) { continue } + if project.Environment == "cloud" && workflow.ExecutionEnvironment == "onprem" { + continue + } + newActions := []Action{} for _, action := range workflow.Actions { // Removed because of exports. These are needed there. @@ -3835,18 +3841,43 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) { newActions = append(newActions, action) } - workflow.Actions = newActions + //workflow.Actions = newActions + // Skipping these as they're related to onprem workflows in cloud (orborus) - if project.Environment == "cloud" && workflow.ExecutionEnvironment == "onprem" { - continue - } usecaseIds = append(usecaseIds, workflow.UsecaseIds...) newWorkflows = append(newWorkflows, workflow) } - workflows = newWorkflows + //log.Printf("[DEBUG] Env: %s, workflows: %d", project.Environment, len(newWorkflows)) + if project.Environment == "cloud" && len(newWorkflows) > 15 { + log.Printf("[DEBUG] Removed workflow actions & images for user %s (%s) in org %s (%s)", user.Username, user.Id, user.ActiveOrg.Name, user.ActiveOrg.Id) + + for workflowIndex, _ := range newWorkflows { + newWorkflows[workflowIndex].Actions = []Action{} + newWorkflows[workflowIndex].Triggers = []Trigger{} + newWorkflows[workflowIndex].Branches = []Branch{} + newWorkflows[workflowIndex].VisualBranches = []Branch{} + newWorkflows[workflowIndex].Image = "" + + newWorkflows[workflowIndex].Description = "" + newWorkflows[workflowIndex].Blogpost = "" + + if len(newWorkflows[workflowIndex].Org) > 0 { + for orgIndex, _ := range newWorkflows[workflowIndex].Org { + newWorkflows[workflowIndex].Org[orgIndex].Image = "" + } + } + + newWorkflows[workflowIndex].ExecutingOrg.Image = "" + } + + // Add header that this is a limited response + resp.Header().Set("X-Shuffle-Truncated", "true") + } else { + log.Printf("[DEBUG] Loading without truncating for user %s (%s) in org %s (%s)", user.Username, user.Id, user.ActiveOrg.Name, user.ActiveOrg.Id) + } // Get the org as well to manage priorities // Only happens on first load, so it's like once per session~ @@ -3876,17 +3907,19 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) { //log.Printf("[INFO] Returning %d workflows", len(newWorkflows)) newjson, err := json.Marshal(newWorkflows) if err != nil { - resp.WriteHeader(401) + resp.WriteHeader(500) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflows"}`))) return } + /* if project.CacheDb { err = SetCache(ctx, cacheKey, newjson, 30) if err != nil { - log.Printf("[WARNING] Failed updating workflow cache: %s", err) + log.Printf("[ERROR] Failed updating workflow cache for org %s: %s", user.ActiveOrg.Id, err) } } + */ resp.WriteHeader(200) resp.Write(newjson) @@ -4349,6 +4382,47 @@ func HandleGetSchedules(resp http.ResponseWriter, request *http.Request) { resp.Write(newjson) } +func HandleGetHooks(resp http.ResponseWriter, request *http.Request) { + cors := HandleCors(resp, request) + if cors { + return + } + + user, err := HandleApiAuthentication(resp, request) + if err != nil { + log.Printf("[WARNING] Api authentication failed in get hooks: %s", err) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } + + if user.Role != "admin" { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Admin required"}`)) + return + } + + ctx := GetContext(request) + hooks, err := GetHooks(ctx, user.ActiveOrg.Id) + if err != nil { + log.Printf("[WARNING] Failed getting hooks: %s", err) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Couldn't get hooks"}`)) + return + } + + newjson, err := json.Marshal(hooks) + if err != nil { + log.Printf("Failed unmarshal: %s", err) + resp.WriteHeader(401) + resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking environments"}`))) + return + } + + resp.WriteHeader(200) + resp.Write(newjson) +} + func HandleUpdateUser(resp http.ResponseWriter, request *http.Request) { cors := HandleCors(resp, request) if cors { @@ -7208,7 +7282,9 @@ func HandleGetUsers(resp http.ResponseWriter, request *http.Request) { item.Executions = ExecutionInfo{} item.Limits = UserLimits{} item.PrivateApps = []WorkflowApp{} - item.MFA = MFAInfo{} + item.MFA = MFAInfo{ + Active: item.MFA.Active, + } if !user.SupportAccess { item.LoginInfo = []LoginInfo{} @@ -10220,6 +10296,11 @@ func HandleDeleteHook(resp http.ResponseWriter, request *http.Request) { fileId = location[4] } + // Check if fileId has the prefix "webhook_" + if strings.HasPrefix(fileId, "webhook_") { + fileId = strings.TrimPrefix(fileId, "webhook_") + } + if len(fileId) != 36 { resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Workflow ID when deleting hook is not valid"}`)) @@ -10395,7 +10476,7 @@ func GetWorkflowAppConfig(resp http.ResponseWriter, request *http.Request) { //log.Printf("SHARING: %s. PUBLIC: %s", app.Sharing, app.Public) if app.Sharing || app.Public { if openapiok && len(openapi) > 0 && strings.ToLower(openapi[0]) == "false" { - log.Printf("Should return WITHOUT openapi") + log.Printf("[DEBUG] Returning app '%s' without OpenAPI", fileId) } else { //log.Printf("CAN SHARE APP!") parsedApi, err := GetOpenApiDatastore(ctx, fileId) @@ -10976,10 +11057,50 @@ func HandleLogin(resp http.ResponseWriter, request *http.Request) { if userdata.ActiveOrg.Id == "" { if len(userdata.Orgs) == 0 { - log.Printf("[ERROR] User %s (%s) has no chosen org. ID: %s, Name: %s", userdata.Username, userdata.Id, userdata.ActiveOrg.Id, userdata.ActiveOrg.Name) - resp.WriteHeader(400) - resp.Write([]byte(`{"success": false, "reason": "No organization available. Please contact your team, or the shuffle if you think there has been a mistake: support@shuffler.io"}`)) - return + log.Printf("[WARNING] User %s (%s) has no chosen org. ID: %s, Name: %s. Creating a default org", userdata.Username, userdata.Id, userdata.ActiveOrg.Id, userdata.ActiveOrg.Name) + orgSetupName := "default" + orgId := uuid.NewV4().String() + newOrg := Org{ + Name: orgSetupName, + Id: orgId, + Org: orgSetupName, + Users: []User{userdata}, + Roles: userdata.Roles, + CloudSync: false, + } + + err := SetOrg(ctx, newOrg, newOrg.Id) + + if err != nil { + log.Printf("[ERROR] Failed setting default org for the user: %s", userdata.Username) + } else { + log.Printf("[DEBUG] Successfully created the default org!") + + defaultEnv := os.Getenv("ORG_ID") + if len(defaultEnv) == 0 { + defaultEnv = "Shuffle" + log.Printf("[DEBUG] Setting default environment for org to %s", defaultEnv) + } + + item := Environment{ + Name: defaultEnv, + Type: "onperm", + OrgId: orgId, + Default: true, + Id: uuid.NewV4().String(), + } + + err := SetEnvironment(ctx, &item) + if err != nil { + log.Printf("[ERROR] Failed setting up new environment for new org: %s", err) + } + + userdata.Orgs = append(userdata.Orgs, newOrg.Id) + } + + // resp.WriteHeader(400) + // resp.Write([]byte(`{"success": false, "reason": "No organization available. Please contact your team, or the shuffle if you think there has been a mistake: support@shuffler.io"}`)) + // return } userdata.ActiveOrg.Id = userdata.Orgs[0] @@ -11977,7 +12098,16 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut setWorkflow := false if strings.ToLower(actionResult.Action.Environment) != "cloud" { if project.Environment == "worker" { - log.Printf("\n\n\n[DEBUG] NOT modifying workflow based on User Input as we are in worker\n\n\n") + + if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" { + //log.Printf("\n\n\n[DEBUG] MODIFYING workflow based on User Input as we are in swarm\n\n\n") + workflowExecution.Status = "WAITING" + workflowExecution.Results = append(workflowExecution.Results, actionResult) + setWorkflow = true + } else { + log.Printf("\n\n\n[DEBUG] NOT modifying workflow based on User Input as we are in worker\n\n\n") + } + } else { // Find the waiting node and change it to this result workflowExecution.Status = "WAITING" @@ -17240,7 +17370,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h } result.CompletedAt = int64(time.Now().Unix()) * 1000 - log.Printf("\n\n[INFO][%s] Setting result to %s\n\n", oldExecution.ExecutionId, result.Action.Label) + log.Printf("[INFO][%s] Setting result to %s", oldExecution.ExecutionId, result.Action.Label) sendSelfRequest := false if answer[0] == "false" { @@ -17300,7 +17430,6 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h } } - log.Printf("\n\n\n[DEBUG][%s] SelfReq: %t, Env: %s\n\n\n", result.ExecutionId, sendSelfRequest, result.Action.Environment) if sendSelfRequest || strings.ToLower(result.Action.Environment) == "cloud" { log.Printf("[DEBUG][%s] Sending User Input result to self because we are on cloud env/action is skipped", result.ExecutionId) @@ -20859,7 +20988,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) { return } - log.Printf("[INFO] Running category-action '%s' in category '%s' for org %s (%s)", value.Label, value.Category, user.ActiveOrg.Name, user.ActiveOrg.Id) + log.Printf("\n\n\n[INFO] Running category-action '%s' in category '%s' for org %s (%s)\n\n\n", value.Label, value.Category, user.ActiveOrg.Name, user.ActiveOrg.Id) if len(value.Query) > 0 { // Check if app authentication. If so, check if intent is to actually authenticate, or find the actual intent @@ -20868,6 +20997,15 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) { } } + if value.Label == "use_app" { + for _, field := range value.Fields { + if field.Key == "action" { + log.Printf("[INFO] NOT IMPLEMENTED: Changing to action label '%s' from use_app", field.Value) + //value.Label = field.Value + break + } + } + } threadId := value.WorkflowId @@ -20922,7 +21060,8 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) { _ = labelIndex - //log.Printf("[INFO] Found label '%s' in category '%s'. Indexes for category: %d, and label: %d", value.Label, value.Category, foundIndex, labelIndex) + log.Printf("\n\n[INFO] Found label '%s' in category '%s'. Indexes for category: %d, and label: %d\n\n", value.Label, value.Category, foundIndex, labelIndex) + newapps, err := GetPrioritizedApps(ctx, user) if err != nil { log.Printf("[WARNING] Failed getting apps in category action: %s", err) @@ -21028,7 +21167,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) { if strings.ReplaceAll(strings.ToLower(action.CategoryLabel[0]), " ", "_") == value.Label { selectedAction = action - //log.Printf("[INFO] Found label %s in app %s. ActionName: %s", value.Label, app.Name, action.Name) + log.Printf("[INFO] Found label %s in app %s. ActionName: %s", value.Label, app.Name, action.Name) break } } @@ -21257,13 +21396,18 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) { // FIXME: Check if ALL fields for the target app can be fullfiled // E.g. for Jira: Org_id is required. + + /* if foundFields != len(baseFields) { log.Printf("[WARNING] Not all required fields were found in category action. Want: %#v, have: %#v", baseFields, value.Fields) + resp.WriteHeader(400) - resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Not all required fields are set", "label": "%s", "missing_fields": "%s"}`, value.Label, strings.Join(missingFields, ",")))) + resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Not all required fields are set. This can be autocompleted from fields you fille in", "label": "%s", "missing_fields": "%s"}`, value.Label, strings.Join(missingFields, ",")))) return } + */ + _ = baseFields /* @@ -24055,60 +24199,59 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { resp.Write(respBody) } -func HandleSavePipelineInfo(resp http.ResponseWriter, request *http.Request) { - cors := HandleCors(resp, request) - if cors { - return - } - - // How do I make sure that Orborus is the one that made this request? - - var requestBody Pipeline - err := json.NewDecoder(request.Body).Decode(&requestBody) - if err != nil { - log.Printf("[WARNING] Failed to decode request body: %s", err) - resp.WriteHeader(401) - resp.Write([]byte(`{"success": false}`)) - return - } - if len(requestBody.TriggerId) == 0 || len(requestBody.PipelineId) == 0 || len(requestBody.Status) == 0 { - log.Printf("[WARNING] Missing fields in the request body") - resp.WriteHeader(400) - resp.Write([]byte(`{"success": false, "reason": "Missing fields in the request body"}`)) - return - } - - ctx := GetContext(request) - pipeline, err := GetPipeline(ctx, requestBody.TriggerId) - log.Printf("[HARI TESTING] trigger id is %s", requestBody.TriggerId) - if err != nil { - if strings.Contains(fmt.Sprintf("%s", err), "pipeline doesn't exist") { - log.Printf("[DEBUG] no matching document found for Pipeline: %s", requestBody.PipelineId) - resp.WriteHeader(404) - resp.Write([]byte(`{"success": false, "reason": "pipeline not found"}`)) - return - } else { - log.Printf("[WARNING] Failed getting pipeline: %s due to %s", requestBody.PipelineId, err) - resp.WriteHeader(500) - resp.Write([]byte(`{"success": false}`)) - return - } - } - - pipeline.PipelineId = requestBody.PipelineId - pipeline.Status = requestBody.Status - - err = savePipelineData(ctx, *pipeline) - if err != nil { - log.Printf("[WARNING] Failed updating pipeline with ID: %s due to %s", pipeline.PipelineId, err) - resp.WriteHeader(500) - resp.Write([]byte(`{"success": false}`)) - return - } - log.Printf("[INFO] Sucessfully saved pipeline: %s", pipeline.PipelineId) - resp.WriteHeader(200) - resp.Write([]byte(`{"success": true}`)) -} +// func HandleSavePipelineInfo(resp http.ResponseWriter, request *http.Request) { +// cors := HandleCors(resp, request) +// if cors { +// return +// } + +// // How do I make sure that Orborus is the one that made this request? + +// var requestBody Pipeline +// err := json.NewDecoder(request.Body).Decode(&requestBody) +// if err != nil { +// log.Printf("[WARNING] Failed to decode request body: %s", err) +// resp.WriteHeader(401) +// resp.Write([]byte(`{"success": false}`)) +// return +// } +// if len(requestBody.TriggerId) == 0 || len(requestBody.PipelineId) == 0 || len(requestBody.Status) == 0 { +// log.Printf("[WARNING] Missing fields in the request body") +// resp.WriteHeader(400) +// resp.Write([]byte(`{"success": false, "reason": "Missing fields in the request body"}`)) +// return +// } + +// ctx := GetContext(request) +// pipeline, err := GetPipeline(ctx, requestBody.TriggerId) +// if err != nil { +// if strings.Contains(fmt.Sprintf("%s", err),"pipeline doesn't exist"){ +// log.Printf("[DEBUG] no matching document found for Pipeline: %s", requestBody.PipelineId) +// resp.WriteHeader(404) +// resp.Write([]byte(`{"success": false, "reason": "pipeline not found"}`)) +// return +// } else { +// log.Printf("[WARNING] Failed getting pipeline: %s due to %s", requestBody.PipelineId, err) +// resp.WriteHeader(500) +// resp.Write([]byte(`{"success": false}`)) +// return +// } +// } + +// pipeline.PipelineId = requestBody.PipelineId +// pipeline.Status = requestBody.Status + +// err = savePipelineData(ctx, *pipeline) +// if err != nil { +// log.Printf("[WARNING] Failed updating pipeline with ID: %s due to %s", pipeline.PipelineId, err) +// resp.WriteHeader(500) +// resp.Write([]byte(`{"success": false}`)) +// return +// } +// log.Printf("[INFO] Sucessfully saved pipeline: %s", pipeline.PipelineId) +// resp.WriteHeader(200) +// resp.Write([]byte(`{"success": true}`)) +// } func LoadUsecases(resp http.ResponseWriter, request *http.Request) { cors := HandleCors(resp, request) @@ -24524,8 +24667,8 @@ func GetStandardDestWorkflow(app *WorkflowApp, action string, enrich bool) *Work TriggerType: "SUBFLOW", Position: Position{ - X: 0, - Y: 150, + X: 150, + Y: 150, }, Parameters: []WorkflowAppActionParameter{ diff --git a/structs.go b/structs.go index 3d8f9fd8..e7a60095 100755 --- a/structs.go +++ b/structs.go @@ -20,24 +20,27 @@ type PipelineRequest struct { Command string `json:"command"` Environment string `json:"environment"` WorkflowId string `json:"workflow_id"` + StartNode string `json:"start_node"` PipelineId string `json:"pipeline_id"` TriggerId string `json:"trigger_id"` } type Pipeline struct { - Name string `json:"name"` - Type string `json:"type"` - Command string `json:"command"` - Environment string `json:"environment"` - WorkflowId string `json:"workflow_id"` - StartNode string `json:"start_node"` - OrgId string `json:"org_id"` - Status string `json:"status"` - Errors []string `json:"errors"` - - PipelineId string `json:"pipeline_id"` - TriggerId string `json:"trigger_id"` + Name string `json:"name" datastore:"name"` + Type string `json:"type" datastore:"type"` + Command string `json:"command" datastore:"command"` + Environment string `json:"environment" datastore:"environment"` + WorkflowId string `json:"workflow_id" datastore:"workflow_id"` + StartNode string `json:"start_node" datastore:"start_node"` + OrgId string `json:"org_id" datastore:"org_id"` + Status string `json:"status" datastore:"status"` + Errors []string `json:"errors" datastore:"errors"` + Url string `json:"url" datastore:"url"` + Owner string `json:"owner" datastore:"owner"` + + PipelineId string `json:"pipeline_id" datastore:"pipeline_id"` + TriggerId string `json:"trigger_id" datastore:"trigger_id"` } type PipelineWrapper struct { @@ -3803,6 +3806,7 @@ type StructuredCategoryAction struct { WorkflowId string `json:"workflow_id"` ExecutionId string `json:"execution_id"` Action string `json:"action"` + Label string `json:"label"` Category string `json:"category"` Apps []WorkflowApp `json:"apps"` @@ -3811,6 +3815,7 @@ type StructuredCategoryAction struct { AvailableLabels []string `json:"available_labels"` ThreadId string `json:"thread_id"` RunId string `json:"run_id"` + MissingFields []string `json:"missing_fields"` Translated bool `json:"translated"` }