Skip to content

Commit

Permalink
Fixes merge issues
Browse files Browse the repository at this point in the history
  • Loading branch information
bomoko committed Sep 18, 2023
2 parents c1ca533 + eae878e commit 1955056
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 204 deletions.
207 changes: 4 additions & 203 deletions internal/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -201,6 +199,7 @@ type ResourceDestination struct {
Format string
}

<<<<<<< HEAD
// Messaging is used for the config and client information for the messaging queue.
type Messaging struct {
Config mq.Config
Expand All @@ -227,6 +226,8 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts
}
}

=======
>>>>>>> main
// Consumer handles consuming messages sent to the queue that this action handler is connected to and processes them accordingly
func (h *Messaging) Consumer() {
var messageQueue mq.MQ
Expand Down Expand Up @@ -263,7 +264,7 @@ func (h *Messaging) Consumer() {

// Handle any tasks that go to the queue
log.Println("Listening for messages in queue lagoon-insights:items")
err = messageQueue.SetConsumerHandler("items-queue", processingIncomingMessageQueueFactory(h))
err = messageQueue.SetConsumerHandler("items-queue", h.processMessageQueue)
if err != nil {
log.Println(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "items-queue", err))
}
Expand All @@ -289,206 +290,6 @@ func (t *authedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return t.wrapped.RoundTrip(req)
}

func processingIncomingMessageQueueFactory(h *Messaging) func(mq.Message) {
//if h.ProblemsFromSBOM { //let's start the queue
SetUpQueue(*h, h.GrypeBinaryLocation)
go processQueue()
//}
return func(message mq.Message) {
var insights InsightsData
var resource ResourceDestination

// set up defer to ack the message after we're done processing
defer func(message mq.Message) {
// Ack to remove from queue
err := message.Ack(false)
if err != nil {
fmt.Printf("Failed to acknowledge message: %s\n", err.Error())
}
}(message)

incoming := &InsightsMessage{}
json.Unmarshal(message.Body(), incoming)

// if we have direct problems or facts, we process them differently - skipping all
// the extra processing below.
if incoming.Type == "direct.facts" || incoming.Type == "direct.problems" {
resp := processItemsDirectly(message, h)
log.Println(resp)
return
}

// Check labels for insights data from message
if incoming.Labels != nil {
labelKeys := make([]string, 0, len(incoming.Labels))
for k := range incoming.Labels {
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)

// Set some insight data defaults
insights = InsightsData{
LagoonType: Facts,
OutputFileExt: "json",
OutputFileMIMEType: "application/json",
}

for _, label := range labelKeys {
if label == "lagoon.sh/project" {
resource.Project = incoming.Labels["lagoon.sh/project"]
}
if label == "lagoon.sh/environment" {
resource.Environment = incoming.Labels["lagoon.sh/environment"]
}
if label == "lagoon.sh/service" {
resource.Service = incoming.Labels["lagoon.sh/service"]
}

if label == "lagoon.sh/insightsType" {
insights.InputType = incoming.Labels["lagoon.sh/insightsType"]
}
if incoming.Labels["lagoon.sh/insightsType"] == "image-gz" {
insights.LagoonType = ImageFacts
}
if label == "lagoon.sh/insightsOutputCompressed" {
compressed, _ := strconv.ParseBool(incoming.Labels["lagoon.sh/insightsOutputCompressed"])
insights.OutputCompressed = compressed
}
if label == "lagoon.sh/insightsOutputFileMIMEType" {
insights.OutputFileMIMEType = incoming.Labels["lagoon.sh/insightsOutputFileMIMEType"]
}
if label == "lagoon.sh/insightsOutputFileExt" {
insights.OutputFileExt = incoming.Labels["lagoon.sh/insightsOutputFileExt"]
}
}
}

// Define insights type from incoming 'insightsType' label
if insights.InputType != "" {
switch insights.InputType {
case "sbom", "sbom-gz":
insights.InsightsType = Sbom
case "image", "image-gz":
insights.InsightsType = Image
case "direct":
insights.InsightsType = Direct
default:
insights.InsightsType = Raw
}
}

// Determine incoming payload type
if incoming.Payload == nil && incoming.BinaryPayload == nil {
if h.EnableDebug {
log.Printf("[DEBUG] no payload was found")
}
err := message.Reject(false)
if err != nil {
fmt.Printf("Unable to reject payload: %s\n", err.Error())
}
return
}
if len(incoming.Payload) != 0 {
insights.InputPayload = Payload
}
if len(incoming.BinaryPayload) != 0 {
insights.InputPayload = BinaryPayload
}

// Debug
if h.EnableDebug {
log.Println("[DEBUG] insights:", insights)
log.Println("[DEBUG] target:", resource)
}

// Process s3 upload
if !h.S3Config.Disabled {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
log.Printf("Unable to send to S3: %s", err.Error())
}
}
}

// Process Lagoon API integration
if !h.LagoonAPI.Disabled {
if insights.InsightsType != Sbom &&
insights.InsightsType != Image &&
insights.InsightsType != Raw &&
insights.InsightsType != Direct {
log.Println("only 'sbom', 'direct', 'raw', and 'image' types are currently supported for api processing")
} else {
err := h.sendToLagoonAPI(incoming, resource, insights)
if err != nil {
log.Printf("Unable to send to the api: %s", err.Error())
}
}
}
}
}

func processItemsDirectly(message mq.Message, h *Messaging) string {
var directFacts DirectFacts
json.Unmarshal(message.Body(), &directFacts)
err := json.Unmarshal(message.Body(), &directFacts)
if err != nil {
log.Println("Error unmarshaling JSON:", err)
return "exciting, unable to process direct facts"
}

// since its useful to allow int and string json definitions, we need to convert strings here to ints.
environmentId, err := strconv.Atoi(directFacts.EnvironmentId.String())
if err != nil {
log.Println("Error converting EnvironmentId to int:", err)
return "exciting, unable to process direct facts"
}

if h.EnableDebug {
log.Print("[DEBUG] facts", directFacts)
}

apiClient := graphql.NewClient(h.LagoonAPI.Endpoint, &http.Client{Transport: &authedTransport{wrapped: http.DefaultTransport, h: h}})

factSources := map[string]string{}

processedFacts := make([]lagoonclient.AddFactInput, len(directFacts.Facts))
for i, fact := range directFacts.Facts {

vartypeString := FactTypeText
if fact.Type == FactTypeText || fact.Type == FactTypeSemver || fact.Type == FactTypeUrl {
vartypeString = fact.Type
}

processedFacts[i] = lagoonclient.AddFactInput{
Environment: environmentId,
Name: fact.Name,
Value: fact.Value,
Source: fact.Source,
Description: fact.Description,
KeyFact: false,
Type: lagoonclient.FactType(vartypeString),
Category: fact.Category,
}
factSources[fact.Source] = fact.Source
}

for _, s := range factSources {
_, err = lagoonclient.DeleteFactsFromSource(context.TODO(), apiClient, environmentId, s)
if err != nil {
log.Println(err)
}
log.Printf("Deleted facts on '%v:%v' for source %v", directFacts.ProjectName, directFacts.EnvironmentName, s)
}

facts, err := lagoonclient.AddFacts(context.TODO(), apiClient, processedFacts)
if err != nil {
log.Println(err)
}

return facts
}

// Incoming payload may contain facts or problems, so we need to handle these differently
func (h *Messaging) sendToLagoonAPI(incoming *InsightsMessage, resource ResourceDestination, insights InsightsData) (err error) {
apiClient := h.getApiClient()
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Test_processingIncomingMessageQueue(t *testing.T) {
}

func Test_processDirectFacts(t *testing.T) {
err := godotenv.Load("../../.env")
err := godotenv.Load("../../.env.example")
if err != nil {
fmt.Println(err)
panic("Error loading .env file")
Expand Down
Loading

0 comments on commit 1955056

Please sign in to comment.