BloomJoin: BloomFilter + CoGroup

We recently open-sourced a number of internal tools we've built to help our engineers write high-performance Cascading code as the cascading_ext project. Today I'm going to to talk about a tool we use to improve the performance of asymmetric joins—joins where one data set in the join contains significantly more records than the other, or where many of the records in the larger set don't share a common key with the smaller set.

Asymmetric Joins

A common MapReduce use case for us is joining a large dataset with a global set of records against a smaller one—for example, we have a store with billions of transactions keyed by user ID, and want to find all transactions by users who were seen within the last 24 hours.   The standard way to execute this join with map-reduce is to sort each store by user ID, and then use a reducer function which skips transactions unless the user ID is present on both left and right-hand sides of the join (here the global transaction store is the left hand side and the specific set of user IDs is the right hand side):

 (when writing a Cascading job, this is just a CoGroup with an InnerJoin)

This solution works perfectly well, but from a performance standpoint, there's a big problemwe pointlessly sorted the entire store of billions of transactions, even though only a few million of them made it into the output store!   What if we could get rid of irrelevant transactions, so we only bother sorting the ones which will end up in the output?

It turns out we can do this, with the use of a clever and widely-used data-structure: the Bloom Filter.

Bloom Filters

A bloom filter is a compact, probabilistic, representation of a set of objects.  Given an arbitrary object, it is cheap to test whether an object is a member of the original set; a property of bloom filters is that while an object that should be in the set  is never falsely rejected, it is possible for an object which should not be in the set to be accepted anyway.  That is to say, the filter does not allow false negatives, but can allow false positives.

For illustration, say we built a bloom filter out of records {A, E}, and want to test values {A, B, C, D} for membership:

Bloom filter simple

Say values C and D were correctly rejectedthese would be true negatives.  Key A will always be accepted since it was in the original filter, so we call it a true positive.  Key B however we call a false positive, since it was accepted even though it was not in the original set of keys.

A bloom filter works by hashing the keys that are inserted into the filter several times and marking the corresponding slots in a bit array.  When testing objects for membership in the filter, the same hash functions are applied to the items being tested.  The corresponding bits are then checked in the array:

As seen above, there are three possibilities when those bits are checked:

  • The item was actually in the original set and is accepted (object A)
  • The item was not in the original set, the corresponding bits are not set in the array, and it is rejected (object C)
  • The item was not in the original set, but the corresponding bits were set anyway, and it is accepted (object B)

Without going deep into the math behind bloom filters, there are four variables to consider when building a bloom filter: the number of keys, false positive rate, size of the filter, and number of hash functions used.  For our MapReduce use case, "size of filter" is a fixed quantity; we're going to build a bloom filter as large as we can fit the memory of a map task.  We also have a fixed number of keys to insert in the filter (for a particular join, we're building a bloom filter out of all keys on one side of the join).

Given a fixed number of keys and bits, we can choose a number of hash functions which minimizes our false positive rate.  For more details, check out the wiki article.

Putting it together

So how can a bloom filter improve the performance of our map-reduce job?   When joining two stores, before performing the reduce itself, we can use the bloom filter to filter out most of the records which would not have matched anything in the other side of the join:

Unlike the previous time, when we had to sort all the left hand side records before joining with the right side, we only have to sort the true positives {A}, as well as the false positives {B}.

BloomJoin

We've put all of this together into our open-source cascading_ext library on Github, as the sub-assemblies BloomJoin and BloomFilter.  The BloomJoin assembly closely mimics the CoGroup interface, but behind the scenes, we first use a bloom filter to filter the left-hand side pipe before performing a CoGroup.  If you use the CascadingHelper utility in the cascading_ext library, the assembly will work on its own: 

If using a raw HadoopFlowConnector, a FlowStepStrategy along with some properties will need to be attached to the flow:

The classes BloomJoinExample and BloomJoinExampleWithoutCascadingUtil have some more examples which should work on your cluster out-of-the-box.

Performance

 We measured the performance benefits of BloomJoin by running a benchmark job on our own data. For a sample dataset we took all request logs for one of our services, where each log entry contains a unique user ID. Each user ID may have many log entries in a single day, and is not necessarily seen every day. The job we ran took all logs from a particlar day (2012/12/26), and found all log entries from the previous two months associated with each of the user IDs from that day:

Logs in Full dataset (2 months of logs) 5,531,090,779
Logs from 12/26 134,035,584
Logs from full dataset matching users from 12/26 1,206,247,339 (21.8%)

We implemented this join two waysfirst with a standard CoGroup using an InnerJoin, and second with our BloomFilter assembly:

Metric CoGroup Bloom Join % Of original cost
Map output records 5,531,090,779 1,554,714,936 28.1
Map output bytes 3,145,258,140,017 788,744,906,949 25.1
Reduce shuffle bytes 806,870,037,807 177,418,598,374 22.0
Total time spent by all maps in occupied slots (ms) 620,127,159 288,005,428 46.4
Total time spent by all reduces in occupied slots (ms) 351,864,211 218,623,410 62.1
CPU time spent (ms) 1,069,814,490 527,423,620 49.3

We can see that filtering the map output for relevant records cuts our map output by 75% (for this input, even an optimal filter would only cut our  output by 78%), and correspondingly cuts shuffle bytes and CPU time.  Note that reduce time was only cut by 38% since copying data from mappers to reducers has some fixed costs per reduce task, and both jobs ran with the same number of reduces on this test.

Overall, using a bloom filter cut the cost of the job on our Hadoop cluster by around 50%a huge win.

Parameters to Tweak

There are a number of parameters available to tweak, depending on the the kind of data being processed, and the machines doing the processing.   The defaults should generally work fine, except for NUM_BLOOM_BITS, which will vary depending on the resources available to TaskTrackers on your cluster:

  • BloomProps.NUM_BLOOM_BITS:  The size of the bloom filter in-memory.  The parameter defaults to 300MB, but can be adjusted up or down depending on the free space on your map tasks.
  • BloomProps.NUM_SPLITS:  The number of parts to split the bloom filter into.   This determines the number of reduce tasks used when constructing the filter.

Feedback Welcome!

Any feature requests, suggestions or bug reports are really welcomewe'd love for these tools to be as accessible as possible.  Drop us a comment or open a pull request on the project page!

Share Button

11 Responses to “BloomJoin: BloomFilter + CoGroup”

  1. jaideep Apr 3, 2013 at 6:56 pm #

    Very interesting. Just a clarification, is the join is still done on the reduce side?
    This would be very useful in self joining with fact tables.

    • Ben Podgursky Apr 3, 2013 at 7:20 pm #

      Yep, the final join is still done on the reduce side, just with (hopefully) a lot less data from the side we are filtering.

      Interesting, I'm not very familiar with fact tables, I'll have to look into that...

  2. AN Apr 4, 2013 at 12:20 am #

    This is awesome! Question though. How does this compare to HashJoins or Map side join techniques since I think reducer side joins are a performance bottleneck? Thanks!

    • Ben Podgursky Apr 4, 2013 at 9:59 am #

      Yeah, the use-case for this is when there's no alternative to a reduce-side join--when your key set is too big for a HashJoin, and you are joining on a key which your store isn't sorted by (for us, this is basically any case when we were forced to do a CoGroup in Cascading, so we built this as a drop-in replacement.)

      If you're able to structure your data so you can do a join on the map side instead, that's definitely preferable in general.

  3. Sam Bessalah Apr 4, 2013 at 12:41 am #

    Great post, and excellent use of Bloom Filters.
    Just wondering though, what hash functions are you using, and how many of them for a default Bloom filter.
    Thanks.

    • Ben Podgursky Apr 4, 2013 at 10:11 am #

      We use the murmur 64 hash function.

      We calculate the optimal number of hash functions given the key cardinality and number of bits in memory, but by default cap the number of functions at 4. We do this for performance reasons--when you do the math, it turns out that for most normal data sets, the small false-positive rate decrease isn't worth the extra CPU cost--for example, on one data set, dropping the number of hashes from the optimal 15 to 4 only increases the false-positive rate from .000042 to .0009, but cuts hashing cost by 75%.

      Of course, results could differ a bit on your data, so the caps are tweakable via the BloomProps.MAX_BLOOM_HASHES and BloomProps.MIN_BLOOM_HASHES params.

  4. Jesse Anderson Apr 4, 2013 at 4:44 pm #

    I love Blooms and I think they can be used more often than we think. My GitHub (https://github.com/eljefe6a) has several examples where I used Blooms to speed up the algorithm.

  5. Hikmat Dhamee Jun 2, 2013 at 9:32 pm #

    Actually, I am joining two files of each 1GB using BloomJoin but the performance is too slow then CoGroup. I am using default settings for BloomJoin parameter and running hadoop in single computer....
    what shoud be the appropriate configuration for BloomJoin...

    • Ben Podgursky Jun 4, 2013 at 9:02 am #

      Joining two files of 1GB each on a single computer isn't really the right use-case for BloomJoin for several reasons--

      First, with 1GB of data you'd probably be best off using HashJoin, since you can probably fit one or both sides of the join into memory, which will outperform either CoGroup or BloomJoin.

      Second, BloomJoin is designed for asymmetrical joins where there isn't a large intersection between the large set and the smaller set. If there isn't a large overlap between the two sets, the overhead of building a bloom filter will outweigh the benefits.

      Third, part of the performance benefits of doing map-side filtering is that the data doesn't need to be transferred to the reducers before being discarded. If your data is all on a single machine, you aren't going to be doing network traffic whether it's a CoGroup or BloomJoin, so you won't see some of the benefits until you scale to more machines.

      Hope that helps clarify things a bit.

      • Hikmat Dhamee Nov 11, 2013 at 8:52 am #

        I agree with you. it helpd me on understanding BloomJoin more clearly
        Thansks for great idea.

  6. Hikmat Dhamee Jun 2, 2013 at 9:41 pm #

    what should be the values of BloomProps properties for joining
    large data set...

Leave a Reply