Shuffle Free Joins in Spark SQL

As I’ve mentioned my previous post on shuffles, shuffles in Spark can be a source of huge slowdowns but for a lot of operations, such as joins, they’re necessary to do the computation. Or are they?

Yes, they are. But you can exercise some more control over your queries and ensure that they only occur once if you know you’re going to be performing the same shuffle/join over and over again. We’ll briefly explore how the Catalyst query planner takes advantage of knowledge of distributions to ensure shuffles are only performed when necessary. As you’ll see below we can actually improve performance in these cases to the point that joins can be done in linear time!

Old-school vanilla Join

Consider the following example:

case class Data1(key: String, value1: Int)
case class Data2(key2: String, value2: Int)

val data1s = Range(0, 1000000).map(x => Data1(s"key${x}", x))
val data2s = Range(0, 1000000).map(x => Data2(s"key${x}", x))

val df1 = sc.parallelize(data1s).toDF.cache()
val df2 = sc.parallelize(data2s).toDF.cache()
df1.count
df2.count
val joined = df1.join(df2, df1("key") === df2("key2"))
joined.count
joined.explain

It’s a fairly naive example but what we’re doing is creating two dataframes with 100,000 rows and joining them on a string key. In order to ensure that all matching keys reside on the same partition, Spark can perform a few operations:

  • If one/both of the tables are small enough, one of the DataFrames can be broadcast to have a copy on every executor and you can iterate over the other partitions in parallel checking for matches in the broadcasted DataFrame.
  • If they’re too large, a shuffle will occur to ensure the keys are colocated and then each partition will be joined one at a time, resulting in a new joined DataFrame.

Looking at the query for the above code we see the following:

== Physical Plan ==
SortMergeJoin [key#0], [key2#2]
:- Sort [key#0 ASC], false, 0
:  +- TungstenExchange hashpartitioning(key#0,20), None
:     +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
+- Sort [key2#2 ASC], false, 0
   +- TungstenExchange hashpartitioning(key2#2,20), None
      +- InMemoryColumnarTableScan [key2#2,value2#3], InMemoryRelation [key2#2,value2#3], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None

This means our DataFrame is too large to be broadcasted and so it chose to perform a SortMergeJoin.

To ensure the SortMergeJoin performs properly, each partition must contain the same keys and each must be sorted so that we can perform the join in linear time. To enforce this, we can see that the physical plan contains a TungstenExchange followed by a Sort within partitions (the 2nd argument tells us it’s within partition as opposed to global) for each DataFrame before the join.

Shuffles Be Gone!

Now suppose we took this knowledge and shifted the shuffle and the sort within each partition to be done before our cache call using “repartition” and “sortWithinPartitions”:

val partitioned1 = df1.repartition(df1("key")).sortWithinPartitions(df1("key")).cache()
val partitioned2 = df2.repartition(df2("key2")).sortWithinPartitions(df2("key2")).cache()
partitioned1.count
partitioned2.count

val ps_joined = partitioned1.join(partitioned2, partitioned1("key") === partitioned2("key2"))
ps_joined.count

The query plan would look something like the following:

== Physical Plan ==
SortMergeJoin [key#0], [key2#2]
:- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
+- InMemoryColumnarTableScan [key2#2,value2#3], InMemoryRelation [key2#2,value2#3], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key2#2 ASC], false, 0, None

Awesome! So, now our join is about as computationally complex as a map operation. We have to pay the cost for the shuffle up front when we’re caching the DataFrames but we could apply the join subsequent times for a much much lower cost than the original query.

Caveats

Following the above workflow showed us that we are performing a SortMergeJoin under the hood and so we were able to optimize to that workflow. In practice, your joins may not actually be doing a SortMergeJoin and just because your data is pre-shuffled to optimize for that, Spark won’t automatically choose that plan for you.

There are a variety of physical joins and the one that gets chosen is subject to a series of strategies within Catalyst to determine which is the best for your query plan (for more details on how strategies work in general, check out our Query Planning post). Before optimizing your joins, check out the physical plan that gets generated by running “explain” on your joined DataFrame.

 

3 thoughts on “Shuffle Free Joins in Spark SQL

    1. hkothari Post author

      That’s a good question.

      They’re definitely co-partitioned. If you partition by the columns you’re joining on, Spark will create a HashPartitioner with “spark.sql.shuffle.partitions” as the number of partitions and it will hash partition them in a matching distribution. This holds true if you explicitly specify the number of partitions as long as you make sure you use the same number of partitions in the repartition argument of the two DataFrames.

      Regarding co-location: I spent some time looking into the code and it’s not immediately clear whether that is the case. I don’t think Spark explicitly guarantees that they will be co-located. That being said, it appears that the real benefit here is that it doesn’t need to insert an Exchange before the join because the partitioning is already done so the partition dependencies for the join are known beforehand and very straightforward.

      Reply

Leave a Reply

Your email address will not be published. Required fields are marked *