Skip to content

Commit

Permalink
initial set of native transformer functions (compose#312)
Browse files Browse the repository at this point in the history
these built-in functions can be used in place of having to write a JS transform function with the added benefits of performance and simplicity

fixes compose#299
  • Loading branch information
jipperinbham authored Mar 16, 2017
1 parent b5921dc commit feebbd9
Show file tree
Hide file tree
Showing 18 changed files with 833 additions and 3 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ before_script:
after_script:
- "./scripts/after_script.sh"
install: true
branches:
only:
- master
- /^v\d+\.\d+(\.\d+)?(-\S*)?$/
notifications:
slack:
secure: R8wvRnq0DcxiFNgUvJ3npnzY2LzU8uVyF8enqfxXNuSR3jRC2tqUosB5Qzb1CCiNicmpEwj3VTcwTozzCwcqckysFek3Pp2/oxYL8tRjqxks1zUeMHVv204l83Js8PAFVhODCQjIxZNQCdUM2fQ9q46MvdY7V8h/wGTbQKq1ZLE=
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ Each adaptor has its own README page with details on configuration and capabilit
* [rethinkdb](./adaptor/rethinkdb)
* [transformer](./adaptor/transformer)

Native Functions
----------------

Each native function can be used as part of a `Transform` step in the pipeline.

* [omit](./adaptor/function/omit)
* [pick](./adaptor/function/pick)
* [pretty](./adaptor/function/pretty)
* [skip](./adaptor/function/skip)

Commands
--------

Expand Down
1 change: 1 addition & 0 deletions adaptor/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
// Initialize all adapters by importing this package
_ "github.com/compose/transporter/adaptor/elasticsearch"
_ "github.com/compose/transporter/adaptor/file"
_ "github.com/compose/transporter/adaptor/function"
_ "github.com/compose/transporter/adaptor/mongodb"
_ "github.com/compose/transporter/adaptor/postgres"
_ "github.com/compose/transporter/adaptor/rabbitmq"
Expand Down
8 changes: 8 additions & 0 deletions adaptor/function/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package function

import (
_ "github.com/compose/transporter/adaptor/function/omit"
_ "github.com/compose/transporter/adaptor/function/pick"
_ "github.com/compose/transporter/adaptor/function/pretty"
_ "github.com/compose/transporter/adaptor/function/skip"
)
33 changes: 33 additions & 0 deletions adaptor/function/omit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# omit function

`omit()` will remove any fields specified from the message and then send down the pipeline. It currently only works for top level fields (i.e. `address.street` would not work).

### configuration

```javascript
omit({"fields": ["name"]})
```

### example

message in
```JSON
{
"_id": 0,
"name": "transporter",
"type": "function"
}
```

config
```javascript
omit({"fields":["type"]})
```

message out
```JSON
{
"_id": 0,
"name": "transporter"
}
```
43 changes: 43 additions & 0 deletions adaptor/function/omit/omitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package omit

import (
"sync"

"github.com/compose/transporter/adaptor"
"github.com/compose/transporter/client"
"github.com/compose/transporter/message"
)

func init() {
adaptor.Add(
"omit",
func() adaptor.Adaptor {
return &Omitter{}
},
)
}

type Omitter struct {
Fields []string `json:"fields"`
}

func (o *Omitter) Client() (client.Client, error) {
return &client.Mock{}, nil
}

func (o *Omitter) Reader() (client.Reader, error) {
return nil, adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"}
}

func (o *Omitter) Writer(chan struct{}, *sync.WaitGroup) (client.Writer, error) {
return o, nil
}

func (o *Omitter) Write(msg message.Msg) func(client.Session) (message.Msg, error) {
return func(s client.Session) (message.Msg, error) {
for _, k := range o.Fields {
msg.Data().Delete(k)
}
return msg, nil
}
}
77 changes: 77 additions & 0 deletions adaptor/function/omit/omitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package omit

import (
"reflect"
"testing"

"github.com/compose/transporter/adaptor"
_ "github.com/compose/transporter/log"
"github.com/compose/transporter/message"
"github.com/compose/transporter/message/ops"
)

var initTests = []map[string]interface{}{
{"fields": []string{"test"}},
}

func TestInit(t *testing.T) {
for _, it := range initTests {
a, err := adaptor.GetAdaptor("omit", it)
if err != nil {
t.Fatalf("unexpected GetAdaptor() error, %s", err)
}
if _, err := a.Client(); err != nil {
t.Errorf("unexpected Client() error, %s", err)
}
rerr := adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"}
if _, err := a.Reader(); err != rerr {
t.Errorf("wrong Reader() error, expected %s, got %s", rerr, err)
}
if _, err := a.Writer(nil, nil); err != nil {
t.Errorf("unexpected Writer() error, %s", err)
}
}
}

var omitTests = []struct {
name string
fields []string
in map[string]interface{}
out map[string]interface{}
err error
}{
{
"single field",
[]string{"type"},
map[string]interface{}{"_id": "blah", "type": "good"},
map[string]interface{}{"_id": "blah"},
nil,
},
{
"multiple fields",
[]string{"type", "name"},
map[string]interface{}{"_id": "blah", "type": "good", "name": "hello"},
map[string]interface{}{"_id": "blah"},
nil,
},
{
"no matched fields",
[]string{"name"},
map[string]interface{}{"_id": "blah", "type": "good"},
map[string]interface{}{"_id": "blah", "type": "good"},
nil,
},
}

func TestOmit(t *testing.T) {
for _, ot := range omitTests {
omit := &Omitter{ot.fields}
msg, err := omit.Write(message.From(ops.Insert, "test", ot.in))(nil)
if !reflect.DeepEqual(err, ot.err) {
t.Errorf("[%s] error mismatch, expected %s, got %s", ot.name, ot.err, err)
}
if !reflect.DeepEqual(msg.Data().AsMap(), ot.out) {
t.Errorf("[%s] wrong message, expected %+v, got %+v", ot.name, ot.out, msg.Data().AsMap())
}
}
}
33 changes: 33 additions & 0 deletions adaptor/function/pick/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# pick function

`pick()` will only include the specified fields from the message when sending down the pipeline. It currently only works for top level fields (i.e. `address.street` would not work).

### configuration

```javascript
pick({"fields": ["name"]})
```

### example

message in
```JSON
{
"_id": 0,
"name": "transporter",
"type": "function"
}
```

config
```javascript
pick({"fields":["_id", "name"]})
```

message out
```JSON
{
"_id": 0,
"name": "transporter"
}
```
46 changes: 46 additions & 0 deletions adaptor/function/pick/picker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package pick

import (
"sync"

"github.com/compose/transporter/adaptor"
"github.com/compose/transporter/client"
"github.com/compose/transporter/message"
)

func init() {
adaptor.Add(
"pick",
func() adaptor.Adaptor {
return &Picker{}
},
)
}

type Picker struct {
Fields []string `json:"fields"`
}

func (p *Picker) Client() (client.Client, error) {
return &client.Mock{}, nil
}

func (p *Picker) Reader() (client.Reader, error) {
return nil, adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"}
}

func (p *Picker) Writer(chan struct{}, *sync.WaitGroup) (client.Writer, error) {
return p, nil
}

func (p *Picker) Write(msg message.Msg) func(client.Session) (message.Msg, error) {
return func(s client.Session) (message.Msg, error) {
pluckedMsg := map[string]interface{}{}
for _, k := range p.Fields {
if v, ok := msg.Data().AsMap()[k]; ok {
pluckedMsg[k] = v
}
}
return message.From(msg.OP(), msg.Namespace(), pluckedMsg), nil
}
}
77 changes: 77 additions & 0 deletions adaptor/function/pick/picker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package pick

import (
"reflect"
"testing"

"github.com/compose/transporter/adaptor"
_ "github.com/compose/transporter/log"
"github.com/compose/transporter/message"
"github.com/compose/transporter/message/ops"
)

var initTests = []map[string]interface{}{
{"fields": []string{"test"}},
}

func TestInit(t *testing.T) {
for _, it := range initTests {
a, err := adaptor.GetAdaptor("pick", it)
if err != nil {
t.Fatalf("unexpected GetAdaptor() error, %s", err)
}
if _, err := a.Client(); err != nil {
t.Errorf("unexpected Client() error, %s", err)
}
rerr := adaptor.ErrFuncNotSupported{Name: "transformer", Func: "Reader()"}
if _, err := a.Reader(); err != rerr {
t.Errorf("wrong Reader() error, expected %s, got %s", rerr, err)
}
if _, err := a.Writer(nil, nil); err != nil {
t.Errorf("unexpected Writer() error, %s", err)
}
}
}

var pickTests = []struct {
name string
fields []string
in map[string]interface{}
out map[string]interface{}
err error
}{
{
"single field",
[]string{"type"},
map[string]interface{}{"_id": "blah", "type": "good"},
map[string]interface{}{"type": "good"},
nil,
},
{
"multiple fields",
[]string{"_id", "name"},
map[string]interface{}{"_id": "blah", "type": "good", "name": "hello"},
map[string]interface{}{"_id": "blah", "name": "hello"},
nil,
},
{
"no matched fields",
[]string{"name"},
map[string]interface{}{"_id": "blah", "type": "good"},
map[string]interface{}{},
nil,
},
}

func TestOmit(t *testing.T) {
for _, pt := range pickTests {
pick := &Picker{pt.fields}
msg, err := pick.Write(message.From(ops.Insert, "test", pt.in))(nil)
if !reflect.DeepEqual(err, pt.err) {
t.Errorf("[%s] error mismatch, expected %s, got %s", pt.name, pt.err, err)
}
if !reflect.DeepEqual(msg.Data().AsMap(), pt.out) {
t.Errorf("[%s] wrong message, expected %+v, got %+v", pt.name, pt.out, msg.Data().AsMap())
}
}
}
Loading

0 comments on commit feebbd9

Please sign in to comment.