-
Notifications
You must be signed in to change notification settings - Fork 97
TokuMX Migrations
This wiki describes how migrations work in a sharded TokuMX system.
First, some background. A migration in MongoDB is a multi-stage process:
-
The balancer asks the donor to migrate a chunk to the recipient. I am not sure how the balancer decides this, but the donor receives a
moveChunk
command.There can only be one migration taking place from a donor shard at once. There is some global state (
migrateStatus
) and a single migration thread on the donor that controls this, if there is already a migration taking place, you can't start another. Also, there can be only one migration to a recipient at one time. I'm not sure if there can be multiple migrations in-flight in the same sharded cluster at once, my guess is yes.Since there can be only one balancer running at a time, I think that means there can be only one migration happening anywhere in the cluster at one time.
-
A distributed lock is taken called
"migrate-" + <min>
on the config server. With the lock held, the donor then checks that the information on the config server is correct. -
The donor sets up
migrateFromStatus
(usingMigrateStatusHolder
), which tracks all modifications to the chunk being migrated with calls tologOpForSharding
(which goes down toMigrateFromStatus::logOp
). It is also used to calculate and store all the DiskLocs (for TokuMX, PKs) in the chunk in sorted order so that during transfer we can read them in order off disk.We then run a command on the recipient,
_recvChunkStart
, which asks the recipient to start reading out of themigrateFromStatus
. The donor then sleeps and runs_recvChunkStatus
in a loop until the recipient says its state isSTEADY
. We also watch how much memory is being used bymigrateFromStatus
and if it exceeds 500MB then we abort the migration. -
On the recipient, when
_recvChunkStart
is called, we set upmigrateStatus
and then kick off amigrateThread
. This does the following:-
Create the collection and any indexes, if necessary, to the recipient, by querying
<dbname>.system.namespaces
and<dbname>.system.indexes
. -
Delete any data in the chunk's range that was accidentally there (maybe because of an earlier failed migration).
-
Run the
_migrateClone
command on the donor, which returns a list of objects (viamigrateFromStatus::clone()
) to upsert. Then upsert all these objects. Repeat_migrateClone
until no objects are returned. -
Now, start running
_transferMods
repeatedly. This returns lists of deleted and updated objects (logged bylogOpForSharding
intomigrateFromStatus
as described above) to be applied. The "deleted" list is just ids to delete, and the "reload" list contains objects to insert (well, upsert), and the post-images of updates. We then apply these (in batches returned by_transferMods
), and then wait for a majority of slaves to catch up on replication to that batch, then start the next batch, until_transferMods
returns nothing. This brings us into stateSTEADY
. -
We continue running
_transferMods
and waiting for replication to catch up until we see that the donor has told us to enter theCOMMIT_START
state, so we'll return to see what the donor does when it sees that we're in theSTEADY
state.
-
-
When the donor sees that the recipient is in the
STEADY
state, it commits the migration:-
It first enters a critical section in the
migrateFromStatus
, which has something to do withsetShardVersion
. TODO: figure this out. -
We take a
DBWrite
lock and "forget" that we own that chunk withshardingState.donateChunk()
(but we haven't told the config server yet, this will cause any more modifications from a mongos to get writebacks, which may keep hitting us for a while until we actually update the config server). Remember that the recipient is still polling us with_transferMods
so after we forget about the chunk, the next_transferMods
command that returns nothing means that we and the recipient are in sync. -
We call
_recvChunkCommit
on the recipient. This brings us to theCOMMIT_START
state and waits up to 30 seconds for the commit to complete before returning. The migrate thread on the recipient sees this, finishes running_transferMods
until it returns nothing (which means we're in sync), does one final wait for replication to a majority followed by a journal commit, then sets its state toDONE
. This allows_recvChunkCommit
to return successfully to the donor. (If it fails, we undo the chunk donation, to allow the writebacks we've sent to come back and complete on the donor, and abort the migration.) -
Finally, the donor updates the config servers in a transaction to bump the version number and update the shard where the chunk is located. There's some amount of extra waiting and querying that can happen to make sure the config server update goes through, we won't need it. If any two config servers have information that isn't in sync, the mongos will fail to start and you have to get them back in sync manually (https://jira.mongodb.org/browse/SERVER-5414). So we probably don't have to do anything special with 2PC here.
-
Now we exit the critical section. Again, TODO: figure this out.
-
-
Finally, we wait for all the cursors within the chunk to exit, and then delete all the data from the donor within the bounds of the chunk, maybe on a background thread. There's a lot of sleeping, waiting for replication, and doing things in small batches going on here that we can probably avoid doing in TokuMX. We also can just go ahead and do the deletions since any cursors should be using snapshot or serializable transactions anyway.
The process for TokuMX is mostly the same. The major difference is that our logging is different due to transactions, and that we're faster and more concurrent so we shouldn't need a lot of their "sleep a lot and wait a lot" stuff to lower the impact of migrations.
First, assume we don't support multi-statement transactions on a sharded
collection. It may be easier for now to assume we don't support
multi-statement transactions through a mongos at all, so let's just do
that. This gives us the property that all modifications are done within
the scope of a transaction which is entirely protected by at least a
DBRead
lock.
Since we take a DBWrite
lock in order to call donateChunk
, this gives
us a point in time when nobody else is writing to the collection (and
after this point, anyone that tries to will get a writeback). As long as
we only call logOpForSharding
on committed data, anything that's going
to commit must do so before donateChunk
and nothing can happen after, so
this is a consistent snapshot.
So to handle logOpForSharding
and _transferMods
, all we need to do is
hook up the logging framework that logs on root transaction commit to
migrateFromStatus::logOp()
and make sure it logs things in the right
format, and also get migrateStatus::apply()
in sync with those changes.