Skip to content

Commit

Permalink
Split out messaging into its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
bomoko committed Sep 17, 2023
1 parent 8e62205 commit e8f6133
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 157 deletions.
156 changes: 0 additions & 156 deletions internal/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -201,28 +200,6 @@ type ResourceDestination struct {
Format string
}

// Messaging is used for the config and client information for the messaging queue.
type Messaging struct {
Config mq.Config
LagoonAPI LagoonAPI
S3Config S3
ConnectionAttempts int
ConnectionRetryInterval int
EnableDebug bool
}

// NewMessaging returns a messaging with config
func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool) *Messaging {
return &Messaging{
Config: config,
LagoonAPI: lagoonAPI,
S3Config: s3,
ConnectionAttempts: startupAttempts,
ConnectionRetryInterval: startupInterval,
EnableDebug: enableDebug,
}
}

// Consumer handles consuming messages sent to the queue that this action handler is connected to and processes them accordingly
func (h *Messaging) Consumer() {
var messageQueue mq.MQ
Expand Down Expand Up @@ -285,139 +262,6 @@ func (t *authedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return t.wrapped.RoundTrip(req)
}

func (h *Messaging) processMessageQueue(message mq.Message) {
var insights InsightsData
var resource ResourceDestination

// set up defer to ack the message after we're done processing
defer func(message mq.Message) {
// Ack to remove from queue
err := message.Ack(false)
if err != nil {
fmt.Printf("Failed to acknowledge message: %s\n", err.Error())
}
}(message)

incoming := &InsightsMessage{}
json.Unmarshal(message.Body(), incoming)

// if we have direct problems or facts, we process them differently - skipping all
// the extra processing below.
if incoming.Type == "direct.facts" || incoming.Type == "direct.problems" {
resp := processItemsDirectly(message, h)
log.Println(resp)
return
}

// Check labels for insights data from message
if incoming.Labels != nil {
labelKeys := make([]string, 0, len(incoming.Labels))
for k := range incoming.Labels {
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)

// Set some insight data defaults
insights = InsightsData{
LagoonType: Facts,
OutputFileExt: "json",
OutputFileMIMEType: "application/json",
}

for _, label := range labelKeys {
if label == "lagoon.sh/project" {
resource.Project = incoming.Labels["lagoon.sh/project"]
}
if label == "lagoon.sh/environment" {
resource.Environment = incoming.Labels["lagoon.sh/environment"]
}
if label == "lagoon.sh/service" {
resource.Service = incoming.Labels["lagoon.sh/service"]
}

if label == "lagoon.sh/insightsType" {
insights.InputType = incoming.Labels["lagoon.sh/insightsType"]
}
if incoming.Labels["lagoon.sh/insightsType"] == "image-gz" {
insights.LagoonType = ImageFacts
}
if label == "lagoon.sh/insightsOutputCompressed" {
compressed, _ := strconv.ParseBool(incoming.Labels["lagoon.sh/insightsOutputCompressed"])
insights.OutputCompressed = compressed
}
if label == "lagoon.sh/insightsOutputFileMIMEType" {
insights.OutputFileMIMEType = incoming.Labels["lagoon.sh/insightsOutputFileMIMEType"]
}
if label == "lagoon.sh/insightsOutputFileExt" {
insights.OutputFileExt = incoming.Labels["lagoon.sh/insightsOutputFileExt"]
}
}
}

// Define insights type from incoming 'insightsType' label
if insights.InputType != "" {
switch insights.InputType {
case "sbom", "sbom-gz":
insights.InsightsType = Sbom
case "image", "image-gz":
insights.InsightsType = Image
case "direct":
insights.InsightsType = Direct
default:
insights.InsightsType = Raw
}
}

// Determine incoming payload type
if incoming.Payload == nil && incoming.BinaryPayload == nil {
if h.EnableDebug {
log.Printf("[DEBUG] no payload was found")
}
err := message.Reject(false)
if err != nil {
fmt.Printf("Unable to reject payload: %s\n", err.Error())
}
return
}
if len(incoming.Payload) != 0 {
insights.InputPayload = Payload
}
if len(incoming.BinaryPayload) != 0 {
insights.InputPayload = BinaryPayload
}

// Debug
if h.EnableDebug {
log.Println("[DEBUG] insights:", insights)
log.Println("[DEBUG] target:", resource)
}

// Process s3 upload
if !h.S3Config.Disabled {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
log.Printf("Unable to send to S3: %s", err.Error())
}
}
}

// Process Lagoon API integration
if !h.LagoonAPI.Disabled {
if insights.InsightsType != Sbom &&
insights.InsightsType != Image &&
insights.InsightsType != Raw &&
insights.InsightsType != Direct {
log.Println("only 'sbom', 'direct', 'raw', and 'image' types are currently supported for api processing")
} else {
err := h.sendToLagoonAPI(incoming, resource, insights)
if err != nil {
log.Printf("Unable to send to the api: %s", err.Error())
}
}
}
}

func processItemsDirectly(message mq.Message, h *Messaging) string {
var directFacts DirectFacts
json.Unmarshal(message.Body(), &directFacts)
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Test_processingIncomingMessageQueue(t *testing.T) {
}

func Test_processDirectFacts(t *testing.T) {
err := godotenv.Load("../../.env")
err := godotenv.Load("../../.env.example")
if err != nil {
fmt.Println(err)
panic("Error loading .env file")
Expand Down
166 changes: 166 additions & 0 deletions internal/handler/messaging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package handler

import (
"encoding/json"
"fmt"
"github.com/cheshir/go-mq"
"log"
"sort"
"strconv"
)

// Messaging is used for the config and client information for the messaging queue, including processing the queue itself.
type Messaging struct {
Config mq.Config
LagoonAPI LagoonAPI
S3Config S3
ConnectionAttempts int
ConnectionRetryInterval int
EnableDebug bool
}

// NewMessaging returns a messaging with config
func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool) *Messaging {
return &Messaging{
Config: config,
LagoonAPI: lagoonAPI,
S3Config: s3,
ConnectionAttempts: startupAttempts,
ConnectionRetryInterval: startupInterval,
EnableDebug: enableDebug,
}
}

// processMessageQueue reads in a rabbitMQ item and dispatches it to the appropriate function to process
func (h *Messaging) processMessageQueue(message mq.Message) {
var insights InsightsData
var resource ResourceDestination

// set up defer to ack the message after we're done processing
defer func(message mq.Message) {
// Ack to remove from queue
err := message.Ack(false)
if err != nil {
fmt.Printf("Failed to acknowledge message: %s\n", err.Error())
}
}(message)

incoming := &InsightsMessage{}
json.Unmarshal(message.Body(), incoming)

// if we have direct problems or facts, we process them differently - skipping all
// the extra processing below.
if incoming.Type == "direct.facts" || incoming.Type == "direct.problems" {
resp := processItemsDirectly(message, h)
log.Println(resp)
return
}

// Check labels for insights data from message
if incoming.Labels != nil {
labelKeys := make([]string, 0, len(incoming.Labels))
for k := range incoming.Labels {
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)

// Set some insight data defaults
insights = InsightsData{
LagoonType: Facts,
OutputFileExt: "json",
OutputFileMIMEType: "application/json",
}

for _, label := range labelKeys {
if label == "lagoon.sh/project" {
resource.Project = incoming.Labels["lagoon.sh/project"]
}
if label == "lagoon.sh/environment" {
resource.Environment = incoming.Labels["lagoon.sh/environment"]
}
if label == "lagoon.sh/service" {
resource.Service = incoming.Labels["lagoon.sh/service"]
}

if label == "lagoon.sh/insightsType" {
insights.InputType = incoming.Labels["lagoon.sh/insightsType"]
}
if incoming.Labels["lagoon.sh/insightsType"] == "image-gz" {
insights.LagoonType = ImageFacts
}
if label == "lagoon.sh/insightsOutputCompressed" {
compressed, _ := strconv.ParseBool(incoming.Labels["lagoon.sh/insightsOutputCompressed"])
insights.OutputCompressed = compressed
}
if label == "lagoon.sh/insightsOutputFileMIMEType" {
insights.OutputFileMIMEType = incoming.Labels["lagoon.sh/insightsOutputFileMIMEType"]
}
if label == "lagoon.sh/insightsOutputFileExt" {
insights.OutputFileExt = incoming.Labels["lagoon.sh/insightsOutputFileExt"]
}
}
}

// Define insights type from incoming 'insightsType' label
if insights.InputType != "" {
switch insights.InputType {
case "sbom", "sbom-gz":
insights.InsightsType = Sbom
case "image", "image-gz":
insights.InsightsType = Image
case "direct":
insights.InsightsType = Direct
default:
insights.InsightsType = Raw
}
}

// Determine incoming payload type
if incoming.Payload == nil && incoming.BinaryPayload == nil {
if h.EnableDebug {
log.Printf("[DEBUG] no payload was found")
}
err := message.Reject(false)
if err != nil {
fmt.Printf("Unable to reject payload: %s\n", err.Error())
}
return
}
if len(incoming.Payload) != 0 {
insights.InputPayload = Payload
}
if len(incoming.BinaryPayload) != 0 {
insights.InputPayload = BinaryPayload
}

// Debug
if h.EnableDebug {
log.Println("[DEBUG] insights:", insights)
log.Println("[DEBUG] target:", resource)
}

// Process s3 upload
if !h.S3Config.Disabled {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
log.Printf("Unable to send to S3: %s", err.Error())
}
}
}

// Process Lagoon API integration
if !h.LagoonAPI.Disabled {
if insights.InsightsType != Sbom &&
insights.InsightsType != Image &&
insights.InsightsType != Raw &&
insights.InsightsType != Direct {
log.Println("only 'sbom', 'direct', 'raw', and 'image' types are currently supported for api processing")
} else {
err := h.sendToLagoonAPI(incoming, resource, insights)
if err != nil {
log.Printf("Unable to send to the api: %s", err.Error())
}
}
}
}

0 comments on commit e8f6133

Please sign in to comment.