Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/replication naming convention #1448

Merged
merged 4 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions cmd/immuadmin/command/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,33 @@ func addDbUpdateFlags(c *cobra.Command) {
c.Flags().Bool("replication-is-replica", false, "set database as a replica")
c.Flags().Bool("replication-sync-enabled", false, "enable synchronous replication")
c.Flags().Uint32("replication-sync-acks", 0, "set a minimum number of replica acknowledgements required before transactions can be committed")
c.Flags().String("replication-master-database", "", "set master database to be replicated")
c.Flags().String("replication-master-address", "", "set master address")
c.Flags().Uint32("replication-master-port", 0, "set master port")
c.Flags().String("replication-follower-username", "", "set username used for replication")
c.Flags().String("replication-follower-password", "", "set password used for replication")
c.Flags().String("replication-primary-database", "", "set primary database to be replicated")
c.Flags().String("replication-primary-host", "", "set primary database host")
c.Flags().Uint32("replication-primary-port", 0, "set primary database port")
c.Flags().String("replication-primary-username", "", "set username used for replication to connect to the primary database")
c.Flags().String("replication-primary-password", "", "set password used for replication to connect to the primary database")
c.Flags().Uint32("replication-prefetch-tx-buffer-size", uint32(replication.DefaultPrefetchTxBufferSize), "maximum number of prefeched transactions")
c.Flags().Uint32("replication-commit-concurrency", uint32(replication.DefaultReplicationCommitConcurrency), "number of concurrent replications")
c.Flags().Bool("replication-allow-tx-discarding", replication.DefaultAllowTxDiscarding, "allow precommitted transactions to be discarded if the follower diverges from the master")
c.Flags().Bool("replication-allow-tx-discarding", replication.DefaultAllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary")
c.Flags().Uint32("write-tx-header-version", 1, "set write tx header version (use 0 for compatibility with immudb 1.1, 1 for immudb 1.2+)")
c.Flags().Uint32("max-commit-concurrency", store.DefaultMaxConcurrency, "set the maximum commit concurrency")
c.Flags().Duration("sync-frequency", store.DefaultSyncFrequency, "set the fsync frequency during commit process")
c.Flags().Uint32("write-buffer-size", store.DefaultWriteBufferSize, "set the size of in-memory buffers for file abstractions")
c.Flags().Uint32("read-tx-pool-size", database.DefaultReadTxPoolSize, "set transaction read pool size (used for reading transaction objects)")
c.Flags().Bool("autoload", true, "enable database autoloading")

flagNameMapping := map[string]string{
"replication-enabled": "replication-is-replica",
"replication-follower-username": "replication-primary-username",
"replication-follower-password": "replication-primary-password",
"replication-master-database": "replication-primary-database",
"replication-master-address": "replication-primary-host",
"replication-master-port": "replication-primary-port",
}

c.Flags().SetNormalizeFunc(func(f *pflag.FlagSet, name string) pflag.NormalizedName {
if name == "replication-enabled" {
name = "replication-is-replica"
if newName, ok := flagNameMapping[name]; ok {
name = newName
}
return pflag.NormalizedName(name)
})
Expand Down Expand Up @@ -390,27 +399,27 @@ func prepareDatabaseNullableSettings(flags *pflag.FlagSet) (*schema.DatabaseNull
return nil, err
}

ret.ReplicationSettings.MasterDatabase, err = condString("replication-master-database")
ret.ReplicationSettings.PrimaryDatabase, err = condString("replication-primary-database")
if err != nil {
return nil, err
}

ret.ReplicationSettings.MasterAddress, err = condString("replication-master-address")
ret.ReplicationSettings.PrimaryHost, err = condString("replication-primary-host")
if err != nil {
return nil, err
}

ret.ReplicationSettings.MasterPort, err = condUInt32("replication-master-port")
ret.ReplicationSettings.PrimaryPort, err = condUInt32("replication-primary-port")
if err != nil {
return nil, err
}

ret.ReplicationSettings.FollowerUsername, err = condString("replication-follower-username")
ret.ReplicationSettings.PrimaryUsername, err = condString("replication-primary-username")
if err != nil {
return nil, err
}

ret.ReplicationSettings.FollowerPassword, err = condString("replication-follower-password")
ret.ReplicationSettings.PrimaryPassword, err = condString("replication-primary-password")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/immuadmin/command/hot_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (cl *commandlineHotBck) useDb(name string, replica bool) (uint64, []byte, e
}

func (cl *commandlineHotBck) createDb(name string) error {
err := cl.immuClient.CreateDatabase(cl.context, &schema.DatabaseSettings{DatabaseName: name, Replica: true, MasterDatabase: "dummy"})
err := cl.immuClient.CreateDatabase(cl.context, &schema.DatabaseSettings{DatabaseName: name, Replica: true, PrimaryDatabase: "dummy"})
if err != nil {
return err
}
Expand Down
23 changes: 16 additions & 7 deletions cmd/immudb/command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
cmd.Flags().Bool("replication-is-replica", false, "set systemdb and defaultdb as replica")
cmd.Flags().Bool("replication-sync-enabled", false, "enable synchronous replication")
cmd.Flags().Int("replication-sync-acks", 0, "set a minimum number of replica acknowledgements required before transactions can be committed")
cmd.Flags().String("replication-master-address", "", "master address (if replica=true)")
cmd.Flags().Int("replication-master-port", 3322, "master port (if replica=true)")
cmd.Flags().String("replication-follower-username", "", "username used for replication of systemdb and defaultdb")
cmd.Flags().String("replication-follower-password", "", "password used for replication of systemdb and defaultdb")
cmd.Flags().String("replication-primary-host", "", "primary database host (if replica=true)")
cmd.Flags().Int("replication-primary-port", 3322, "primary database port (if replica=true)")
cmd.Flags().String("replication-primary-username", "", "username in the primary database used for replication of systemdb and defaultdb")
cmd.Flags().String("replication-primary-password", "", "password in the primary database used for replication of systemdb and defaultdb")
cmd.Flags().Int("replication-prefetch-tx-buffer-size", options.ReplicationOptions.PrefetchTxBufferSize, "maximum number of prefeched transactions")
cmd.Flags().Int("replication-commit-concurrency", options.ReplicationOptions.ReplicationCommitConcurrency, "number of concurrent replications")
cmd.Flags().Bool("replication-allow-tx-discarding", replication.DefaultAllowTxDiscarding, "allow precommitted transactions to be discarded if the follower diverges from the master")
cmd.Flags().Bool("replication-allow-tx-discarding", replication.DefaultAllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary")

cmd.PersistentFlags().StringVar(&cl.config.CfgFn, "config", "", "config file (default path are configs or $HOME. Default filename is immudb.toml)")
cmd.Flags().String("pidfile", options.Pidfile, "pid path with filename e.g. /var/run/immudb.pid")
Expand Down Expand Up @@ -84,9 +84,18 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
cmd.Flags().Duration("sessions-guard-check-interval", 1*time.Minute, "sessions guard check interval")
cmd.Flags().MarkHidden("sessions-guard-check-interval")

flagNameMapping := map[string]string{
"replication-enabled": "replication-is-replica",
"replication-follower-username": "replication-primary-username",
"replication-follower-password": "replication-primary-password",
"replication-master-database": "replication-primary-database",
"replication-master-address": "replication-primary-host",
"replication-master-port": "replication-primary-port",
}

cmd.Flags().SetNormalizeFunc(func(f *pflag.FlagSet, name string) pflag.NormalizedName {
if name == "replication-enabled" {
name = "replication-is-replica"
if newName, ok := flagNameMapping[name]; ok {
name = newName
}
return pflag.NormalizedName(name)
})
Expand Down
8 changes: 4 additions & 4 deletions cmd/immudb/command/parse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func parseOptions() (options *server.Options, err error) {

if replicationOptions.IsReplica {
replicationOptions.
WithMasterAddress(viper.GetString("replication-master-address")).
WithMasterPort(viper.GetInt("replication-master-port")).
WithFollowerUsername(viper.GetString("replication-follower-username")).
WithFollowerPassword(viper.GetString("replication-follower-password")).
WithPrimaryHost(viper.GetString("replication-primary-host")).
WithPrimaryPort(viper.GetInt("replication-primary-port")).
WithPrimaryUsername(viper.GetString("replication-primary-username")).
WithPrimaryPassword(viper.GetString("replication-primary-password")).
WithPrefetchTxBufferSize(viper.GetInt("replication-prefetch-tx-buffer-size")).
WithReplicationCommitConcurrency(viper.GetInt("replication-commit-concurrency")).
WithAllowTxDiscarding(viper.GetBool("replication-allow-tx-discarding"))
Expand Down
44 changes: 22 additions & 22 deletions embedded/store/immustore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2293,19 +2293,19 @@ func TestUncommittedTxOverwriting(t *testing.T) {
}

func TestExportAndReplicateTx(t *testing.T) {
masterDir := t.TempDir()
primaryDir := t.TempDir()

masterStore, err := Open(masterDir, DefaultOptions())
primaryStore, err := Open(primaryDir, DefaultOptions())
require.NoError(t, err)
defer immustoreClose(t, masterStore)
defer immustoreClose(t, primaryStore)

replicaDir := t.TempDir()

replicaStore, err := Open(replicaDir, DefaultOptions())
require.NoError(t, err)
defer immustoreClose(t, replicaStore)

tx, err := masterStore.NewWriteOnlyTx()
tx, err := primaryStore.NewWriteOnlyTx()
require.NoError(t, err)

tx.WithMetadata(NewTxMetadata())
Expand All @@ -2317,9 +2317,9 @@ func TestExportAndReplicateTx(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, hdr)

txholder := tempTxHolder(t, masterStore)
txholder := tempTxHolder(t, primaryStore)

etx, err := masterStore.ExportTx(1, false, txholder)
etx, err := primaryStore.ExportTx(1, false, txholder)
require.NoError(t, err)

rhdr, err := replicaStore.ReplicateTx(etx, false)
Expand All @@ -2334,19 +2334,19 @@ func TestExportAndReplicateTx(t *testing.T) {
}

func TestExportAndReplicateTxCornerCases(t *testing.T) {
masterDir := t.TempDir()
primaryDir := t.TempDir()

masterStore, err := Open(masterDir, DefaultOptions())
primaryStore, err := Open(primaryDir, DefaultOptions())
require.NoError(t, err)
defer immustoreClose(t, masterStore)
defer immustoreClose(t, primaryStore)

replicaDir := t.TempDir()

replicaStore, err := Open(replicaDir, DefaultOptions().WithMaxActiveTransactions(1))
require.NoError(t, err)
defer immustoreClose(t, replicaStore)

tx, err := masterStore.NewWriteOnlyTx()
tx, err := primaryStore.NewWriteOnlyTx()
require.NoError(t, err)

tx.WithMetadata(NewTxMetadata())
Expand All @@ -2358,10 +2358,10 @@ func TestExportAndReplicateTxCornerCases(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, hdr)

txholder := tempTxHolder(t, masterStore)
txholder := tempTxHolder(t, primaryStore)

t.Run("prevent replicating broken data", func(t *testing.T) {
etx, err := masterStore.ExportTx(1, false, txholder)
etx, err := primaryStore.ExportTx(1, false, txholder)
require.NoError(t, err)

for i := range etx {
Expand Down Expand Up @@ -2392,11 +2392,11 @@ func TestExportAndReplicateTxCornerCases(t *testing.T) {
}

func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) {
masterDir := t.TempDir()
primaryDir := t.TempDir()

masterStore, err := Open(masterDir, DefaultOptions())
primaryStore, err := Open(primaryDir, DefaultOptions())
require.NoError(t, err)
defer immustoreClose(t, masterStore)
defer immustoreClose(t, primaryStore)

replicaDir := t.TempDir()

Expand All @@ -2409,7 +2409,7 @@ func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) {

for i := 0; i < txCount; i++ {
t.Run(fmt.Sprintf("tx: %d", i), func(t *testing.T) {
tx, err := masterStore.NewWriteOnlyTx()
tx, err := primaryStore.NewWriteOnlyTx()
require.NoError(t, err)

tx.WithMetadata(NewTxMetadata())
Expand All @@ -2422,7 +2422,7 @@ func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) {
require.NotNil(t, hdr)

txholder := tempTxHolder(t, replicaStore)
etx, err := masterStore.ExportTx(hdr.ID, false, txholder)
etx, err := primaryStore.ExportTx(hdr.ID, false, txholder)
require.NoError(t, err)

// Replicate the same transactions concurrently, only one must succeed
Expand Down Expand Up @@ -2452,11 +2452,11 @@ func TestExportAndReplicateTxSimultaneousWriters(t *testing.T) {
}

func TestExportAndReplicateTxDisorderedReplication(t *testing.T) {
masterDir := t.TempDir()
primaryDir := t.TempDir()

masterStore, err := Open(masterDir, DefaultOptions())
primaryStore, err := Open(primaryDir, DefaultOptions())
require.NoError(t, err)
defer immustoreClose(t, masterStore)
defer immustoreClose(t, primaryStore)

replicaDir := t.TempDir()

Expand All @@ -2472,7 +2472,7 @@ func TestExportAndReplicateTxDisorderedReplication(t *testing.T) {
txholder := tempTxHolder(t, replicaStore)

for i := 0; i < txCount; i++ {
tx, err := masterStore.NewWriteOnlyTx()
tx, err := primaryStore.NewWriteOnlyTx()
require.NoError(t, err)

tx.WithMetadata(NewTxMetadata())
Expand All @@ -2484,7 +2484,7 @@ func TestExportAndReplicateTxDisorderedReplication(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, hdr)

etx, err := masterStore.ExportTx(hdr.ID, false, txholder)
etx, err := primaryStore.ExportTx(hdr.ID, false, txholder)
require.NoError(t, err)

etxs <- etx
Expand Down
64 changes: 32 additions & 32 deletions pkg/api/schema/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
- [ExportTxRequest](#immudb.schema.ExportTxRequest)
- [FlushIndexRequest](#immudb.schema.FlushIndexRequest)
- [FlushIndexResponse](#immudb.schema.FlushIndexResponse)
- [FollowerState](#immudb.schema.FollowerState)
- [HealthResponse](#immudb.schema.HealthResponse)
- [HistoryRequest](#immudb.schema.HistoryRequest)
- [ImmutableState](#immudb.schema.ImmutableState)
Expand Down Expand Up @@ -79,6 +78,7 @@
- [Precondition.KeyNotModifiedAfterTXPrecondition](#immudb.schema.Precondition.KeyNotModifiedAfterTXPrecondition)
- [Reference](#immudb.schema.Reference)
- [ReferenceRequest](#immudb.schema.ReferenceRequest)
- [ReplicaState](#immudb.schema.ReplicaState)
- [ReplicationNullableSettings](#immudb.schema.ReplicationNullableSettings)
- [RetryInfo](#immudb.schema.RetryInfo)
- [Row](#immudb.schema.Row)
Expand Down Expand Up @@ -461,11 +461,11 @@ DEPRECATED
| ----- | ---- | ----- | ----------- |
| databaseName | [string](#string) | | Name of the database |
| replica | [bool](#bool) | | If set to true, this database is replicating another database |
| masterDatabase | [string](#string) | | Name of the database to replicate |
| masterAddress | [string](#string) | | Hostname of the immudb instance with database to replicate |
| masterPort | [uint32](#uint32) | | Port of the immudb instance with database to replicate |
| followerUsername | [string](#string) | | Username of the user with read access of the database to replicate |
| followerPassword | [string](#string) | | Password of the user with read access of the database to replicate |
| primaryDatabase | [string](#string) | | Name of the database to replicate |
| primaryHost | [string](#string) | | Hostname of the immudb instance with database to replicate |
| primaryPort | [uint32](#uint32) | | Port of the immudb instance with database to replicate |
| primaryUsername | [string](#string) | | Username of the user with read access of the database to replicate |
| primaryPassword | [string](#string) | | Password of the user with read access of the database to replicate |
| fileSize | [uint32](#uint32) | | Size of files stored on disk |
| maxKeyLen | [uint32](#uint32) | | Maximum length of keys |
| maxValueLen | [uint32](#uint32) | | Maximum length of values |
Expand Down Expand Up @@ -744,7 +744,7 @@ DualProof contains inclusion and consistency proofs for dual Merkle-Tree &#43; L
| ----- | ---- | ----- | ----------- |
| tx | [uint64](#uint64) | | Id of transaction to export |
| allowPreCommitted | [bool](#bool) | | If set to true, non-committed transactions can be exported |
| followerState | [FollowerState](#immudb.schema.FollowerState) | | Used on synchronous replication to notify the master about follower state |
| replicaState | [ReplicaState](#immudb.schema.ReplicaState) | | Used on synchronous replication to notify the primary about replica state |



Expand Down Expand Up @@ -782,25 +782,6 @@ DualProof contains inclusion and consistency proofs for dual Merkle-Tree &#43; L



<a name="immudb.schema.FollowerState"></a>

### FollowerState



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| UUID | [string](#string) | | |
| committedTxID | [uint64](#uint64) | | |
| committedAlh | [bytes](#bytes) | | |
| precommittedTxID | [uint64](#uint64) | | |
| precommittedAlh | [bytes](#bytes) | | |






<a name="immudb.schema.HealthResponse"></a>

### HealthResponse
Expand Down Expand Up @@ -1399,6 +1380,25 @@ Only succeed if given key was not modified after given transaction



<a name="immudb.schema.ReplicaState"></a>

### ReplicaState



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| UUID | [string](#string) | | |
| committedTxID | [uint64](#uint64) | | |
| committedAlh | [bytes](#bytes) | | |
| precommittedTxID | [uint64](#uint64) | | |
| precommittedAlh | [bytes](#bytes) | | |






<a name="immudb.schema.ReplicationNullableSettings"></a>

### ReplicationNullableSettings
Expand All @@ -1408,16 +1408,16 @@ Only succeed if given key was not modified after given transaction
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| replica | [NullableBool](#immudb.schema.NullableBool) | | If set to true, this database is replicating another database |
| masterDatabase | [NullableString](#immudb.schema.NullableString) | | Name of the database to replicate |
| masterAddress | [NullableString](#immudb.schema.NullableString) | | Hostname of the immudb instance with database to replicate |
| masterPort | [NullableUint32](#immudb.schema.NullableUint32) | | Port of the immudb instance with database to replicate |
| followerUsername | [NullableString](#immudb.schema.NullableString) | | Username of the user with read access of the database to replicate |
| followerPassword | [NullableString](#immudb.schema.NullableString) | | Password of the user with read access of the database to replicate |
| primaryDatabase | [NullableString](#immudb.schema.NullableString) | | Name of the database to replicate |
| primaryHost | [NullableString](#immudb.schema.NullableString) | | Hostname of the immudb instance with database to replicate |
| primaryPort | [NullableUint32](#immudb.schema.NullableUint32) | | Port of the immudb instance with database to replicate |
| primaryUsername | [NullableString](#immudb.schema.NullableString) | | Username of the user with read access of the database to replicate |
| primaryPassword | [NullableString](#immudb.schema.NullableString) | | Password of the user with read access of the database to replicate |
| syncReplication | [NullableBool](#immudb.schema.NullableBool) | | Enable synchronous replication |
| syncAcks | [NullableUint32](#immudb.schema.NullableUint32) | | Number of confirmations from synchronous replicas required to commit a transaction |
| prefetchTxBufferSize | [NullableUint32](#immudb.schema.NullableUint32) | | Maximum number of prefetched transactions |
| replicationCommitConcurrency | [NullableUint32](#immudb.schema.NullableUint32) | | Number of concurrent replications |
| AllowTxDiscarding | [NullableBool](#immudb.schema.NullableBool) | | Allow precommitted transactions to be discarded if the follower diverges from the master |
| AllowTxDiscarding | [NullableBool](#immudb.schema.NullableBool) | | Allow precommitted transactions to be discarded if the replica diverges from the primary |



Expand Down
Loading