Skip to content

Commit

Permalink
fix: Change replication-related terms in codebase
Browse files Browse the repository at this point in the history
Signed-off-by: Bartłomiej Święcki <[email protected]>
  • Loading branch information
Bartłomiej Święcki committed Nov 9, 2022
1 parent f8a6c4a commit 326363b
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 227 deletions.
8 changes: 4 additions & 4 deletions cmd/immudb/command/parse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func parseOptions() (options *server.Options, err error) {

if replicationOptions.IsReplica {
replicationOptions.
WithMasterAddress(viper.GetString("replication-primary-host")).
WithMasterPort(viper.GetInt("replication-primary-port")).
WithFollowerUsername(viper.GetString("replication-primary-username")).
WithFollowerPassword(viper.GetString("replication-primary-password")).
WithPrimaryHost(viper.GetString("replication-primary-host")).
WithPrimaryPort(viper.GetInt("replication-primary-port")).
WithPrimaryUsername(viper.GetString("replication-primary-username")).
WithPrimaryPassword(viper.GetString("replication-primary-password")).
WithPrefetchTxBufferSize(viper.GetInt("replication-prefetch-tx-buffer-size")).
WithReplicationCommitConcurrency(viper.GetInt("replication-commit-concurrency")).
WithAllowTxDiscarding(viper.GetBool("replication-allow-tx-discarding"))
Expand Down
162 changes: 81 additions & 81 deletions pkg/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var ErrIllegalArguments = store.ErrIllegalArguments
var ErrIllegalState = store.ErrIllegalState
var ErrIsReplica = errors.New("database is read-only because it's a replica")
var ErrNotReplica = errors.New("database is NOT a replica")
var ErrFollowerDivergedFromMaster = errors.New("follower diverged from master")
var ErrReplicaDivergedFromPrimary = errors.New("replica diverged from primary")
var ErrInvalidRevision = errors.New("invalid key revision number")

type DB interface {
Expand Down Expand Up @@ -141,7 +141,7 @@ type DB interface {

type uuid = string

type followerState struct {
type replicaState struct {
precommittedTxID uint64
precommittedAlh [sha256.Size]byte
}
Expand All @@ -166,8 +166,8 @@ type db struct {

txPool store.TxPool

followerStates map[uuid]*followerState
followerStatesMutex sync.Mutex
replicaStates map[uuid]*replicaState
replicaStatesMutex sync.Mutex
}

// OpenDB Opens an existing Database from disk
Expand All @@ -178,19 +178,19 @@ func OpenDB(dbName string, multidbHandler sql.MultiDBHandler, op *Options, log l

log.Infof("Opening database '%s' {replica = %v}...", dbName, op.replica)

var followerStates map[uuid]*followerState
// follower states are only managed in master with synchronous replication
var replicaStates map[uuid]*replicaState
// replica states are only managed in primary with synchronous replication
if !op.replica && op.syncAcks > 0 {
followerStates = make(map[uuid]*followerState, op.syncAcks)
replicaStates = make(map[uuid]*replicaState, op.syncAcks)
}

dbi := &db{
Logger: log,
options: op,
name: dbName,
followerStates: followerStates,
maxResultSize: MaxKeyScanLimit,
mutex: &instrumentedRWMutex{},
Logger: log,
options: op,
name: dbName,
replicaStates: replicaStates,
maxResultSize: MaxKeyScanLimit,
mutex: &instrumentedRWMutex{},
}

dbDir := dbi.Path()
Expand Down Expand Up @@ -288,19 +288,19 @@ func NewDB(dbName string, multidbHandler sql.MultiDBHandler, op *Options, log lo

log.Infof("Creating database '%s' {replica = %v}...", dbName, op.replica)

var followerStates map[uuid]*followerState
// follower states are only managed in master with synchronous replication
var replicaStates map[uuid]*replicaState
// replica states are only managed in primary with synchronous replication
if !op.replica && op.syncAcks > 0 {
followerStates = make(map[uuid]*followerState, op.syncAcks)
replicaStates = make(map[uuid]*replicaState, op.syncAcks)
}

dbi := &db{
Logger: log,
options: op,
name: dbName,
followerStates: followerStates,
maxResultSize: MaxKeyScanLimit,
mutex: &instrumentedRWMutex{},
Logger: log,
options: op,
name: dbName,
replicaStates: replicaStates,
maxResultSize: MaxKeyScanLimit,
mutex: &instrumentedRWMutex{},
}

dbDir := filepath.Join(op.GetDBRootPath(), dbName)
Expand Down Expand Up @@ -1171,57 +1171,57 @@ func (d *db) serializeTx(tx *store.Tx, spec *schema.EntriesSpec, snap *store.Sna
return stx, nil
}

func (d *db) mayUpdateFollowerState(committedTxID uint64, newFollowerState *schema.ReplicaState) error {
d.followerStatesMutex.Lock()
defer d.followerStatesMutex.Unlock()
func (d *db) mayUpdateReplicaState(committedTxID uint64, newReplicaState *schema.ReplicaState) error {
d.replicaStatesMutex.Lock()
defer d.replicaStatesMutex.Unlock()

// clean up followerStates
// it's safe to remove up to latest tx committed in master
for uuid, st := range d.followerStates {
// clean up replicaStates
// it's safe to remove up to latest tx committed in primary
for uuid, st := range d.replicaStates {
if st.precommittedTxID <= committedTxID {
delete(d.followerStates, uuid)
delete(d.replicaStates, uuid)
}
}

if newFollowerState.PrecommittedTxID <= committedTxID {
// as far as the master is concerned, nothing really new has happened
if newReplicaState.PrecommittedTxID <= committedTxID {
// as far as the primary is concerned, nothing really new has happened
return nil
}

newFollowerAlh := schema.DigestFromProto(newFollowerState.PrecommittedAlh)
newReplicaAlh := schema.DigestFromProto(newReplicaState.PrecommittedAlh)

followerSt, ok := d.followerStates[newFollowerState.UUID]
replicaSt, ok := d.replicaStates[newReplicaState.UUID]
if ok {
if newFollowerState.PrecommittedTxID < followerSt.precommittedTxID {
return fmt.Errorf("%w: the newly informed follower state lags behind the previously informed one", ErrIllegalArguments)
if newReplicaState.PrecommittedTxID < replicaSt.precommittedTxID {
return fmt.Errorf("%w: the newly informed replica state lags behind the previously informed one", ErrIllegalArguments)
}

if newFollowerState.PrecommittedTxID == followerSt.precommittedTxID {
// as of the last informed follower status update, nothing has changed
if newReplicaState.PrecommittedTxID == replicaSt.precommittedTxID {
// as of the last informed replica status update, nothing has changed
return nil
}

// actual replication progress is informed by the follower
followerSt.precommittedTxID = newFollowerState.PrecommittedTxID
followerSt.precommittedAlh = newFollowerAlh
// actual replication progress is informed by the replica
replicaSt.precommittedTxID = newReplicaState.PrecommittedTxID
replicaSt.precommittedAlh = newReplicaAlh
} else {
// follower informs first replication state
d.followerStates[newFollowerState.UUID] = &followerState{
precommittedTxID: newFollowerState.PrecommittedTxID,
precommittedAlh: newFollowerAlh,
// replica informs first replication state
d.replicaStates[newReplicaState.UUID] = &replicaState{
precommittedTxID: newReplicaState.PrecommittedTxID,
precommittedAlh: newReplicaAlh,
}
}

// check up to which tx enough followers ack replication and it's safe to commit
// check up to which tx enough replicas ack replication and it's safe to commit
mayCommitUpToTxID := uint64(0)
if len(d.followerStates) > 0 {
if len(d.replicaStates) > 0 {
mayCommitUpToTxID = math.MaxUint64
}

allowances := 0

// we may clean up followerStates from those who are lagging behind commit
for _, st := range d.followerStates {
// we may clean up replicaStates from those who are lagging behind commit
for _, st := range d.replicaStates {
if st.precommittedTxID < mayCommitUpToTxID {
mayCommitUpToTxID = st.precommittedTxID
}
Expand All @@ -1244,8 +1244,8 @@ func (d *db) ExportTxByID(req *schema.ExportTxRequest) (txbs []byte, mayCommitUp
return nil, 0, mayCommitUpToAlh, ErrIllegalArguments
}

if d.followerStates == nil && req.ReplicaState != nil {
return nil, 0, mayCommitUpToAlh, fmt.Errorf("%w: follower state was NOT expected", ErrIllegalState)
if d.replicaStates == nil && req.ReplicaState != nil {
return nil, 0, mayCommitUpToAlh, fmt.Errorf("%w: replica state was NOT expected", ErrIllegalState)
}

tx, err := d.allocTx()
Expand All @@ -1259,68 +1259,68 @@ func (d *db) ExportTxByID(req *schema.ExportTxRequest) (txbs []byte, mayCommitUp

if req.ReplicaState != nil {
if req.ReplicaState.CommittedTxID > 0 {
// validate follower commit state
// validate replica commit state
if req.ReplicaState.CommittedTxID > committedTxID {
return nil, committedTxID, committedAlh,
fmt.Errorf("%w: follower commit state diverged from master's", ErrFollowerDivergedFromMaster)
fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
}

expectedFollowerCommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.CommittedTxID, false)
expectedReplicaCommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.CommittedTxID, false)
if err != nil {
return nil, committedTxID, committedAlh, err
}

followerCommittedAlh := schema.DigestFromProto(req.ReplicaState.CommittedAlh)
replicaCommittedAlh := schema.DigestFromProto(req.ReplicaState.CommittedAlh)

if expectedFollowerCommitHdr.Alh() != followerCommittedAlh {
return nil, expectedFollowerCommitHdr.ID, expectedFollowerCommitHdr.Alh(),
fmt.Errorf("%w: follower commit state diverged from master's", ErrFollowerDivergedFromMaster)
if expectedReplicaCommitHdr.Alh() != replicaCommittedAlh {
return nil, expectedReplicaCommitHdr.ID, expectedReplicaCommitHdr.Alh(),
fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
}
}

if req.ReplicaState.PrecommittedTxID > 0 {
// validate follower precommit state
// validate replica precommit state
if req.ReplicaState.PrecommittedTxID > preCommittedTxID {
return nil, committedTxID, committedAlh,
fmt.Errorf("%w: follower precommit state diverged from master's", ErrFollowerDivergedFromMaster)
fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
}

expectedFollowerPrecommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.PrecommittedTxID, true)
expectedReplicaPrecommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.PrecommittedTxID, true)
if err != nil {
return nil, committedTxID, committedAlh, err
}

followerPreCommittedAlh := schema.DigestFromProto(req.ReplicaState.PrecommittedAlh)
replicaPreCommittedAlh := schema.DigestFromProto(req.ReplicaState.PrecommittedAlh)

if expectedFollowerPrecommitHdr.Alh() != followerPreCommittedAlh {
return nil, expectedFollowerPrecommitHdr.ID, expectedFollowerPrecommitHdr.Alh(),
fmt.Errorf("%w: follower precommit state diverged from master's", ErrFollowerDivergedFromMaster)
if expectedReplicaPrecommitHdr.Alh() != replicaPreCommittedAlh {
return nil, expectedReplicaPrecommitHdr.ID, expectedReplicaPrecommitHdr.Alh(),
fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
}

// master will provide commit state to the follower so it can commit pre-committed transactions
// primary will provide commit state to the replica so it can commit pre-committed transactions
if req.ReplicaState.PrecommittedTxID < committedTxID {
// if follower is behind current commit state in master
// return the alh up to the point known by the follower.
// That way the follower is able to validate is following the right master.
// if replica is behind current commit state in primary
// return the alh up to the point known by the replica.
// That way the replica is able to validate is following the right primary.
mayCommitUpToTxID = req.ReplicaState.PrecommittedTxID
mayCommitUpToAlh = followerPreCommittedAlh
mayCommitUpToAlh = replicaPreCommittedAlh
} else {
mayCommitUpToTxID = committedTxID
mayCommitUpToAlh = committedAlh
}
}

err = d.mayUpdateFollowerState(committedTxID, req.ReplicaState)
err = d.mayUpdateReplicaState(committedTxID, req.ReplicaState)
if err != nil {
return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
}
}

// it might be the case master will commit some txs (even there could be inmem-precommitted txs)
// it might be the case primary will commit some txs (even there could be inmem-precommitted txs)
// current timeout it's not a special value but at least a relative one
// note: master might also be waiting ack from any follower (even this follower may do progress)
// note: primary might also be waiting ack from any replica (even this primary may do progress)

// TODO: under some circumstances, follower might not be able to do further progress until master
// TODO: under some circumstances, replica might not be able to do further progress until primary
// has made changes, such wait doesn't need to have a timeout, reducing networking and CPU utilization
ctx, cancel := context.WithTimeout(context.Background(), d.options.storeOpts.SyncFrequency*4)
defer cancel()
Expand Down Expand Up @@ -1357,7 +1357,7 @@ func (d *db) ReplicateTx(exportedTx []byte) (*schema.TxHeader, error) {
return schema.TxHeaderToProto(hdr), nil
}

// AllowCommitUpto is used by followers to commit transactions once committed in master
// AllowCommitUpto is used by replicas to commit transactions once committed in primary
func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
d.mutex.RLock()
defer d.mutex.RUnlock()
Expand All @@ -1366,13 +1366,13 @@ func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
return ErrNotReplica
}

// follower pre-committed state must be consistent with master
// replica pre-committed state must be consistent with primary

committedTxID, committedAlh := d.st.CommittedAlh()
// handling a particular case in an optimized manner
if committedTxID == txID {
if committedAlh != alh {
return fmt.Errorf("%w: follower commit state diverged from master's", ErrIllegalState)
return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
}
return nil
}
Expand All @@ -1383,7 +1383,7 @@ func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
}

if hdr.Alh() != alh {
return fmt.Errorf("%w: follower commit state diverged from master's", ErrIllegalState)
return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
}

return d.st.AllowCommitUpto(txID)
Expand Down Expand Up @@ -1668,17 +1668,17 @@ func (d *db) AsReplica(asReplica, syncReplication bool, syncAcks int) {
d.mutex.Lock()
defer d.mutex.Unlock()

d.followerStatesMutex.Lock()
defer d.followerStatesMutex.Unlock()
d.replicaStatesMutex.Lock()
defer d.replicaStatesMutex.Unlock()

d.options.replica = asReplica
d.options.syncAcks = syncAcks
d.options.syncReplication = syncReplication

if asReplica {
d.followerStates = nil
d.replicaStates = nil
} else if syncAcks > 0 {
d.followerStates = make(map[uuid]*followerState, syncAcks)
d.replicaStates = make(map[uuid]*replicaState, syncAcks)
}

d.st.SetExternalCommitAllowance(syncReplication)
Expand Down
Loading

0 comments on commit 326363b

Please sign in to comment.