Skip to content

Commit

Permalink
new function.Function interface to replace Transformer adaptor (compo…
Browse files Browse the repository at this point in the history
…se#316)

sink nodes will now have an array of Transforms which map to an implemented function.Function, this simplifies the message passing as well as makes it easier to add more native functions or additional language runtimes in the future
  • Loading branch information
jipperinbham authored Mar 21, 2017
1 parent feebbd9 commit 86d88b5
Show file tree
Hide file tree
Showing 102 changed files with 1,702 additions and 1,496 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ env:
- TESTDIR=adaptor/postgres/...
- TESTDIR=adaptor/rabbitmq/...
- TESTDIR=adaptor/rethinkdb/...
- TESTDIR="adaptor, adaptor/all, adaptor/file/..., adaptor/transformer/..., client/..., events/..., log/..., message/..., pipe/..., state/..., pipeline/..."
- TESTDIR="adaptor, adaptor/all, adaptor/file/..., client/..., events/..., function/..., log/..., message/..., pipe/..., state/..., pipeline/..."
- TESTDIR=integration_tests/mongo_to_mongo
- TESTDIR=integration_tests/mongo_to_es
- TESTDIR=integration_tests/mongo_to_rethink
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Transporter no longer requires a YAML file. All configuration is in the JS file
- NEW RabbitMQ adaptor [#298](https://github.com/compose/transporter/pull/298)
- MongoDB adaptor supports per collection query filter when needing to copy only a subset of data [#301](https://github.com/compose/transporter/pull/301)
- [goja](https://github.com/dop251/goja) added as an option for the JavaScript VM in transformers [#294](https://github.com/compose/transporter/pull/294)
- NEW [native functions](https://github.com/compose/transporter#native-functions)

### Bugfixes

Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ Each adaptor has its own README page with details on configuration and capabilit
* [postgresql](./adaptor/postgres)
* [rabbitmq](./adaptor/rabbitmq)
* [rethinkdb](./adaptor/rethinkdb)
* [transformer](./adaptor/transformer)

Native Functions
----------------

Each native function can be used as part of a `Transform` step in the pipeline.

* [goja](./adaptor/function/gojajs)
* [omit](./adaptor/function/omit)
* [otto](./adaptor/function/ottojs)
* [pick](./adaptor/function/pick)
* [pretty](./adaptor/function/pretty)
* [rename](./adaptor/function/rename)
* [skip](./adaptor/function/skip)

Commands
Expand Down Expand Up @@ -94,7 +96,6 @@ mongodb - a mongodb adaptor that functions as both a source and a sink
postgres - a postgres adaptor that functions as both a source and a sink
rabbitmq - an adaptor that handles publish/subscribe messaging with RabbitMQ
rethinkdb - a rethinkdb adaptor that functions as both a source and a sink
transformer - an adaptor that transforms documents using a javascript function
```

Giving the name of an adaptor produces more detail, such as the sample configuration.
Expand Down
5 changes: 2 additions & 3 deletions adaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func CompileNamespace(ns string) (string, *regexp.Regexp, error) {

// BaseConfig is a standard typed config struct to use for as general purpose config for most databases.
type BaseConfig struct {
URI string `json:"uri"`
Namespace string `json:"namespace"`
Timeout string `json:"timeout"`
URI string `json:"uri"`
Timeout string `json:"timeout"`
}
2 changes: 0 additions & 2 deletions adaptor/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
// Initialize all adapters by importing this package
_ "github.com/compose/transporter/adaptor/elasticsearch"
_ "github.com/compose/transporter/adaptor/file"
_ "github.com/compose/transporter/adaptor/function"
_ "github.com/compose/transporter/adaptor/mongodb"
_ "github.com/compose/transporter/adaptor/postgres"
_ "github.com/compose/transporter/adaptor/rabbitmq"
_ "github.com/compose/transporter/adaptor/rethinkdb"
_ "github.com/compose/transporter/adaptor/transformer"
)
9 changes: 4 additions & 5 deletions adaptor/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ func (e *Elasticsearch) Reader() (client.Reader, error) {

// Writer determines the which underlying writer to used based on the cluster's version.
func (e *Elasticsearch) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error) {
index, _, _ := adaptor.CompileNamespace(e.Namespace)
return setupWriter(index, e)
return setupWriter(e)
}

func setupWriter(index string, conf *Elasticsearch) (client.Writer, error) {
func setupWriter(conf *Elasticsearch) (client.Writer, error) {
uri, err := url.Parse(conf.URI)
if err != nil {
return nil, client.InvalidURIError{URI: conf.URI, Err: err.Error()}
Expand All @@ -95,7 +94,7 @@ func setupWriter(index string, conf *Elasticsearch) (client.Writer, error) {

timeout, err := time.ParseDuration(conf.Timeout)
if err != nil {
log.Infof("failed to parse duration, %s, falling back to default timeout of 30s", conf.Timeout)
log.Debugf("failed to parse duration, %s, falling back to default timeout of 30s", conf.Timeout)
timeout = 30 * time.Second
}

Expand All @@ -114,7 +113,7 @@ func setupWriter(index string, conf *Elasticsearch) (client.Writer, error) {
URLs: urls,
UserInfo: uri.User,
HTTPClient: httpClient,
Index: index,
Index: uri.Path[1:],
}
versionedClient, _ := vc.Creator(opts)
return versionedClient, nil
Expand Down
24 changes: 12 additions & 12 deletions adaptor/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
authURI = func() string {
uri, _ := url.Parse(authedServer.URL)
uri.User = url.UserPassword(testUser, testPwd)
return uri.String()
return fmt.Sprintf("%s/test", uri.String())
}
)
var authedServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -73,48 +73,48 @@ var clientTests = []struct {
}{
{
"base config",
adaptor.Config{"uri": goodVersionServer.URL, "namespace": "test.test"},
adaptor.Config{"uri": fmt.Sprintf("%s/test", goodVersionServer.URL)},
nil,
},
{
"timeout config",
adaptor.Config{"uri": goodVersionServer.URL, "namespace": "test.test", "timeout": "60s"},
adaptor.Config{"uri": fmt.Sprintf("%s/test", goodVersionServer.URL), "timeout": "60s"},
nil,
},
{
"authed URI",
adaptor.Config{"uri": authURI(), "namespace": "test.test"},
adaptor.Config{"uri": authURI()},
nil,
},
{
"bad URI",
adaptor.Config{"uri": "%gh&%ij", "namespace": "test.test"},
adaptor.Config{"uri": "%gh&%ij"},
client.InvalidURIError{URI: "%gh&%ij", Err: `parse %gh&%ij: invalid URL escape "%gh"`},
},
{
"no connection",
adaptor.Config{"uri": "http://localhost:7200", "namespace": "test.test"},
adaptor.Config{"uri": "http://localhost:7200/test"},
client.ConnectError{Reason: "http://localhost:7200"},
},
{
"empty body",
adaptor.Config{"uri": emptyBodyServer.URL, "namespace": "test.test"},
adaptor.Config{"uri": fmt.Sprintf("%s/test", emptyBodyServer.URL)},
client.VersionError{URI: emptyBodyServer.URL, V: "", Err: "missing version: {}"},
},
{
"malformed JSON",
adaptor.Config{"uri": badJSONServer.URL, "namespace": "test.test"},
adaptor.Config{"uri": fmt.Sprintf("%s/test", badJSONServer.URL)},
client.VersionError{URI: badJSONServer.URL, V: "", Err: "malformed JSON: Hello, client"},
},
{
"bad version",
adaptor.Config{"uri": badVersionServer.URL, "namespace": "test.test"},
client.VersionError{URI: badVersionServer.URL, V: "not a version", Err: "Malformed version: not a version"},
adaptor.Config{"uri": fmt.Sprintf("%s/test", badVersionServer.URL)},
client.VersionError{URI: fmt.Sprintf("%s/test", badVersionServer.URL), V: "not a version", Err: "Malformed version: not a version"},
},
{
"unsupported version",
adaptor.Config{"uri": unsupportedVersionServer.URL, "namespace": "test.test"},
client.VersionError{URI: unsupportedVersionServer.URL, V: "0.9.2", Err: "unsupported client"},
adaptor.Config{"uri": fmt.Sprintf("%s/test", unsupportedVersionServer.URL)},
client.VersionError{URI: fmt.Sprintf("%s/test", unsupportedVersionServer.URL), V: "0.9.2", Err: "unsupported client"},
},
}

Expand Down
2 changes: 1 addition & 1 deletion adaptor/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestSampleConfig(t *testing.T) {
}

var initTests = []map[string]interface{}{
{"uri": DefaultURI, "namespace": "test.test"},
{"uri": DefaultURI},
}

func TestInit(t *testing.T) {
Expand Down
8 changes: 0 additions & 8 deletions adaptor/function/all.go

This file was deleted.

43 changes: 0 additions & 43 deletions adaptor/function/omit/omitter.go

This file was deleted.

46 changes: 0 additions & 46 deletions adaptor/function/pick/picker.go

This file was deleted.

58 changes: 0 additions & 58 deletions adaptor/function/pretty/prettify.go

This file was deleted.

Loading

0 comments on commit 86d88b5

Please sign in to comment.