The downside of that is that I quickly found that once I got a few hundred million documents loaded, the performance not only dropped off quite a bit (expected) but also became highly variable (less expected). In fact, it got so bad that I started to worry how many months (not weeks or days) the whole process might take.
After some poking around and reading on the mailing list, I realized that The Balancer was the culprit. To really understand what’s going on, you first need to read about:
If you’re lazy, I can explain without you having to click at all.
You see, MongoDB’s sharding infrastructure groups documents into logical “chunks” which are capped at 200MB by default. Chunks are identified by a range of shard keys. Each chunk is assigned to one shard. So if you’re sharding by a key like PostingID (in our case), you end up with a bunch of chunks that look logically like this:
chunk01 (PostingID 10024 -> 20098) lives on shard01 chunk02 (PostingID 20099 -> 30031) lives on shard02 chunk03 (PostingID 30032 -> 30085) lives on shard03
And so on.
Again, MongoDB groups documents into chunks of roughly equal size and maps those chunks to shards. As time goes on, it tries to notice imbalances such as a single shard having more chunks than the others. It then tries to correct these imbalances by moving chunks from the more populated shard(s) to the less populated shard(s). That is the job of the balancer.
In normal operation, this all works pretty well–especially if your data set fits mostly in RAM. However, our case is different in two important ways.
First off, this is a one-time data import. Our goal is to get it done as quickly as possible. We want to write, write, and write more. There are no reads going on.
Secondly, the size of data is expected to far exceed the available RAM on each of the shard servers. Once the migration is done, this is going to be a lightly used deployment most of the time, so that’s OK.
Since MongoDB uses memory-mapped files for all of its data and indexes, what you expect to see is that eventually all available RAM is used up and the kernel eventually needs to start writing dirty pages out to disk so that RAM can be freed up for new data. (This all assumes you’re not pre-emptively making fsync requests.)
If that’s all that happened, you might expect performance to take a temporary hit when these flushes happen and then recover back to some baseline value. But we were seeing not only degraded performance but we saw performance take a nose dive, not really recover, and vary quite a bit over time (by a factor of 10 or more).
All writes were initially going to one of the three shards. As the data grew, it was automatically split into chunks. And the aforementioned balancer would eventually sense the need for balancing and start to move chunks from one shard to another.
Many of the chunks the balancer decided to move from the busy shard to a less busy shard contained “older” data that had already been flushed to disk to make room in memory for newer data. That means that the process of migrating those chunks was especially painful, since loading in that older data meant pushing newer data from memory, flushing it to disk, and then reading back the older data only to hand it to another shard and ultimately delete it. All the while newer data is streaming in and adding to the pressure.
That extra I/O and flushing eventually manifest themselves as lower throughput. A lot lower. Needless to say, the situation was not sustainable. At all.
The better way to do this involves pre-splitting the key range, which is described in Splitting Chunks, followed by using the moveChunk command as described in Moving Chunks to assign the various chunks to different shards. (For bonus points, you also disable the balancer after this is done.)
Once that is done, you can load data at a fairly rapid rate without fear of chunk migrations happening between shards. And instead of all writes going to a single shard initially, they can be fairly evenly distributed among all shards. That leads to stable performance and greater sustained throughput overall.
Doing this well assumes that you have a good handle on what you data looks like, including the distribution of shard keys (if they’re not unique), average document size, and so on.
These are the inputs we started with:
- min shard key: 1
- max shard key: 2^31 + 20%
- average doc size: 2,200 bytes
- default chunk size: 200MB
- num shards: 3
The resulting output was:
- 95,325 documents per chunk
- 27,033 total chunks
- 9,011 chunks per shard
It took probably 6 hours to do the pre-splitting and chunk moving, but the resulting performance has been very good so far. We’re easily able to sustain a higher write rate with var less variance than we saw before.
This image shows what I see now:
The occasional dips are the result of a worker process finishing its current file and stopping to decompress the next one before it can continue adding documents to MongoDB.
10gen CTO Eliot tells me that they’re working to implement shard key hashing so that this can happen automatically (if you’re willing to sacrifice range queries based on your shard key). When that is done, loaded documents will be sprayed uniformly across the shards. That should greatly reduce the need for constant balancing to happen during this process.
Since we’re not anticipating range queries that involve our shard key, this would eliminate the need to pre-split and move chunks before importing this type of data in the future. The only question in my mind is what happens when a new shard or two is added. Presumably new documents will start to land on those new shards and the balancer will move chunks to them over time as well.