Skip to content

Commit

Permalink
Refactor v1 release filters (closes #192)
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Aug 12, 2022
1 parent 33210be commit 1de851a
Show file tree
Hide file tree
Showing 8 changed files with 548 additions and 243 deletions.
2 changes: 1 addition & 1 deletion config/test-fixtures/transform-mixed-filtered.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ transform {

transform {
use "spEnrichedFilter" {
field = "app_id"
atomic_field = "app_id"
regex = "wrong"
regex_timeout = 10
}
Expand Down
267 changes: 172 additions & 95 deletions pkg/transform/snowplow_enriched_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package transform

import (
"fmt"
"log"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -21,119 +21,55 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/models"
)

func findSpEnrichedFilterValue(queriedField, parsedEventName, eventVer, field string, parsedMessage analytics.ParsedEvent, path []interface{}) ([]interface{}, error) {
var vf interface{}
var valueFound []interface{}
var err error

switch {
case strings.HasPrefix(queriedField, `contexts_`):
vf, err = parsedMessage.GetContextValue(queriedField, path...)
valueFound = append(valueFound, vf.([]interface{})...)
case strings.HasPrefix(queriedField, `unstruct_event`):
eventNameFull := `unstruct_event_` + parsedEventName
if queriedField == eventNameFull || queriedField == eventNameFull+`_`+eventVer {
vf, err = parsedMessage.GetUnstructEventValue(path...)
valueFound = append(valueFound, vf)
}
default:
vf, err = parsedMessage.GetValue(field)
valueFound = append(valueFound, vf)
func evaluateSpEnrichedFilter(re *regexp2.Regexp, valuesFound []interface{}) bool {
// if valuesFound is nil, we found no value.
// Because negative matches are a thing, we still want to match against an empty string
if valuesFound == nil {
valuesFound = make([]interface{}, 1)
}
if err != nil {
// GetValue returns an error if the field requested is empty. Check for that particular error before returning error
if err.Error() == analytics.EmptyFieldErr {
return nil, nil
for _, v := range valuesFound {
if v == nil {
v = "" // because nil gets cast to `<nil>`
}
return nil, err
}
return valueFound, nil
}

func evaluateSpEnrichedFilter(valuesFound []interface{}, regex string, regexTimeout int) bool {
re, err := regexp2.Compile(regex, 0)
re.MatchTimeout = time.Duration(regexTimeout) * time.Second
if err != nil {
log.Fatal(errors.Wrap(err, `error compiling regex for filter`))
}
for _, v := range valuesFound {
if ok, _ := re.MatchString(fmt.Sprintf("%v", v)); ok {
return true
}
}
return false
}

// createSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event
// and a regex declared by the user.
func createSpEnrichedFilterFunction(field, regex string, regexTimeout int, isUnstructEvent bool) (TransformationFunction, error) {
func createSpEnrichedFilterFunction(regex string, regexTimeout int, getFunc valueGetter) (TransformationFunction, error) {
if regexTimeout == 0 {
// default timeout for regex is 10 seconds
regexTimeout = 10
}

// regexToMatch is what we use to evaluate the actual filter, once we have the value.
regexToMatch, err := regexp2.Compile(regex, 0)
regexToMatch.MatchTimeout = time.Duration(regexTimeout) * time.Second
if err != nil {
return nil, errors.Wrap(err, `error compiling regex for filter`)
}

return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
if regexTimeout == 0 {
// default timeout for regex is 10 seconds
regexTimeout = 10
}

// Evaluate intermediateState to parsedEvent
parsedMessage, parseErr := IntermediateAsSpEnrichedParsed(intermediateState, message)
if parseErr != nil {
message.SetError(parseErr)
return nil, nil, message, nil
}

// This regex retrieves the path fields
// (e.g. field1.field2[0].field3 -> [field1, field2, 0, field3])
regexWords := `\w+`
re := regexp.MustCompile(regexWords)

// separate the path string into words using regex
path := re.FindAllString(field, -1)
separatedPath := make([]string, len(path)-1)
for idx, pathField := range path[1:] {
separatedPath[idx] = pathField
}

var parsedEventName string
var eventMajorVer string
var err error

// only call SDK functions if an unstruct_event is being filtered
if isUnstructEvent {
// get event name
eventName, err := parsedMessage.GetValue(`event_name`)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}
parsedEventName = eventName.(string)
// get event version
fullEventVer, err := parsedMessage.GetValue(`event_version`)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}
// get the major event version
eventMajorVer = strings.Split(fullEventVer.(string), `-`)[0]
if eventMajorVer == `` {
message.SetError(fmt.Errorf(`invalid schema version format: %s`, fullEventVer))
return nil, nil, message, nil
}
}

// find the value in the event
valueFound, err := findSpEnrichedFilterValue(
path[0],
parsedEventName,
eventMajorVer,
field,
parsedMessage,
convertPathToInterfaces(separatedPath),
)
// get the value
valueFound, err := getFunc(parsedMessage)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}

// evaluate whether the found value passes the filter, determining if the message should be kept
shouldKeepMessage := evaluateSpEnrichedFilter(valueFound, regex, regexTimeout)
shouldKeepMessage := evaluateSpEnrichedFilter(regexToMatch, valueFound)

// if message is not to be kept, return it as a filtered message to be acked in the main function
if !shouldKeepMessage {
Expand All @@ -145,17 +81,158 @@ func createSpEnrichedFilterFunction(field, regex string, regexTimeout int, isUns
}, nil
}

// valueGetter is a function that can hold the logic for getting values in the case of base, context, and unstruct fields,
// which respecively require different logic.
type valueGetter func(analytics.ParsedEvent) ([]interface{}, error)

// Because each type of value requires different arguments, we use these `make` functions to construct them.
// This allows us to unit test each one, plug them into the createSpEnrichedFilterFunction constructor,
// and to construct them so that field names/paths and regexes are handled only once, at startup.

// makeBaseValueGetter returns a valueGetter for base-level values.
func makeBaseValueGetter(field string) valueGetter {
return func(parsedMessage analytics.ParsedEvent) (value []interface{}, err error) {
// find the value in the event
valueFound, err := parsedMessage.GetValue(field)
// We don't return an error for empty field since this just means the value is nil.
if err != nil && err.Error() != analytics.EmptyFieldErr {
return nil, err
}
return []interface{}{valueFound}, nil
}
}

// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event.
func NewSpEnrichedFilterFunction(field, regex string, regexTimeout int) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(field, regex, regexTimeout, false)

// getBaseValueForMatch is responsible for retrieving data from the message for base fields
getBaseValueForMatch := makeBaseValueGetter(field)

return createSpEnrichedFilterFunction(regex, regexTimeout, getBaseValueForMatch)
}

// makeContextValueGetter creates a valueGetter for context data
func makeContextValueGetter(name string, path []interface{}) valueGetter {
return func(parsedMessage analytics.ParsedEvent) ([]interface{}, error) {
value, err := parsedMessage.GetContextValue(name, path...)
// We don't return an error for empty field since this just means the value is nil.
if err != nil && err.Error() != analytics.EmptyFieldErr {
return nil, err
}
// bug in analytics sdk requires the type casting below. https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/36
// GetContextValue should always return []interface{} but instead it returns an interface{} which always contains type []interface{}

// if it's nil, return nil - we just didn't find any value.
if value == nil {
return nil, nil
}
// otherwise, type assertion.
valueFound, ok := value.([]interface{})
if !ok {
return nil, errors.New(fmt.Sprintf("Context filter encountered unexpected type in getting value for path %v", path))
}

return valueFound, nil
}
}

// NewSpEnrichedFilterFunctionContext returns a TransformationFunction for filtering a context
func NewSpEnrichedFilterFunctionContext(field, regex string, regexTimeout int) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(field, regex, regexTimeout, false)
func NewSpEnrichedFilterFunctionContext(contextFullName, pathToField, regex string, regexTimeout int) (TransformationFunction, error) {

path, err := parsePathToArguments(pathToField)
if err != nil {
return nil, errors.Wrap(err, "error creating Context filter function")
}

// getContextValuesForMatch is responsible for retrieving data from the message for context fields
getContextValuesForMatch := makeContextValueGetter(contextFullName, path)

return createSpEnrichedFilterFunction(regex, regexTimeout, getContextValuesForMatch)
}

// makeUnstructValueGetter creates a valueGetter for unstruct data.
func makeUnstructValueGetter(eventName string, versionRegex *regexp.Regexp, path []interface{}) valueGetter {
return func(parsedMessage analytics.ParsedEvent) (value []interface{}, err error) {
eventNameFound, err := parsedMessage.GetValue(`event_name`)
if err != nil { // This field can't be empty for a valid event, so we return all errors here
return nil, err
}
if eventNameFound != eventName { // If we don't have an exact match on event name, we return nil value
return nil, nil
}
versionFound, err := parsedMessage.GetValue(`event_version`)
if err != nil { // This field can't be empty for a valid event, so we return all errors here
return nil, err
}
if !versionRegex.MatchString(versionFound.(string)) { // If we don't match the provided version regex, return nil value
return nil, nil
}

valueFound, err := parsedMessage.GetUnstructEventValue(path...)
// We don't return an error for empty field since this just means the value is nil.
if err != nil && err.Error() != analytics.EmptyFieldErr && !strings.Contains(err.Error(), "not found") {
// This last clause exists because of this: https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/37
// TODO: Fix that and remove it as soon as possible.
return nil, err
}

if valueFound == nil {
return nil, nil
}

return []interface{}{valueFound}, nil
}
}

// NewSpEnrichedFilterFunctionUnstructEvent returns a TransformationFunction for filtering an unstruct_event
func NewSpEnrichedFilterFunctionUnstructEvent(field, regex string, regexTimeout int) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(field, regex, regexTimeout, true)
func NewSpEnrichedFilterFunctionUnstructEvent(eventNameToMatch, eventVersionToMatch, pathToField, regex string, regexTimeout int) (TransformationFunction, error) {

path, err := parsePathToArguments(pathToField)
if err != nil {
return nil, errors.Wrap(err, "error creating Unstruct filter function")
}

versionRegex, err := regexp.Compile(eventVersionToMatch)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprint("Failed to compile regex: ", eventVersionToMatch))
}

// getUnstructValuesForMatch is responsible for retrieving data from the message for context fields.
// It also checks that the correct event name and version are provided, and returns nil if not.
getUnstructValuesForMatch := makeUnstructValueGetter(eventNameToMatch, versionRegex, path)

return createSpEnrichedFilterFunction(regex, regexTimeout, getUnstructValuesForMatch)
}

// parsePathToArguments parses a string path to custom data (eg. `test1.test2[0].test3`)
// into the slice of interfaces expected by the analytics SDK's Get() methods.
func parsePathToArguments(pathToField string) ([]interface{}, error) {
// validate that an edge case (unmatched opening brace) isn't present
if strings.Count(pathToField, "[") != strings.Count(pathToField, "]") {
return nil, errors.New(fmt.Sprint("unmatched brace in path: ", pathToField))
}

// regex to separate path into components
re := regexp.MustCompile(`\[\d+\]|[^\.\[]+`)
parts := re.FindAllString(pathToField, -1)

// regex to identify arrays
arrayRegex := regexp.MustCompile(`\[\d+\]`)

convertedPath := make([]interface{}, 0)
for _, part := range parts {

if arrayRegex.MatchString(part) { // handle arrays first
intPart, err := strconv.Atoi(part[1 : len(part)-1]) // strip braces and convert to int
if err != nil {
return nil, errors.New(fmt.Sprint("error parsing path element: ", part))
}

convertedPath = append(convertedPath, intPart)
} else { // handle strings
convertedPath = append(convertedPath, part)
}

}
return convertedPath, nil
}
Loading

0 comments on commit 1de851a

Please sign in to comment.