From d87907486aee252e8d5b94399d9c21859f9c860b Mon Sep 17 00:00:00 2001 From: TiganeteaRobert Date: Thu, 28 Apr 2022 14:28:36 +0300 Subject: [PATCH] =?UTF-8?q?Extend=20filtering=20to=20use=20custom=20data?= =?UTF-8?q?=C2=A0(closes=20#176)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 12 ++ config/config_test.go | 2 +- go.mod | 2 +- go.sum | 11 +- pkg/transform/snowplow_enriched_filter.go | 143 +++++++++++++++--- .../snowplow_enriched_filter_test.go | 75 ++++++++- pkg/transform/snowplow_enriched_util.go | 16 ++ pkg/transform/transform_test_variables.go | 6 +- 8 files changed, 230 insertions(+), 37 deletions(-) diff --git a/config/config.go b/config/config.go index ff82e5ed..0b524773 100644 --- a/config/config.go +++ b/config/config.go @@ -320,6 +320,18 @@ func (c *Config) GetTransformations() (transform.TransformationApplyFunction, er return nil, err } funcs = append(funcs, filterFunc) + case "spEnrichedFilterContext": + filterFunc, err := transform.NewSpEnrichedFilterFunctionContext(funcOpts[1]) + if err != nil { + return nil, err + } + funcs = append(funcs, filterFunc) + case "spEnrichedFilterUnstructEvent": + filterFunc, err := transform.NewSpEnrichedFilterFunctionUnstructEvent(funcOpts[1]) + if err != nil { + return nil, err + } + funcs = append(funcs, filterFunc) case "none": default: return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got '%s'", c.Data.Transformation)) diff --git a/config/config_test.go b/config/config_test.go index ec242f8e..ce03c7d8 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -110,7 +110,7 @@ func TestNewConfig_FilterFailure(t *testing.T) { transformation, err := c.GetTransformations() assert.Nil(transformation) assert.NotNil(err) - assert.Equal(`Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`, err.Error()) + assert.Equal(`invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`, err.Error()) } func TestNewConfig_InvalidTarget(t *testing.T) { diff --git a/go.mod b/go.mod index 28ee2387..5cd3b36e 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,6 @@ require ( github.com/smira/go-statsd v1.3.2 github.com/snowplow-devops/go-retry v0.0.0-20210106090855-8989bbdbae1c github.com/snowplow-devops/go-sentryhook v0.0.0-20210106082031-21bf7f9dac2a - github.com/snowplow/snowplow-golang-analytics-sdk v0.1.0 github.com/stretchr/testify v1.7.0 github.com/twinj/uuid v1.0.0 github.com/twitchscience/kinsumer v0.0.0-20210611163023-da24975e2c91 @@ -54,6 +53,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 github.com/hashicorp/hcl/v2 v2.11.1 + github.com/snowplow/snowplow-golang-analytics-sdk v0.2.2 github.com/zclconf/go-cty v1.10.0 ) diff --git a/go.sum b/go.sum index 23ae8f5f..78450e5e 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,6 @@ github.com/aws/aws-sdk-go v1.25.19/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpi github.com/aws/aws-sdk-go v1.40.22 h1:iit4tJ1hjL2GlNCrbE4aJza6jTmvEE2pDTnShct/yyY= github.com/aws/aws-sdk-go v1.40.22/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= -github.com/caarlos0/env/v6 v6.6.2 h1:BypLXDWQTA32rS4UM7pBz+/0BOuvs6C7LSeQAxMwyvI= -github.com/caarlos0/env/v6 v6.6.2/go.mod h1:P0BVSgU9zfkxfSpFUs6KsO3uWR4k3Ac0P66ibAGTybM= github.com/caarlos0/env/v6 v6.9.1 h1:zOkkjM0F6ltnQ5eBX6IPI41UP/KDGEK7rRPwGCNos8k= github.com/caarlos0/env/v6 v6.9.1/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc= github.com/cactus/go-statsd-client/statsd v0.0.0-20190922113730-52b467de415c/go.mod h1:D4RDtP0MffJ3+R36OkGul0LwJLIN8nRb0Ac6jZmJCmo= @@ -208,6 +206,7 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= +github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= @@ -379,12 +378,11 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= -github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= -github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -441,6 +439,7 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= +github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -457,8 +456,8 @@ github.com/snowplow-devops/go-sentryhook v0.0.0-20210106082031-21bf7f9dac2a h1:9 github.com/snowplow-devops/go-sentryhook v0.0.0-20210106082031-21bf7f9dac2a/go.mod h1:7/jMxl0yrvgiUlv5L37fw6pql71aNh55sKQc4kBFj5s= github.com/snowplow-devops/kinsumer v1.3.0 h1:uN8PPG8EffKjcfTcDqsHWnnsTFvYGMU39XlDPULIQcA= github.com/snowplow-devops/kinsumer v1.3.0/go.mod h1:SebvcasLweQnOygk9SOFkM/JjBtXFviUxoAq19CwrHQ= -github.com/snowplow/snowplow-golang-analytics-sdk v0.1.0 h1:FA8xHSHzoshF3fJDK9tqUDnuBmyqTiGPRLvIaRQMk2I= -github.com/snowplow/snowplow-golang-analytics-sdk v0.1.0/go.mod h1:Z8ZW805JGCYhnq1wnHe2PIiamUnvoNtAtXPWNyS0mV8= +github.com/snowplow/snowplow-golang-analytics-sdk v0.2.2 h1:ehPNYJ4tOq+n4Lj8jtentKS4UzzvRv5iQ8vlESQj8qw= +github.com/snowplow/snowplow-golang-analytics-sdk v0.2.2/go.mod h1:Z8ZW805JGCYhnq1wnHe2PIiamUnvoNtAtXPWNyS0mV8= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= diff --git a/pkg/transform/snowplow_enriched_filter.go b/pkg/transform/snowplow_enriched_filter.go index 8a807bb6..b52a5bd5 100644 --- a/pkg/transform/snowplow_enriched_filter.go +++ b/pkg/transform/snowplow_enriched_filter.go @@ -13,21 +13,67 @@ import ( "strings" "github.com/snowplow-devops/stream-replicator/pkg/models" + "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) -// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event. +func findSpEnrichedFilterValue(queriedField, parsedEventName, eventVer, field string, parsedMessage analytics.ParsedEvent, path []interface{}) ([]interface{}, error) { + var vf interface{} + var valueFound []interface{} + var err error + + switch { + case strings.HasPrefix(queriedField, `contexts_`): + vf, err = parsedMessage.GetContextValue(queriedField, path...) + valueFound = append(valueFound, vf.([]interface{})...) + case strings.HasPrefix(queriedField, `unstruct_event`): + eventNameFull := `unstruct_event_` + parsedEventName + if queriedField == eventNameFull || queriedField == eventNameFull+`_`+eventVer { + vf, err = parsedMessage.GetUnstructEventValue(path...) + valueFound = append(valueFound, vf) + } + default: + vf, err = parsedMessage.GetValue(field) + valueFound = append(valueFound, vf) + } + if err != nil { + // GetValue returns an error if the field requested is empty. Check for that particular error before returning error + if err.Error() == analytics.EmptyFieldErr { + return nil, nil + } + return nil, err + } + return valueFound, nil +} + +func evaluateSpEnrichedFilter(valuesToMatch string, valuesFound []interface{}, isNegationFilter, shouldKeepMessage *bool) { + for _, valueToMatch := range strings.Split(valuesToMatch, "|") { + for _, v := range valuesFound { + if fmt.Sprintf("%v", v) == valueToMatch { + // Once config value is matched once, change shouldKeepMessage, and stop looking for matches + if *isNegationFilter { + *shouldKeepMessage = false + } else { + *shouldKeepMessage = true + } + return + + } + } + } +} + +// createSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event. // The filterconfig should describe the conditions for including a message. // For example "aid=abc|def" includes all events with app IDs of abc or def, and filters out the rest. // aid!=abc|def includes all events whose app IDs do not match abc or def, and filters out the rest. -func NewSpEnrichedFilterFunction(filterConfig string) (TransformationFunction, error) { - +func createSpEnrichedFilterFunction(filterConfig string, isUnstructEvent bool, isContext bool) (TransformationFunction, error) { // This regex prevents whitespace characters in the value provided regex := `\S+(!=|==)[^\s\|]+((?:\|[^\s|]+)*)$` re := regexp.MustCompile(regex) if !(re.MatchString(filterConfig)) { // If invalid, return an error which will be returned by the main function - return nil, errors.New("Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]") + return nil, errors.New("invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]") } // Check for a negation condition first @@ -48,43 +94,90 @@ func NewSpEnrichedFilterFunction(filterConfig string) (TransformationFunction, e // Start by resetting shouldKeepMessage to isNegationFilter shouldKeepMessage := isNegationFilter - // Evalute intermediateState to parsedEvent + // Evaluate intermediateState to parsedEvent parsedMessage, parseErr := intermediateAsSpEnrichedParsed(intermediateState, message) if parseErr != nil { message.SetError(parseErr) return nil, nil, message, nil } - valueFound, err := parsedMessage.GetValue(keyValues[0]) + // This regex retrieves the path fields + // (e.g. field1.field2[0].field3 -> [field1, field2, 0, field3]) + regex = `\w+` + re = regexp.MustCompile(regex) - // GetValue returns an error if the field requested is empty. Check for that particular error before failing the message. - if err != nil && err.Error() == fmt.Sprintf("Field %s is empty", keyValues[0]) { - valueFound = nil - } else if err != nil { - message.SetError(err) - return nil, nil, message, nil + // separate the path string into words using regex + path := re.FindAllString(keyValues[0], -1) + separatedPath := make([]string, len(path)-1) + for idx, pathField := range path[1:] { + separatedPath[idx] = pathField } - evaluation: - for _, valueToMatch := range strings.Split(keyValues[1], "|") { - if valueToMatch == fmt.Sprintf("%v", valueFound) { // coerce to string as valueFound may be any type found in a Snowplow event - if isNegationFilter { - shouldKeepMessage = false - } else { - shouldKeepMessage = true - } - break evaluation - // Once config value is matched once, change shouldKeepMessage, and stop looking for matches + var parsedEventName string + var eventMajorVer string + var err error + + // only call SDK functions if an unstruct_event is being filtered + if isUnstructEvent { + // get event name + eventName, err := parsedMessage.GetValue(`event_name`) + if err != nil { + message.SetError(err) + return nil, nil, message, nil + } + parsedEventName = eventName.(string) + // get event version + fullEventVer, err := parsedMessage.GetValue(`event_version`) + if err != nil { + message.SetError(err) + return nil, nil, message, nil + } + // get the major event version + eventMajorVer = strings.Split(fullEventVer.(string), `-`)[0] + if eventMajorVer == `` { + message.SetError(fmt.Errorf(`invalid schema version format: %s`, fullEventVer)) + return nil, nil, message, nil } } - // If message is not to be kept, return it as a filtered message to be acked in the main function - if !shouldKeepMessage { + // find the value in the event + valueFound, err := findSpEnrichedFilterValue( + path[0], + parsedEventName, + eventMajorVer, + keyValues[0], + parsedMessage, + convertPathToInterfaces(separatedPath), + ) + if err != nil { + message.SetError(err) + return nil, nil, message, nil + } + + // evaluate whether the found value passes the filter, determining if the message should be kept + evaluateSpEnrichedFilter(keyValues[1], valueFound, &isNegationFilter, &shouldKeepMessage) + // if message is not to be kept, return it as a filtered message to be acked in the main function + if !shouldKeepMessage { return nil, message, nil, nil } - // Otherwise, return the message and intermediateState for further processing. + // otherwise, return the message and intermediateState for further processing. return message, nil, nil, parsedMessage }, nil } + +// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event. +func NewSpEnrichedFilterFunction(filterConfig string) (TransformationFunction, error) { + return createSpEnrichedFilterFunction(filterConfig, false, false) +} + +// NewSpEnrichedFilterFunctionContext returns a TransformationFunction for filtering a context +func NewSpEnrichedFilterFunctionContext(filterConfig string) (TransformationFunction, error) { + return createSpEnrichedFilterFunction(filterConfig, false, true) +} + +// NewSpEnrichedFilterFunctionUnstructEvent returns a TransformationFunction for filtering an unstruct_event +func NewSpEnrichedFilterFunctionUnstructEvent(filterConfig string) (TransformationFunction, error) { + return createSpEnrichedFilterFunction(filterConfig, true, false) +} diff --git a/pkg/transform/snowplow_enriched_filter_test.go b/pkg/transform/snowplow_enriched_filter_test.go index 32da7ac2..e8c0f825 100644 --- a/pkg/transform/snowplow_enriched_filter_test.go +++ b/pkg/transform/snowplow_enriched_filter_test.go @@ -21,6 +21,16 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { PartitionKey: "some-key", } + var messageGoodInt = models.Message{ + Data: snowplowTsv4, + PartitionKey: "some-key", + } + + var messageWithUnstructEvent = models.Message{ + Data: snowplowTsv1, + PartitionKey: "some-key", + } + // Single value cases aidFilterFuncKeep, _ := NewSpEnrichedFilterFunction("app_id==test-data3") @@ -116,11 +126,74 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) { assert.Equal(snowplowTsv3, nilNegationIn.Data) assert.Nil(nilNegationOut) assert.Nil(fail8) + + // context filter success + contextFuncKeep, _ := NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1.test1.test2[0].test3==testValue") + + contextKeepIn, contextKeepOut, fail9, _ := contextFuncKeep(&messageGood, nil) + + assert.Equal(snowplowTsv3, contextKeepIn.Data) + assert.Nil(contextKeepOut) + assert.Nil(fail9) + + // context filter success (integer value) + contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1.test1.test2[0].test3==1") + + contextKeepIn, contextKeepOut, fail9, _ = contextFuncKeep(&messageGoodInt, nil) + + assert.Equal(snowplowTsv4, contextKeepIn.Data) + assert.Nil(contextKeepOut) + assert.Nil(fail9) + + // context filter failure + contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_2.test1.test2[0].test3==testValue") + + contextKeepIn, contextKeepOut, fail9, _ = contextFuncKeep(&messageGood, nil) + + assert.Nil(contextKeepIn) + assert.Equal(snowplowTsv3, contextKeepOut.Data) + assert.Nil(fail9) + + // event filter success, filtered event name + eventFilterFunCkeep, _ := NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart_1.sku==item41") + + eventKeepIn, eventKeepOut, fail10, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil) + + assert.Equal(snowplowTsv1, eventKeepIn.Data) + assert.Nil(eventKeepOut) + assert.Nil(fail10) + + // event filter success, filtered event name, no event ver + eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart.sku==item41") + + eventKeepIn, eventKeepOut, fail10, _ = eventFilterFunCkeep(&messageWithUnstructEvent, nil) + + assert.Equal(snowplowTsv1, eventKeepIn.Data) + assert.Nil(eventKeepOut) + assert.Nil(fail10) + + // event filter failure, wrong event name + eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_wrong_name.sku==item41") + + eventKeepIn, eventKeepOut, fail11, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil) + + assert.Nil(eventKeepIn) + assert.Equal(snowplowTsv1, eventKeepOut.Data) + assert.Nil(fail11) + + // event filter failure, field not found + eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart.ska==item41") + + eventNoFieldIn, eventNoFieldOut, fail12, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil) + + assert.Nil(eventNoFieldIn) + assert.Nil(eventNoFieldOut) + assert.NotNil(fail12) } func TestNewSpEnrichedFilterFunction_Error(t *testing.T) { assert := assert.New(t) - error := `Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]` + error := `invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]` filterFunc, err1 := NewSpEnrichedFilterFunction("") diff --git a/pkg/transform/snowplow_enriched_util.go b/pkg/transform/snowplow_enriched_util.go index 3f374d4d..d0d5cc6e 100644 --- a/pkg/transform/snowplow_enriched_util.go +++ b/pkg/transform/snowplow_enriched_util.go @@ -9,6 +9,7 @@ package transform import ( "github.com/snowplow-devops/stream-replicator/pkg/models" "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" + "strconv" ) func intermediateAsSpEnrichedParsed(intermediateState interface{}, message *models.Message) (analytics.ParsedEvent, error) { @@ -23,3 +24,18 @@ func intermediateAsSpEnrichedParsed(intermediateState interface{}, message *mode } return parsedMessage, nil } + +// convertPathToInterfaces converts a slice of strings representing a path to a slice of interfaces to be used +// by the SDK Get() function +func convertPathToInterfaces(path []string) []interface{} { + var output []interface{} + for _, pathField := range path { + pathFieldInt, err := strconv.Atoi(pathField) + if err != nil { + output = append(output, pathField) + } else { + output = append(output, pathFieldInt) + } + } + return output +} diff --git a/pkg/transform/transform_test_variables.go b/pkg/transform/transform_test_variables.go index 57f67336..9302f063 100644 --- a/pkg/transform/transform_test_variables.go +++ b/pkg/transform/transform_test_variables.go @@ -14,14 +14,14 @@ import ( var snowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `) var spTsv1Parsed, _ = analytics.ParseEvent(string(snowplowTsv1)) var snowplowJSON1 = []byte(`{"app_id":"test-data1","collector_tstamp":"2019-05-10T14:40:35.972Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:35.972Z","dvce_created_tstamp":"2019-05-10T14:40:35.551Z","dvce_sent_tstamp":"2019-05-10T14:40:35Z","etl_tstamp":"2019-05-10T14:40:37.436Z","event":"unstruct","event_format":"jsonschema","event_id":"e9234345-f042-46ad-b1aa-424464066a33","event_name":"add_to_cart","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"d26822f5-52cc-4292-8f77-14ef6b7a27e2","platform":"pc","unstruct_event_com_snowplowanalytics_snowplow_add_to_cart_1":{"currency":"GBP","quantity":2,"sku":"item41","unitPrice":32.4},"user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) - var snowplowTsv2 = []byte(`test-data2 pc 2019-05-10 14:40:32.392 2019-05-10 14:40:31.105 2019-05-10 14:40:30.218 transaction_item 5071169f-3050-473f-b03f-9748319b1ef2 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 68220ade-307b-4898-8e25-c4c8ac92f1d7 transaction item58 35.87 1 python-requests/2.21.0 2019-05-10 14:40:30.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:31.105 com.snowplowanalytics.snowplow transaction_item jsonschema 1-0-0 `) var spTsv2Parsed, _ = analytics.ParseEvent(string(snowplowTsv2)) var snowplowJSON2 = []byte(`{"app_id":"test-data2","collector_tstamp":"2019-05-10T14:40:31.105Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:31.105Z","dvce_created_tstamp":"2019-05-10T14:40:30.218Z","dvce_sent_tstamp":"2019-05-10T14:40:30Z","etl_tstamp":"2019-05-10T14:40:32.392Z","event":"transaction_item","event_format":"jsonschema","event_id":"5071169f-3050-473f-b03f-9748319b1ef2","event_name":"transaction_item","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"68220ade-307b-4898-8e25-c4c8ac92f1d7","platform":"pc","ti_orderid":"transaction\u003cbuilt-in function input\u003e","ti_price":35.87,"ti_quantity":1,"ti_sku":"item58","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) -var snowplowTsv3 = []byte(`test-data3 pc 2019-05-10 14:40:30.836 2019-05-10 14:40:29.576 2019-05-10 14:40:29.204 page_view e8aef68d-8533-45c6-a672-26a0f01be9bd py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 b66c4a12-8584-4c7a-9a5d-7c96f59e2556 www.demo-site.com/campaign-landing-page landing-page 80 www.demo-site.com/campaign-landing-page python-requests/2.21.0 2019-05-10 14:40:29.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:29.576 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0 `) +var snowplowTsv3 = []byte(`test-data3 pc 2019-05-10 14:40:30.836 2019-05-10 14:40:29.576 2019-05-10 14:40:29.204 page_view e8aef68d-8533-45c6-a672-26a0f01be9bd py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 b66c4a12-8584-4c7a-9a5d-7c96f59e2556 www.demo-site.com/campaign-landing-page landing-page 80 www.demo-site.com/campaign-landing-page python-requests/2.21.0 2019-05-10 14:40:29.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??","test1":{"test2":[{"test3":"testValue"}]}}}]} 2019-05-10 14:40:29.576 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0 `) var spTsv3Parsed, _ = analytics.ParseEvent(string(snowplowTsv3)) -var snowplowJSON3 = []byte(`{"app_id":"test-data3","collector_tstamp":"2019-05-10T14:40:29.576Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:29.576Z","dvce_created_tstamp":"2019-05-10T14:40:29.204Z","dvce_sent_tstamp":"2019-05-10T14:40:29Z","etl_tstamp":"2019-05-10T14:40:30.836Z","event":"page_view","event_format":"jsonschema","event_id":"e8aef68d-8533-45c6-a672-26a0f01be9bd","event_name":"page_view","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"b66c4a12-8584-4c7a-9a5d-7c96f59e2556","page_title":"landing-page","page_url":"www.demo-site.com/campaign-landing-page","page_urlpath":"www.demo-site.com/campaign-landing-page","page_urlport":80,"platform":"pc","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) +var snowplowJSON3 = []byte(`{"app_id":"test-data3","collector_tstamp":"2019-05-10T14:40:29.576Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??","test1":{"test2":[{"test3":"testValue"}]}}],"derived_tstamp":"2019-05-10T14:40:29.576Z","dvce_created_tstamp":"2019-05-10T14:40:29.204Z","dvce_sent_tstamp":"2019-05-10T14:40:29Z","etl_tstamp":"2019-05-10T14:40:30.836Z","event":"page_view","event_format":"jsonschema","event_id":"e8aef68d-8533-45c6-a672-26a0f01be9bd","event_name":"page_view","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"b66c4a12-8584-4c7a-9a5d-7c96f59e2556","page_title":"landing-page","page_url":"www.demo-site.com/campaign-landing-page","page_urlpath":"www.demo-site.com/campaign-landing-page","page_urlport":80,"platform":"pc","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) +var snowplowTsv4 = []byte(`test-data3 pc 2019-05-10 14:40:30.836 2019-05-10 14:40:29.576 2019-05-10 14:40:29.204 page_view e8aef68d-8533-45c6-a672-26a0f01be9bd py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 b66c4a12-8584-4c7a-9a5d-7c96f59e2556 www.demo-site.com/campaign-landing-page landing-page 80 www.demo-site.com/campaign-landing-page python-requests/2.21.0 2019-05-10 14:40:29.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??","test1":{"test2":[{"test3":1}]}}}]} 2019-05-10 14:40:29.576 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0 `) var nonSnowplowString = []byte(`not a snowplow event`)