MongoDB Pre-Splitting for Faster Data Loading and Importing

Once nice thing about playing with Webdis is that I can watch the import rate of my multi-billion document MongoDB import in nearly real-time.

The downside of that is that I quickly found that once I got a few hundred million documents loaded, the performance not only dropped off quite a bit (expected) but also became highly variable (less expected). In fact, it got so bad that I started to worry how many months (not weeks or days) the whole process might take.


After some poking around and reading on the mailing list, I realized that The Balancer was the culprit. To really understand what’s going on, you first need to read about:

  1. Sharding Administration
  2. Splitting Chunks

If you’re lazy, I can explain without you having to click at all.

You see, MongoDB’s sharding infrastructure groups documents into logical “chunks” which are capped at 200MB by default. Chunks are identified by a range of shard keys. Each chunk is assigned to one shard. So if you’re sharding by a key like PostingID (in our case), you end up with a bunch of chunks that look logically like this:

chunk01 (PostingID 10024 -> 20098) lives on shard01
chunk02 (PostingID 20099 -> 30031) lives on shard02
chunk03 (PostingID 30032 -> 30085) lives on shard03

And so on.

Again, MongoDB groups documents into chunks of roughly equal size and maps those chunks to shards. As time goes on, it tries to notice imbalances such as a single shard having more chunks than the others. It then tries to correct these imbalances by moving chunks from the more populated shard(s) to the less populated shard(s). That is the job of the balancer.

In normal operation, this all works pretty well–especially if your data set fits mostly in RAM. However, our case is different in two important ways.

First off, this is a one-time data import. Our goal is to get it done as quickly as possible. We want to write, write, and write more. There are no reads going on.

Secondly, the size of data is expected to far exceed the available RAM on each of the shard servers. Once the migration is done, this is going to be a lightly used deployment most of the time, so that’s OK.

Since MongoDB uses memory-mapped files for all of its data and indexes, what you expect to see is that eventually all available RAM is used up and the kernel eventually needs to start writing dirty pages out to disk so that RAM can be freed up for new data. (This all assumes you’re not pre-emptively making fsync requests.)

If that’s all that happened, you might expect performance to take a temporary hit when these flushes happen and then recover back to some baseline value. But we were seeing not only degraded performance but we saw performance take a nose dive, not really recover, and vary quite a bit over time (by a factor of 10 or more).

The Problem

After reading on the MongoDB mailing list, double-checking some docs, and looking more closely at logs, I started to see what what happening. This thread was particularly useful.

All writes were initially going to one of the three shards. As the data grew, it was automatically split into chunks. And the aforementioned balancer would eventually sense the need for balancing and start to move chunks from one shard to another.

Many of the chunks the balancer decided to move from the busy shard to a less busy shard contained “older” data that had already been flushed to disk to make room in memory for newer data. That means that the process of migrating those chunks was especially painful, since loading in that older data meant pushing newer data from memory, flushing it to disk, and then reading back the older data only to hand it to another shard and ultimately delete it. All the while newer data is streaming in and adding to the pressure.

That extra I/O and flushing eventually manifest themselves as lower throughput. A lot lower. Needless to say, the situation was not sustainable. At all.

The Solution

The better way to do this involves pre-splitting the key range, which is described in Splitting Chunks, followed by using the moveChunk command as described in Moving Chunks to assign the various chunks to different shards. (For bonus points, you also disable the balancer after this is done.)

Once that is done, you can load data at a fairly rapid rate without fear of chunk migrations happening between shards. And instead of all writes going to a single shard initially, they can be fairly evenly distributed among all shards. That leads to stable performance and greater sustained throughput overall.

Doing this well assumes that you have a good handle on what you data looks like, including the distribution of shard keys (if they’re not unique), average document size, and so on.

Fortunately, we’re using a unique shard key and I have a really good sense of our average document size from earlier testing with MongoDB. That allowed me to build a Perl script that would produce the necessary JavaScript code I could feed to the MongoDB shell (when talking to one of our mongos routing servers) to perform the splitting of chunks and assigning them to different shards in our cluster.

These are the inputs we started with:

  • min shard key: 1
  • max shard key: 2^31 + 20%
  • average doc size: 2,200 bytes
  • default chunk size: 200MB
  • num shards: 3

The resulting output was:

  • 95,325 documents per chunk
  • 27,033 total chunks
  • 9,011 chunks per shard

It took probably 6 hours to do the pre-splitting and chunk moving, but the resulting performance has been very good so far. We’re easily able to sustain a higher write rate with var less variance than we saw before.

This image shows what I see now:

The occasional dips are the result of a worker process finishing its current file and stopping to decompress the next one before it can continue adding documents to MongoDB.

Future Enhancements

10gen CTO Eliot tells me that they’re working to implement shard key hashing so that this can happen automatically (if you’re willing to sacrifice range queries based on your shard key). When that is done, loaded documents will be sprayed uniformly across the shards. That should greatly reduce the need for constant balancing to happen during this process.

Since we’re not anticipating range queries that involve our shard key, this would eliminate the need to pre-split and move chunks before importing this type of data in the future. The only question in my mind is what happens when a new shard or two is added. Presumably new documents will start to land on those new shards and the balancer will move chunks to them over time as well.

About Jeremy Zawodny

I'm a software engineer and pilot. I work at craigslist by day, hacking on various bits of back-end software and data systems. As a pilot, I fly Glastar N97BM, Just AirCraft SuperSTOL N119AM, Bonanza N200TE, and high performance gliders in the northern California and Nevada area. I'm also the original author of "High Performance MySQL" published by O'Reilly Media. I still speak at conferences and user groups on occasion.
This entry was posted in craigslist, mongodb, programming, tech. Bookmark the permalink.

24 Responses to MongoDB Pre-Splitting for Faster Data Loading and Importing

  1. Glad you found thread helpful 🙂

  2. Vladimir Rodionov says:

    I suppose you are going to use MongoDB map – reduce engine for your analytical tasks. I do not think its mature and fast enough to compete with Hadoop + Hive + Mahout combination.

  3. Pingback: links for 2011-03-11 « Bloggitation

  4. Pingback: 用Pre-Splitting提升MongoDB Auto-Sharding效率 - webguo在路上

  5. Pingback: Cheatsheet: 2011 03.07 ~ 03.14 - gOODiDEA.NET

  6. Pingback: 用Pre-Splitting提升MongoDB Auto-Sharding效率 | NeXT

  7. How would you recommend to calculate the average object size?

    Here is some stats() data I think I can use from a previous load of data into mongodb:
    setone: “count” : 2922737, “size” : 638057384, “avgObjSize” : 218.30817620606985, “storageSize” : 891371520

    Do I take the avgObjSize? +-220 bytes
    Do I take the storage / count? 891371520 / 2922737 = +-305 bytes

    I think this figure is pretty crucial for shards ending up close to the default 200MB


  8. Pingback: Optimizing MongoDB: Lessons Learned at Localytics « Localytics Engineering

  9. Pingback: Windows Azure and Cloud Computing Posts for 7/8/2011+ - Windows Azure Blog

  10. Pingback: 记一次MongoDB性能问题 | 火丁笔记

  11. Pingback: 缪贼日志 » 转记一次MongoDB性能问题

  12. Pingback: Document Splitting in SAP New GL #5 The semantics of SAP new GL document splitting process :: Verity Business Solutions Pty Ltd

  13. Pingback: 记一次MongoDB性能问题 - 北极狐博客-专注于互联网-电子商务-SNS-系统架构-DBA

  14. Pingback: webguo在路上 » 用Pre-Splitting提升MongoDB Auto-Sharding效率

  15. Pingback: 记一次MongoDB性能问题【转】 | L先生'Blog

  16. Pingback: Challenge the Conventional Wisdom : MongoSF 2011 | Mandal's Musings

  17. Pingback: Scaling MongoDB – Know your sharding Kung Fu | comSysto Blog

  18. Pingback: 记一次MongoDB性能问题-SuperMan's blog

  19. Pingback: MongoDB常用命令 - IT大道

  20. Pingback: MongoDB使用小结:一些常用操作分享 - 酷笔Blog

  21. Pingback: 记一次MongoDB性能问题 – 冷门数据库

  22. Pingback: MongoDB使用小结 | 大博辞

  23. Pingback: MongoDB uses summaries to share common operations | Develop Paper

  24. Pingback: MongoDB: BIllions of documents in a collection –

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s