Skip to content

Commit

Permalink
Extend filtering to use custom data (closes #176)
Browse files Browse the repository at this point in the history
  • Loading branch information
TiganeteaRobert authored and colmsnowplow committed Jul 22, 2022
1 parent 1858c7a commit eb8d677
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 37 deletions.
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,18 @@ func (c *Config) GetTransformations() (transform.TransformationApplyFunction, er
return nil, err
}
funcs = append(funcs, filterFunc)
case "spEnrichedFilterContext":
filterFunc, err := transform.NewSpEnrichedFilterFunctionContext(funcOpts[1])
if err != nil {
return nil, err
}
funcs = append(funcs, filterFunc)
case "spEnrichedFilterUnstructEvent":
filterFunc, err := transform.NewSpEnrichedFilterFunctionUnstructEvent(funcOpts[1])
if err != nil {
return nil, err
}
funcs = append(funcs, filterFunc)
case "none":
default:
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got '%s'", c.Data.Transformation))
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestNewConfig_FilterFailure(t *testing.T) {
transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal(`Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`, err.Error())
assert.Equal(`invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`, err.Error())
}

func TestNewConfig_InvalidTarget(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ require (
github.com/smira/go-statsd v1.3.2
github.com/snowplow-devops/go-retry v0.0.0-20210106090855-8989bbdbae1c
github.com/snowplow-devops/go-sentryhook v0.0.0-20210106082031-21bf7f9dac2a
github.com/snowplow/snowplow-golang-analytics-sdk v0.1.0
github.com/stretchr/testify v1.7.0
github.com/twinj/uuid v1.0.0
github.com/twitchscience/kinsumer v0.0.0-20210611163023-da24975e2c91
Expand All @@ -54,6 +53,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1
github.com/hashicorp/hcl/v2 v2.11.1
github.com/snowplow/snowplow-golang-analytics-sdk v0.2.2
github.com/zclconf/go-cty v1.10.0
)

Expand Down
11 changes: 5 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ github.com/aws/aws-sdk-go v1.25.19/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpi
github.com/aws/aws-sdk-go v1.40.22 h1:iit4tJ1hjL2GlNCrbE4aJza6jTmvEE2pDTnShct/yyY=
github.com/aws/aws-sdk-go v1.40.22/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/caarlos0/env/v6 v6.6.2 h1:BypLXDWQTA32rS4UM7pBz+/0BOuvs6C7LSeQAxMwyvI=
github.com/caarlos0/env/v6 v6.6.2/go.mod h1:P0BVSgU9zfkxfSpFUs6KsO3uWR4k3Ac0P66ibAGTybM=
github.com/caarlos0/env/v6 v6.9.1 h1:zOkkjM0F6ltnQ5eBX6IPI41UP/KDGEK7rRPwGCNos8k=
github.com/caarlos0/env/v6 v6.9.1/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc=
github.com/cactus/go-statsd-client/statsd v0.0.0-20190922113730-52b467de415c/go.mod h1:D4RDtP0MffJ3+R36OkGul0LwJLIN8nRb0Ac6jZmJCmo=
Expand Down Expand Up @@ -208,6 +206,7 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68=
github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
Expand Down Expand Up @@ -379,12 +378,11 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE=
github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
Expand Down Expand Up @@ -441,6 +439,7 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand All @@ -457,8 +456,8 @@ github.com/snowplow-devops/go-sentryhook v0.0.0-20210106082031-21bf7f9dac2a h1:9
github.com/snowplow-devops/go-sentryhook v0.0.0-20210106082031-21bf7f9dac2a/go.mod h1:7/jMxl0yrvgiUlv5L37fw6pql71aNh55sKQc4kBFj5s=
github.com/snowplow-devops/kinsumer v1.3.0 h1:uN8PPG8EffKjcfTcDqsHWnnsTFvYGMU39XlDPULIQcA=
github.com/snowplow-devops/kinsumer v1.3.0/go.mod h1:SebvcasLweQnOygk9SOFkM/JjBtXFviUxoAq19CwrHQ=
github.com/snowplow/snowplow-golang-analytics-sdk v0.1.0 h1:FA8xHSHzoshF3fJDK9tqUDnuBmyqTiGPRLvIaRQMk2I=
github.com/snowplow/snowplow-golang-analytics-sdk v0.1.0/go.mod h1:Z8ZW805JGCYhnq1wnHe2PIiamUnvoNtAtXPWNyS0mV8=
github.com/snowplow/snowplow-golang-analytics-sdk v0.2.2 h1:ehPNYJ4tOq+n4Lj8jtentKS4UzzvRv5iQ8vlESQj8qw=
github.com/snowplow/snowplow-golang-analytics-sdk v0.2.2/go.mod h1:Z8ZW805JGCYhnq1wnHe2PIiamUnvoNtAtXPWNyS0mV8=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
Expand Down
143 changes: 118 additions & 25 deletions pkg/transform/snowplow_enriched_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,67 @@ import (
"strings"

"github.com/snowplow-devops/stream-replicator/pkg/models"
"github.com/snowplow/snowplow-golang-analytics-sdk/analytics"
)

// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event.
func findSpEnrichedFilterValue(queriedField, parsedEventName, eventVer, field string, parsedMessage analytics.ParsedEvent, path []interface{}) ([]interface{}, error) {
var vf interface{}
var valueFound []interface{}
var err error

switch {
case strings.HasPrefix(queriedField, `contexts_`):
vf, err = parsedMessage.GetContextValue(queriedField, path...)
valueFound = append(valueFound, vf.([]interface{})...)
case strings.HasPrefix(queriedField, `unstruct_event`):
eventNameFull := `unstruct_event_` + parsedEventName
if queriedField == eventNameFull || queriedField == eventNameFull+`_`+eventVer {
vf, err = parsedMessage.GetUnstructEventValue(path...)
valueFound = append(valueFound, vf)
}
default:
vf, err = parsedMessage.GetValue(field)
valueFound = append(valueFound, vf)
}
if err != nil {
// GetValue returns an error if the field requested is empty. Check for that particular error before returning error
if err.Error() == analytics.EmptyFieldErr {
return nil, nil
}
return nil, err
}
return valueFound, nil
}

func evaluateSpEnrichedFilter(valuesToMatch string, valuesFound []interface{}, isNegationFilter, shouldKeepMessage *bool) {
for _, valueToMatch := range strings.Split(valuesToMatch, "|") {
for _, v := range valuesFound {
if fmt.Sprintf("%v", v) == valueToMatch {
// Once config value is matched once, change shouldKeepMessage, and stop looking for matches
if *isNegationFilter {
*shouldKeepMessage = false
} else {
*shouldKeepMessage = true
}
return

}
}
}
}

// createSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event.
// The filterconfig should describe the conditions for including a message.
// For example "aid=abc|def" includes all events with app IDs of abc or def, and filters out the rest.
// aid!=abc|def includes all events whose app IDs do not match abc or def, and filters out the rest.
func NewSpEnrichedFilterFunction(filterConfig string) (TransformationFunction, error) {

func createSpEnrichedFilterFunction(filterConfig string, isUnstructEvent bool, isContext bool) (TransformationFunction, error) {
// This regex prevents whitespace characters in the value provided
regex := `\S+(!=|==)[^\s\|]+((?:\|[^\s|]+)*)$`
re := regexp.MustCompile(regex)

if !(re.MatchString(filterConfig)) {
// If invalid, return an error which will be returned by the main function
return nil, errors.New("Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]")
return nil, errors.New("invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]")
}

// Check for a negation condition first
Expand All @@ -48,43 +94,90 @@ func NewSpEnrichedFilterFunction(filterConfig string) (TransformationFunction, e
// Start by resetting shouldKeepMessage to isNegationFilter
shouldKeepMessage := isNegationFilter

// Evalute intermediateState to parsedEvent
// Evaluate intermediateState to parsedEvent
parsedMessage, parseErr := intermediateAsSpEnrichedParsed(intermediateState, message)
if parseErr != nil {
message.SetError(parseErr)
return nil, nil, message, nil
}

valueFound, err := parsedMessage.GetValue(keyValues[0])
// This regex retrieves the path fields
// (e.g. field1.field2[0].field3 -> [field1, field2, 0, field3])
regex = `\w+`
re = regexp.MustCompile(regex)

// GetValue returns an error if the field requested is empty. Check for that particular error before failing the message.
if err != nil && err.Error() == fmt.Sprintf("Field %s is empty", keyValues[0]) {
valueFound = nil
} else if err != nil {
message.SetError(err)
return nil, nil, message, nil
// separate the path string into words using regex
path := re.FindAllString(keyValues[0], -1)
separatedPath := make([]string, len(path)-1)
for idx, pathField := range path[1:] {
separatedPath[idx] = pathField
}

evaluation:
for _, valueToMatch := range strings.Split(keyValues[1], "|") {
if valueToMatch == fmt.Sprintf("%v", valueFound) { // coerce to string as valueFound may be any type found in a Snowplow event
if isNegationFilter {
shouldKeepMessage = false
} else {
shouldKeepMessage = true
}
break evaluation
// Once config value is matched once, change shouldKeepMessage, and stop looking for matches
var parsedEventName string
var eventMajorVer string
var err error

// only call SDK functions if an unstruct_event is being filtered
if isUnstructEvent {
// get event name
eventName, err := parsedMessage.GetValue(`event_name`)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}
parsedEventName = eventName.(string)
// get event version
fullEventVer, err := parsedMessage.GetValue(`event_version`)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}
// get the major event version
eventMajorVer = strings.Split(fullEventVer.(string), `-`)[0]
if eventMajorVer == `` {
message.SetError(fmt.Errorf(`invalid schema version format: %s`, fullEventVer))
return nil, nil, message, nil
}
}

// If message is not to be kept, return it as a filtered message to be acked in the main function
if !shouldKeepMessage {
// find the value in the event
valueFound, err := findSpEnrichedFilterValue(
path[0],
parsedEventName,
eventMajorVer,
keyValues[0],
parsedMessage,
convertPathToInterfaces(separatedPath),
)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}

// evaluate whether the found value passes the filter, determining if the message should be kept
evaluateSpEnrichedFilter(keyValues[1], valueFound, &isNegationFilter, &shouldKeepMessage)

// if message is not to be kept, return it as a filtered message to be acked in the main function
if !shouldKeepMessage {
return nil, message, nil, nil
}

// Otherwise, return the message and intermediateState for further processing.
// otherwise, return the message and intermediateState for further processing.
return message, nil, nil, parsedMessage
}, nil
}

// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event.
func NewSpEnrichedFilterFunction(filterConfig string) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(filterConfig, false, false)
}

// NewSpEnrichedFilterFunctionContext returns a TransformationFunction for filtering a context
func NewSpEnrichedFilterFunctionContext(filterConfig string) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(filterConfig, false, true)
}

// NewSpEnrichedFilterFunctionUnstructEvent returns a TransformationFunction for filtering an unstruct_event
func NewSpEnrichedFilterFunctionUnstructEvent(filterConfig string) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(filterConfig, true, false)
}
75 changes: 74 additions & 1 deletion pkg/transform/snowplow_enriched_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) {
PartitionKey: "some-key",
}

var messageGoodInt = models.Message{
Data: snowplowTsv4,
PartitionKey: "some-key",
}

var messageWithUnstructEvent = models.Message{
Data: snowplowTsv1,
PartitionKey: "some-key",
}

// Single value cases
aidFilterFuncKeep, _ := NewSpEnrichedFilterFunction("app_id==test-data3")

Expand Down Expand Up @@ -116,11 +126,74 @@ func TestNewSpEnrichedFilterFunction(t *testing.T) {
assert.Equal(snowplowTsv3, nilNegationIn.Data)
assert.Nil(nilNegationOut)
assert.Nil(fail8)

// context filter success
contextFuncKeep, _ := NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1.test1.test2[0].test3==testValue")

contextKeepIn, contextKeepOut, fail9, _ := contextFuncKeep(&messageGood, nil)

assert.Equal(snowplowTsv3, contextKeepIn.Data)
assert.Nil(contextKeepOut)
assert.Nil(fail9)

// context filter success (integer value)
contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_1.test1.test2[0].test3==1")

contextKeepIn, contextKeepOut, fail9, _ = contextFuncKeep(&messageGoodInt, nil)

assert.Equal(snowplowTsv4, contextKeepIn.Data)
assert.Nil(contextKeepOut)
assert.Nil(fail9)

// context filter failure
contextFuncKeep, _ = NewSpEnrichedFilterFunctionContext("contexts_nl_basjes_yauaa_context_2.test1.test2[0].test3==testValue")

contextKeepIn, contextKeepOut, fail9, _ = contextFuncKeep(&messageGood, nil)

assert.Nil(contextKeepIn)
assert.Equal(snowplowTsv3, contextKeepOut.Data)
assert.Nil(fail9)

// event filter success, filtered event name
eventFilterFunCkeep, _ := NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart_1.sku==item41")

eventKeepIn, eventKeepOut, fail10, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil)

assert.Equal(snowplowTsv1, eventKeepIn.Data)
assert.Nil(eventKeepOut)
assert.Nil(fail10)

// event filter success, filtered event name, no event ver
eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart.sku==item41")

eventKeepIn, eventKeepOut, fail10, _ = eventFilterFunCkeep(&messageWithUnstructEvent, nil)

assert.Equal(snowplowTsv1, eventKeepIn.Data)
assert.Nil(eventKeepOut)
assert.Nil(fail10)

// event filter failure, wrong event name
eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_wrong_name.sku==item41")

eventKeepIn, eventKeepOut, fail11, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil)

assert.Nil(eventKeepIn)
assert.Equal(snowplowTsv1, eventKeepOut.Data)
assert.Nil(fail11)

// event filter failure, field not found
eventFilterFunCkeep, _ = NewSpEnrichedFilterFunctionUnstructEvent("unstruct_event_add_to_cart.ska==item41")

eventNoFieldIn, eventNoFieldOut, fail12, _ := eventFilterFunCkeep(&messageWithUnstructEvent, nil)

assert.Nil(eventNoFieldIn)
assert.Nil(eventNoFieldOut)
assert.NotNil(fail12)
}

func TestNewSpEnrichedFilterFunction_Error(t *testing.T) {
assert := assert.New(t)
error := `Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`
error := `invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`

filterFunc, err1 := NewSpEnrichedFilterFunction("")

Expand Down
16 changes: 16 additions & 0 deletions pkg/transform/snowplow_enriched_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package transform
import (
"github.com/snowplow-devops/stream-replicator/pkg/models"
"github.com/snowplow/snowplow-golang-analytics-sdk/analytics"
"strconv"
)

func intermediateAsSpEnrichedParsed(intermediateState interface{}, message *models.Message) (analytics.ParsedEvent, error) {
Expand All @@ -23,3 +24,18 @@ func intermediateAsSpEnrichedParsed(intermediateState interface{}, message *mode
}
return parsedMessage, nil
}

// convertPathToInterfaces converts a slice of strings representing a path to a slice of interfaces to be used
// by the SDK Get() function
func convertPathToInterfaces(path []string) []interface{} {
var output []interface{}
for _, pathField := range path {
pathFieldInt, err := strconv.Atoi(pathField)
if err != nil {
output = append(output, pathField)
} else {
output = append(output, pathFieldInt)
}
}
return output
}
Loading

0 comments on commit eb8d677

Please sign in to comment.