diff --git a/.travis.yml b/.travis.yml index 0eb41af2b..00cac622c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,6 +39,10 @@ before_script: after_script: - "./scripts/after_script.sh" install: true +branches: + only: + - master + - /^v\d+\.\d+(\.\d+)?(-\S*)?$/ notifications: slack: secure: R8wvRnq0DcxiFNgUvJ3npnzY2LzU8uVyF8enqfxXNuSR3jRC2tqUosB5Qzb1CCiNicmpEwj3VTcwTozzCwcqckysFek3Pp2/oxYL8tRjqxks1zUeMHVv204l83Js8PAFVhODCQjIxZNQCdUM2fQ9q46MvdY7V8h/wGTbQKq1ZLE= diff --git a/README.md b/README.md index fe599f4d9..b62575591 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,16 @@ Each adaptor has its own README page with details on configuration and capabilit * [rethinkdb](./adaptor/rethinkdb) * [transformer](./adaptor/transformer) +Native Functions +---------------- + +Each native function can be used as part of a `Transform` step in the pipeline. + +* [omit](./adaptor/function/omit) +* [pick](./adaptor/function/pick) +* [pretty](./adaptor/function/pretty) +* [skip](./adaptor/function/skip) + Commands -------- diff --git a/adaptor/all/all.go b/adaptor/all/all.go index 80bcd222c..dbc0ad166 100644 --- a/adaptor/all/all.go +++ b/adaptor/all/all.go @@ -4,6 +4,7 @@ import ( // Initialize all adapters by importing this package _ "github.com/compose/transporter/adaptor/elasticsearch" _ "github.com/compose/transporter/adaptor/file" + _ "github.com/compose/transporter/adaptor/function" _ "github.com/compose/transporter/adaptor/mongodb" _ "github.com/compose/transporter/adaptor/postgres" _ "github.com/compose/transporter/adaptor/rabbitmq" diff --git a/adaptor/function/all.go b/adaptor/function/all.go new file mode 100644 index 000000000..6fe69fade --- /dev/null +++ b/adaptor/function/all.go @@ -0,0 +1,8 @@ +package function + +import ( + _ "github.com/compose/transporter/adaptor/function/omit" + _ "github.com/compose/transporter/adaptor/function/pick" + _ "github.com/compose/transporter/adaptor/function/pretty" + _ "github.com/compose/transporter/adaptor/function/skip" +) diff --git a/adaptor/function/omit/README.md b/adaptor/function/omit/README.md new file mode 100644 index 000000000..2e20a1388 --- /dev/null +++ b/adaptor/function/omit/README.md @@ -0,0 +1,33 @@ +# omit function + +`omit()` will remove any fields specified from the message and then send down the pipeline. It currently only works for top level fields (i.e. `address.street` would not work). + +### configuration + +```javascript +omit({"fields": ["name"]}) +``` + +### example + +message in +```JSON +{ + "_id": 0, + "name": "transporter", + "type": "function" +} +``` + +config +```javascript +omit({"fields":["type"]}) +``` + +message out +```JSON +{ + "_id": 0, + "name": "transporter" +} +``` \ No newline at end of file diff --git a/adaptor/function/omit/omitter.go b/adaptor/function/omit/omitter.go new file mode 100644 index 000000000..4cd2559b5 --- /dev/null +++ b/adaptor/function/omit/omitter.go @@ -0,0 +1,43 @@ +package omit + +import ( + "sync" + + "github.com/compose/transporter/adaptor" + "github.com/compose/transporter/client" + "github.com/compose/transporter/message" +) + +func init() { + adaptor.Add( + "omit", + func() adaptor.Adaptor { + return &Omitter{} + }, + ) +} + +type Omitter struct { + Fields []string `json:"fields"` +} + +func (o *Omitter) Client() (client.Client, error) { + return &client.Mock{}, nil +} + +func (o *Omitter) Reader() (client.Reader, error) { + return nil, adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} +} + +func (o *Omitter) Writer(chan struct{}, *sync.WaitGroup) (client.Writer, error) { + return o, nil +} + +func (o *Omitter) Write(msg message.Msg) func(client.Session) (message.Msg, error) { + return func(s client.Session) (message.Msg, error) { + for _, k := range o.Fields { + msg.Data().Delete(k) + } + return msg, nil + } +} diff --git a/adaptor/function/omit/omitter_test.go b/adaptor/function/omit/omitter_test.go new file mode 100644 index 000000000..a1ca0e2f9 --- /dev/null +++ b/adaptor/function/omit/omitter_test.go @@ -0,0 +1,77 @@ +package omit + +import ( + "reflect" + "testing" + + "github.com/compose/transporter/adaptor" + _ "github.com/compose/transporter/log" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/ops" +) + +var initTests = []map[string]interface{}{ + {"fields": []string{"test"}}, +} + +func TestInit(t *testing.T) { + for _, it := range initTests { + a, err := adaptor.GetAdaptor("omit", it) + if err != nil { + t.Fatalf("unexpected GetAdaptor() error, %s", err) + } + if _, err := a.Client(); err != nil { + t.Errorf("unexpected Client() error, %s", err) + } + rerr := adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} + if _, err := a.Reader(); err != rerr { + t.Errorf("wrong Reader() error, expected %s, got %s", rerr, err) + } + if _, err := a.Writer(nil, nil); err != nil { + t.Errorf("unexpected Writer() error, %s", err) + } + } +} + +var omitTests = []struct { + name string + fields []string + in map[string]interface{} + out map[string]interface{} + err error +}{ + { + "single field", + []string{"type"}, + map[string]interface{}{"_id": "blah", "type": "good"}, + map[string]interface{}{"_id": "blah"}, + nil, + }, + { + "multiple fields", + []string{"type", "name"}, + map[string]interface{}{"_id": "blah", "type": "good", "name": "hello"}, + map[string]interface{}{"_id": "blah"}, + nil, + }, + { + "no matched fields", + []string{"name"}, + map[string]interface{}{"_id": "blah", "type": "good"}, + map[string]interface{}{"_id": "blah", "type": "good"}, + nil, + }, +} + +func TestOmit(t *testing.T) { + for _, ot := range omitTests { + omit := &Omitter{ot.fields} + msg, err := omit.Write(message.From(ops.Insert, "test", ot.in))(nil) + if !reflect.DeepEqual(err, ot.err) { + t.Errorf("[%s] error mismatch, expected %s, got %s", ot.name, ot.err, err) + } + if !reflect.DeepEqual(msg.Data().AsMap(), ot.out) { + t.Errorf("[%s] wrong message, expected %+v, got %+v", ot.name, ot.out, msg.Data().AsMap()) + } + } +} diff --git a/adaptor/function/pick/README.md b/adaptor/function/pick/README.md new file mode 100644 index 000000000..dd879ec73 --- /dev/null +++ b/adaptor/function/pick/README.md @@ -0,0 +1,33 @@ +# pick function + +`pick()` will only include the specified fields from the message when sending down the pipeline. It currently only works for top level fields (i.e. `address.street` would not work). + +### configuration + +```javascript +pick({"fields": ["name"]}) +``` + +### example + +message in +```JSON +{ + "_id": 0, + "name": "transporter", + "type": "function" +} +``` + +config +```javascript +pick({"fields":["_id", "name"]}) +``` + +message out +```JSON +{ + "_id": 0, + "name": "transporter" +} +``` \ No newline at end of file diff --git a/adaptor/function/pick/picker.go b/adaptor/function/pick/picker.go new file mode 100644 index 000000000..d5c8068ea --- /dev/null +++ b/adaptor/function/pick/picker.go @@ -0,0 +1,46 @@ +package pick + +import ( + "sync" + + "github.com/compose/transporter/adaptor" + "github.com/compose/transporter/client" + "github.com/compose/transporter/message" +) + +func init() { + adaptor.Add( + "pick", + func() adaptor.Adaptor { + return &Picker{} + }, + ) +} + +type Picker struct { + Fields []string `json:"fields"` +} + +func (p *Picker) Client() (client.Client, error) { + return &client.Mock{}, nil +} + +func (p *Picker) Reader() (client.Reader, error) { + return nil, adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} +} + +func (p *Picker) Writer(chan struct{}, *sync.WaitGroup) (client.Writer, error) { + return p, nil +} + +func (p *Picker) Write(msg message.Msg) func(client.Session) (message.Msg, error) { + return func(s client.Session) (message.Msg, error) { + pluckedMsg := map[string]interface{}{} + for _, k := range p.Fields { + if v, ok := msg.Data().AsMap()[k]; ok { + pluckedMsg[k] = v + } + } + return message.From(msg.OP(), msg.Namespace(), pluckedMsg), nil + } +} diff --git a/adaptor/function/pick/picker_test.go b/adaptor/function/pick/picker_test.go new file mode 100644 index 000000000..32d8c2a13 --- /dev/null +++ b/adaptor/function/pick/picker_test.go @@ -0,0 +1,77 @@ +package pick + +import ( + "reflect" + "testing" + + "github.com/compose/transporter/adaptor" + _ "github.com/compose/transporter/log" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/ops" +) + +var initTests = []map[string]interface{}{ + {"fields": []string{"test"}}, +} + +func TestInit(t *testing.T) { + for _, it := range initTests { + a, err := adaptor.GetAdaptor("pick", it) + if err != nil { + t.Fatalf("unexpected GetAdaptor() error, %s", err) + } + if _, err := a.Client(); err != nil { + t.Errorf("unexpected Client() error, %s", err) + } + rerr := adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} + if _, err := a.Reader(); err != rerr { + t.Errorf("wrong Reader() error, expected %s, got %s", rerr, err) + } + if _, err := a.Writer(nil, nil); err != nil { + t.Errorf("unexpected Writer() error, %s", err) + } + } +} + +var pickTests = []struct { + name string + fields []string + in map[string]interface{} + out map[string]interface{} + err error +}{ + { + "single field", + []string{"type"}, + map[string]interface{}{"_id": "blah", "type": "good"}, + map[string]interface{}{"type": "good"}, + nil, + }, + { + "multiple fields", + []string{"_id", "name"}, + map[string]interface{}{"_id": "blah", "type": "good", "name": "hello"}, + map[string]interface{}{"_id": "blah", "name": "hello"}, + nil, + }, + { + "no matched fields", + []string{"name"}, + map[string]interface{}{"_id": "blah", "type": "good"}, + map[string]interface{}{}, + nil, + }, +} + +func TestOmit(t *testing.T) { + for _, pt := range pickTests { + pick := &Picker{pt.fields} + msg, err := pick.Write(message.From(ops.Insert, "test", pt.in))(nil) + if !reflect.DeepEqual(err, pt.err) { + t.Errorf("[%s] error mismatch, expected %s, got %s", pt.name, pt.err, err) + } + if !reflect.DeepEqual(msg.Data().AsMap(), pt.out) { + t.Errorf("[%s] wrong message, expected %+v, got %+v", pt.name, pt.out, msg.Data().AsMap()) + } + } +} diff --git a/adaptor/function/pretty/README.md b/adaptor/function/pretty/README.md new file mode 100644 index 000000000..5a9e9ed4e --- /dev/null +++ b/adaptor/function/pretty/README.md @@ -0,0 +1,46 @@ +# pretty function + +`pretty()` will marshal the data to JSON and then log it at the `INFO` level. The default indention setting is `2` spaces and if set to `0`, it will print on a single line. + +### configuration + +```javascript +pretty({"spaces": 2}) +``` + +### example + +message in +```JSON +{ + "_id": 0, + "name": "transporter", + "type": "function" +} +``` + +config +```javascript +pretty({"spaces":0}) +``` + +log line +```shell +INFO[0000] +{"_id":0,"name":"transporter","type":"function"} +``` + +config +```javascript +pretty({"spaces":2}) +``` + +log line +```shell +INFO[0000] +{ + "_id":0, + "name":"transporter", + "type":"function" +} +``` \ No newline at end of file diff --git a/adaptor/function/pretty/prettify.go b/adaptor/function/pretty/prettify.go new file mode 100644 index 000000000..db2d09529 --- /dev/null +++ b/adaptor/function/pretty/prettify.go @@ -0,0 +1,58 @@ +package pretty + +import ( + "encoding/json" + "strings" + "sync" + + "github.com/compose/mejson" + "github.com/compose/transporter/adaptor" + "github.com/compose/transporter/client" + "github.com/compose/transporter/log" + "github.com/compose/transporter/message" +) + +const ( + DefaultIndent = 2 +) + +var ( + DefaultPrettifier = &Prettify{Spaces: DefaultIndent} +) + +func init() { + adaptor.Add( + "pretty", + func() adaptor.Adaptor { + return DefaultPrettifier + }, + ) +} + +type Prettify struct { + Spaces int `json:"spaces"` +} + +func (p *Prettify) Client() (client.Client, error) { + return &client.Mock{}, nil +} + +func (p *Prettify) Reader() (client.Reader, error) { + return nil, adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} +} + +func (p *Prettify) Writer(chan struct{}, *sync.WaitGroup) (client.Writer, error) { + return p, nil +} + +func (p *Prettify) Write(msg message.Msg) func(client.Session) (message.Msg, error) { + return func(s client.Session) (message.Msg, error) { + d, _ := mejson.Unmarshal(msg.Data()) + b, _ := json.Marshal(d) + if p.Spaces > 0 { + b, _ = json.MarshalIndent(d, "", strings.Repeat(" ", p.Spaces)) + } + log.Infof("\n%s", string(b)) + return msg, nil + } +} diff --git a/adaptor/function/pretty/prettify_test.go b/adaptor/function/pretty/prettify_test.go new file mode 100644 index 000000000..d36c58ac2 --- /dev/null +++ b/adaptor/function/pretty/prettify_test.go @@ -0,0 +1,65 @@ +package pretty + +import ( + "reflect" + "testing" + "time" + + "github.com/compose/transporter/adaptor" + _ "github.com/compose/transporter/log" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/ops" + + bson "gopkg.in/mgo.v2/bson" +) + +func TestInit(t *testing.T) { + a, err := adaptor.GetAdaptor("pretty", map[string]interface{}{}) + if err != nil { + t.Fatalf("unexpected GetAdaptor() error, %s", err) + } + if _, err := a.Client(); err != nil { + t.Errorf("unexpected Client() error, %s", err) + } + rerr := adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} + if _, err := a.Reader(); err != rerr { + t.Errorf("wrong Reader() error, expected %s, got %s", rerr, err) + } + if _, err := a.Writer(nil, nil); err != nil { + t.Errorf("unexpected Writer() error, %s", err) + } +} + +var prettyTests = []struct { + p *Prettify + data map[string]interface{} +}{ + { + DefaultPrettifier, + map[string]interface{}{"_id": "blah", "type": "good"}, + }, + { + DefaultPrettifier, + map[string]interface{}{"_id": "blah", "type": "good", "name": "hello"}, + }, + { + DefaultPrettifier, + map[string]interface{}{"_id": bson.NewObjectId(), "hello": "world", "ts": bson.MongoTimestamp(time.Now().Unix() << 32)}, + }, + { + &Prettify{Spaces: 0}, + map[string]interface{}{"_id": bson.NewObjectId(), "hello": "world", "ts": bson.MongoTimestamp(time.Now().Unix() << 32)}, + }, +} + +func TestPretty(t *testing.T) { + for _, pt := range prettyTests { + msg, err := pt.p.Write(message.From(ops.Insert, "test", pt.data))(nil) + if err != nil { + t.Errorf("unexpected error, got %s", err) + } + if !reflect.DeepEqual(msg.Data().AsMap(), pt.data) { + t.Errorf("wrong message, expected %+v, got %+v", pt.data, msg.Data().AsMap()) + } + } +} diff --git a/adaptor/function/skip/README.md b/adaptor/function/skip/README.md new file mode 100644 index 000000000..4833c8536 --- /dev/null +++ b/adaptor/function/skip/README.md @@ -0,0 +1,43 @@ +# skip function + +`skip()` will evalute the data based on the criteria configured and determine whether the message should continue down the pipeline or be skipped. When evaluating the data, `true` will result in the message being sent down the pipeline and `false` will result in the message being skipped. Take a look at the [tests](skipper_test.go) for all currently supported configurations. It currently only works for top level fields (i.e. `address.street` would not work). + +### configuration + +```javascript +skip({"field": "test", "operator": "==", "match": 10}) +``` + +### example + +message in +```JSON +{ + "_id": 0, + "name": "transporter", + "type": "function", + "count": 10 +} +``` + +config +```javascript +skip({"field": "count", "operator": "==", "match": 10}) +``` + +message out +```JSON +{ + "_id": 0, + "name": "transporter", + "type": "function", + "count": 10 +} +``` + +config +```javascript +skip({"field": "count", "operator": ">", "match": 20}) +``` + +message would be skipped \ No newline at end of file diff --git a/adaptor/function/skip/skipper.go b/adaptor/function/skip/skipper.go new file mode 100644 index 000000000..cd6357ee5 --- /dev/null +++ b/adaptor/function/skip/skipper.go @@ -0,0 +1,127 @@ +package skip + +import ( + "fmt" + "math" + "reflect" + "regexp" + "strconv" + "sync" + + "github.com/compose/transporter/adaptor" + "github.com/compose/transporter/client" + "github.com/compose/transporter/message" +) + +type UnknownOperatorError struct { + Op string +} + +func (e UnknownOperatorError) Error() string { + return fmt.Sprintf("unkown operator, %s", e.Op) +} + +type WrongTypeError struct { + Wanted string + Got string +} + +func (e WrongTypeError) Error() string { + return fmt.Sprintf("value is of incompatible type, wanted %s, got %s", e.Wanted, e.Got) +} + +func init() { + adaptor.Add( + "skip", + func() adaptor.Adaptor { + return &Skip{} + }, + ) +} + +type Skip struct { + Field string `json:"field"` + Operator string `json:"operator"` + Match interface{} `json:"match"` +} + +func (s *Skip) Client() (client.Client, error) { + return &client.Mock{}, nil +} + +func (s *Skip) Reader() (client.Reader, error) { + return nil, adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} +} + +func (s *Skip) Writer(chan struct{}, *sync.WaitGroup) (client.Writer, error) { + return s, nil +} + +func (s *Skip) Write(msg message.Msg) func(client.Session) (message.Msg, error) { + return func(client.Session) (message.Msg, error) { + val := msg.Data().Get(s.Field) + switch s.Operator { + case "==", "eq", "$eq": + if reflect.DeepEqual(val, s.Match) { + return msg, nil + } + case "=~": + if ok, err := regexp.MatchString(s.Match.(string), val.(string)); err != nil || ok { + return msg, err + } + case ">", "gt", "$gt": + v, m, err := convertForComparison(val, s.Match) + if err == nil && v > m { + return msg, err + } + return nil, err + case ">=", "gte", "$gte": + v, m, err := convertForComparison(val, s.Match) + if err == nil && v >= m { + return msg, err + } + return nil, err + case "<", "lt", "$lt": + v, m, err := convertForComparison(val, s.Match) + if err == nil && v < m { + return msg, err + } + return nil, err + case "<=", "lte", "$lte": + v, m, err := convertForComparison(val, s.Match) + if err == nil && v <= m { + return msg, err + } + return nil, err + default: + return nil, UnknownOperatorError{s.Operator} + } + return nil, nil + } +} + +func convertForComparison(in1, in2 interface{}) (float64, float64, error) { + float1, err := convertToFloat(in1) + if err != nil { + return math.NaN(), math.NaN(), err + } + float2, err := convertToFloat(in2) + if err != nil { + return math.NaN(), math.NaN(), err + } + return float1, float2, nil +} + +func convertToFloat(in interface{}) (float64, error) { + switch i := in.(type) { + case float64: + return i, nil + case int: + return float64(i), nil + case string: + return strconv.ParseFloat(i, 0) + default: + return math.NaN(), WrongTypeError{"float64 or int", fmt.Sprintf("%T", i)} + } + +} diff --git a/adaptor/function/skip/skipper_test.go b/adaptor/function/skip/skipper_test.go new file mode 100644 index 000000000..e782b9eeb --- /dev/null +++ b/adaptor/function/skip/skipper_test.go @@ -0,0 +1,161 @@ +package skip + +import ( + "reflect" + "strconv" + "testing" + + "github.com/compose/transporter/adaptor" + _ "github.com/compose/transporter/log" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/ops" +) + +var errorTests = []struct { + name string + expected string + e error +}{ + { + "WrongTypeError", + "value is of incompatible type, wanted blah, got blah", + WrongTypeError{"blah", "blah"}, + }, + { + "UnknownOperatorError", + "unkown operator, dosomething", + UnknownOperatorError{"dosomething"}, + }, +} + +func TestErrors(t *testing.T) { + for _, et := range errorTests { + if et.e.Error() != et.expected { + t.Errorf("[%s] wrong Error(), expected %s, got %s", et.name, et.expected, et.e.Error()) + } + } +} + +var initTests = []map[string]interface{}{ + {"field": "test", "operator": "==", "match": 10}, +} + +func TestInit(t *testing.T) { + for _, it := range initTests { + a, err := adaptor.GetAdaptor("skip", it) + if err != nil { + t.Fatalf("unexpected GetAdaptor() error, %s", err) + } + if _, err := a.Client(); err != nil { + t.Errorf("unexpected Client() error, %s", err) + } + rerr := adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"} + if _, err := a.Reader(); err != rerr { + t.Errorf("wrong Reader() error, expected %s, got %s", rerr, err) + } + if _, err := a.Writer(nil, nil); err != nil { + t.Errorf("unexpected Writer() error, %s", err) + } + } +} + +var skipTests = []struct { + name string + field string + operators []string + match interface{} + data map[string]interface{} + err error + skipped bool +}{ + { + "unknown operator", "type", []string{"="}, "good", map[string]interface{}{"_id": "blah", "type": "good"}, UnknownOperatorError{"="}, true, + }, + { + "match", "type", []string{"==", "eq", "$eq"}, "good", map[string]interface{}{"_id": "blah", "type": "good"}, nil, false, + }, + { + "skipped", "type", []string{"==", "eq", "$eq"}, "bad", map[string]interface{}{"_id": "blah", "type": "good"}, nil, true, + }, + { + "match", "type", []string{"=~"}, "good", map[string]interface{}{"_id": "blah", "type": "goodnight"}, nil, false, + }, + { + "skipped", "type", []string{"=~"}, "^good", map[string]interface{}{"_id": "blah", "type": "very good"}, nil, true, + }, + { + "match", "count", []string{">", "gt", "$gt"}, 10, map[string]interface{}{"_id": "blah", "count": 11}, nil, false, + }, + { + "skipped", "count", []string{">", "gt", "$gt"}, 10, map[string]interface{}{"_id": "blah", "count": 10}, nil, true, + }, + { + "match", "count", []string{">", "gt", "$gt"}, 10.5, map[string]interface{}{"_id": "blah", "count": 11}, nil, false, + }, + { + "skipped", "count", []string{">", "gt", "$gt"}, 10.5, map[string]interface{}{"_id": "blah", "count": 10}, nil, true, + }, + { + "match", "count", []string{">=", "gte", "$gte"}, 10, map[string]interface{}{"_id": "blah", "count": 10}, nil, false, + }, + { + "match", "count", []string{">=", "gte", "$gte"}, 10, map[string]interface{}{"_id": "blah", "count": 10.5}, nil, false, + }, + { + "skipped", "count", []string{">=", "gte", "$gte"}, 10, map[string]interface{}{"_id": "blah", "count": 9.5}, nil, true, + }, + { + "match", "count", []string{"<", "lt", "$lt"}, 10, map[string]interface{}{"_id": "blah", "count": 9}, nil, false, + }, + { + "skipped", "count", []string{"<", "lt", "$lt"}, 10, map[string]interface{}{"_id": "blah", "count": 10}, nil, true, + }, + { + "match", "count", []string{"<=", "lte", "$lte"}, 10, map[string]interface{}{"_id": "blah", "count": 9.9}, nil, false, + }, + { + "match", "count", []string{"<=", "lte", "$lte"}, 10, map[string]interface{}{"_id": "blah", "count": 10}, nil, false, + }, + { + "skipped", "count", []string{"<=", "lte", "$lte"}, 10, map[string]interface{}{"_id": "blah", "count": 10.1}, nil, true, + }, + { + "match", "count", []string{"lte"}, "10", map[string]interface{}{"_id": "blah", "count": 10}, nil, false, + }, + { + "match", "count", []string{"lte"}, 10, map[string]interface{}{"_id": "blah", "count": "10"}, nil, false, + }, + { + "wrong type", "count", []string{"<="}, "10", map[string]interface{}{"_id": "blah", "count": 10.1}, nil, true, + }, + { + "wrong type", "count", []string{"<="}, 10, map[string]interface{}{"_id": "blah", "count": "10.1"}, nil, true, + }, + { + "uncovertable string", "count", []string{"<="}, "ten", map[string]interface{}{"_id": "blah", "count": 10.1}, &strconv.NumError{"ParseFloat", "ten", strconv.ErrSyntax}, true, + }, + { + "uncovertable string", "count", []string{"<="}, 10, map[string]interface{}{"_id": "blah", "count": "ten"}, &strconv.NumError{"ParseFloat", "ten", strconv.ErrSyntax}, true, + }, + { + "wrong type", "count", []string{"<="}, true, map[string]interface{}{"_id": "blah", "count": 10.1}, WrongTypeError{"float64 or int", "bool"}, true, + }, + { + "wrong type", "count", []string{"<="}, 10, map[string]interface{}{"_id": "blah", "count": false}, WrongTypeError{"float64 or int", "bool"}, true, + }, +} + +func TestSkip(t *testing.T) { + for _, st := range skipTests { + for _, op := range st.operators { + skip := &Skip{st.field, op, st.match} + msg, err := skip.Write(message.From(ops.Insert, "test", st.data))(nil) + if !reflect.DeepEqual(err, st.err) { + t.Errorf("[%s %s] error mismatch, expected %s, got %s", op, st.name, st.err, err) + } + if (msg == nil) != st.skipped { + t.Errorf("[%s %s] skip mismatch, expected %v, got %v", op, st.name, st.skipped, (msg == nil)) + } + } + } +} diff --git a/pipe/pipe.go b/pipe/pipe.go index 09830de5e..045d6891b 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -111,9 +111,6 @@ func (m *Pipe) Stop() { // we only worry about the stop channel if we're in a listening loop if m.listening { - if len(m.Out) > 0 { - close(m.Err) - } c := make(chan bool) m.chStop <- c <-c diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index e2e85667c..a1522ee58 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -107,6 +107,7 @@ func (pipeline *Pipeline) Stop() { pipeline.emitMetrics() pipeline.source.pipe.Event <- events.NewExitEvent(time.Now().UnixNano(), pipeline.version, endpoints) pipeline.emitter.Stop() + close(pipeline.source.pipe.Err) } // Run the pipeline