Skip to content

Commit

Permalink
Adds error rets to message queue processing
Browse files Browse the repository at this point in the history
  • Loading branch information
bomoko committed Nov 23, 2023
1 parent 6c6a9ce commit a52f994
Showing 1 changed file with 36 additions and 9 deletions.
45 changes: 36 additions & 9 deletions internal/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,62 +283,81 @@ func (h *Messaging) sendToLagoonAPI(incoming *InsightsMessage, resource Resource

if insights.InputPayload == Payload && insights.LagoonType == Facts {
for _, p := range incoming.Payload {
parserFilterLoopForPayloads(insights, p, h, apiClient, resource)
err := parserFilterLoopForPayloads(insights, p, h, apiClient, resource)
if err != nil {
return err
}
}
}

if insights.InputPayload == BinaryPayload && insights.LagoonType == Facts {
for _, p := range incoming.BinaryPayload {
parserFilterLoopForBinaryPayloads(insights, p, h, apiClient, resource)
err := parserFilterLoopForBinaryPayloads(insights, p, h, apiClient, resource)
if err != nil {
return err
}
}
}

return nil
}

func parserFilterLoopForBinaryPayloads(insights InsightsData, p string, h *Messaging, apiClient graphql.Client, resource ResourceDestination) {
func parserFilterLoopForBinaryPayloads(insights InsightsData, p string, h *Messaging, apiClient graphql.Client, resource ResourceDestination) error {
for _, filter := range parserFilters {

result, source, err := filter(h, insights, p, apiClient, resource)
if err != nil {
slog.Error("Error running filter", "error", err.Error())
return err
}

processResultset(result, err, h, apiClient, resource, source)
err = processResultset(result, err, h, apiClient, resource, source)
if err != nil {
return err
}
}
return nil
}

func parserFilterLoopForPayloads(insights InsightsData, p PayloadInput, h *Messaging, apiClient graphql.Client, resource ResourceDestination) {
func parserFilterLoopForPayloads(insights InsightsData, p PayloadInput, h *Messaging, apiClient graphql.Client, resource ResourceDestination) error {
for _, filter := range parserFilters {
var result []interface{}
var source string

json, err := json.Marshal(p)
if err != nil {
slog.Error("Error marshalling data", "error", err.Error())
return err
}

result, source, err = filter(h, insights, fmt.Sprintf("%s", json), apiClient, resource)
if err != nil {
slog.Error("Error Filtering payload", "error", err.Error())
return err
}

processResultset(result, err, h, apiClient, resource, source)
err = processResultset(result, err, h, apiClient, resource, source)
if err != nil {
return err
}
}
return nil
}

// processResultset will send results as facts to the lagoon api after processing via a parser filter
func processResultset(result []interface{}, err error, h *Messaging, apiClient graphql.Client, resource ResourceDestination, source string) {
func processResultset(result []interface{}, err error, h *Messaging, apiClient graphql.Client, resource ResourceDestination, source string) error {
project, environment, apiErr := determineResourceFromLagoonAPI(apiClient, resource)
if apiErr != nil {
log.Println(apiErr)
return apiErr
}

// Even if we don't find any new facts, we need to delete the existing ones
// since these may be the end product of a filter process
apiErr = h.deleteExistingFactsBySource(apiClient, environment, source, project)
if apiErr != nil {
log.Printf("%s", apiErr.Error())
return apiErr
}

for _, r := range result {
Expand All @@ -347,15 +366,23 @@ func processResultset(result []interface{}, err error, h *Messaging, apiClient g
err = h.sendFactsToLagoonAPI([]LagoonFact{fact}, apiClient, resource, source)
if err != nil {
slog.Error("Error sending facts to Lagoon API", "error", err.Error())
return err
}
} else if facts, ok := r.([]LagoonFact); ok {
// Handle slice of facts
h.sendFactsToLagoonAPI(facts, apiClient, resource, source)
err = h.sendFactsToLagoonAPI(facts, apiClient, resource, source)
if err != nil {
slog.Error("Error sending facts to Lagoon API", "error", err.Error())
return err
}
} else {
// Unexpected type returned from filter()
slog.Error(fmt.Sprintf("unexpected type returned from filter(): %T\n", r))
err := fmt.Errorf("unexpected type returned from filter(): %T\n", r)
slog.Error(err.Error())
return err
}
}
return nil
}

func (h *Messaging) sendFactsToLagoonAPI(facts []LagoonFact, apiClient graphql.Client, resource ResourceDestination, source string) error {
Expand Down

0 comments on commit a52f994

Please sign in to comment.