Improving performance of appengine mapreduce using batching

At Khan Academy, we run most of our software on Google's appengine and store our data in Google's cloud datastore (a non-relational key/value-like store). If we need to go back and add a new field to one of our datastore models, we usually run a map[reduce] job (no reduce step required) over the objects in the datastore using the appengine-mapreduce library, adding the new field to each object in the process.

I recently found myself needing to add a new field to ProblemLog, our datastore model recording every problem ever done on Khan Academy. We have well over 3 billion of them in the datastore. I started to run the mapreduce job and estimated that in the best case, it would take a little over 6 weeks. In practice, because of uneven splitting of the objects among different processes, and transient datastore errors that cause partial restarts of the job, it might take much longer (or never finish at all).

One potential source of slowness is that each object is processed one at a time. A single process will make an RPC to fetch the object from the datastore, apply the change that adds the field, and then make another RPC to write the updated version. If the RPCs are rate-limiting, then we could potentially speed things up a lot by batching: fetch N objects in each process in parallel and then write them back in parallel too. Best case, this speeds things up by a factor of N.

I went ahead and implemented this via a new mapreduce InputReader. (In appengine's mapreduce, there's an InputReader hierarchy of classes controlling fetching the data and passing it to the processes doing the mapping.) The standard InputReader I was previously using would pass datastore keys to the mapper processes one at a time. I modified it to pass a lazy python generator that would yield up to N keys per process. (Doing this lazily ensured that if the mapper was interrupted partway through, we didn't miss any keys. I'm not actually sure if this was necessary, but better safe than sorry.) Using this, I could then modify the mapper process to perform the RPCs for fetching and storing the N objects in parallel.

The full code for the batching input reader is freely available. If I get my act together, I may submit a patch to the upstream library at some point.

After deploying the new input reader, I did a bit of casual performance testing. I started a mapreduce over our ProblemLogs with the old input reader as well as the new one with the batch size set to various values. I then let the job run for at least 10 minutes and read out the number of objects processed per second (this rate stabilized by about 5 minutes, so the timing didn't need to be exact; just longer than 5 minutes).

This is a plot of objects processed per second vs batch size. The new batching input reader is shown in the red solid line; the baseline from the old non-batching reader is shown as a blue dashed line.
Objects per second vs batch size

The processing rate plateaued in this test around 100 datastore keys per batch. Unfortunately, this only represents a relatively modest ~3.75x speedup-- far less than the maximum possible N time speedup for a batch size of N, but still considerably more convenient! Of course, the position of the plateau and the relative speedup will vary considerably depending on what the mapper is actually doing, so this may look different for other mapreduce jobs.

In the end, I didn't actually run the complete mapreduce over ProblemLog (it would have been extremely costly, and we're pursuing alternate approaches). The new input reader has stuck around, though, and it's a nice little speedup for RPC-heavy mapreduces of any size.

(Postscript: here's the script that generated the plot.)