diff --git a/db-connector.go b/db-connector.go index 39a0f24..c9a7868 100755 --- a/db-connector.go +++ b/db-connector.go @@ -8215,10 +8215,11 @@ func savePipelineData(ctx context.Context, pipeline Pipeline) error { return err } } else { - // key := datastore.NameKey(nameKey, pipelineId, nil) - // if _, err := project.Dbclient.Put(ctx, key, &pipeline); err != nil { - // log.Printf("[ERROR] failed to add pipeline: %s", err) - // return err + key := datastore.NameKey(nameKey, triggerId, nil) + if _, err := project.Dbclient.Put(ctx, key, &pipeline); err != nil { + log.Printf("[ERROR] failed to add pipeline: %s", err) + return err + } } return nil diff --git a/pipelines.go b/pipelines.go index 53150d1..ed6b696 100644 --- a/pipelines.go +++ b/pipelines.go @@ -2,6 +2,8 @@ package shuffle import ( "encoding/json" + "errors" + "context" "fmt" "io/ioutil" "log" @@ -92,7 +94,7 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) } } - if !envFound { + if !envFound && pipeline.Type != "delete"{ log.Printf("[WARNING] Environment '%s' is not available", pipeline.Environment) resp.WriteHeader(400) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Environment '%s' is not available. Please make it, or change the environment you want to deploy to."}`, pipeline.Environment))) @@ -118,11 +120,24 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) return } - // 1. Add to trigger list - /* TBD */ - // Look for PIPELINE_ command that exists in the queue already startCommand := strings.ToUpper(strings.Split(pipeline.Type, " ")[0]) + + //check if this is the first time creating the pipeline + pipelineInfo, err := GetPipeline(ctx, pipeline.TriggerId) + if err != nil { + if (startCommand == "DELETE" || startCommand == "STOP") && err.Error() == "pipeline doesn't exist" { + log.Printf("[WARNING] Failed getting pipeline %s, reason: %s", pipeline.TriggerId, err) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } else if startCommand == "START" && err.Error() == "pipeline doesn't exist" { + startCommand = "CREATE" + } + } else if startCommand == "CREATE" { + startCommand = "START" + } + //parsedId := fmt.Sprintf("%s_%s", strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(pipeline.Environment, " ", "-"), "_", "-")), user.ActiveOrg.Id) parsedId := strings.ToLower(pipeline.Environment) formattedType := fmt.Sprintf("PIPELINE_%s", startCommand) @@ -147,26 +162,48 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) Priority: 11, } - if startCommand == "CREATE" { + pipelineData := Pipeline{} + + if startCommand == "DELETE" { + + err := deletePipeline(ctx, *pipelineInfo) + if err != nil { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Failed deleting the pipeline."}`)) + return + } + + } else if startCommand == "STOP" { + + pipelineInfo.Status = "stopped" + err = setPipelineTrigger(ctx, *pipelineInfo) + if err != nil { + log.Printf("[ERROR] Failed to stop the pipeline with trigger id: %s, reason: %s", pipelineInfo.TriggerId, err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false}`)) + return + } + log.Printf("[INFO] Stopped the pipeline %s sucessfully", pipelineInfo.TriggerId) + } else { - pipelineData := Pipeline{} pipelineData.Name = pipeline.Name pipelineData.Type = startCommand pipelineData.Command = pipeline.Command pipelineData.Environment = pipeline.Environment pipelineData.WorkflowId = pipeline.WorkflowId pipelineData.OrgId = user.ActiveOrg.Id - pipelineData.Status = "uninitialized" + pipelineData.Owner = user.Id + pipelineData.Status = "running" pipelineData.TriggerId = pipeline.TriggerId - err = savePipelineData(ctx, pipelineData) + err = setPipelineTrigger(ctx, pipelineData) if err != nil { - log.Printf("[ERROR] Failed to save the pipeline with trigger id: %s into the db: %s", pipeline.TriggerId, err) + log.Printf("[ERROR] Failed to create the pipeline with trigger id: %s, reason: %s", pipeline.TriggerId, err) resp.WriteHeader(500) resp.Write([]byte(`{"success": false}`)) return } - log.Printf("[INFO] Successfully saved the pipeline info") + log.Printf("[INFO] Set up pipeline with trigger ID %s and environment %s", pipeline.TriggerId, pipeline.Environment) } err = SetWorkflowQueue(ctx, execRequest, parsedId) @@ -180,3 +217,44 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) resp.WriteHeader(200) resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Pipeline will be created"}`))) } + + +func setPipelineTrigger(ctx context.Context, pipeline Pipeline) error{ + + input := pipeline.Command + index := strings.Index(input, "to ") + + if index == -1 { + return errors.New("url not found") + } + extractedURL := input[index+len("to "):] + extractedURL = strings.TrimSpace(extractedURL) + + pipeline.Url = extractedURL + err := savePipelineData(ctx, pipeline) + + if err != nil { + return err + } + + return nil +} + +func deletePipeline(ctx context.Context, pipeline Pipeline) error { + + pipeline.Status = "stopped" + err := savePipelineData(ctx, pipeline) + if err != nil { + log.Printf("[WARNING] Failed saving pipeline: %s", err) + return err + } + + err = DeleteKey(ctx, "pipelines", pipeline.TriggerId) + if err != nil { + log.Printf("[WARNING] Error deleting pipeline %s, reason: %s", pipeline.TriggerId) + return err + } + + log.Printf("[INFO] Successfully deleted pipeline %s", pipeline.TriggerId) + return nil +} \ No newline at end of file diff --git a/shared.go b/shared.go index 442bd76..5f4fd18 100755 --- a/shared.go +++ b/shared.go @@ -24079,61 +24079,59 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { resp.Write(respBody) } -func HandleSavePipelineInfo(resp http.ResponseWriter, request *http.Request) { - cors := HandleCors(resp, request) - if cors { - return - } - - // How do I make sure that Orborus is the one that made this request? - - var requestBody Pipeline - err := json.NewDecoder(request.Body).Decode(&requestBody) - if err != nil { - log.Printf("[WARNING] Failed to decode request body: %s", err) - resp.WriteHeader(401) - resp.Write([]byte(`{"success": false}`)) - return - } - if len(requestBody.TriggerId) == 0 || len(requestBody.PipelineId) == 0 || len(requestBody.Status) == 0 { - log.Printf("[WARNING] Missing fields in the request body") - resp.WriteHeader(400) - resp.Write([]byte(`{"success": false, "reason": "Missing fields in the request body"}`)) - return - } - - ctx := GetContext(request) - pipeline, err := GetPipeline(ctx, requestBody.TriggerId) - log.Printf("[HARI TESTING] trigger id is %s", requestBody.TriggerId) - if err != nil { - if strings.Contains(fmt.Sprintf("%s", err),"pipeline doesn't exist"){ - log.Printf("[DEBUG] no matching document found for Pipeline: %s", requestBody.PipelineId) - resp.WriteHeader(404) - resp.Write([]byte(`{"success": false, "reason": "pipeline not found"}`)) - return - } else { - log.Printf("[WARNING] Failed getting pipeline: %s due to %s", requestBody.PipelineId, err) - resp.WriteHeader(500) - resp.Write([]byte(`{"success": false}`)) - return - } - } - - pipeline.PipelineId = requestBody.PipelineId - pipeline.Status = requestBody.Status - - - err = savePipelineData(ctx, *pipeline) - if err != nil { - log.Printf("[WARNING] Failed updating pipeline with ID: %s due to %s", pipeline.PipelineId, err) - resp.WriteHeader(500) - resp.Write([]byte(`{"success": false}`)) - return - } - log.Printf("[INFO] Sucessfully saved pipeline: %s", pipeline.PipelineId) - resp.WriteHeader(200) - resp.Write([]byte(`{"success": true}`)) -} +// func HandleSavePipelineInfo(resp http.ResponseWriter, request *http.Request) { +// cors := HandleCors(resp, request) +// if cors { +// return +// } + +// // How do I make sure that Orborus is the one that made this request? + +// var requestBody Pipeline +// err := json.NewDecoder(request.Body).Decode(&requestBody) +// if err != nil { +// log.Printf("[WARNING] Failed to decode request body: %s", err) +// resp.WriteHeader(401) +// resp.Write([]byte(`{"success": false}`)) +// return +// } +// if len(requestBody.TriggerId) == 0 || len(requestBody.PipelineId) == 0 || len(requestBody.Status) == 0 { +// log.Printf("[WARNING] Missing fields in the request body") +// resp.WriteHeader(400) +// resp.Write([]byte(`{"success": false, "reason": "Missing fields in the request body"}`)) +// return +// } + +// ctx := GetContext(request) +// pipeline, err := GetPipeline(ctx, requestBody.TriggerId) +// if err != nil { +// if strings.Contains(fmt.Sprintf("%s", err),"pipeline doesn't exist"){ +// log.Printf("[DEBUG] no matching document found for Pipeline: %s", requestBody.PipelineId) +// resp.WriteHeader(404) +// resp.Write([]byte(`{"success": false, "reason": "pipeline not found"}`)) +// return +// } else { +// log.Printf("[WARNING] Failed getting pipeline: %s due to %s", requestBody.PipelineId, err) +// resp.WriteHeader(500) +// resp.Write([]byte(`{"success": false}`)) +// return +// } +// } + +// pipeline.PipelineId = requestBody.PipelineId +// pipeline.Status = requestBody.Status + +// err = savePipelineData(ctx, *pipeline) +// if err != nil { +// log.Printf("[WARNING] Failed updating pipeline with ID: %s due to %s", pipeline.PipelineId, err) +// resp.WriteHeader(500) +// resp.Write([]byte(`{"success": false}`)) +// return +// } +// log.Printf("[INFO] Sucessfully saved pipeline: %s", pipeline.PipelineId) +// resp.WriteHeader(200) +// resp.Write([]byte(`{"success": true}`)) +// } func LoadUsecases(resp http.ResponseWriter, request *http.Request) { cors := HandleCors(resp, request) diff --git a/structs.go b/structs.go index 5d54104..33693ca 100755 --- a/structs.go +++ b/structs.go @@ -20,24 +20,27 @@ type PipelineRequest struct { Command string `json:"command"` Environment string `json:"environment"` WorkflowId string `json:"workflow_id"` + StartNode string `json:"start_node"` PipelineId string `json:"pipeline_id"` TriggerId string `json:"trigger_id"` } type Pipeline struct { - Name string `json:"name"` - Type string `json:"type"` - Command string `json:"command"` - Environment string `json:"environment"` - WorkflowId string `json:"workflow_id"` - StartNode string `json:"start_node"` - OrgId string `json:"org_id"` - Status string `json:"status"` - Errors []string `json:"errors"` - - PipelineId string `json:"pipeline_id"` - TriggerId string `json:"trigger_id"` + Name string `json:"name" datastore:"name"` + Type string `json:"type" datastore:"type"` + Command string `json:"command" datastore:"command"` + Environment string `json:"environment" datastore:"environment"` + WorkflowId string `json:"workflow_id" datastore:"workflow_id"` + StartNode string `json:"start_node" datastore:"start_node"` + OrgId string `json:"org_id" datastore:"org_id"` + Status string `json:"status" datastore:"status"` + Errors []string `json:"errors" datastore:"errors"` + Url string `json:"url" datastore:"url"` + Owner string `json:"owner" datastore:"owner"` + + PipelineId string `json:"pipeline_id" datastore:"pipeline_id"` + TriggerId string `json:"trigger_id" datastore:"trigger_id"` } type PipelineWrapper struct {