diff --git a/internal/handler/main.go b/internal/handler/main.go index c4e48a1..c8c119d 100644 --- a/internal/handler/main.go +++ b/internal/handler/main.go @@ -14,7 +14,6 @@ import ( "net/http" "net/url" "os" - "sort" "strconv" "strings" "time" @@ -201,28 +200,6 @@ type ResourceDestination struct { Format string } -// Messaging is used for the config and client information for the messaging queue. -type Messaging struct { - Config mq.Config - LagoonAPI LagoonAPI - S3Config S3 - ConnectionAttempts int - ConnectionRetryInterval int - EnableDebug bool -} - -// NewMessaging returns a messaging with config -func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool) *Messaging { - return &Messaging{ - Config: config, - LagoonAPI: lagoonAPI, - S3Config: s3, - ConnectionAttempts: startupAttempts, - ConnectionRetryInterval: startupInterval, - EnableDebug: enableDebug, - } -} - // Consumer handles consuming messages sent to the queue that this action handler is connected to and processes them accordingly func (h *Messaging) Consumer() { var messageQueue mq.MQ @@ -285,139 +262,6 @@ func (t *authedTransport) RoundTrip(req *http.Request) (*http.Response, error) { return t.wrapped.RoundTrip(req) } -func (h *Messaging) processMessageQueue(message mq.Message) { - var insights InsightsData - var resource ResourceDestination - - // set up defer to ack the message after we're done processing - defer func(message mq.Message) { - // Ack to remove from queue - err := message.Ack(false) - if err != nil { - fmt.Printf("Failed to acknowledge message: %s\n", err.Error()) - } - }(message) - - incoming := &InsightsMessage{} - json.Unmarshal(message.Body(), incoming) - - // if we have direct problems or facts, we process them differently - skipping all - // the extra processing below. - if incoming.Type == "direct.facts" || incoming.Type == "direct.problems" { - resp := processItemsDirectly(message, h) - log.Println(resp) - return - } - - // Check labels for insights data from message - if incoming.Labels != nil { - labelKeys := make([]string, 0, len(incoming.Labels)) - for k := range incoming.Labels { - labelKeys = append(labelKeys, k) - } - sort.Strings(labelKeys) - - // Set some insight data defaults - insights = InsightsData{ - LagoonType: Facts, - OutputFileExt: "json", - OutputFileMIMEType: "application/json", - } - - for _, label := range labelKeys { - if label == "lagoon.sh/project" { - resource.Project = incoming.Labels["lagoon.sh/project"] - } - if label == "lagoon.sh/environment" { - resource.Environment = incoming.Labels["lagoon.sh/environment"] - } - if label == "lagoon.sh/service" { - resource.Service = incoming.Labels["lagoon.sh/service"] - } - - if label == "lagoon.sh/insightsType" { - insights.InputType = incoming.Labels["lagoon.sh/insightsType"] - } - if incoming.Labels["lagoon.sh/insightsType"] == "image-gz" { - insights.LagoonType = ImageFacts - } - if label == "lagoon.sh/insightsOutputCompressed" { - compressed, _ := strconv.ParseBool(incoming.Labels["lagoon.sh/insightsOutputCompressed"]) - insights.OutputCompressed = compressed - } - if label == "lagoon.sh/insightsOutputFileMIMEType" { - insights.OutputFileMIMEType = incoming.Labels["lagoon.sh/insightsOutputFileMIMEType"] - } - if label == "lagoon.sh/insightsOutputFileExt" { - insights.OutputFileExt = incoming.Labels["lagoon.sh/insightsOutputFileExt"] - } - } - } - - // Define insights type from incoming 'insightsType' label - if insights.InputType != "" { - switch insights.InputType { - case "sbom", "sbom-gz": - insights.InsightsType = Sbom - case "image", "image-gz": - insights.InsightsType = Image - case "direct": - insights.InsightsType = Direct - default: - insights.InsightsType = Raw - } - } - - // Determine incoming payload type - if incoming.Payload == nil && incoming.BinaryPayload == nil { - if h.EnableDebug { - log.Printf("[DEBUG] no payload was found") - } - err := message.Reject(false) - if err != nil { - fmt.Printf("Unable to reject payload: %s\n", err.Error()) - } - return - } - if len(incoming.Payload) != 0 { - insights.InputPayload = Payload - } - if len(incoming.BinaryPayload) != 0 { - insights.InputPayload = BinaryPayload - } - - // Debug - if h.EnableDebug { - log.Println("[DEBUG] insights:", insights) - log.Println("[DEBUG] target:", resource) - } - - // Process s3 upload - if !h.S3Config.Disabled { - if insights.InsightsType != Direct { - err := h.sendToLagoonS3(incoming, insights, resource) - if err != nil { - log.Printf("Unable to send to S3: %s", err.Error()) - } - } - } - - // Process Lagoon API integration - if !h.LagoonAPI.Disabled { - if insights.InsightsType != Sbom && - insights.InsightsType != Image && - insights.InsightsType != Raw && - insights.InsightsType != Direct { - log.Println("only 'sbom', 'direct', 'raw', and 'image' types are currently supported for api processing") - } else { - err := h.sendToLagoonAPI(incoming, resource, insights) - if err != nil { - log.Printf("Unable to send to the api: %s", err.Error()) - } - } - } -} - func processItemsDirectly(message mq.Message, h *Messaging) string { var directFacts DirectFacts json.Unmarshal(message.Body(), &directFacts) diff --git a/internal/handler/main_test.go b/internal/handler/main_test.go index a0f5b34..b33bfbf 100644 --- a/internal/handler/main_test.go +++ b/internal/handler/main_test.go @@ -60,7 +60,7 @@ func Test_processingIncomingMessageQueue(t *testing.T) { } func Test_processDirectFacts(t *testing.T) { - err := godotenv.Load("../../.env") + err := godotenv.Load("../../.env.example") if err != nil { fmt.Println(err) panic("Error loading .env file") diff --git a/internal/handler/messaging.go b/internal/handler/messaging.go new file mode 100644 index 0000000..7def8e3 --- /dev/null +++ b/internal/handler/messaging.go @@ -0,0 +1,166 @@ +package handler + +import ( + "encoding/json" + "fmt" + "github.com/cheshir/go-mq" + "log" + "sort" + "strconv" +) + +// Messaging is used for the config and client information for the messaging queue, including processing the queue itself. +type Messaging struct { + Config mq.Config + LagoonAPI LagoonAPI + S3Config S3 + ConnectionAttempts int + ConnectionRetryInterval int + EnableDebug bool +} + +// NewMessaging returns a messaging with config +func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool) *Messaging { + return &Messaging{ + Config: config, + LagoonAPI: lagoonAPI, + S3Config: s3, + ConnectionAttempts: startupAttempts, + ConnectionRetryInterval: startupInterval, + EnableDebug: enableDebug, + } +} + +// processMessageQueue reads in a rabbitMQ item and dispatches it to the appropriate function to process +func (h *Messaging) processMessageQueue(message mq.Message) { + var insights InsightsData + var resource ResourceDestination + + // set up defer to ack the message after we're done processing + defer func(message mq.Message) { + // Ack to remove from queue + err := message.Ack(false) + if err != nil { + fmt.Printf("Failed to acknowledge message: %s\n", err.Error()) + } + }(message) + + incoming := &InsightsMessage{} + json.Unmarshal(message.Body(), incoming) + + // if we have direct problems or facts, we process them differently - skipping all + // the extra processing below. + if incoming.Type == "direct.facts" || incoming.Type == "direct.problems" { + resp := processItemsDirectly(message, h) + log.Println(resp) + return + } + + // Check labels for insights data from message + if incoming.Labels != nil { + labelKeys := make([]string, 0, len(incoming.Labels)) + for k := range incoming.Labels { + labelKeys = append(labelKeys, k) + } + sort.Strings(labelKeys) + + // Set some insight data defaults + insights = InsightsData{ + LagoonType: Facts, + OutputFileExt: "json", + OutputFileMIMEType: "application/json", + } + + for _, label := range labelKeys { + if label == "lagoon.sh/project" { + resource.Project = incoming.Labels["lagoon.sh/project"] + } + if label == "lagoon.sh/environment" { + resource.Environment = incoming.Labels["lagoon.sh/environment"] + } + if label == "lagoon.sh/service" { + resource.Service = incoming.Labels["lagoon.sh/service"] + } + + if label == "lagoon.sh/insightsType" { + insights.InputType = incoming.Labels["lagoon.sh/insightsType"] + } + if incoming.Labels["lagoon.sh/insightsType"] == "image-gz" { + insights.LagoonType = ImageFacts + } + if label == "lagoon.sh/insightsOutputCompressed" { + compressed, _ := strconv.ParseBool(incoming.Labels["lagoon.sh/insightsOutputCompressed"]) + insights.OutputCompressed = compressed + } + if label == "lagoon.sh/insightsOutputFileMIMEType" { + insights.OutputFileMIMEType = incoming.Labels["lagoon.sh/insightsOutputFileMIMEType"] + } + if label == "lagoon.sh/insightsOutputFileExt" { + insights.OutputFileExt = incoming.Labels["lagoon.sh/insightsOutputFileExt"] + } + } + } + + // Define insights type from incoming 'insightsType' label + if insights.InputType != "" { + switch insights.InputType { + case "sbom", "sbom-gz": + insights.InsightsType = Sbom + case "image", "image-gz": + insights.InsightsType = Image + case "direct": + insights.InsightsType = Direct + default: + insights.InsightsType = Raw + } + } + + // Determine incoming payload type + if incoming.Payload == nil && incoming.BinaryPayload == nil { + if h.EnableDebug { + log.Printf("[DEBUG] no payload was found") + } + err := message.Reject(false) + if err != nil { + fmt.Printf("Unable to reject payload: %s\n", err.Error()) + } + return + } + if len(incoming.Payload) != 0 { + insights.InputPayload = Payload + } + if len(incoming.BinaryPayload) != 0 { + insights.InputPayload = BinaryPayload + } + + // Debug + if h.EnableDebug { + log.Println("[DEBUG] insights:", insights) + log.Println("[DEBUG] target:", resource) + } + + // Process s3 upload + if !h.S3Config.Disabled { + if insights.InsightsType != Direct { + err := h.sendToLagoonS3(incoming, insights, resource) + if err != nil { + log.Printf("Unable to send to S3: %s", err.Error()) + } + } + } + + // Process Lagoon API integration + if !h.LagoonAPI.Disabled { + if insights.InsightsType != Sbom && + insights.InsightsType != Image && + insights.InsightsType != Raw && + insights.InsightsType != Direct { + log.Println("only 'sbom', 'direct', 'raw', and 'image' types are currently supported for api processing") + } else { + err := h.sendToLagoonAPI(incoming, resource, insights) + if err != nil { + log.Printf("Unable to send to the api: %s", err.Error()) + } + } + } +}