From 1de851a918c2fab91dfd50f0ad07be44c6b88ed7 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Thu, 4 Aug 2022 13:12:04 +0100 Subject: [PATCH] Refactor v1 release filters (closes #192) --- .../transform-mixed-filtered.hcl | 2 +- pkg/transform/snowplow_enriched_filter.go | 267 +++++++++----- .../snowplow_enriched_filter_test.go | 332 +++++++++++++++--- pkg/transform/snowplow_enriched_util.go | 17 - pkg/transform/snowplow_enriched_util_test.go | 11 - pkg/transform/transform_test_variables.go | 5 +- .../transformconfig/transform_config.go | 75 ++-- .../transformconfig/transform_config_test.go | 82 +++-- 8 files changed, 548 insertions(+), 243 deletions(-) diff --git a/config/test-fixtures/transform-mixed-filtered.hcl b/config/test-fixtures/transform-mixed-filtered.hcl index 9ba1d870..1d6711db 100644 --- a/config/test-fixtures/transform-mixed-filtered.hcl +++ b/config/test-fixtures/transform-mixed-filtered.hcl @@ -8,7 +8,7 @@ transform { transform { use "spEnrichedFilter" { - field = "app_id" + atomic_field = "app_id" regex = "wrong" regex_timeout = 10 } diff --git a/pkg/transform/snowplow_enriched_filter.go b/pkg/transform/snowplow_enriched_filter.go index 3cec9a2c..74b4393d 100644 --- a/pkg/transform/snowplow_enriched_filter.go +++ b/pkg/transform/snowplow_enriched_filter.go @@ -8,8 +8,8 @@ package transform import ( "fmt" - "log" "regexp" + "strconv" "strings" "time" @@ -21,42 +21,17 @@ import ( "github.com/snowplow-devops/stream-replicator/pkg/models" ) -func findSpEnrichedFilterValue(queriedField, parsedEventName, eventVer, field string, parsedMessage analytics.ParsedEvent, path []interface{}) ([]interface{}, error) { - var vf interface{} - var valueFound []interface{} - var err error - - switch { - case strings.HasPrefix(queriedField, `contexts_`): - vf, err = parsedMessage.GetContextValue(queriedField, path...) - valueFound = append(valueFound, vf.([]interface{})...) - case strings.HasPrefix(queriedField, `unstruct_event`): - eventNameFull := `unstruct_event_` + parsedEventName - if queriedField == eventNameFull || queriedField == eventNameFull+`_`+eventVer { - vf, err = parsedMessage.GetUnstructEventValue(path...) - valueFound = append(valueFound, vf) - } - default: - vf, err = parsedMessage.GetValue(field) - valueFound = append(valueFound, vf) +func evaluateSpEnrichedFilter(re *regexp2.Regexp, valuesFound []interface{}) bool { + // if valuesFound is nil, we found no value. + // Because negative matches are a thing, we still want to match against an empty string + if valuesFound == nil { + valuesFound = make([]interface{}, 1) } - if err != nil { - // GetValue returns an error if the field requested is empty. Check for that particular error before returning error - if err.Error() == analytics.EmptyFieldErr { - return nil, nil + for _, v := range valuesFound { + if v == nil { + v = "" // because nil gets cast to `` } - return nil, err - } - return valueFound, nil -} -func evaluateSpEnrichedFilter(valuesFound []interface{}, regex string, regexTimeout int) bool { - re, err := regexp2.Compile(regex, 0) - re.MatchTimeout = time.Duration(regexTimeout) * time.Second - if err != nil { - log.Fatal(errors.Wrap(err, `error compiling regex for filter`)) - } - for _, v := range valuesFound { if ok, _ := re.MatchString(fmt.Sprintf("%v", v)); ok { return true } @@ -64,14 +39,21 @@ func evaluateSpEnrichedFilter(valuesFound []interface{}, regex string, regexTime return false } -// createSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event -// and a regex declared by the user. -func createSpEnrichedFilterFunction(field, regex string, regexTimeout int, isUnstructEvent bool) (TransformationFunction, error) { +func createSpEnrichedFilterFunction(regex string, regexTimeout int, getFunc valueGetter) (TransformationFunction, error) { + if regexTimeout == 0 { + // default timeout for regex is 10 seconds + regexTimeout = 10 + } + + // regexToMatch is what we use to evaluate the actual filter, once we have the value. + regexToMatch, err := regexp2.Compile(regex, 0) + regexToMatch.MatchTimeout = time.Duration(regexTimeout) * time.Second + if err != nil { + return nil, errors.Wrap(err, `error compiling regex for filter`) + } + return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { - if regexTimeout == 0 { - // default timeout for regex is 10 seconds - regexTimeout = 10 - } + // Evaluate intermediateState to parsedEvent parsedMessage, parseErr := IntermediateAsSpEnrichedParsed(intermediateState, message) if parseErr != nil { @@ -79,61 +61,15 @@ func createSpEnrichedFilterFunction(field, regex string, regexTimeout int, isUns return nil, nil, message, nil } - // This regex retrieves the path fields - // (e.g. field1.field2[0].field3 -> [field1, field2, 0, field3]) - regexWords := `\w+` - re := regexp.MustCompile(regexWords) - - // separate the path string into words using regex - path := re.FindAllString(field, -1) - separatedPath := make([]string, len(path)-1) - for idx, pathField := range path[1:] { - separatedPath[idx] = pathField - } - - var parsedEventName string - var eventMajorVer string - var err error - - // only call SDK functions if an unstruct_event is being filtered - if isUnstructEvent { - // get event name - eventName, err := parsedMessage.GetValue(`event_name`) - if err != nil { - message.SetError(err) - return nil, nil, message, nil - } - parsedEventName = eventName.(string) - // get event version - fullEventVer, err := parsedMessage.GetValue(`event_version`) - if err != nil { - message.SetError(err) - return nil, nil, message, nil - } - // get the major event version - eventMajorVer = strings.Split(fullEventVer.(string), `-`)[0] - if eventMajorVer == `` { - message.SetError(fmt.Errorf(`invalid schema version format: %s`, fullEventVer)) - return nil, nil, message, nil - } - } - - // find the value in the event - valueFound, err := findSpEnrichedFilterValue( - path[0], - parsedEventName, - eventMajorVer, - field, - parsedMessage, - convertPathToInterfaces(separatedPath), - ) + // get the value + valueFound, err := getFunc(parsedMessage) if err != nil { message.SetError(err) return nil, nil, message, nil } // evaluate whether the found value passes the filter, determining if the message should be kept - shouldKeepMessage := evaluateSpEnrichedFilter(valueFound, regex, regexTimeout) + shouldKeepMessage := evaluateSpEnrichedFilter(regexToMatch, valueFound) // if message is not to be kept, return it as a filtered message to be acked in the main function if !shouldKeepMessage { @@ -145,17 +81,158 @@ func createSpEnrichedFilterFunction(field, regex string, regexTimeout int, isUns }, nil } +// valueGetter is a function that can hold the logic for getting values in the case of base, context, and unstruct fields, +// which respecively require different logic. +type valueGetter func(analytics.ParsedEvent) ([]interface{}, error) + +// Because each type of value requires different arguments, we use these `make` functions to construct them. +// This allows us to unit test each one, plug them into the createSpEnrichedFilterFunction constructor, +// and to construct them so that field names/paths and regexes are handled only once, at startup. + +// makeBaseValueGetter returns a valueGetter for base-level values. +func makeBaseValueGetter(field string) valueGetter { + return func(parsedMessage analytics.ParsedEvent) (value []interface{}, err error) { + // find the value in the event + valueFound, err := parsedMessage.GetValue(field) + // We don't return an error for empty field since this just means the value is nil. + if err != nil && err.Error() != analytics.EmptyFieldErr { + return nil, err + } + return []interface{}{valueFound}, nil + } +} + // NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event. func NewSpEnrichedFilterFunction(field, regex string, regexTimeout int) (TransformationFunction, error) { - return createSpEnrichedFilterFunction(field, regex, regexTimeout, false) + + // getBaseValueForMatch is responsible for retrieving data from the message for base fields + getBaseValueForMatch := makeBaseValueGetter(field) + + return createSpEnrichedFilterFunction(regex, regexTimeout, getBaseValueForMatch) +} + +// makeContextValueGetter creates a valueGetter for context data +func makeContextValueGetter(name string, path []interface{}) valueGetter { + return func(parsedMessage analytics.ParsedEvent) ([]interface{}, error) { + value, err := parsedMessage.GetContextValue(name, path...) + // We don't return an error for empty field since this just means the value is nil. + if err != nil && err.Error() != analytics.EmptyFieldErr { + return nil, err + } + // bug in analytics sdk requires the type casting below. https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/36 + // GetContextValue should always return []interface{} but instead it returns an interface{} which always contains type []interface{} + + // if it's nil, return nil - we just didn't find any value. + if value == nil { + return nil, nil + } + // otherwise, type assertion. + valueFound, ok := value.([]interface{}) + if !ok { + return nil, errors.New(fmt.Sprintf("Context filter encountered unexpected type in getting value for path %v", path)) + } + + return valueFound, nil + } } // NewSpEnrichedFilterFunctionContext returns a TransformationFunction for filtering a context -func NewSpEnrichedFilterFunctionContext(field, regex string, regexTimeout int) (TransformationFunction, error) { - return createSpEnrichedFilterFunction(field, regex, regexTimeout, false) +func NewSpEnrichedFilterFunctionContext(contextFullName, pathToField, regex string, regexTimeout int) (TransformationFunction, error) { + + path, err := parsePathToArguments(pathToField) + if err != nil { + return nil, errors.Wrap(err, "error creating Context filter function") + } + + // getContextValuesForMatch is responsible for retrieving data from the message for context fields + getContextValuesForMatch := makeContextValueGetter(contextFullName, path) + + return createSpEnrichedFilterFunction(regex, regexTimeout, getContextValuesForMatch) +} + +// makeUnstructValueGetter creates a valueGetter for unstruct data. +func makeUnstructValueGetter(eventName string, versionRegex *regexp.Regexp, path []interface{}) valueGetter { + return func(parsedMessage analytics.ParsedEvent) (value []interface{}, err error) { + eventNameFound, err := parsedMessage.GetValue(`event_name`) + if err != nil { // This field can't be empty for a valid event, so we return all errors here + return nil, err + } + if eventNameFound != eventName { // If we don't have an exact match on event name, we return nil value + return nil, nil + } + versionFound, err := parsedMessage.GetValue(`event_version`) + if err != nil { // This field can't be empty for a valid event, so we return all errors here + return nil, err + } + if !versionRegex.MatchString(versionFound.(string)) { // If we don't match the provided version regex, return nil value + return nil, nil + } + + valueFound, err := parsedMessage.GetUnstructEventValue(path...) + // We don't return an error for empty field since this just means the value is nil. + if err != nil && err.Error() != analytics.EmptyFieldErr && !strings.Contains(err.Error(), "not found") { + // This last clause exists because of this: https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/37 + // TODO: Fix that and remove it as soon as possible. + return nil, err + } + + if valueFound == nil { + return nil, nil + } + + return []interface{}{valueFound}, nil + } } // NewSpEnrichedFilterFunctionUnstructEvent returns a TransformationFunction for filtering an unstruct_event -func NewSpEnrichedFilterFunctionUnstructEvent(field, regex string, regexTimeout int) (TransformationFunction, error) { - return createSpEnrichedFilterFunction(field, regex, regexTimeout, true) +func NewSpEnrichedFilterFunctionUnstructEvent(eventNameToMatch, eventVersionToMatch, pathToField, regex string, regexTimeout int) (TransformationFunction, error) { + + path, err := parsePathToArguments(pathToField) + if err != nil { + return nil, errors.Wrap(err, "error creating Unstruct filter function") + } + + versionRegex, err := regexp.Compile(eventVersionToMatch) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprint("Failed to compile regex: ", eventVersionToMatch)) + } + + // getUnstructValuesForMatch is responsible for retrieving data from the message for context fields. + // It also checks that the correct event name and version are provided, and returns nil if not. + getUnstructValuesForMatch := makeUnstructValueGetter(eventNameToMatch, versionRegex, path) + + return createSpEnrichedFilterFunction(regex, regexTimeout, getUnstructValuesForMatch) +} + +// parsePathToArguments parses a string path to custom data (eg. `test1.test2[0].test3`) +// into the slice of interfaces expected by the analytics SDK's Get() methods. +func parsePathToArguments(pathToField string) ([]interface{}, error) { + // validate that an edge case (unmatched opening brace) isn't present + if strings.Count(pathToField, "[") != strings.Count(pathToField, "]") { + return nil, errors.New(fmt.Sprint("unmatched brace in path: ", pathToField)) + } + + // regex to separate path into components + re := regexp.MustCompile(`\[\d+\]|[^\.\[]+`) + parts := re.FindAllString(pathToField, -1) + + // regex to identify arrays + arrayRegex := regexp.MustCompile(`\[\d+\]`) + + convertedPath := make([]interface{}, 0) + for _, part := range parts { + + if arrayRegex.MatchString(part) { // handle arrays first + intPart, err := strconv.Atoi(part[1 : len(part)-1]) // strip braces and convert to int + if err != nil { + return nil, errors.New(fmt.Sprint("error parsing path element: ", part)) + } + + convertedPath = append(convertedPath, intPart) + } else { // handle strings + convertedPath = append(convertedPath, part) + } + + } + return convertedPath, nil } diff --git a/pkg/transform/snowplow_enriched_filter_test.go b/pkg/transform/snowplow_enriched_filter_test.go index 738c5505..acc11583 100644 --- a/pkg/transform/snowplow_enriched_filter_test.go +++ b/pkg/transform/snowplow_enriched_filter_test.go @@ -7,33 +7,35 @@ package transform import ( + "regexp" "testing" + "github.com/dlclark/regexp2" "github.com/stretchr/testify/assert" "github.com/snowplow-devops/stream-replicator/pkg/models" ) -func TestNewSpEnrichedFilterFunction(t *testing.T) { - assert := assert.New(t) +var messageGood = models.Message{ + Data: snowplowTsv3, + PartitionKey: "some-key", +} - var messageGood = models.Message{ - Data: snowplowTsv3, - PartitionKey: "some-key", - } +var messageGoodInt = models.Message{ + Data: snowplowTsv4, + PartitionKey: "some-key", +} - var messageGoodInt = models.Message{ - Data: snowplowTsv4, - PartitionKey: "some-key", - } +var messageWithUnstructEvent = models.Message{ + Data: snowplowTsv1, + PartitionKey: "some-key", +} - var messageWithUnstructEvent = models.Message{ - Data: snowplowTsv1, - PartitionKey: "some-key", - } +func TestNewSpEnrichedFilterFunction(t *testing.T) { + assert := assert.New(t) // Single value cases - aidFilterFuncKeep, _ := NewSpEnrichedFilterFunction("app_id", "test-data3", 0) + aidFilterFuncKeep, _ := NewSpEnrichedFilterFunction("app_id", "^test-data3$", 0) aidKeepIn, aidKeepOut, fail, _ := aidFilterFuncKeep(&messageGood, nil) @@ -50,7 +52,7 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Nil(fail2) // int value - urlPrtFilterFuncKeep, _ := NewSpEnrichedFilterFunction("page_urlport", "80", 10) + urlPrtFilterFuncKeep, _ := NewSpEnrichedFilterFunction("page_urlport", "^80$", 10) urlPrtKeepIn, urlPrtKeepOut, fail, _ := urlPrtFilterFuncKeep(&messageGood, nil) @@ -59,7 +61,7 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Nil(fail) // Multiple value cases - aidFilterFuncKeepWithMultiple, _ := NewSpEnrichedFilterFunction("app_id", "someotherValue|test-data3", 10) + aidFilterFuncKeepWithMultiple, _ := NewSpEnrichedFilterFunction("app_id", "^someotherValue|test-data3$", 10) aidMultipleNegationFailedIn, aidMultipleKeepOut, fail3, _ := aidFilterFuncKeepWithMultiple(&messageGood, nil) @@ -67,7 +69,7 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Nil(aidMultipleKeepOut) assert.Nil(fail3) - aidFilterFuncDiscardWithMultiple, _ := NewSpEnrichedFilterFunction("app_id", "someotherValue|failThis", 10) + aidFilterFuncDiscardWithMultiple, _ := NewSpEnrichedFilterFunction("app_id", "^someotherValue|failThis$", 10) aidNegationMultipleIn, aidMultipleDiscardOut, fail3, _ := aidFilterFuncDiscardWithMultiple(&messageGood, nil) @@ -76,7 +78,6 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Nil(fail3) // Single value negation cases - aidFilterFuncNegationDiscard, _ := NewSpEnrichedFilterFunction("app_id", "^((?!test-data3).)*$", 10) aidNegationIn, aidNegationOut, fail4, _ := aidFilterFuncNegationDiscard(&messageGood, nil) @@ -85,7 +86,7 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Equal(snowplowTsv3, aidNegationOut.Data) assert.Nil(fail4) - aidFilterFuncNegationKeep, _ := NewSpEnrichedFilterFunction("app_id", "^((?!failThis).)*$", 10) + aidFilterFuncNegationKeep, _ := NewSpEnrichedFilterFunction("app_id", "^((?!someValue).)*$", 10) aidNegationFailedIn, aidNegationFailedOut, fail5, _ := aidFilterFuncNegationKeep(&messageGood, nil) @@ -111,10 +112,11 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Nil(fail7) // Filters on a nil field - txnFilterFunctionAffirmation, _ := NewSpEnrichedFilterFunction("txn_id", "something", 10) + txnFilterFunctionAffirmation, _ := NewSpEnrichedFilterFunction("txn_id", "^something$", 10) nilAffirmationIn, nilAffirmationOut, fail8, _ := txnFilterFunctionAffirmation(&messageGood, nil) + // nil doesn't match the regex and should be filtered out. assert.Nil(nilAffirmationIn) assert.Equal(snowplowTsv3, nilAffirmationOut.Data) assert.Nil(fail8) @@ -123,12 +125,27 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { nilNegationIn, nilNegationOut, fail8, _ := txnFilterFunctionNegation(&messageGood, nil) - assert.Nil(nilNegationIn) - assert.Equal(snowplowTsv3, nilNegationOut.Data) + // nil DOES match the negative lookup - it doesn't contain 'something'. So should be kept. + assert.Equal(snowplowTsv3, nilNegationIn.Data) + assert.Nil(nilNegationOut) assert.Nil(fail8) + fieldNotExistsFilter, _ := NewSpEnrichedFilterFunction("nothing", "", 10) + + notExistsIn, notExistsOut, notExistsFail, _ := fieldNotExistsFilter(&messageGood, nil) + + assert.Nil(notExistsIn) + assert.Nil(notExistsOut) + assert.NotNil(notExistsFail) +} + +func TestNewSpEnrichedFilterFunctionContext(t *testing.T) { + assert := assert.New(t) + + // The relevant data in messageGood looks like this: "test1":{"test2":[{"test3":"testValue"}] + // context filter success - contextFuncKeep, _ := NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1.test1.test2[0].test3", "testValue", 10) + contextFuncKeep, _ := NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1", "test1.test2[0].test3", "^testValue$", 10) contextKeepIn, contextKeepOut, fail9, _ := contextFuncKeep(&messageGood, nil) @@ -136,8 +153,10 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Nil(contextKeepOut) assert.Nil(fail9) + // The relevant data in messageGoodInt looks like this: "test1":{"test2":[{"test3":1}] + // context filter success (integer value) - contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1.test1.test2[0].test3", "1", 10) + contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1", "test1.test2[0].test3", "^1$", 10) contextKeepIn, contextKeepOut, fail9, _ = contextFuncKeep(&messageGoodInt, nil) @@ -145,50 +164,55 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Nil(contextKeepOut) assert.Nil(fail9) - // context filter failure - contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_2.test1.test2[0].test3", "testValue", 10) + // context filter wrong path + contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_2", "test1.test2[0].test3", "^testValue$", 10) contextKeepIn, contextKeepOut, fail9, _ = contextFuncKeep(&messageGood, nil) assert.Nil(contextKeepIn) assert.Equal(snowplowTsv3, contextKeepOut.Data) assert.Nil(fail9) +} + +func TestNewSpEnrichedFilterFunctionUnstructEvent(t *testing.T) { + assert := assert.New(t) // event filter success, filtered event name - eventFilterFunCkeep, _ := NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart_1.sku", "item41", 10) + eventFilterFuncKeep, _ := NewSpEnrichedFilterFunctionUnstructEvent("add_to_cart", "1-*-*", "sku", "^item41$", 10) - eventKeepIn, eventKeepOut, fail10, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil) + eventKeepIn, eventKeepOut, fail10, _ := eventFilterFuncKeep(&messageWithUnstructEvent, nil) assert.Equal(snowplowTsv1, eventKeepIn.Data) assert.Nil(eventKeepOut) assert.Nil(fail10) // event filter success, filtered event name, no event ver - eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart.sku", "item41", 10) + eventFilterFuncKeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("add_to_cart", "", "sku", "^item41$", 10) - eventKeepIn, eventKeepOut, fail10, _ = eventFilterFunCkeep(&messageWithUnstructEvent, nil) + eventKeepIn, eventKeepOut, fail10, _ = eventFilterFuncKeep(&messageWithUnstructEvent, nil) assert.Equal(snowplowTsv1, eventKeepIn.Data) assert.Nil(eventKeepOut) assert.Nil(fail10) // event filter failure, wrong event name - eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_wrong_name.sku", "item41", 10) + eventFilterFuncKeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("wrong_name", "", "sku", "^item41$", 10) - eventKeepIn, eventKeepOut, fail11, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil) + eventKeepIn, eventKeepOut, fail11, _ := eventFilterFuncKeep(&messageWithUnstructEvent, nil) assert.Nil(eventKeepIn) assert.Equal(snowplowTsv1, eventKeepOut.Data) assert.Nil(fail11) // event filter failure, field not found - eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart.ska", "item41", 10) + eventFilterFuncKeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("add_to_cart", "", "ska", "item41", 10) - eventNoFieldIn, eventNoFieldOut, fail12, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil) + eventNoFieldIn, eventNoFieldOut, fail12, _ := eventFilterFuncKeep(&messageWithUnstructEvent, nil) assert.Nil(eventNoFieldIn) - assert.Nil(eventNoFieldOut) - assert.NotNil(fail12) + assert.Equal(snowplowTsv1, eventNoFieldOut.Data) + assert.Nil(fail12) + } func TestSpEnrichedFilterFunction_Slice(t *testing.T) { @@ -212,7 +236,7 @@ func TestSpEnrichedFilterFunction_Slice(t *testing.T) { }, } - filterFunc, _ := NewSpEnrichedFilterFunction("app_id", "test-data1", 10) + filterFunc, _ := NewSpEnrichedFilterFunction("app_id", "^test-data1$", 10) filter1 := NewTransformation(filterFunc) filter1Res := filter1(messages) @@ -240,7 +264,7 @@ func TestSpEnrichedFilterFunction_Slice(t *testing.T) { }, } - filterFunc2, _ := NewSpEnrichedFilterFunction("app_id", "test-data1|test-data2", 10) + filterFunc2, _ := NewSpEnrichedFilterFunction("app_id", "^test-data1|test-data2$", 10) filter2 := NewTransformation(filterFunc2) filter2Res := filter2(messages) @@ -269,9 +293,233 @@ func TestSpEnrichedFilterFunction_Slice(t *testing.T) { func TestEvaluateSpEnrichedFilter(t *testing.T) { assert := assert.New(t) + regex, err := regexp2.Compile("^yes$", 0) + if err != nil { + panic(err) + } + valuesFound := []interface{}{"NO", "maybe", "yes"} - assert.True(evaluateSpEnrichedFilter(valuesFound, "yes", 10)) + assert.True(evaluateSpEnrichedFilter(regex, valuesFound)) + + valuesFound2 := []interface{}{"NO", "maybe", "nope", nil} + assert.False(evaluateSpEnrichedFilter(regex, valuesFound2)) - valuesFound = []interface{}{"NO", "maybe", "nope"} - assert.False(evaluateSpEnrichedFilter(valuesFound, "yes", 10)) + regexInt, err := regexp2.Compile("^123$", 0) + if err != nil { + panic(err) + } + + valuesFound3 := []interface{}{123, "maybe", "nope", nil} + assert.True(evaluateSpEnrichedFilter(regexInt, valuesFound3)) + + // This asserts that when any element of the input is nil, we assert against empty string. + // It exists to ensure we don't evaluate against the string `` since we're naively casting values to string. + regexNil, err := regexp2.Compile("^$", 0) + if err != nil { + panic(err) + } + + assert.True(evaluateSpEnrichedFilter(regexNil, []interface{}{nil})) + + // just to make sure the regex only matches empty: + assert.False(evaluateSpEnrichedFilter(regexNil, []interface{}{"a"})) + + // These tests ensures that when getters return a nil slice, we're still asserting against the empty value. + // This is important since we have negative lookaheads. + + assert.True(evaluateSpEnrichedFilter(regexNil, nil)) + + // negative lookahead: + regexNegative, err := regexp2.Compile("^((?!failThis).)*$", 0) + if err != nil { + panic(err) + } + + assert.True(evaluateSpEnrichedFilter(regexNegative, nil)) +} + +func TestMakeBaseValueGetter(t *testing.T) { + assert := assert.New(t) + + // simple app ID + appIDGetter := makeBaseValueGetter("app_id") + + res, err := appIDGetter(spTsv3Parsed) + + assert.Equal([]interface{}{"test-data3"}, res) + assert.Nil(err) + + nonExistentFieldGetter := makeBaseValueGetter("nope") + + res2, err2 := nonExistentFieldGetter(spTsv3Parsed) + + assert.Nil(res2) + assert.NotNil(err2) + if err2 != nil { + assert.Equal("Key nope not a valid atomic field", err2.Error()) + } + // TODO: currently we'll only hit this error while processing data. Ideally we should hit it on startup. +} + +func TestMakeContextValueGetter(t *testing.T) { + assert := assert.New(t) + + contextGetter := makeContextValueGetter("contexts_nl_basjes_yauaa_context_1", []interface{}{"test1", "test2", 0, "test3"}) + + res, err := contextGetter(spTsv3Parsed) + + assert.Equal([]interface{}{"testValue"}, res) + assert.Nil(err) + + res2, err2 := contextGetter(spTsv1Parsed) + + // If the path doesn't exist, we shoud return nil, nil. + assert.Nil(res2) + assert.Nil(err2) + + contextGetterArray := makeContextValueGetter("contexts_com_acme_just_ints_1", []interface{}{"integerField"}) + + res3, err3 := contextGetterArray(spTsv1Parsed) + + assert.Equal([]interface{}{float64(0), float64(1), float64(2)}, res3) + assert.Nil(err3) +} + +func TestMakeUnstructValueGetter(t *testing.T) { + assert := assert.New(t) + + re1 := regexp.MustCompile("1-*-*") + + unstructGetter := makeUnstructValueGetter("add_to_cart", re1, []interface{}{"sku"}) + + res, err := unstructGetter(spTsv1Parsed) + + assert.Equal([]interface{}{"item41"}, res) + assert.Nil(err) + + unstructGetterWrongPath := makeUnstructValueGetter("add_to_cart", re1, []interface{}{"notSku"}) + + // If it's not in the event, both should be nil + res2, err2 := unstructGetterWrongPath(spTsv1Parsed) + + assert.Nil(res2) + assert.Nil(err2) + + // test that wrong schema version behaves appropriately (return nil nil) + re2 := regexp.MustCompile("2-*-*") + + unstructWrongSchemaGetter := makeUnstructValueGetter("add_to_cart", re2, []interface{}{"sku"}) + + res3, err3 := unstructWrongSchemaGetter(spTsv1Parsed) + + assert.Nil(res3) + assert.Nil(err3) + + // test that not specifying a version behaves appropriately (accepts all versions) + re3 := regexp.MustCompile("") + + unstructAnyVersionGetter := makeUnstructValueGetter("add_to_cart", re3, []interface{}{"sku"}) + + res4, err4 := unstructAnyVersionGetter(spTsv1Parsed) + + assert.Equal([]interface{}{"item41"}, res4) + assert.Nil(err4) + + // test that wrong event name behaves appropriately (return nil nil) + + unstructWrongEvnetName := makeUnstructValueGetter("not_add_to_cart_at_all", re3, []interface{}{"sku"}) + + res5, err5 := unstructWrongEvnetName(spTsv1Parsed) + + assert.Nil(res5) + assert.Nil(err5) +} + +func BenchmarkBaseFieldFilter(b *testing.B) { + var messageGood = models.Message{ + Data: snowplowTsv3, + PartitionKey: "some-key", + } + aidFilterFuncKeep, _ := NewSpEnrichedFilterFunction("app_id", "^test-data3$", 0) + + aidFilterFuncNegationKeep, _ := NewSpEnrichedFilterFunction("app_id", "^((?!failThis).)*$", 10) + + for i := 0; i < b.N; i++ { + + aidFilterFuncKeep(&messageGood, nil) + aidFilterFuncNegationKeep(&messageGood, nil) + } +} + +func BenchmarkContextFilterNew(b *testing.B) { + var messageGood = models.Message{ + Data: snowplowTsv3, + PartitionKey: "some-key", + } + + contextFuncAffirm, _ := NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1", "test1.test2[0].test3", "^testValue$", 10) + contextFuncNegate, _ := NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1", "test1.test2[0].test3", "^((?!failThis).)*$", 10) + + for i := 0; i < b.N; i++ { + contextFuncAffirm(&messageGood, nil) + contextFuncNegate(&messageGood, nil) + } +} + +func BenchmarkUnstructFilterNew(b *testing.B) { + var messageGood = models.Message{ + Data: snowplowTsv1, + PartitionKey: "some-key", + } + + unstructFilterFuncAffirm, _ := NewSpEnrichedFilterFunctionUnstructEvent("add_to_cart", "1-*-*", "sku", "^item41$", 10) + unstructFilterFuncNegate, _ := NewSpEnrichedFilterFunctionUnstructEvent("add_to_cart", "1-*-*", "sku", "^((?!failThis).)*$", 10) + + for i := 0; i < b.N; i++ { + unstructFilterFuncAffirm(&messageGood, nil) + unstructFilterFuncNegate(&messageGood, nil) + + } +} + +func TestParsePathToArguments(t *testing.T) { + assert := assert.New(t) + + // Common case + path1, err1 := parsePathToArguments("test1[123].test2[1].test3") + expectedPath1 := []interface{}{"test1", 123, "test2", 1, "test3"} + + assert.Equal(expectedPath1, path1) + assert.Nil(err1) + + // Success edge case - field names with different character + path2, err2 := parsePathToArguments("test-1.test_2[1].test$3") + expectedPath2 := []interface{}{"test-1", "test_2", 1, "test$3"} + + assert.Equal(expectedPath2, path2) + assert.Nil(err2) + + // Success edge case - field name is stringified int + path3, err3 := parsePathToArguments("123.456[1].789") + expectedPath3 := []interface{}{"123", "456", 1, "789"} + + assert.Equal(expectedPath3, path3) + assert.Nil(err3) + + // Success edge case - nested arrays + path4, err4 := parsePathToArguments("test1.test2[1][2].test3") + expectedPath4 := []interface{}{"test1", "test2", 1, 2, "test3"} + + assert.Equal(expectedPath4, path4) + assert.Nil(err4) + + // Failure edge case - unmatched brace in path + // We are validating for this and failing at startup, with the assumption that it must be misconfiguration. + path5, err5 := parsePathToArguments("test1.test[2.test3") + + assert.Nil(path5) + assert.NotNil(err5) + if err5 != nil { + assert.Equal("unmatched brace in path: test1.test[2.test3", err5.Error()) + } } diff --git a/pkg/transform/snowplow_enriched_util.go b/pkg/transform/snowplow_enriched_util.go index d494c823..b29b34cd 100644 --- a/pkg/transform/snowplow_enriched_util.go +++ b/pkg/transform/snowplow_enriched_util.go @@ -7,8 +7,6 @@ package transform import ( - "strconv" - "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" "github.com/snowplow-devops/stream-replicator/pkg/models" @@ -28,18 +26,3 @@ func IntermediateAsSpEnrichedParsed(intermediateState interface{}, message *mode } return parsedMessage, nil } - -// convertPathToInterfaces converts a slice of strings representing a path to a slice of interfaces to be used -// by the SDK Get() function -func convertPathToInterfaces(path []string) []interface{} { - var output []interface{} - for _, pathField := range path { - pathFieldInt, err := strconv.Atoi(pathField) - if err != nil { - output = append(output, pathField) - } else { - output = append(output, pathFieldInt) - } - } - return output -} diff --git a/pkg/transform/snowplow_enriched_util_test.go b/pkg/transform/snowplow_enriched_util_test.go index 47ea5469..d253cb21 100644 --- a/pkg/transform/snowplow_enriched_util_test.go +++ b/pkg/transform/snowplow_enriched_util_test.go @@ -46,14 +46,3 @@ func TestIntermediateAsSpEnrichedParsed(t *testing.T) { assert.Equal("Cannot parse tsv event - wrong number of fields provided: 1", err4.Error()) } } - -// TestConvertPathToInterfaces tests that convertPathToInterfaces returns integers and strings where appropriate -func TestConvertPathToInterfaces(t *testing.T) { - assert := assert.New(t) - - expected := []interface{}{"one", 2, 3, "four", "five", 6} - - res := convertPathToInterfaces([]string{"one", "2", "3", "four", "five", "6"}) - - assert.Equal(expected, res) -} diff --git a/pkg/transform/transform_test_variables.go b/pkg/transform/transform_test_variables.go index 6464bd01..0e48b5bd 100644 --- a/pkg/transform/transform_test_variables.go +++ b/pkg/transform/transform_test_variables.go @@ -12,13 +12,12 @@ import ( "github.com/snowplow-devops/stream-replicator/pkg/models" ) -var snowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `) +var snowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 0}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 1}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 2}},{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `) var spTsv1Parsed, _ = analytics.ParseEvent(string(snowplowTsv1)) -var snowplowJSON1 = []byte(`{"app_id":"test-data1","collector_tstamp":"2019-05-10T14:40:35.972Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:35.972Z","dvce_created_tstamp":"2019-05-10T14:40:35.551Z","dvce_sent_tstamp":"2019-05-10T14:40:35Z","etl_tstamp":"2019-05-10T14:40:37.436Z","event":"unstruct","event_format":"jsonschema","event_id":"e9234345-f042-46ad-b1aa-424464066a33","event_name":"add_to_cart","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"d26822f5-52cc-4292-8f77-14ef6b7a27e2","platform":"pc","unstruct_event_com_snowplowanalytics_snowplow_add_to_cart_1":{"currency":"GBP","quantity":2,"sku":"item41","unitPrice":32.4},"user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) +var snowplowJSON1 = []byte(`{"app_id":"test-data1","collector_tstamp":"2019-05-10T14:40:35.972Z","contexts_com_acme_just_ints_1":[{"integerField":0},{"integerField":1},{"integerField":2}],"contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:35.972Z","dvce_created_tstamp":"2019-05-10T14:40:35.551Z","dvce_sent_tstamp":"2019-05-10T14:40:35Z","etl_tstamp":"2019-05-10T14:40:37.436Z","event":"unstruct","event_format":"jsonschema","event_id":"e9234345-f042-46ad-b1aa-424464066a33","event_name":"add_to_cart","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"d26822f5-52cc-4292-8f77-14ef6b7a27e2","platform":"pc","unstruct_event_com_snowplowanalytics_snowplow_add_to_cart_1":{"currency":"GBP","quantity":2,"sku":"item41","unitPrice":32.4},"user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) var snowplowTsv2 = []byte(`test-data2 pc 2019-05-10 14:40:32.392 2019-05-10 14:40:31.105 2019-05-10 14:40:30.218 transaction_item 5071169f-3050-473f-b03f-9748319b1ef2 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 68220ade-307b-4898-8e25-c4c8ac92f1d7 transaction item58 35.87 1 python-requests/2.21.0 2019-05-10 14:40:30.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:31.105 com.snowplowanalytics.snowplow transaction_item jsonschema 1-0-0 `) var spTsv2Parsed, _ = analytics.ParseEvent(string(snowplowTsv2)) var snowplowJSON2 = []byte(`{"app_id":"test-data2","collector_tstamp":"2019-05-10T14:40:31.105Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:31.105Z","dvce_created_tstamp":"2019-05-10T14:40:30.218Z","dvce_sent_tstamp":"2019-05-10T14:40:30Z","etl_tstamp":"2019-05-10T14:40:32.392Z","event":"transaction_item","event_format":"jsonschema","event_id":"5071169f-3050-473f-b03f-9748319b1ef2","event_name":"transaction_item","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"68220ade-307b-4898-8e25-c4c8ac92f1d7","platform":"pc","ti_orderid":"transaction\u003cbuilt-in function input\u003e","ti_price":35.87,"ti_quantity":1,"ti_sku":"item58","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) - var snowplowTsv3 = []byte(`test-data3 pc 2019-05-10 14:40:30.836 2019-05-10 14:40:29.576 2019-05-10 14:40:29.204 page_view e8aef68d-8533-45c6-a672-26a0f01be9bd py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 b66c4a12-8584-4c7a-9a5d-7c96f59e2556 www.demo-site.com/campaign-landing-page landing-page 80 www.demo-site.com/campaign-landing-page python-requests/2.21.0 2019-05-10 14:40:29.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??","test1":{"test2":[{"test3":"testValue"}]}}}]} 2019-05-10 14:40:29.576 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0 `) var spTsv3Parsed, _ = analytics.ParseEvent(string(snowplowTsv3)) var snowplowJSON3 = []byte(`{"app_id":"test-data3","collector_tstamp":"2019-05-10T14:40:29.576Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??","test1":{"test2":[{"test3":"testValue"}]}}],"derived_tstamp":"2019-05-10T14:40:29.576Z","dvce_created_tstamp":"2019-05-10T14:40:29.204Z","dvce_sent_tstamp":"2019-05-10T14:40:29Z","etl_tstamp":"2019-05-10T14:40:30.836Z","event":"page_view","event_format":"jsonschema","event_id":"e8aef68d-8533-45c6-a672-26a0f01be9bd","event_name":"page_view","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"b66c4a12-8584-4c7a-9a5d-7c96f59e2556","page_title":"landing-page","page_url":"www.demo-site.com/campaign-landing-page","page_urlpath":"www.demo-site.com/campaign-landing-page","page_urlport":80,"platform":"pc","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/transformconfig/transform_config.go index 362f153f..fd4d8ee2 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/transformconfig/transform_config.go @@ -20,10 +20,15 @@ import ( // Transformation represents a transformation's configuration type Transformation struct { - Description string `hcl:"description,optional"` - Field string `hcl:"field,optional"` - Regex string `hcl:"regex,optional"` - RegexTimeout int `hcl:"regex_timeout,optional"` + // For native filters + Description string `hcl:"description,optional"` + UnstructEventName string `hcl:"unstruct_event_name,optional"` + UnstructEventVersionRegex string `hcl:"unstruct_event_version_regex,optional"` + ContextFullName string `hcl:"context_full_name,optional"` + CustomFieldPath string `hcl:"custom_field_path,optional"` + AtomicField string `hcl:"atomic_field,optional"` + Regex string `hcl:"regex,optional"` + RegexTimeout int `hcl:"regex_timeout,optional"` // for JS and Lua transformations SourceB64 string `hcl:"source_b64,optional"` TimeoutSec int `hcl:"timeout_sec,optional"` @@ -76,55 +81,55 @@ func ValidateTransformations(transformations []*Transformation) []error { case "spEnrichedToJson": continue case "spEnrichedSetPk": - if transformation.Field == `` { - validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedSetPk, empty field`, idx)) + if transformation.AtomicField == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedSetPk, empty atomic field`, idx)) continue } case "spEnrichedFilter": - if transformation.Field != `` && transformation.Regex != `` { + if transformation.AtomicField == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilter, empty atomic field`, idx)) + } + if transformation.Regex == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilter, empty regex`, idx)) + } else { _, err := regexp.Compile(transformation.Regex) if err != nil { validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilter, regex does not compile. error: %v`, idx, err)) - continue } - continue } - if transformation.Field == `` { - validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilter, empty field`, idx)) + continue + case "spEnrichedFilterContext": + if transformation.ContextFullName == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, empty context full name`, idx)) } - if transformation.Regex == `` { - validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilter, empty regex`, idx)) + if transformation.CustomFieldPath == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, empty custom field path`, idx)) } - case "spEnrichedFilterContext": - if transformation.Field != `` && transformation.Regex != `` { + if transformation.Regex == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, empty regex`, idx)) + } else { _, err := regexp.Compile(transformation.Regex) if err != nil { validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, regex does not compile. error: %v`, idx, err)) - continue } - continue } - if transformation.Field == `` { - validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, empty field`, idx)) + continue + case "spEnrichedFilterUnstructEvent": + if transformation.CustomFieldPath == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterUnstructEvent, empty custom field path`, idx)) } - if transformation.Regex == `` { - validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, empty regex`, idx)) + if transformation.UnstructEventName == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterUnstructEvent, empty event name`, idx)) } - case "spEnrichedFilterUnstructEvent": - if transformation.Field != `` && transformation.Regex != `` { + if transformation.Regex == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterUnstructEvent, empty regex`, idx)) + } else { _, err := regexp.Compile(transformation.Regex) if err != nil { validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterUnstructEvent, regex does not compile. error: %v`, idx, err)) - continue } - continue - } - if transformation.Field == `` { - validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterUnstructEvent, empty field`, idx)) - } - if transformation.Regex == `` { - validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterUnstructEvent, empty regex`, idx)) } + continue case "lua": if transformation.Engine.SmokeTest(`main`) != nil { validationErrors = append(validationErrors, fmt.Errorf(`validation error in lua transformation #%d, main() smoke test failed`, idx)) @@ -224,21 +229,21 @@ func GetTransformations(c *config.Config) (transform.TransformationApplyFunction case "spEnrichedToJson": funcs = append(funcs, transform.SpEnrichedToJSON) case "spEnrichedSetPk": - funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(transformation.Field)) + funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(transformation.AtomicField)) case "spEnrichedFilter": - filterFunc, err := transform.NewSpEnrichedFilterFunction(transformation.Field, transformation.Regex, transformation.RegexTimeout) + filterFunc, err := transform.NewSpEnrichedFilterFunction(transformation.AtomicField, transformation.Regex, transformation.RegexTimeout) if err != nil { return nil, err } funcs = append(funcs, filterFunc) case "spEnrichedFilterContext": - filterFunc, err := transform.NewSpEnrichedFilterFunctionContext(transformation.Field, transformation.Regex, transformation.RegexTimeout) + filterFunc, err := transform.NewSpEnrichedFilterFunctionContext(transformation.ContextFullName, transformation.CustomFieldPath, transformation.Regex, transformation.RegexTimeout) if err != nil { return nil, err } funcs = append(funcs, filterFunc) case "spEnrichedFilterUnstructEvent": - filterFunc, err := transform.NewSpEnrichedFilterFunctionUnstructEvent(transformation.Field, transformation.Regex, transformation.RegexTimeout) + filterFunc, err := transform.NewSpEnrichedFilterFunctionUnstructEvent(transformation.UnstructEventName, transformation.UnstructEventVersionRegex, transformation.CustomFieldPath, transformation.Regex, transformation.RegexTimeout) if err != nil { return nil, err } diff --git a/pkg/transform/transformconfig/transform_config_test.go b/pkg/transform/transformconfig/transform_config_test.go index b442a9dd..c42c243e 100644 --- a/pkg/transform/transformconfig/transform_config_test.go +++ b/pkg/transform/transformconfig/transform_config_test.go @@ -160,8 +160,8 @@ function notMain(x) { { Name: "spEnrichedSetPk success", Transformations: []*Transformation{{ - Name: "spEnrichedSetPk", - Field: `app_id`, + Name: "spEnrichedSetPk", + AtomicField: `app_id`, }}, }, { @@ -169,106 +169,111 @@ function notMain(x) { Transformations: []*Transformation{{ Name: "spEnrichedSetPk", }}, - ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedSetPk, empty field")}, + ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedSetPk, empty atomic field")}, }, { Name: "spEnrichedFilter success", Transformations: []*Transformation{{ - Name: "spEnrichedFilter", - Field: "app_id", - Regex: "test.+", + Name: "spEnrichedFilter", + AtomicField: "app_id", + Regex: "test.+", }}, }, { Name: "spEnrichedFilter regexp does not compile", Transformations: []*Transformation{{ - Name: "spEnrichedFilter", - Field: "app_id", - Regex: "?(?=-)", + Name: "spEnrichedFilter", + AtomicField: "app_id", + Regex: "?(?=-)", }}, ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilter, regex does not compile. error: error parsing regexp: missing argument to repetition operator: `?`")}, }, { - Name: "spEnrichedFilter empty field", + Name: "spEnrichedFilter empty atomic field", Transformations: []*Transformation{{ Name: "spEnrichedFilter", Regex: "test.+", }}, - ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilter, empty field")}, + ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilter, empty atomic field")}, }, { Name: "spEnrichedFilter empty regex", Transformations: []*Transformation{{ - Name: "spEnrichedFilter", - Field: "app_id", + Name: "spEnrichedFilter", + AtomicField: "app_id", }}, ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilter, empty regex")}, }, { Name: "spEnrichedFilterContext success", Transformations: []*Transformation{{ - Name: "spEnrichedFilterContext", - Field: "contexts_nl_basjes_yauaa_context_1.test1.test2[0]", - Regex: "test.+", + Name: "spEnrichedFilterContext", + ContextFullName: "contexts_nl_basjes_yauaa_context_1", + CustomFieldPath: "test1.test2[0]", + Regex: "test.+", }}, }, { Name: "spEnrichedFilterContext regexp does not compile", Transformations: []*Transformation{{ - Name: "spEnrichedFilterContext", - Field: "contexts_nl_basjes_yauaa_context_1.test1.test2[0]", - Regex: "?(?=-)", + Name: "spEnrichedFilterContext", + ContextFullName: "contexts_nl_basjes_yauaa_context_1", + CustomFieldPath: "test1.test2[0]", + Regex: "?(?=-)", }}, ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterContext, regex does not compile. error: error parsing regexp: missing argument to repetition operator: `?`")}, }, { - Name: "spEnrichedFilterContext empty field", + Name: "spEnrichedFilterContext empty custom field path", Transformations: []*Transformation{{ Name: "spEnrichedFilterContext", Regex: "test.+", }}, - ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterContext, empty field")}, + ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterContext, empty context full name"), fmt.Errorf("validation error #0 spEnrichedFilterContext, empty custom field path")}, }, { Name: "spEnrichedFilterContext empty regex", Transformations: []*Transformation{{ - Name: "spEnrichedFilterContext", - Field: "contexts_nl_basjes_yauaa_context_1.test1.test2[0]", + Name: "spEnrichedFilterContext", + ContextFullName: "contexts_nl_basjes_yauaa_context_1", + CustomFieldPath: "test1.test2[0]", }}, ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterContext, empty regex")}, }, { Name: "spEnrichedFilterUnstructEvent success", Transformations: []*Transformation{{ - Name: "spEnrichedFilterUnstructEvent", - Field: "unstruct_event_add_to_cart_1.sku", - Regex: "test.+", + Name: "spEnrichedFilterUnstructEvent", + CustomFieldPath: "sku", + Regex: "test.+", + UnstructEventName: "add_to_cart", }}, }, { Name: "spEnrichedFilterUnstructEvent regexp does not compile", Transformations: []*Transformation{{ - Name: "spEnrichedFilterUnstructEvent", - Field: "unstruct_event_add_to_cart_1.sku", - Regex: "?(?=-)", + Name: "spEnrichedFilterUnstructEvent", + CustomFieldPath: "sku", + Regex: "?(?=-)", + UnstructEventName: "add_to_cart", }}, ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterUnstructEvent, regex does not compile. error: error parsing regexp: missing argument to repetition operator: `?`")}, }, { - Name: "spEnrichedFilterUnstructEvent empty field", + Name: "spEnrichedFilterUnstructEvent empty custom field path and event name", Transformations: []*Transformation{{ Name: "spEnrichedFilterUnstructEvent", Regex: "test.+", }}, - ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterUnstructEvent, empty field")}, + ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterUnstructEvent, empty custom field path"), fmt.Errorf("validation error #0 spEnrichedFilterUnstructEvent, empty event name")}, }, { - Name: "spEnrichedFilterUnstructEvent empty regex", + Name: "spEnrichedFilterUnstructEvent empty regex and event name", Transformations: []*Transformation{{ - Name: "spEnrichedFilterUnstructEvent", - Field: "unstruct_event_add_to_cart_1.sku", + Name: "spEnrichedFilterUnstructEvent", + CustomFieldPath: "sku", }}, - ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterUnstructEvent, empty regex")}, + ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterUnstructEvent, empty event name"), fmt.Errorf("validation error #0 spEnrichedFilterUnstructEvent, empty regex")}, }, { Name: "lua success", @@ -321,8 +326,8 @@ function notMain(x) { }, ExpectedErrs: []error{ fmt.Errorf("validation error in js transformation #0, main() smoke test failed"), - fmt.Errorf("validation error #1 spEnrichedFilter, empty field"), - fmt.Errorf("validation error #3 spEnrichedSetPk, empty field"), + fmt.Errorf("validation error #1 spEnrichedFilter, empty atomic field"), + fmt.Errorf("validation error #3 spEnrichedSetPk, empty atomic field"), }, }, } @@ -332,10 +337,9 @@ function notMain(x) { assert := assert.New(t) valErrs := ValidateTransformations(tt.Transformations) - if tt.ExpectedErrs != nil { for idx, valErr := range valErrs { - assert.Equal(valErr.Error(), tt.ExpectedErrs[idx].Error()) + assert.Equal(tt.ExpectedErrs[idx].Error(), valErr.Error()) } } else { assert.Nil(valErrs)