-
Notifications
You must be signed in to change notification settings - Fork 97
Sharding
This wiki describes how MongoDB sharding works, its limitations in TokuMX, and alternative proposals.
THIS IS A WORK VERY MUCH IN PROGRESS.
A MongoDB sharded cluster is a distributed system with the following components:
- Shards: A shard is a single mongod or a replica set. There are many in a cluster. Shards store user data in collections.
-
Config servers:
Either one or three mongods.
There is only one config server (or set of three) in the cluster.
Stores sharding metadata:- chunk boundaries and shards
- distributed locks
- changelogs
- etc.
- Routers: A mongos process. There are many in a cluster. Acts as a proxy for clients. Routes queries to the shard(s) that can service them. Also runs some background tasks, like migrations. Only one mongos will be in charge of this at a time (uses distributed locks).
- Clients: These are mongo shell processes, or driver clients. They connect to the routers and send normal operations there.
The following concepts apply to sharding:
- Shard key: Each sharded collection has an associated "shard key", which is an index key with certain restrictions. Each sharded collection also has a "shard index", which must be on a key of which the shard key is a prefix. Usually it's exactly the shard key.
- Chunks: A sharded collection has an associated metadata known as "chunks". The set of chunks is a disjoint partition of the key space into ranges. Each chunk has an associated shard, which is where any collection data within the chunk's range can be found.
TODO: more?
TODO: describe shard versions, chunk managers, configs etc. etc.
The most important metadata is the chunk information (what chunks exist and which shard is responsible for each chunk).
There is a ShardChunkVersion
that tracks updates to this information.
The ShardChunkVersion
is incremented each time the chunk information is updated, and the new version accompanies the chunk info on the config server.
Each shard maintains the ShardChunkVersion
for the most recent change that affects it (the shard).
Each router caches a copy of the config in memory so it doesn't need to do frequent reads from the config servers.
But when a router routes a query to a shard, it also provides its most recent ShardChunkVersion
so that the shard can reject the query if the router has a config that is too old.
When this happens, the router queries the config servers for the most recent data, and retries the query, possibly on a different shard if a migration just occurred.
There are two operations that are performed on chunks: split and migrate.
When a collection is initially sharded, it undergoes a "multi-split" which creates many chunks from existing data.
During normal operation, chunks are split by the router in Chunk::splitIfShould
, which is called at the end of each insert or update operation.
This does some cheap heuristic checks, and then if necessary does queries to find out if a chunk needs to be split (has more than 64MB of data).
If so, it first calculates the split point (Chunk::singleSplit
) using the splitVector
command on the shard, then calls the splitChunk
command on the shard containing the original chunk.
The shard then does some checking, takes a distributed lock, and updates the config servers with the new chunk information, replacing the one original chunk with two new chunks.
Next, there is a sequential insertion optimization where if one of the resulting chunks has only one document, it recommends that the router migrate the new chunk by returning shouldMigrate
with the single-document chunk's boundaries.
If the router receives shouldMigrate
, it calls moveChunk
which induces a migration of the new chunk to a new shard.
Since the chunk is small, this is cheap.
Otherwise, the split is finished.
-
If the heuristics in
Chunk::splitIfShould
don't filter out much work, we can end up callingsplitVector
many times without actually splitting anything. (If we callsplitVector
and do split a chunk, we can charge the cost ofsplitVector
to all the insert/update work.) On a non-clustering shard key,splitVector
is very expensive, because it basically needs to do a range query over the shard key, which is expensive for a non-clustering key.
Therefore, we have strongly recommended that users use clustering shard keys (it helps migrations too). But if this ends up being something users are unwilling to do, we may need to store better metadata or something to strengthen the heuristic filters. -
For some reason, the router can't update the config servers itself, and instead delegates that work to the shard. I think the reason is that the shard must always know about the most recent shard version that affects it, so that it can send
StaleConfigException
s back to routers.
Every router has a background process called a Balancer
that periodically wakes up and tries to migrate a chunk to balance things across shards.
When the Balancer
wakes up, it tries to take a distributed lock to make sure there is only one Balancer
working in the cluster at a time.
If it fails to acquire the lock, it sleeps and tries again later.
At a high level, the balancer looks at the two shards that have the most and least chunks respectively.
If the number of chunks on each differs by more than the migration threshold, the balancer moves one chunk from the most to the least heavily loaded shard.
The choice of which chunk to move is complected with tags and other policy decisions, but we can treat this choice as arbitrary for now.
It should be a quick read some day (src/mongo/s/balancer_policy.cpp).
To migrate the chunk, the Balancer
uses the moveChunk
command on the donor shard.
This is arguably the most complex part of sharding, implemented mostly in src/mongo/s/d_migrate.cpp.
The process is mostly a back-and-forth between the donor ("from") and recipient ("to") shards using a mini-protocol made of commands like _recvChunkStart
.
There is some global-ish state on each shard, in global variables migrateFromStatus
(on the donor) and migrateStatus
(on the recipient).
What follows is a description of the process through time, starting with the initial moveChunk
command:
- [DONOR] In
MoveChunkCommand::run
, we:- Fail if we are already doing a migration.
- Try to get a distributed lock, and fail if we cannot.
(There can be only one migration at a time in the cluster.) - Log a
start
message in the config servers' changelog. - Read the most recently updated chunk from the config servers (
.sort({lastmod: -1})
), and grab itsShardChunkVersion
. We'll use this to calculate the next version that will be used for the config server updates we do later. - We check a few things and bail out if something looks bad.
If not, we decide to accept the move, and we start the migration process by creating a (confusingly named)
MigrateStatusHolder
that will manage the lifetime of the information in the global variablemigrateFromStatus
. This callsMigrateFromStatus::start
, where we:- Zero out a bunch of state in
migrateFromStatus
. - [TokuMX ONLY] Take write locks in order to drop and re-create
local.migratelog.sh
andlocal.migratelogref.sh
.
- Zero out a bunch of state in
- Call
_recvChunkStart
on the recipient shard.
- [RECIPIENT] In
RecvChunkStartCommand::run
, we:
TODO: more