Skip to content

Commit

Permalink
Rename stream start to run
Browse files Browse the repository at this point in the history
  • Loading branch information
eminano committed Jul 4, 2024
1 parent b2a4f7f commit 28bebc6
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ test:

.PHONY: integration-test
integration-test:
PGSTREAM_INTEGRATION_TESTS=true go test -timeout 90s github.com/xataio/pgstream/pkg/stream/integration
@PGSTREAM_INTEGRATION_TESTS=true go test -timeout 90s github.com/xataio/pgstream/pkg/stream/integration

.PHONY: license-check
license-check:
Expand Down
2 changes: 1 addition & 1 deletion cmd/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func Execute() error {
// register subcommands
rootCmd.AddCommand(initCmd)
rootCmd.AddCommand(tearDownCmd)
rootCmd.AddCommand(startCmd)
rootCmd.AddCommand(runCmd)

return rootCmd.Execute()
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/start_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
"github.com/xataio/pgstream/pkg/stream"
)

var startCmd = &cobra.Command{
Use: "start",
Short: "Starts the configured pgstream modules",
RunE: withSignalWatcher(start),
var runCmd = &cobra.Command{
Use: "run",
Short: "Run starts all the configured pgstream modules",
RunE: withSignalWatcher(run),
}

func start(ctx context.Context) error {
func run(ctx context.Context) error {
logger := zerolog.NewLogger(&zerolog.Config{
LogLevel: viper.GetString("PGSTREAM_LOG_LEVEL"),
})
zerolog.SetGlobalLogger(logger)
return stream.Start(ctx, zerolog.NewStdLogger(logger), parseStreamConfig(), nil)
return stream.Run(ctx, zerolog.NewStdLogger(logger), parseStreamConfig(), nil)
}
8 changes: 8 additions & 0 deletions pkg/stream/integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ func (m *mockWebhookServer) close() {
m.Server.Close()
}

func runStream(t *testing.T, ctx context.Context, cfg *stream.Config) {
// start the configured stream listener/processor
go func() {
err := stream.Run(ctx, testLogger(), cfg, nil)
require.NoError(t, err)
}()
}

func execQuery(t *testing.T, ctx context.Context, query string) {
conn, err := pglib.NewConn(ctx, pgurl)
require.NoError(t, err)
Expand Down
6 changes: 1 addition & 5 deletions pkg/stream/integration/pg_kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ func Test_PostgresToKafka(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start the configured stream listener/processor
go func() {
err := stream.Start(ctx, testLogger(), cfg, nil)
require.NoError(t, err)
}()
runStream(t, ctx, cfg)

// use a mock processor and a kafka reader to validate the kafka messages
// are properly sent to the topic
Expand Down
6 changes: 1 addition & 5 deletions pkg/stream/integration/pg_opensearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ func Test_PostgresToOpensearch(t *testing.T) {
testSchema := "pg2os_integration_test"
execQuery(t, ctx, fmt.Sprintf("create schema %s", testSchema))

// start the configured stream listener/processor
go func() {
err := stream.Start(ctx, testLogger(), cfg, nil)
require.NoError(t, err)
}()
runStream(t, ctx, cfg)

client, err := es.NewClient(searchURL)
require.NoError(t, err)
Expand Down
6 changes: 1 addition & 5 deletions pkg/stream/integration/pg_webhook_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ func Test_PostgresToWebhook(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start the configured stream listener/processor
go func() {
err := stream.Start(ctx, testLogger(), cfg, nil)
require.NoError(t, err)
}()
runStream(t, ctx, cfg)

mockWebhookServer := newMockWebhookServer()
defer mockWebhookServer.close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/stream_start.go → pkg/stream/stream_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"golang.org/x/sync/errgroup"
)

// Start will start the configured pgstream processes. This call is blocking.
func Start(ctx context.Context, logger loglib.Logger, config *Config, meter metric.Meter) error {
// Run will run the configured pgstream processes. This call is blocking.
func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric.Meter) error {
if err := config.IsValid(); err != nil {
return fmt.Errorf("incompatible configuration: %w", err)
}
Expand Down

0 comments on commit 28bebc6

Please sign in to comment.