Skip to content

Commit

Permalink
Merge pull request #68 from satti-hari-krishna-reddy/newPipelines
Browse files Browse the repository at this point in the history
feat : made pipeline work as a trigger
  • Loading branch information
frikky authored May 21, 2024
2 parents 3e0099c + 4c621da commit eadc365
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 81 deletions.
9 changes: 5 additions & 4 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8215,10 +8215,11 @@ func savePipelineData(ctx context.Context, pipeline Pipeline) error {
return err
}
} else {
// key := datastore.NameKey(nameKey, pipelineId, nil)
// if _, err := project.Dbclient.Put(ctx, key, &pipeline); err != nil {
// log.Printf("[ERROR] failed to add pipeline: %s", err)
// return err
key := datastore.NameKey(nameKey, triggerId, nil)
if _, err := project.Dbclient.Put(ctx, key, &pipeline); err != nil {
log.Printf("[ERROR] failed to add pipeline: %s", err)
return err
}
}

return nil
Expand Down
98 changes: 88 additions & 10 deletions pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package shuffle

import (
"encoding/json"
"errors"
"context"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -92,7 +94,7 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
}
}

if !envFound {
if !envFound && pipeline.Type != "delete"{
log.Printf("[WARNING] Environment '%s' is not available", pipeline.Environment)
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Environment '%s' is not available. Please make it, or change the environment you want to deploy to."}`, pipeline.Environment)))
Expand All @@ -118,11 +120,24 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
return
}

// 1. Add to trigger list
/* TBD */

// Look for PIPELINE_ command that exists in the queue already
startCommand := strings.ToUpper(strings.Split(pipeline.Type, " ")[0])

//check if this is the first time creating the pipeline
pipelineInfo, err := GetPipeline(ctx, pipeline.TriggerId)
if err != nil {
if (startCommand == "DELETE" || startCommand == "STOP") && err.Error() == "pipeline doesn't exist" {
log.Printf("[WARNING] Failed getting pipeline %s, reason: %s", pipeline.TriggerId, err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
} else if startCommand == "START" && err.Error() == "pipeline doesn't exist" {
startCommand = "CREATE"
}
} else if startCommand == "CREATE" {
startCommand = "START"
}

//parsedId := fmt.Sprintf("%s_%s", strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(pipeline.Environment, " ", "-"), "_", "-")), user.ActiveOrg.Id)
parsedId := strings.ToLower(pipeline.Environment)
formattedType := fmt.Sprintf("PIPELINE_%s", startCommand)
Expand All @@ -147,26 +162,48 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
Priority: 11,
}

if startCommand == "CREATE" {
pipelineData := Pipeline{}

if startCommand == "DELETE" {

err := deletePipeline(ctx, *pipelineInfo)
if err != nil {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Failed deleting the pipeline."}`))
return
}

} else if startCommand == "STOP" {

pipelineInfo.Status = "stopped"
err = setPipelineTrigger(ctx, *pipelineInfo)
if err != nil {
log.Printf("[ERROR] Failed to stop the pipeline with trigger id: %s, reason: %s", pipelineInfo.TriggerId, err)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}
log.Printf("[INFO] Stopped the pipeline %s sucessfully", pipelineInfo.TriggerId)
} else {

pipelineData := Pipeline{}
pipelineData.Name = pipeline.Name
pipelineData.Type = startCommand
pipelineData.Command = pipeline.Command
pipelineData.Environment = pipeline.Environment
pipelineData.WorkflowId = pipeline.WorkflowId
pipelineData.OrgId = user.ActiveOrg.Id
pipelineData.Status = "uninitialized"
pipelineData.Owner = user.Id
pipelineData.Status = "running"
pipelineData.TriggerId = pipeline.TriggerId

err = savePipelineData(ctx, pipelineData)
err = setPipelineTrigger(ctx, pipelineData)
if err != nil {
log.Printf("[ERROR] Failed to save the pipeline with trigger id: %s into the db: %s", pipeline.TriggerId, err)
log.Printf("[ERROR] Failed to create the pipeline with trigger id: %s, reason: %s", pipeline.TriggerId, err)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}
log.Printf("[INFO] Successfully saved the pipeline info")
log.Printf("[INFO] Set up pipeline with trigger ID %s and environment %s", pipeline.TriggerId, pipeline.Environment)
}

err = SetWorkflowQueue(ctx, execRequest, parsedId)
Expand All @@ -180,3 +217,44 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Pipeline will be created"}`)))
}


func setPipelineTrigger(ctx context.Context, pipeline Pipeline) error{

input := pipeline.Command
index := strings.Index(input, "to ")

if index == -1 {
return errors.New("url not found")
}
extractedURL := input[index+len("to "):]
extractedURL = strings.TrimSpace(extractedURL)

pipeline.Url = extractedURL
err := savePipelineData(ctx, pipeline)

if err != nil {
return err
}

return nil
}

func deletePipeline(ctx context.Context, pipeline Pipeline) error {

pipeline.Status = "stopped"
err := savePipelineData(ctx, pipeline)
if err != nil {
log.Printf("[WARNING] Failed saving pipeline: %s", err)
return err
}

err = DeleteKey(ctx, "pipelines", pipeline.TriggerId)
if err != nil {
log.Printf("[WARNING] Error deleting pipeline %s, reason: %s", pipeline.TriggerId)
return err
}

log.Printf("[INFO] Successfully deleted pipeline %s", pipeline.TriggerId)
return nil
}
108 changes: 53 additions & 55 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -24079,61 +24079,59 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) {
resp.Write(respBody)
}

func HandleSavePipelineInfo(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
if cors {
return
}

// How do I make sure that Orborus is the one that made this request?

var requestBody Pipeline
err := json.NewDecoder(request.Body).Decode(&requestBody)
if err != nil {
log.Printf("[WARNING] Failed to decode request body: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}
if len(requestBody.TriggerId) == 0 || len(requestBody.PipelineId) == 0 || len(requestBody.Status) == 0 {
log.Printf("[WARNING] Missing fields in the request body")
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false, "reason": "Missing fields in the request body"}`))
return
}

ctx := GetContext(request)
pipeline, err := GetPipeline(ctx, requestBody.TriggerId)
log.Printf("[HARI TESTING] trigger id is %s", requestBody.TriggerId)
if err != nil {
if strings.Contains(fmt.Sprintf("%s", err),"pipeline doesn't exist"){
log.Printf("[DEBUG] no matching document found for Pipeline: %s", requestBody.PipelineId)
resp.WriteHeader(404)
resp.Write([]byte(`{"success": false, "reason": "pipeline not found"}`))
return
} else {
log.Printf("[WARNING] Failed getting pipeline: %s due to %s", requestBody.PipelineId, err)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}
}

pipeline.PipelineId = requestBody.PipelineId
pipeline.Status = requestBody.Status


err = savePipelineData(ctx, *pipeline)
if err != nil {
log.Printf("[WARNING] Failed updating pipeline with ID: %s due to %s", pipeline.PipelineId, err)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}
log.Printf("[INFO] Sucessfully saved pipeline: %s", pipeline.PipelineId)
resp.WriteHeader(200)
resp.Write([]byte(`{"success": true}`))
}
// func HandleSavePipelineInfo(resp http.ResponseWriter, request *http.Request) {
// cors := HandleCors(resp, request)
// if cors {
// return
// }

// // How do I make sure that Orborus is the one that made this request?

// var requestBody Pipeline
// err := json.NewDecoder(request.Body).Decode(&requestBody)
// if err != nil {
// log.Printf("[WARNING] Failed to decode request body: %s", err)
// resp.WriteHeader(401)
// resp.Write([]byte(`{"success": false}`))
// return
// }
// if len(requestBody.TriggerId) == 0 || len(requestBody.PipelineId) == 0 || len(requestBody.Status) == 0 {
// log.Printf("[WARNING] Missing fields in the request body")
// resp.WriteHeader(400)
// resp.Write([]byte(`{"success": false, "reason": "Missing fields in the request body"}`))
// return
// }

// ctx := GetContext(request)
// pipeline, err := GetPipeline(ctx, requestBody.TriggerId)
// if err != nil {
// if strings.Contains(fmt.Sprintf("%s", err),"pipeline doesn't exist"){
// log.Printf("[DEBUG] no matching document found for Pipeline: %s", requestBody.PipelineId)
// resp.WriteHeader(404)
// resp.Write([]byte(`{"success": false, "reason": "pipeline not found"}`))
// return
// } else {
// log.Printf("[WARNING] Failed getting pipeline: %s due to %s", requestBody.PipelineId, err)
// resp.WriteHeader(500)
// resp.Write([]byte(`{"success": false}`))
// return
// }
// }

// pipeline.PipelineId = requestBody.PipelineId
// pipeline.Status = requestBody.Status

// err = savePipelineData(ctx, *pipeline)
// if err != nil {
// log.Printf("[WARNING] Failed updating pipeline with ID: %s due to %s", pipeline.PipelineId, err)
// resp.WriteHeader(500)
// resp.Write([]byte(`{"success": false}`))
// return
// }
// log.Printf("[INFO] Sucessfully saved pipeline: %s", pipeline.PipelineId)
// resp.WriteHeader(200)
// resp.Write([]byte(`{"success": true}`))
// }

func LoadUsecases(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
Expand Down
27 changes: 15 additions & 12 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@ type PipelineRequest struct {
Command string `json:"command"`
Environment string `json:"environment"`
WorkflowId string `json:"workflow_id"`
StartNode string `json:"start_node"`

PipelineId string `json:"pipeline_id"`
TriggerId string `json:"trigger_id"`
}

type Pipeline struct {
Name string `json:"name"`
Type string `json:"type"`
Command string `json:"command"`
Environment string `json:"environment"`
WorkflowId string `json:"workflow_id"`
StartNode string `json:"start_node"`
OrgId string `json:"org_id"`
Status string `json:"status"`
Errors []string `json:"errors"`

PipelineId string `json:"pipeline_id"`
TriggerId string `json:"trigger_id"`
Name string `json:"name" datastore:"name"`
Type string `json:"type" datastore:"type"`
Command string `json:"command" datastore:"command"`
Environment string `json:"environment" datastore:"environment"`
WorkflowId string `json:"workflow_id" datastore:"workflow_id"`
StartNode string `json:"start_node" datastore:"start_node"`
OrgId string `json:"org_id" datastore:"org_id"`
Status string `json:"status" datastore:"status"`
Errors []string `json:"errors" datastore:"errors"`
Url string `json:"url" datastore:"url"`
Owner string `json:"owner" datastore:"owner"`

PipelineId string `json:"pipeline_id" datastore:"pipeline_id"`
TriggerId string `json:"trigger_id" datastore:"trigger_id"`
}

type PipelineWrapper struct {
Expand Down

0 comments on commit eadc365

Please sign in to comment.