diff --git a/embedded/store/immustore_test.go b/embedded/store/immustore_test.go index 3b7b5b684f..64b5cfb31b 100644 --- a/embedded/store/immustore_test.go +++ b/embedded/store/immustore_test.go @@ -2293,11 +2293,11 @@ func TestUncommittedTxOverwriting(t *testing.T) { } func TestExportAndReplicateTx(t *testing.T) { - masterDir := t.TempDir() + primaryDir := t.TempDir() - masterStore, err := Open(masterDir, DefaultOptions()) + primaryStore, err := Open(primaryDir, DefaultOptions()) require.NoError(t, err) - defer immustoreClose(t, masterStore) + defer immustoreClose(t, primaryStore) replicaDir := t.TempDir() @@ -2305,7 +2305,7 @@ func TestExportAndReplicateTx(t *testing.T) { require.NoError(t, err) defer immustoreClose(t, replicaStore) - tx, err := masterStore.NewWriteOnlyTx() + tx, err := primaryStore.NewWriteOnlyTx() require.NoError(t, err) tx.WithMetadata(NewTxMetadata()) @@ -2317,9 +2317,9 @@ func TestExportAndReplicateTx(t *testing.T) { require.NoError(t, err) require.NotNil(t, hdr) - txholder := tempTxHolder(t, masterStore) + txholder := tempTxHolder(t, primaryStore) - etx, err := masterStore.ExportTx(1, false, txholder) + etx, err := primaryStore.ExportTx(1, false, txholder) require.NoError(t, err) rhdr, err := replicaStore.ReplicateTx(etx, false) @@ -2334,11 +2334,11 @@ func TestExportAndReplicateTx(t *testing.T) { } func TestExportAndReplicateTxCornerCases(t *testing.T) { - masterDir := t.TempDir() + primaryDir := t.TempDir() - masterStore, err := Open(masterDir, DefaultOptions()) + primaryStore, err := Open(primaryDir, DefaultOptions()) require.NoError(t, err) - defer immustoreClose(t, masterStore) + defer immustoreClose(t, primaryStore) replicaDir := t.TempDir() @@ -2346,7 +2346,7 @@ func TestExportAndReplicateTxCornerCases(t *testing.T) { require.NoError(t, err) defer immustoreClose(t, replicaStore) - tx, err := masterStore.NewWriteOnlyTx() + tx, err := primaryStore.NewWriteOnlyTx() require.NoError(t, err) tx.WithMetadata(NewTxMetadata()) @@ -2358,10 +2358,10 @@ func TestExportAndReplicateTxCornerCases(t *testing.T) { require.NoError(t, err) require.NotNil(t, hdr) - txholder := tempTxHolder(t, masterStore) + txholder := tempTxHolder(t, primaryStore) t.Run("prevent replicating broken data", func(t *testing.T) { - etx, err := masterStore.ExportTx(1, false, txholder) + etx, err := primaryStore.ExportTx(1, false, txholder) require.NoError(t, err) for i := range etx { @@ -2392,11 +2392,11 @@ func TestExportAndReplicateTxCornerCases(t *testing.T) { } func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) { - masterDir := t.TempDir() + primaryDir := t.TempDir() - masterStore, err := Open(masterDir, DefaultOptions()) + primaryStore, err := Open(primaryDir, DefaultOptions()) require.NoError(t, err) - defer immustoreClose(t, masterStore) + defer immustoreClose(t, primaryStore) replicaDir := t.TempDir() @@ -2409,7 +2409,7 @@ func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) { for i := 0; i < txCount; i++ { t.Run(fmt.Sprintf("tx: %d", i), func(t *testing.T) { - tx, err := masterStore.NewWriteOnlyTx() + tx, err := primaryStore.NewWriteOnlyTx() require.NoError(t, err) tx.WithMetadata(NewTxMetadata()) @@ -2422,7 +2422,7 @@ func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) { require.NotNil(t, hdr) txholder := tempTxHolder(t, replicaStore) - etx, err := masterStore.ExportTx(hdr.ID, false, txholder) + etx, err := primaryStore.ExportTx(hdr.ID, false, txholder) require.NoError(t, err) // Replicate the same transactions concurrently, only one must succeed @@ -2452,11 +2452,11 @@ func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) { } func TestExportAndReplicateTxDisorderedReplication(t *testing.T) { - masterDir := t.TempDir() + primaryDir := t.TempDir() - masterStore, err := Open(masterDir, DefaultOptions()) + primaryStore, err := Open(primaryDir, DefaultOptions()) require.NoError(t, err) - defer immustoreClose(t, masterStore) + defer immustoreClose(t, primaryStore) replicaDir := t.TempDir() @@ -2472,7 +2472,7 @@ func TestExportAndReplicateTxDisorderedReplication(t *testing.T) { txholder := tempTxHolder(t, replicaStore) for i := 0; i < txCount; i++ { - tx, err := masterStore.NewWriteOnlyTx() + tx, err := primaryStore.NewWriteOnlyTx() require.NoError(t, err) tx.WithMetadata(NewTxMetadata()) @@ -2484,7 +2484,7 @@ func TestExportAndReplicateTxDisorderedReplication(t *testing.T) { require.NoError(t, err) require.NotNil(t, hdr) - etx, err := masterStore.ExportTx(hdr.ID, false, txholder) + etx, err := primaryStore.ExportTx(hdr.ID, false, txholder) require.NoError(t, err) etxs <- etx diff --git a/pkg/integration/follower_replication_test.go b/pkg/integration/follower_replication_test.go index 489037037c..c82a1d420f 100644 --- a/pkg/integration/follower_replication_test.go +++ b/pkg/integration/follower_replication_test.go @@ -33,68 +33,68 @@ import ( ) func TestReplication(t *testing.T) { - //init master server - masterDir := t.TempDir() + //init primary server + primaryDir := t.TempDir() - masterServerOpts := server.DefaultOptions(). + primaryServerOpts := server.DefaultOptions(). WithMetricsServer(false). WithWebServer(false). WithPgsqlServer(false). WithPort(0). - WithDir(masterDir) + WithDir(primaryDir) - masterServer := server.DefaultServer().WithOptions(masterServerOpts).(*server.ImmuServer) + primaryServer := server.DefaultServer().WithOptions(primaryServerOpts).(*server.ImmuServer) - err := masterServer.Initialize() + err := primaryServer.Initialize() require.NoError(t, err) - //init follower server - followerDir := t.TempDir() + //init replica server + replicaDir := t.TempDir() - followerServerOpts := server.DefaultOptions(). + replicaServerOpts := server.DefaultOptions(). WithMetricsServer(false). WithWebServer(false). WithPgsqlServer(false). WithPort(0). - WithDir(followerDir) + WithDir(replicaDir) - followerServer := server.DefaultServer().WithOptions(followerServerOpts).(*server.ImmuServer) + replicaServer := server.DefaultServer().WithOptions(replicaServerOpts).(*server.ImmuServer) - err = followerServer.Initialize() + err = replicaServer.Initialize() require.NoError(t, err) go func() { - masterServer.Start() + primaryServer.Start() }() go func() { - followerServer.Start() + replicaServer.Start() }() time.Sleep(1 * time.Second) defer func() { - masterServer.Stop() + primaryServer.Stop() time.Sleep(1 * time.Second) - followerServer.Stop() + replicaServer.Stop() }() - // init master client - masterPort := masterServer.Listener.Addr().(*net.TCPAddr).Port - masterClient, err := ic.NewImmuClient(ic.DefaultOptions().WithPort(masterPort)) + // init primary client + primaryPort := primaryServer.Listener.Addr().(*net.TCPAddr).Port + primaryClient, err := ic.NewImmuClient(ic.DefaultOptions().WithPort(primaryPort)) require.NoError(t, err) - require.NotNil(t, masterClient) + require.NotNil(t, primaryClient) - mlr, err := masterClient.Login(context.TODO(), []byte(`immudb`), []byte(`immudb`)) + mlr, err := primaryClient.Login(context.TODO(), []byte(`immudb`), []byte(`immudb`)) require.NoError(t, err) mmd := metadata.Pairs("authorization", mlr.Token) - mctx := metadata.NewOutgoingContext(context.Background(), mmd) + pctx := metadata.NewOutgoingContext(context.Background(), mmd) - // create database as masterdb in master server - _, err = masterClient.CreateDatabaseV2(mctx, "masterdb", &schema.DatabaseNullableSettings{ + // create database as primarydb in primary server + _, err = primaryClient.CreateDatabaseV2(pctx, "primarydb", &schema.DatabaseNullableSettings{ ReplicationSettings: &schema.ReplicationNullableSettings{ SyncReplication: &schema.NullableBool{Value: true}, SyncAcks: &schema.NullableUint32{Value: 1}, @@ -102,40 +102,40 @@ func TestReplication(t *testing.T) { }) require.NoError(t, err) - mdb, err := masterClient.UseDatabase(mctx, &schema.Database{DatabaseName: "masterdb"}) + mdb, err := primaryClient.UseDatabase(pctx, &schema.Database{DatabaseName: "primarydb"}) require.NoError(t, err) require.NotNil(t, mdb) mmd = metadata.Pairs("authorization", mdb.Token) - mctx = metadata.NewOutgoingContext(context.Background(), mmd) + pctx = metadata.NewOutgoingContext(context.Background(), mmd) - err = masterClient.CreateUser(mctx, []byte("follower"), []byte("follower1Pwd!"), auth.PermissionAdmin, "masterdb") + err = primaryClient.CreateUser(pctx, []byte("replicator"), []byte("replicator1Pwd!"), auth.PermissionAdmin, "primarydb") require.NoError(t, err) - err = masterClient.SetActiveUser(mctx, &schema.SetActiveUserRequest{Active: true, Username: "follower"}) + err = primaryClient.SetActiveUser(pctx, &schema.SetActiveUserRequest{Active: true, Username: "replicator"}) require.NoError(t, err) - // init follower client - followerPort := followerServer.Listener.Addr().(*net.TCPAddr).Port - followerClient, err := ic.NewImmuClient(ic.DefaultOptions().WithPort(followerPort)) + // init replica client + replicaPort := replicaServer.Listener.Addr().(*net.TCPAddr).Port + replicaClient, err := ic.NewImmuClient(ic.DefaultOptions().WithPort(replicaPort)) require.NoError(t, err) - require.NotNil(t, followerClient) + require.NotNil(t, replicaClient) - flr, err := followerClient.Login(context.TODO(), []byte(`immudb`), []byte(`immudb`)) + flr, err := replicaClient.Login(context.TODO(), []byte(`immudb`), []byte(`immudb`)) require.NoError(t, err) fmd := metadata.Pairs("authorization", flr.Token) - fctx := metadata.NewOutgoingContext(context.Background(), fmd) + rctx := metadata.NewOutgoingContext(context.Background(), fmd) - // create database as replica in follower server - _, err = followerClient.CreateDatabaseV2(fctx, "replicadb", &schema.DatabaseNullableSettings{ + // create database as replica in replica server + _, err = replicaClient.CreateDatabaseV2(rctx, "replicadb", &schema.DatabaseNullableSettings{ ReplicationSettings: &schema.ReplicationNullableSettings{ Replica: &schema.NullableBool{Value: true}, SyncReplication: &schema.NullableBool{Value: true}, - PrimaryDatabase: &schema.NullableString{Value: "masterdb"}, + PrimaryDatabase: &schema.NullableString{Value: "primarydb"}, PrimaryHost: &schema.NullableString{Value: "127.0.0.1"}, - PrimaryPort: &schema.NullableUint32{Value: uint32(masterPort)}, - PrimaryUsername: &schema.NullableString{Value: "follower"}, + PrimaryPort: &schema.NullableUint32{Value: uint32(primaryPort)}, + PrimaryUsername: &schema.NullableString{Value: "replicator"}, PrimaryPassword: &schema.NullableString{Value: "wrongPassword"}, }, }) @@ -143,147 +143,147 @@ func TestReplication(t *testing.T) { time.Sleep(1 * time.Second) - _, err = followerClient.UpdateDatabaseV2(fctx, "replicadb", &schema.DatabaseNullableSettings{ + _, err = replicaClient.UpdateDatabaseV2(rctx, "replicadb", &schema.DatabaseNullableSettings{ ReplicationSettings: &schema.ReplicationNullableSettings{ - PrimaryPassword: &schema.NullableString{Value: "follower1Pwd!"}, + PrimaryPassword: &schema.NullableString{Value: "replicator1Pwd!"}, }, }) require.NoError(t, err) - fdb, err := followerClient.UseDatabase(fctx, &schema.Database{DatabaseName: "replicadb"}) + fdb, err := replicaClient.UseDatabase(rctx, &schema.Database{DatabaseName: "replicadb"}) require.NoError(t, err) require.NotNil(t, fdb) fmd = metadata.Pairs("authorization", fdb.Token) - fctx = metadata.NewOutgoingContext(context.Background(), fmd) + rctx = metadata.NewOutgoingContext(context.Background(), fmd) t.Run("key1 should not exist", func(t *testing.T) { - _, err = followerClient.Get(fctx, []byte("key1")) + _, err = replicaClient.Get(rctx, []byte("key1")) require.Error(t, err) require.Contains(t, err.Error(), "key not found") }) - _, err = masterClient.Set(mctx, []byte("key1"), []byte("value1")) + _, err = primaryClient.Set(pctx, []byte("key1"), []byte("value1")) require.NoError(t, err) - _, err = masterClient.Set(mctx, []byte("key2"), []byte("value2")) + _, err = primaryClient.Set(pctx, []byte("key2"), []byte("value2")) require.NoError(t, err) time.Sleep(1 * time.Second) - t.Run("key1 should exist in replicadb@follower", func(t *testing.T) { - _, err = followerClient.Get(fctx, []byte("key1")) + t.Run("key1 should exist in replicadb@replica", func(t *testing.T) { + _, err = replicaClient.Get(rctx, []byte("key1")) require.NoError(t, err) }) - fdb, err = followerClient.UseDatabase(fctx, &schema.Database{DatabaseName: "defaultdb"}) + fdb, err = replicaClient.UseDatabase(rctx, &schema.Database{DatabaseName: "defaultdb"}) require.NoError(t, err) require.NotNil(t, fdb) fmd = metadata.Pairs("authorization", fdb.Token) - fctx = metadata.NewOutgoingContext(context.Background(), fmd) + rctx = metadata.NewOutgoingContext(context.Background(), fmd) - t.Run("key1 should not exist in defaultdb@follower", func(t *testing.T) { - _, err = followerClient.Get(fctx, []byte("key1")) + t.Run("key1 should not exist in defaultdb@replica", func(t *testing.T) { + _, err = replicaClient.Get(rctx, []byte("key1")) require.Contains(t, err.Error(), "key not found") }) } func TestSystemDBAndDefaultDBReplication(t *testing.T) { - //init master server - masterDir, err := ioutil.TempDir("", "master-data") + // init primary server + primaryDir, err := ioutil.TempDir("", "primary-data") require.NoError(t, err) - defer os.RemoveAll(masterDir) + defer os.RemoveAll(primaryDir) - masterServerOpts := server.DefaultOptions(). + primaryServerOpts := server.DefaultOptions(). WithMetricsServer(false). WithWebServer(false). WithPgsqlServer(false). WithPort(0). - WithDir(masterDir) + WithDir(primaryDir) - masterServer := server.DefaultServer().WithOptions(masterServerOpts).(*server.ImmuServer) + primaryServer := server.DefaultServer().WithOptions(primaryServerOpts).(*server.ImmuServer) - err = masterServer.Initialize() + err = primaryServer.Initialize() require.NoError(t, err) go func() { - masterServer.Start() + primaryServer.Start() }() time.Sleep(1 * time.Second) - defer masterServer.Stop() + defer primaryServer.Stop() - // init master client - masterPort := masterServer.Listener.Addr().(*net.TCPAddr).Port - masterClient := ic.NewClient().WithOptions(ic.DefaultOptions().WithPort(masterPort)) - require.NotNil(t, masterClient) + // init primary client + primaryPort := primaryServer.Listener.Addr().(*net.TCPAddr).Port + primaryClient := ic.NewClient().WithOptions(ic.DefaultOptions().WithPort(primaryPort)) + require.NotNil(t, primaryClient) - err = masterClient.OpenSession(context.Background(), []byte(`immudb`), []byte(`immudb`), "defaultdb") + err = primaryClient.OpenSession(context.Background(), []byte(`immudb`), []byte(`immudb`), "defaultdb") require.NoError(t, err) - defer masterClient.CloseSession(context.Background()) + defer primaryClient.CloseSession(context.Background()) - //init follower server - followerDir, err := ioutil.TempDir("", "follower-data") + // init replica server + replicaDir, err := ioutil.TempDir("", "replica-data") require.NoError(t, err) - defer os.RemoveAll(followerDir) + defer os.RemoveAll(replicaDir) replicationOpts := &server.ReplicationOptions{ IsReplica: true, - MasterAddress: "127.0.0.1", - MasterPort: masterPort, - FollowerUsername: "immudb", - FollowerPassword: "immudb", + PrimaryHost: "127.0.0.1", + PrimaryPort: primaryPort, + PrimaryUsername: "immudb", + PrimaryPassword: "immudb", PrefetchTxBufferSize: 100, ReplicationCommitConcurrency: 1, } - followerServerOpts := server.DefaultOptions(). + replicaServerOpts := server.DefaultOptions(). WithMetricsServer(false). WithWebServer(false). WithPgsqlServer(false). WithPort(0). - WithDir(followerDir). + WithDir(replicaDir). WithReplicationOptions(replicationOpts) - followerServer := server.DefaultServer().WithOptions(followerServerOpts).(*server.ImmuServer) + replicaServer := server.DefaultServer().WithOptions(replicaServerOpts).(*server.ImmuServer) - err = followerServer.Initialize() + err = replicaServer.Initialize() require.NoError(t, err) go func() { - followerServer.Start() + replicaServer.Start() }() time.Sleep(1 * time.Second) - defer followerServer.Stop() + defer replicaServer.Stop() - // init follower client - followerPort := followerServer.Listener.Addr().(*net.TCPAddr).Port - followerClient := ic.NewClient().WithOptions(ic.DefaultOptions().WithPort(followerPort)) - require.NotNil(t, followerClient) + // init replica client + replicaPort := replicaServer.Listener.Addr().(*net.TCPAddr).Port + replicaClient := ic.NewClient().WithOptions(ic.DefaultOptions().WithPort(replicaPort)) + require.NotNil(t, replicaClient) - err = followerClient.OpenSession(context.Background(), []byte(`immudb`), []byte(`immudb`), "defaultdb") + err = replicaClient.OpenSession(context.Background(), []byte(`immudb`), []byte(`immudb`), "defaultdb") require.NoError(t, err) - defer followerClient.CloseSession(context.Background()) + defer replicaClient.CloseSession(context.Background()) t.Run("key1 should not exist", func(t *testing.T) { - _, err = followerClient.Get(context.Background(), []byte("key1")) + _, err = replicaClient.Get(context.Background(), []byte("key1")) require.Error(t, err) require.Contains(t, err.Error(), "key not found") }) - _, err = masterClient.Set(context.Background(), []byte("key1"), []byte("value1")) + _, err = primaryClient.Set(context.Background(), []byte("key1"), []byte("value1")) require.NoError(t, err) time.Sleep(1 * time.Second) - t.Run("key1 should exist in replicateddb@follower", func(t *testing.T) { - _, err = followerClient.Get(context.Background(), []byte("key1")) + t.Run("key1 should exist in replicateddb@replica", func(t *testing.T) { + _, err = replicaClient.Get(context.Background(), []byte("key1")) require.NoError(t, err) }) - _, err = followerClient.Set(context.Background(), []byte("key2"), []byte("value2")) + _, err = replicaClient.Set(context.Background(), []byte("key2"), []byte("value2")) require.Contains(t, err.Error(), "database is read-only because it's a replica") } diff --git a/pkg/integration/replication/suite.go b/pkg/integration/replication/suite.go index 087ee02cd3..3e3ad3ba2d 100644 --- a/pkg/integration/replication/suite.go +++ b/pkg/integration/replication/suite.go @@ -16,10 +16,10 @@ import ( ) const ( - masterDBName = "masterdb" + primaryDBName = "primarydb" replicaDBName = "replicadb" - replicaUsername = "follower" - replicaPassword = "follower1Pwd!" + primaryUsername = "replicator" + primaryPassword = "replicator1Pwd!" ) // TestServer is an abstract representation of a TestServer @@ -48,89 +48,89 @@ type baseReplicationTestSuite struct { mu sync.Mutex // server settings - master TestServer - masterDBName string - masterRunning bool + primary TestServer + primaryDBName string + primaryRunning bool - followers []TestServer - followersDBName []string - followersRunning []bool + replicas []TestServer + replicasDBName []string + replicasRunning []bool clientStateDir string } -func (suite *baseReplicationTestSuite) GetFollowersCount() int { +func (suite *baseReplicationTestSuite) GetReplicasCount() int { suite.mu.Lock() defer suite.mu.Unlock() - return len(suite.followers) + return len(suite.replicas) } -func (suite *baseReplicationTestSuite) AddFollower(sync bool) int { +func (suite *baseReplicationTestSuite) AddReplica(sync bool) int { suite.mu.Lock() defer suite.mu.Unlock() - follower := suite.srvProvider.AddServer(suite.T()) + replica := suite.srvProvider.AddServer(suite.T()) - followerNum := len(suite.followers) - suite.followers = append(suite.followers, follower) - suite.followersDBName = append(suite.followersDBName, replicaDBName) - suite.followersRunning = append(suite.followersRunning, true) + replicaNum := len(suite.replicas) + suite.replicas = append(suite.replicas, replica) + suite.replicasDBName = append(suite.replicasDBName, replicaDBName) + suite.replicasRunning = append(suite.replicasRunning, true) - fctx, followerClient, cleanup := suite.internalClientFor(follower, client.DefaultDB) + rctx, replicaClient, cleanup := suite.internalClientFor(replica, client.DefaultDB) defer cleanup() - masterHost, masterPort := suite.master.Address(suite.T()) + primaryHost, primaryPort := suite.primary.Address(suite.T()) settings := &schema.DatabaseNullableSettings{ ReplicationSettings: &schema.ReplicationNullableSettings{ Replica: &schema.NullableBool{Value: true}, SyncReplication: &schema.NullableBool{Value: sync}, - PrimaryDatabase: &schema.NullableString{Value: suite.masterDBName}, - PrimaryHost: &schema.NullableString{Value: masterHost}, - PrimaryPort: &schema.NullableUint32{Value: uint32(masterPort)}, - PrimaryUsername: &schema.NullableString{Value: replicaUsername}, - PrimaryPassword: &schema.NullableString{Value: replicaPassword}, + PrimaryDatabase: &schema.NullableString{Value: suite.primaryDBName}, + PrimaryHost: &schema.NullableString{Value: primaryHost}, + PrimaryPort: &schema.NullableUint32{Value: uint32(primaryPort)}, + PrimaryUsername: &schema.NullableString{Value: primaryUsername}, + PrimaryPassword: &schema.NullableString{Value: primaryPassword}, }, } - // init database on the follower to replicate - _, err := followerClient.CreateDatabaseV2(fctx, replicaDBName, settings) + // init database on the replica to replicate + _, err := replicaClient.CreateDatabaseV2(rctx, replicaDBName, settings) require.NoError(suite.T(), err) - return followerNum + return replicaNum } -func (suite *baseReplicationTestSuite) StopFollower(followerNum int) { +func (suite *baseReplicationTestSuite) StopReplica(replicaNum int) { suite.mu.Lock() defer suite.mu.Unlock() - f := suite.followers[followerNum] + f := suite.replicas[replicaNum] f.Shutdown(suite.T()) - suite.followersRunning[followerNum] = false + suite.replicasRunning[replicaNum] = false } -func (suite *baseReplicationTestSuite) StartFollower(followerNum int) { +func (suite *baseReplicationTestSuite) StartReplica(replicaNum int) { suite.mu.Lock() defer suite.mu.Unlock() - f := suite.followers[followerNum] + f := suite.replicas[replicaNum] f.Start(suite.T()) - suite.followersRunning[followerNum] = true + suite.replicasRunning[replicaNum] = true } -func (suite *baseReplicationTestSuite) PromoteFollower(followerNum, syncAcks int) { +func (suite *baseReplicationTestSuite) PromoteReplica(replicaNum, syncAcks int) { suite.mu.Lock() defer suite.mu.Unlock() - // set follower as new master and current master as follower - suite.master, suite.followers[followerNum] = suite.followers[followerNum], suite.master - suite.masterDBName, suite.followersDBName[followerNum] = suite.followersDBName[followerNum], suite.masterDBName + // set replica as new primary and current primary as replica + suite.primary, suite.replicas[replicaNum] = suite.replicas[replicaNum], suite.primary + suite.primaryDBName, suite.replicasDBName[replicaNum] = suite.replicasDBName[replicaNum], suite.primaryDBName - mctx, mClient, cleanup := suite.internalClientFor(suite.master, suite.masterDBName) + mctx, mClient, cleanup := suite.internalClientFor(suite.primary, suite.primaryDBName) defer cleanup() - _, err := mClient.UpdateDatabaseV2(mctx, suite.masterDBName, &schema.DatabaseNullableSettings{ + _, err := mClient.UpdateDatabaseV2(mctx, suite.primaryDBName, &schema.DatabaseNullableSettings{ ReplicationSettings: &schema.ReplicationNullableSettings{ Replica: &schema.NullableBool{Value: false}, SyncReplication: &schema.NullableBool{Value: syncAcks > 0}, @@ -139,45 +139,45 @@ func (suite *baseReplicationTestSuite) PromoteFollower(followerNum, syncAcks int }) require.NoError(suite.T(), err) - mdb, err := mClient.UseDatabase(mctx, &schema.Database{DatabaseName: suite.masterDBName}) + mdb, err := mClient.UseDatabase(mctx, &schema.Database{DatabaseName: suite.primaryDBName}) require.NoError(suite.T(), err) require.NotNil(suite.T(), mdb) - err = mClient.CreateUser(mctx, []byte(replicaUsername), []byte(replicaPassword), auth.PermissionAdmin, suite.masterDBName) + err = mClient.CreateUser(mctx, []byte(primaryUsername), []byte(primaryPassword), auth.PermissionAdmin, suite.primaryDBName) require.NoError(suite.T(), err) - err = mClient.SetActiveUser(mctx, &schema.SetActiveUserRequest{Active: true, Username: replicaUsername}) + err = mClient.SetActiveUser(mctx, &schema.SetActiveUserRequest{Active: true, Username: primaryUsername}) require.NoError(suite.T(), err) - host, port := suite.master.Address(suite.T()) + host, port := suite.primary.Address(suite.T()) - for i, _ := range suite.followers { - ctx, client, cleanup := suite.internalClientFor(suite.followers[i], suite.followersDBName[i]) + for i, _ := range suite.replicas { + ctx, client, cleanup := suite.internalClientFor(suite.replicas[i], suite.replicasDBName[i]) defer cleanup() - _, err = client.UpdateDatabaseV2(ctx, suite.followersDBName[i], &schema.DatabaseNullableSettings{ + _, err = client.UpdateDatabaseV2(ctx, suite.replicasDBName[i], &schema.DatabaseNullableSettings{ ReplicationSettings: &schema.ReplicationNullableSettings{ Replica: &schema.NullableBool{Value: true}, PrimaryHost: &schema.NullableString{Value: host}, PrimaryPort: &schema.NullableUint32{Value: uint32(port)}, - PrimaryDatabase: &schema.NullableString{Value: suite.masterDBName}, - PrimaryUsername: &schema.NullableString{Value: replicaUsername}, - PrimaryPassword: &schema.NullableString{Value: replicaPassword}, + PrimaryDatabase: &schema.NullableString{Value: suite.primaryDBName}, + PrimaryUsername: &schema.NullableString{Value: primaryUsername}, + PrimaryPassword: &schema.NullableString{Value: primaryPassword}, }, }) require.NoError(suite.T(), err) } } -func (suite *baseReplicationTestSuite) StartMaster(syncAcks int) { +func (suite *baseReplicationTestSuite) StartPrimary(syncAcks int) { suite.mu.Lock() defer suite.mu.Unlock() - require.Nil(suite.T(), suite.master) + require.Nil(suite.T(), suite.primary) srv := suite.srvProvider.AddServer(suite.T()) - suite.master = srv + suite.primary = srv mctx, client, cleanup := suite.internalClientFor(srv, client.DefaultDB) defer cleanup() @@ -191,40 +191,40 @@ func (suite *baseReplicationTestSuite) StartMaster(syncAcks int) { } } - _, err := client.CreateDatabaseV2(mctx, suite.masterDBName, settings) + _, err := client.CreateDatabaseV2(mctx, suite.primaryDBName, settings) require.NoError(suite.T(), err) - mdb, err := client.UseDatabase(mctx, &schema.Database{DatabaseName: suite.masterDBName}) + mdb, err := client.UseDatabase(mctx, &schema.Database{DatabaseName: suite.primaryDBName}) require.NoError(suite.T(), err) require.NotNil(suite.T(), mdb) - err = client.CreateUser(mctx, []byte(replicaUsername), []byte(replicaPassword), auth.PermissionAdmin, suite.masterDBName) + err = client.CreateUser(mctx, []byte(primaryUsername), []byte(primaryPassword), auth.PermissionAdmin, suite.primaryDBName) require.NoError(suite.T(), err) - err = client.SetActiveUser(mctx, &schema.SetActiveUserRequest{Active: true, Username: replicaUsername}) + err = client.SetActiveUser(mctx, &schema.SetActiveUserRequest{Active: true, Username: primaryUsername}) require.NoError(suite.T(), err) - suite.masterRunning = true + suite.primaryRunning = true } -func (suite *baseReplicationTestSuite) StopMaster() { +func (suite *baseReplicationTestSuite) StopPrimary() { suite.mu.Lock() defer suite.mu.Unlock() - require.NotNil(suite.T(), suite.master) + require.NotNil(suite.T(), suite.primary) - suite.master.Shutdown(suite.T()) - suite.masterRunning = false + suite.primary.Shutdown(suite.T()) + suite.primaryRunning = false } -func (suite *baseReplicationTestSuite) RestartMaster() { - suite.StopMaster() +func (suite *baseReplicationTestSuite) RestartPrimary() { + suite.StopPrimary() suite.mu.Lock() defer suite.mu.Unlock() - suite.master.Start(suite.T()) - suite.masterRunning = true + suite.primary.Start(suite.T()) + suite.primaryRunning = true } func (suite *baseReplicationTestSuite) internalClientFor(srv TestServer, dbName string) (context.Context, client.ImmuClient, func()) { @@ -249,18 +249,18 @@ func (suite *baseReplicationTestSuite) internalClientFor(srv TestServer, dbName return context.Background(), c, func() { c.CloseSession(context.Background()) } } -func (suite *baseReplicationTestSuite) ClientForMaster() (mctx context.Context, client client.ImmuClient, cleanup func()) { +func (suite *baseReplicationTestSuite) ClientForPrimary() (mctx context.Context, client client.ImmuClient, cleanup func()) { suite.mu.Lock() defer suite.mu.Unlock() - return suite.internalClientFor(suite.master, suite.masterDBName) + return suite.internalClientFor(suite.primary, suite.primaryDBName) } -func (suite *baseReplicationTestSuite) ClientForReplica(replicaNum int) (fctx context.Context, client client.ImmuClient, cleanup func()) { +func (suite *baseReplicationTestSuite) ClientForReplica(replicaNum int) (rctx context.Context, client client.ImmuClient, cleanup func()) { suite.mu.Lock() defer suite.mu.Unlock() - return suite.internalClientFor(suite.followers[replicaNum], suite.followersDBName[replicaNum]) + return suite.internalClientFor(suite.replicas[replicaNum], suite.replicasDBName[replicaNum]) } func (suite *baseReplicationTestSuite) WaitForCommittedTx( @@ -288,9 +288,9 @@ func (suite *baseReplicationTestSuite) WaitForCommittedTx( } func (suite *baseReplicationTestSuite) SetupCluster(syncReplicas, syncAcks, asyncReplicas int) { - suite.masterDBName = masterDBName + suite.primaryDBName = primaryDBName - suite.StartMaster(syncAcks) + suite.StartPrimary(syncAcks) wg := sync.WaitGroup{} @@ -298,7 +298,7 @@ func (suite *baseReplicationTestSuite) SetupCluster(syncReplicas, syncAcks, asyn wg.Add(1) go func() { defer wg.Done() - suite.AddFollower(true) + suite.AddReplica(true) }() } @@ -306,7 +306,7 @@ func (suite *baseReplicationTestSuite) SetupCluster(syncReplicas, syncAcks, asyn wg.Add(1) go func() { defer wg.Done() - suite.AddFollower(false) + suite.AddReplica(false) }() } @@ -314,11 +314,11 @@ func (suite *baseReplicationTestSuite) SetupCluster(syncReplicas, syncAcks, asyn } func (suite *baseReplicationTestSuite) ValidateClusterSetup() { - uuids := make(map[string]struct{}, 1+suite.GetFollowersCount()) + uuids := make(map[string]struct{}, 1+suite.GetReplicasCount()) - uuids[suite.master.UUID(suite.T()).String()] = struct{}{} + uuids[suite.primary.UUID(suite.T()).String()] = struct{}{} - for _, f := range suite.followers { + for _, f := range suite.replicas { uuid := f.UUID(suite.T()).String() if _, ok := uuids[uuid]; ok { @@ -346,18 +346,18 @@ func (suite *baseReplicationTestSuite) TearDownTest() { suite.mu.Lock() defer suite.mu.Unlock() - // stop followers - for i, srv := range suite.followers { - if suite.followersRunning[i] { + // stop replicas + for i, srv := range suite.replicas { + if suite.replicasRunning[i] { srv.Shutdown(suite.T()) } } - suite.followers = []TestServer{} + suite.replicas = []TestServer{} - // stop master - if suite.master != nil { - suite.master.Shutdown(suite.T()) - suite.master = nil + // stop primary + if suite.primary != nil { + suite.primary.Shutdown(suite.T()) + suite.primary = nil } - suite.master = nil + suite.primary = nil } diff --git a/pkg/integration/replication/synchronous_replication_test.go b/pkg/integration/replication/synchronous_replication_test.go index 771bd38a9f..46d8de2309 100644 --- a/pkg/integration/replication/synchronous_replication_test.go +++ b/pkg/integration/replication/synchronous_replication_test.go @@ -13,23 +13,23 @@ import ( "github.com/stretchr/testify/suite" ) -type SyncTestSuiteMasterToAllFollowers struct { +type SyncTestSuitePrimaryToAllReplicas struct { baseReplicationTestSuite } -func TestSyncTestSuiteMasterToAllFollowers(t *testing.T) { - suite.Run(t, &SyncTestSuiteMasterToAllFollowers{}) +func TestSyncTestSuitePrimaryToAllReplicas(t *testing.T) { + suite.Run(t, &SyncTestSuitePrimaryToAllReplicas{}) } // this function executes before the test suite begins execution -func (suite *SyncTestSuiteMasterToAllFollowers) SetupTest() { +func (suite *SyncTestSuitePrimaryToAllReplicas) SetupTest() { suite.baseReplicationTestSuite.SetupTest() suite.SetupCluster(2, 2, 0) suite.ValidateClusterSetup() } -func (suite *SyncTestSuiteMasterToAllFollowers) TestSyncFromMasterToAllFollowers() { - ctx, client, cleanup := suite.ClientForMaster() +func (suite *SyncTestSuitePrimaryToAllReplicas) TestSyncFromPrimaryToAllReplicas() { + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() tx1, err := client.Set(ctx, []byte("key1"), []byte("value1")) @@ -38,14 +38,14 @@ func (suite *SyncTestSuiteMasterToAllFollowers) TestSyncFromMasterToAllFollowers tx2, err := client.Set(ctx, []byte("key2"), []byte("value2")) require.NoError(suite.T(), err) - for i := 0; i < suite.GetFollowersCount(); i++ { + for i := 0; i < suite.GetReplicasCount(); i++ { suite.Run(fmt.Sprintf("test replica %d", i), func() { ctx, client, cleanup := suite.ClientForReplica(i) defer cleanup() // Tests are flaky because it takes time to commit the // precommitted TX, so this function just ensures the state - // is in sync between master and follower + // is in sync between primary and replica suite.WaitForCommittedTx(ctx, client, tx2.Id, time.Duration(3)*time.Second) val, err := client.GetAt(ctx, []byte("key1"), tx1.Id) @@ -59,26 +59,26 @@ func (suite *SyncTestSuiteMasterToAllFollowers) TestSyncFromMasterToAllFollowers } } -type SyncTestSuiteMasterRestart struct { +type SyncTestSuitePrimaryRestart struct { baseReplicationTestSuite } -func TestSyncTestSuiteMasterRestart(t *testing.T) { - suite.Run(t, &SyncTestSuiteMasterRestart{}) +func TestSyncTestSuitePrimaryRestart(t *testing.T) { + suite.Run(t, &SyncTestSuitePrimaryRestart{}) } // this function executes before the test suite begins execution -func (suite *SyncTestSuiteMasterRestart) SetupTest() { +func (suite *SyncTestSuitePrimaryRestart) SetupTest() { suite.baseReplicationTestSuite.SetupTest() suite.SetupCluster(2, 2, 0) suite.ValidateClusterSetup() } -func (suite *SyncTestSuiteMasterRestart) TestMasterRestart() { +func (suite *SyncTestSuitePrimaryRestart) TestPrimaryRestart() { var txBeforeRestart *schema.TxHeader suite.Run("commit before restarting primary", func() { - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() tx, err := client.Set(ctx, []byte("key-before-restart"), []byte("value-before-restart")) @@ -87,23 +87,23 @@ func (suite *SyncTestSuiteMasterRestart) TestMasterRestart() { txBeforeRestart = tx }) - suite.RestartMaster() + suite.RestartPrimary() - suite.Run("commit after restarting master", func() { - ctx, client, cleanup := suite.ClientForMaster() + suite.Run("commit after restarting primary", func() { + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() tx, err := client.Set(ctx, []byte("key3"), []byte("value3")) require.NoError(suite.T(), err) - for i := 0; i < suite.GetFollowersCount(); i++ { - suite.Run(fmt.Sprintf("check follower %d", i), func() { + for i := 0; i < suite.GetReplicasCount(); i++ { + suite.Run(fmt.Sprintf("check replica %d", i), func() { ctx, client, cleanup := suite.ClientForReplica(i) defer cleanup() // Tests are flaky because it takes time to commit the // precommitted TX, so this function just ensures the state - // is in sync between master and follower + // is in sync between primary and replica suite.WaitForCommittedTx(ctx, client, tx.Id, 30*time.Second) // Longer time since replica must reestablish connection to the primary val, err := client.GetAt(ctx, []byte("key3"), tx.Id) @@ -133,42 +133,42 @@ func (suite *SyncTestSuitePrecommitStateSync) SetupTest() { suite.ValidateClusterSetup() } -// TestPrecommitStateSync checks if the precommit state at master -// and its followers are in sync during synchronous replication +// TestPrecommitStateSync checks if the precommit state at primary +// and its replicas are in sync during synchronous replication func (suite *SyncTestSuitePrecommitStateSync) TestPrecommitStateSync() { var ( - masterState *schema.ImmutableState - err error - startCh = make(chan bool) + primaryState *schema.ImmutableState + err error + startCh = make(chan bool) ) - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() // Create goroutines for client waiting to query the state - // of the followers. This is initialised before to avoid - // spending time initialising the follower client for faster + // of the replicas. This is initialized before to avoid + // spending time initializing the replica client for faster // state access var wg sync.WaitGroup - for i := 0; i < suite.GetFollowersCount(); i++ { + for i := 0; i < suite.GetReplicasCount(); i++ { wg.Add(1) - go func(followerID int) { + go func(replicaID int) { defer wg.Done() - ctx, client, cleanup := suite.ClientForReplica(followerID) + ctx, client, cleanup := suite.ClientForReplica(replicaID) defer cleanup() <-startCh - suite.Run(fmt.Sprintf("test replica sync state %d", followerID), func() { + suite.Run(fmt.Sprintf("test replica sync state %d", replicaID), func() { state, err := client.CurrentState(ctx) require.NoError(suite.T(), err) - suite.Require().Equal(state.PrecommittedTxId, masterState.TxId) - suite.Require().Equal(state.PrecommittedTxHash, masterState.TxHash) + suite.Require().Equal(state.PrecommittedTxId, primaryState.TxId) + suite.Require().Equal(state.PrecommittedTxHash, primaryState.TxHash) }) }(i) } - // add multiple keys to make update the master's state quickly + // add multiple keys to make update the primary's state quickly for i := 10; i < 30; i++ { key := fmt.Sprintf("key%d", i) value := fmt.Sprintf("value%d", i) @@ -176,8 +176,8 @@ func (suite *SyncTestSuitePrecommitStateSync) TestPrecommitStateSync() { require.NoError(suite.T(), err) } - // get the current precommit txn id state of master - masterState, err = client.CurrentState(ctx) + // get the current precommit txn id state of primary + primaryState, err = client.CurrentState(ctx) require.NoError(suite.T(), err) // close will unblock all goroutines @@ -186,30 +186,30 @@ func (suite *SyncTestSuitePrecommitStateSync) TestPrecommitStateSync() { wg.Wait() } -type SyncTestMinimumFollowersSuite struct { +type SyncTestMinimumReplicasSuite struct { baseReplicationTestSuite } -func TestSyncTestMinimumFollowersSuite(t *testing.T) { - suite.Run(t, &SyncTestMinimumFollowersSuite{}) +func TestSyncTestMinimumReplicasSuite(t *testing.T) { + suite.Run(t, &SyncTestMinimumReplicasSuite{}) } // this function executes before the test suite begins execution -func (suite *SyncTestMinimumFollowersSuite) SetupTest() { +func (suite *SyncTestMinimumReplicasSuite) SetupTest() { suite.baseReplicationTestSuite.SetupTest() suite.SetupCluster(4, 2, 0) suite.ValidateClusterSetup() } -// TestMinimumFollowers ensures the primary can operate as long as the minimum +// TestMinimumReplicas ensures the primary can operate as long as the minimum // number of replicas send their confirmations -func (suite *SyncTestMinimumFollowersSuite) TestMinimumFollowers() { +func (suite *SyncTestMinimumReplicasSuite) TestMinimumReplicas() { - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() suite.Run("should commit successfully without one replica", func() { - suite.StopFollower(0) + suite.StopReplica(0) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -219,7 +219,7 @@ func (suite *SyncTestMinimumFollowersSuite) TestMinimumFollowers() { }) suite.Run("should commit successfully without two replicas", func() { - suite.StopFollower(1) + suite.StopReplica(1) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -229,7 +229,7 @@ func (suite *SyncTestMinimumFollowersSuite) TestMinimumFollowers() { }) suite.Run("should not commit without three replicas", func() { - suite.StopFollower(2) + suite.StopReplica(2) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -240,7 +240,7 @@ func (suite *SyncTestMinimumFollowersSuite) TestMinimumFollowers() { }) suite.Run("should commit again once first replica is back online", func() { - suite.StartFollower(0) + suite.StartReplica(0) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -250,11 +250,11 @@ func (suite *SyncTestMinimumFollowersSuite) TestMinimumFollowers() { }) suite.Run("should recover with all replicas replaced", func() { - suite.StopFollower(0) - suite.StopFollower(3) + suite.StopReplica(0) + suite.StopReplica(3) - suite.AddFollower(true) - suite.AddFollower(true) + suite.AddReplica(true) + suite.AddReplica(true) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -304,9 +304,9 @@ func (suite *SyncTestRecoverySpeedSuite) TestReplicaRecoverySpeed() { const parallelWriters = 30 const samplingTime = time.Second * 5 - // Stop the follower, we don't replicate any transactions to it now + // Stop the replica, we don't replicate any transactions to it now // but we can still commit using the second replica - suite.StopFollower(0) + suite.StopReplica(0) var txWritten uint64 @@ -323,7 +323,7 @@ func (suite *SyncTestRecoverySpeedSuite) TestReplicaRecoverySpeed() { go func(i int) { defer wgFinish.Done() - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() // Wait for the start signal @@ -365,10 +365,10 @@ func (suite *SyncTestRecoverySpeedSuite) TestReplicaRecoverySpeed() { suite.Run("Ensure replica can recover in reasonable amount of time", func() { - // Stop the second follower, now the DB is locked - suite.StopFollower(1) + // Stop the second replica, now the DB is locked + suite.StopReplica(1) - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() state, err := client.CurrentState(ctx) @@ -377,23 +377,23 @@ func (suite *SyncTestRecoverySpeedSuite) TestReplicaRecoverySpeed() { // Check if we can recover the cluster and perform write within the double the amount of time // that was needed for initial sampling. The replica that was initially stopped and now - // started has the same amount of transaction to grab from master as the other one + // started has the same amount of transaction to grab from primary as the other one // which should take the same amount of time as the initial write period or less // (since the primary is not persisting data this time). ctxTimeout, cancel := context.WithTimeout(ctx, samplingTime*2) defer cancel() - suite.StartFollower(0) // 1 down + suite.StartReplica(0) // 1 down tx, err = client.Set(ctxTimeout, []byte("key-after-recovery"), []byte("value-after-recovery")) suite.NoError(err) }) suite.Run("Ensure the data is readable from replicas", func() { - suite.StartFollower(1) + suite.StartReplica(1) suite.Run("primary", func() { - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() val, err := client.GetAt(ctx, []byte("key-after-recovery"), tx.Id) @@ -401,7 +401,7 @@ func (suite *SyncTestRecoverySpeedSuite) TestReplicaRecoverySpeed() { suite.Equal([]byte("value-after-recovery"), val.Value) }) - for i := 0; i < suite.GetFollowersCount(); i++ { + for i := 0; i < suite.GetReplicasCount(); i++ { suite.Run(fmt.Sprintf("replica %d", i), func() { ctx, client, cleanup := suite.ClientForReplica(i) defer cleanup() @@ -417,21 +417,21 @@ func (suite *SyncTestRecoverySpeedSuite) TestReplicaRecoverySpeed() { } -type SyncTestWithAsyncFollowersSuite struct { +type SyncTestWithAsyncReplicaSuite struct { baseReplicationTestSuite } -func TestSyncTestWithAsyncFollowersSuite(t *testing.T) { - suite.Run(t, &SyncTestWithAsyncFollowersSuite{}) +func TestSyncTestWithAsyncReplicaSuite(t *testing.T) { + suite.Run(t, &SyncTestWithAsyncReplicaSuite{}) } -func (suite *SyncTestWithAsyncFollowersSuite) SetupTest() { +func (suite *SyncTestWithAsyncReplicaSuite) SetupTest() { suite.baseReplicationTestSuite.SetupTest() suite.SetupCluster(2, 1, 1) suite.ValidateClusterSetup() } -func (suite *SyncTestWithAsyncFollowersSuite) TestSyncReplicationAlongWithAsyncFollowers() { +func (suite *SyncTestWithAsyncReplicaSuite) TestSyncReplicationAlongWithAsyncReplicas() { const parallelWriters = 30 const samplingTime = time.Second * 5 @@ -450,7 +450,7 @@ func (suite *SyncTestWithAsyncFollowersSuite) TestSyncReplicationAlongWithAsyncF go func(i int) { defer wgFinish.Done() - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() // Wait for the start signal @@ -488,14 +488,14 @@ func (suite *SyncTestWithAsyncFollowersSuite) TestSyncReplicationAlongWithAsyncF }) suite.Run("Ensure the data is available in all the replicas", func() { - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() state, err := client.CurrentState(ctx) suite.Require().NoError(err) suite.Require().Greater(state.TxId, txWritten, "Ensure enough TXs were written") - for i := 0; i < suite.GetFollowersCount(); i++ { + for i := 0; i < suite.GetReplicasCount(); i++ { suite.Run(fmt.Sprintf("replica %d", i), func() { ctx, client, cleanup := suite.ClientForReplica(i) defer cleanup() @@ -507,65 +507,65 @@ func (suite *SyncTestWithAsyncFollowersSuite) TestSyncReplicationAlongWithAsyncF } -type SyncTestChangingMasterSuite struct { +type SyncTestChangingPrimarySuite struct { baseReplicationTestSuite } -func TestSyncTestChangingMasterSuite(t *testing.T) { - suite.Run(t, &SyncTestChangingMasterSuite{}) +func TestSyncTestChangingPrimarySuite(t *testing.T) { + suite.Run(t, &SyncTestChangingPrimarySuite{}) } -func (suite *SyncTestChangingMasterSuite) SetupTest() { +func (suite *SyncTestChangingPrimarySuite) SetupTest() { suite.baseReplicationTestSuite.SetupTest() suite.SetupCluster(2, 2, 0) suite.ValidateClusterSetup() } -func (suite *SyncTestChangingMasterSuite) TestSyncTestChangingMasterSuite() { - var txBeforeChangingMaster *schema.TxHeader +func (suite *SyncTestChangingPrimarySuite) TestSyncTestChangingPrimarySuite() { + var txBeforeChangingPrimary *schema.TxHeader suite.Run("commit before changing primary", func() { - ctx, client, cleanup := suite.ClientForMaster() + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() - tx, err := client.Set(ctx, []byte("key-before-master-change"), []byte("value-before-master-change")) + tx, err := client.Set(ctx, []byte("key-before-primary-change"), []byte("value-before-primary-change")) require.NoError(suite.T(), err) - txBeforeChangingMaster = tx + txBeforeChangingPrimary = tx }) - // it's possible to promote any replica as new master because ack from all replicas is required by master - // ensure the replica to be promoted is up to date with master's commit state + // it's possible to promote any replica as new primary because ack from all replicas is required by primary + // ensure the replica to be promoted is up to date with primary's commit state ctx, client, cleanup := suite.ClientForReplica(1) - suite.WaitForCommittedTx(ctx, client, txBeforeChangingMaster.Id, 1*time.Second) + suite.WaitForCommittedTx(ctx, client, txBeforeChangingPrimary.Id, 1*time.Second) cleanup() - suite.PromoteFollower(1, 1) + suite.PromoteReplica(1, 1) - suite.Run("commit after changing master", func() { - ctx, client, cleanup := suite.ClientForMaster() + suite.Run("commit after changing primary", func() { + ctx, client, cleanup := suite.ClientForPrimary() defer cleanup() - tx, err := client.Set(ctx, []byte("key-after-master-change"), []byte("value-after-master-change")) + tx, err := client.Set(ctx, []byte("key-after-primary-change"), []byte("value-after-primary-change")) require.NoError(suite.T(), err) - for i := 0; i < suite.GetFollowersCount(); i++ { - suite.Run(fmt.Sprintf("check follower %d", i), func() { + for i := 0; i < suite.GetReplicasCount(); i++ { + suite.Run(fmt.Sprintf("check replica %d", i), func() { ctx, client, cleanup := suite.ClientForReplica(i) defer cleanup() // Tests are flaky because it takes time to commit the // precommitted TX, so this function just ensures the state - // is in sync between master and follower + // is in sync between primary and replica suite.WaitForCommittedTx(ctx, client, tx.Id, 30*time.Second) // Longer time since replica must reestablish connection to the primary - val, err := client.GetAt(ctx, []byte("key-before-master-change"), txBeforeChangingMaster.Id) + val, err := client.GetAt(ctx, []byte("key-before-primary-change"), txBeforeChangingPrimary.Id) require.NoError(suite.T(), err) - require.Equal(suite.T(), []byte("value-before-master-change"), val.Value) + require.Equal(suite.T(), []byte("value-before-primary-change"), val.Value) - val, err = client.GetAt(ctx, []byte("key-after-master-change"), tx.Id) + val, err = client.GetAt(ctx, []byte("key-after-primary-change"), tx.Id) require.NoError(suite.T(), err) - require.Equal(suite.T(), []byte("value-after-master-change"), val.Value) + require.Equal(suite.T(), []byte("value-after-primary-change"), val.Value) }) } }) diff --git a/pkg/server/db_options_test.go b/pkg/server/db_options_test.go index 8855ce02a5..2d71385f44 100644 --- a/pkg/server/db_options_test.go +++ b/pkg/server/db_options_test.go @@ -69,7 +69,7 @@ func TestReplicaOptions(t *testing.T) { require.ErrorIs(t, opts.Validate(), ErrIllegalArguments) } -func TestMasterOptions(t *testing.T) { +func TestPrimaryOptions(t *testing.T) { dir, err := ioutil.TempDir("", "server_test") require.NoError(t, err) defer os.RemoveAll(dir) @@ -100,10 +100,10 @@ func TestMasterOptions(t *testing.T) { opts.PrefetchTxBufferSize = 100 require.ErrorIs(t, opts.Validate(), ErrIllegalArguments) - opts.PrimaryPassword = "follower-pwd" + opts.PrimaryPassword = "primary-pwd" require.ErrorIs(t, opts.Validate(), ErrIllegalArguments) - opts.PrimaryUsername = "follower-username" + opts.PrimaryUsername = "primary-username" require.ErrorIs(t, opts.Validate(), ErrIllegalArguments) opts.PrimaryPort = 3323 @@ -112,7 +112,7 @@ func TestMasterOptions(t *testing.T) { opts.PrimaryHost = "localhost" require.ErrorIs(t, opts.Validate(), ErrIllegalArguments) - opts.PrimaryDatabase = "masterdb" + opts.PrimaryDatabase = "primarydb" require.ErrorIs(t, opts.Validate(), ErrIllegalArguments) opts.SyncAcks = -1 diff --git a/pkg/server/options_test.go b/pkg/server/options_test.go index cfb183f7fe..50d8602e3b 100644 --- a/pkg/server/options_test.go +++ b/pkg/server/options_test.go @@ -69,8 +69,8 @@ func TestReplicationOptions(t *testing.T) { WithSyncAcks(0). WithPrimaryHost("localhost"). WithPrimaryPort(3322). - WithPrimaryUsername("follower-user"). - WithPrimaryPassword("follower-pwd"). + WithPrimaryUsername("primary-user"). + WithPrimaryPassword("primary-pwd"). WithPrefetchTxBufferSize(100). WithReplicationCommitConcurrency(5). WithAllowTxDiscarding(true) @@ -80,13 +80,13 @@ func TestReplicationOptions(t *testing.T) { require.Zero(t, repOpts.SyncAcks) require.Equal(t, "localhost", repOpts.PrimaryHost) require.Equal(t, 3322, repOpts.PrimaryPort) - require.Equal(t, "follower-user", repOpts.PrimaryUsername) - require.Equal(t, "follower-pwd", repOpts.PrimaryPassword) + require.Equal(t, "primary-user", repOpts.PrimaryUsername) + require.Equal(t, "primary-pwd", repOpts.PrimaryPassword) require.Equal(t, 100, repOpts.PrefetchTxBufferSize) require.Equal(t, 5, repOpts.ReplicationCommitConcurrency) require.True(t, repOpts.AllowTxDiscarding) - // master-related settings + // primary-related settings repOpts. WithIsReplica(false). WithSyncReplication(true). diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 7a0d638640..faa0d9c769 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -387,7 +387,7 @@ func TestServerCreateDatabase(t *testing.T) { dbSettings := &schema.DatabaseSettings{ DatabaseName: "lisbon", Replica: false, - PrimaryDatabase: "masterdb", + PrimaryDatabase: "primarydb", } _, err = s.CreateDatabaseWith(ctx, dbSettings) require.ErrorIs(t, err, ErrIllegalArguments)