Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error handling in message processing #35

Merged
merged 5 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions internal/handler/insightsFactsParserFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ func processFactsInsightsData(h *Messaging, insights InsightsData, v string, api
slog.Error("Error reading insights data", "Error", err)
}

facts := processFactsFromJSON(logger, res, source)
facts, err := processFactsFromJSON(logger, res, source)
if err != nil {
return nil, "", err
}

facts, err = KeyFactsFilter(facts)
if err != nil {
return nil, "", err
Expand All @@ -46,18 +50,17 @@ func processFactsInsightsData(h *Messaging, insights InsightsData, v string, api
return nil, "", nil
}

func processFactsFromJSON(logger *slog.Logger, facts []byte, source string) []LagoonFact {
func processFactsFromJSON(logger *slog.Logger, facts []byte, source string) ([]LagoonFact, error) {
var factsInput []LagoonFact

var factsPayload FactsPayload
err := json.Unmarshal(facts, &factsPayload)
if err != nil {
logger.Error(err.Error())
panic("Can't unmarshal facts")
return factsInput, err
}

if len(factsPayload.Facts) == 0 {
return factsInput
return factsInput, nil
}

var filteredFacts []LagoonFact
Expand All @@ -82,10 +85,13 @@ func processFactsFromJSON(logger *slog.Logger, facts []byte, source string) []La
Type: FactTypeText,
}
logger.Debug("Processing fact", "name", f.Name, "value", f.Value)
fact, _ = ProcessLagoonFactAgainstRegisteredFilters(fact, f)
fact, err = ProcessLagoonFactAgainstRegisteredFilters(fact, f)
if err != nil {
return factsInput, err
}
factsInput = append(factsInput, fact)
}
return factsInput
return factsInput, nil
}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/insightsParserFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func processSbomInsightsData(h *Messaging, insights InsightsData, v string, apiC

decoder := cdx.NewBOMDecoder(bytes.NewReader(b), cdx.BOMFileFormatJSON)
if err = decoder.Decode(bom); err != nil {
panic(err)
return nil, "", err
}
}

Expand Down
63 changes: 45 additions & 18 deletions internal/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ func (h *Messaging) Consumer() {
var err error
messageQueue, err = mq.New(h.Config)
if err != nil {
log.Println(err,
fmt.Sprintf(
"Failed to initialize message queue manager, retrying in %d seconds, attempt %d/%d",
h.ConnectionRetryInterval,
attempt,
h.ConnectionAttempts,
),
slog.Error(fmt.Sprintf(
"Failed to initialize message queue manager, retrying in %d seconds, attempt %d/%d",
h.ConnectionRetryInterval,
attempt,
h.ConnectionAttempts,
),
"error", err.Error(),
)
time.Sleep(time.Duration(h.ConnectionRetryInterval) * time.Second)
}
Expand Down Expand Up @@ -283,62 +283,81 @@ func (h *Messaging) sendToLagoonAPI(incoming *InsightsMessage, resource Resource

if insights.InputPayload == Payload && insights.LagoonType == Facts {
for _, p := range incoming.Payload {
parserFilterLoopForPayloads(insights, p, h, apiClient, resource)
err := parserFilterLoopForPayloads(insights, p, h, apiClient, resource)
if err != nil {
return err
}
}
}

if insights.InputPayload == BinaryPayload && insights.LagoonType == Facts {
for _, p := range incoming.BinaryPayload {
parserFilterLoopForBinaryPayloads(insights, p, h, apiClient, resource)
err := parserFilterLoopForBinaryPayloads(insights, p, h, apiClient, resource)
if err != nil {
return err
}
}
}

return nil
}

func parserFilterLoopForBinaryPayloads(insights InsightsData, p string, h *Messaging, apiClient graphql.Client, resource ResourceDestination) {
func parserFilterLoopForBinaryPayloads(insights InsightsData, p string, h *Messaging, apiClient graphql.Client, resource ResourceDestination) error {
for _, filter := range parserFilters {

result, source, err := filter(h, insights, p, apiClient, resource)
if err != nil {
slog.Error("Error running filter", "error", err.Error())
return err
}

processResultset(result, err, h, apiClient, resource, source)
err = processResultset(result, err, h, apiClient, resource, source)
if err != nil {
return err
}
}
return nil
}

func parserFilterLoopForPayloads(insights InsightsData, p PayloadInput, h *Messaging, apiClient graphql.Client, resource ResourceDestination) {
func parserFilterLoopForPayloads(insights InsightsData, p PayloadInput, h *Messaging, apiClient graphql.Client, resource ResourceDestination) error {
for _, filter := range parserFilters {
var result []interface{}
var source string

json, err := json.Marshal(p)
if err != nil {
slog.Error("Error marshalling data", "error", err.Error())
return err
}

result, source, err = filter(h, insights, fmt.Sprintf("%s", json), apiClient, resource)
if err != nil {
slog.Error("Error Filtering payload", "error", err.Error())
return err
}

processResultset(result, err, h, apiClient, resource, source)
err = processResultset(result, err, h, apiClient, resource, source)
if err != nil {
return err
}
}
return nil
}

// processResultset will send results as facts to the lagoon api after processing via a parser filter
func processResultset(result []interface{}, err error, h *Messaging, apiClient graphql.Client, resource ResourceDestination, source string) {
func processResultset(result []interface{}, err error, h *Messaging, apiClient graphql.Client, resource ResourceDestination, source string) error {
project, environment, apiErr := determineResourceFromLagoonAPI(apiClient, resource)
if apiErr != nil {
log.Println(apiErr)
slog.Error(apiErr.Error())
return apiErr
}

// Even if we don't find any new facts, we need to delete the existing ones
// since these may be the end product of a filter process
apiErr = h.deleteExistingFactsBySource(apiClient, environment, source, project)
if apiErr != nil {
log.Printf("%s", apiErr.Error())
slog.Error(apiErr.Error())
return apiErr
}

for _, r := range result {
Expand All @@ -347,15 +366,23 @@ func processResultset(result []interface{}, err error, h *Messaging, apiClient g
err = h.sendFactsToLagoonAPI([]LagoonFact{fact}, apiClient, resource, source)
if err != nil {
slog.Error("Error sending facts to Lagoon API", "error", err.Error())
return err
}
} else if facts, ok := r.([]LagoonFact); ok {
// Handle slice of facts
h.sendFactsToLagoonAPI(facts, apiClient, resource, source)
err = h.sendFactsToLagoonAPI(facts, apiClient, resource, source)
if err != nil {
slog.Error("Error sending facts to Lagoon API", "error", err.Error())
return err
}
} else {
// Unexpected type returned from filter()
slog.Error(fmt.Sprintf("unexpected type returned from filter(): %T\n", r))
err := fmt.Errorf("unexpected type returned from filter(): %T\n", r)
slog.Error(err.Error())
return err
}
}
return nil
}

func (h *Messaging) sendFactsToLagoonAPI(facts []LagoonFact, apiClient graphql.Client, resource ResourceDestination, source string) error {
Expand Down
1 change: 0 additions & 1 deletion internal/handler/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
err := h.sendToLagoonAPI(incoming, resource, insights)

if err != nil {
//log.Printf("Unable to send to the api: %s", err.Error())
slog.Error("Unable to send to the API", "Error", err.Error())
rejectMessage(false)
return
Expand Down
Loading