In the Code: Spark SQL Query Planning and Execution

If you’ve dug around into the Spark SQL code, whether it’s in catalyst, or the core code, you’ve probably seen tons of references to Logical and Physical plans. These are the core units of query planning, a common concept in database programming. In this post we’re going to go over what these are at a high level and then how they’re represented and used in Spark. The query planning code is especially important because it determines how your SparkSQL query gets turned into actual RDD primitive calls and has large implications on performance of applications.

This document is based on the Spark master branch as of early Februrary. It assumes that you’ve got a cursory understanding of the Spark RDD API, and some of the lower level concepts such as partitioning and shuffling (what it is, when it takes place).

For higher level coverage of what we’re going over, I would recommend the Spark SQL whitepaper that was published last year. The content it covers is pretty similar, but we’ll be diving a little deeper with examples from the code and more explanation on how execution actually takes place rather than focusing on features.

Introduction to Query Planning

There are two main types of Plans in Spark: Logical plans and Physical plans. These are a staple of database programming and you’ll find them in roughly every database you look at.

A logical plan perhaps from df1.join("df1.col1 == df.col2", df2.where("col3 == 5"))

A logical plan perhaps from df1.join(“df1.col1 == df.col2”, df2.where(“col3 == 5”))

A logical plan represents a query as a series of algebraic or language constructs. In SQL, this could be things like select, group by, union, etc. Usually these are represented as a tree, where each leaf represents a data item and it can be evaluated by a post-order traversal.

A physical plan generated from the above Logical plan. Note how these operations all correspond to specific RDD operations/workflows.

A physical plan generated from the above Logical plan. Note how these operations all correspond to specific RDD operations/workflows.

The physical plan is another representation of the same query but now represented in terms of lower level constructs. In general you can think of this as actual instructions to represent the execution of part of that plan. Depending on the database these can be implemented as primitives for file/network reads, computations or other actual logic for evaluation. For one individual LogicalPlan there can be multiple corresponding physical plans. So, to determine the one to actually use the query planner will generally try to estimate the “cost” of the plan based on metrics we may be able to estimate before hand and choosing the one with the lowest cost. Again there’s a rich set of metrics that can be used here, for example: using metrics that may be collected before hand on a table (eg. table statistics, indexes, etc). Later on we’ll get into an example of where this can be useful.

To start looking at the Spark code for plans, we can ground ourselves in the QueryPlan.scala file in sql/catalyst. This is the highest level interface which is used to specify our Query Plans (both Logical and Physical). Direct descendants of this class are LogicalPlan which specifies our logical plans and SparkPlan which specifies Spark’s implementation PhysicalPlan.

Just to recap we have the following ideas. which we will be referencing through the rest of this guide:

  • Query Plan – A set of operations which specify the query. Usually can be represented as a Tree/DAG.
    • Logical Plan – A high level plan specified in terms of the query language constructs.
    • Physical Plan – An equivalent lower level plan specified in terms of low level execution primitives.
  • Query Planner – Engine which creates a Physical Plan based on Logical Plan.
  • Strategy – A method of finding equivalent PhysicalPlan(s) given a LogicalPlan. These can be ranked by “cost” of execution.

The workflow of running a query end to end looks something like the following:

  1. Query Parsing – Creating the LogicalPlan from the provided DataFrame or SQL string.
  2. Query Planning – Creating the PhysicalPlan from the provided LogicalPlan using the QueryPlanner.
    • Code Generation – Generating native bytecode in some cases if the workflows are generic enough.
  3. Query Execution – Executing the PhysicalPlan

All of the above details are rather general to query languages and most databases, not just Spark. Now we’re going to go into detail about how this is implemented in Spark and specific nuances to this implementation.

Logical Plans

Looking at the LogicalPlan class and it’s children we have the basic things that make up a query language: Binary, Unary and Leaf nodes. Leaf nodes are the ones that provide our data and thus we find the implemnetations are various types of Relations aka tables or other queries (eg. LogicalRelation/MetastoreRelation, ComplexPlan). Unary nodes specify transformations on top of a single node, so you can think of this as projections (“select”), filters, limits, etc. Finally binary nodes specify operations on top of two other nodes, such as unions or joins. Of course these three classes do not solely represent the components of a logical plan, you could imagine a trinary node, etc, and so there are also a few other direct descendants of LogicalPlan which perform other more complex operations (eg. DescribeFunction, InsertIntoTable, etc).

An important thing to note if you look at the implementation of these classes is that there isn’t any computation logic contained within them. This is because, as we mentioned, the LogicalPlan is a tree representation of a query which means that all of these structures are just used for decomposing our query into more consumable atomic components which can be handled by our actual execution engine. Because of this, these classes aren’t terribly exciting but looking at all of them you can get a catalog of what kind of features are offered from a querying perspective. Here’s a subset of them that might help cement this idea for you:

I’d strongly recommend taking a look at a few of these classes. They’re all fairly lightweight because, as we’ve mentioned before, they’re just representing the Query itself, none of the logic for computing the query lives within a LogicalPlan. Even an operation like Aggregate is represented with less than 20 lines of code in the LogicalPlan. RunnableCommands also do seem to have their computation logic within them, even though they’re a LogicalPlan, but this is more of a special case so I wouldn’t place much weight on them.

Physical Plans

A Physical plan is an execution oriented plan usually expressed in terms of lower level primitives. The implementation of a Physical plan in Spark is a SparkPlan and upon examining it should be no surprise to you that the lower level primitives that are used are that of the RDD. Using RDD primitives allows Spark to reuse a lot of the lower level infrastructure that was already implemented for RDDs and allows the QueryPlanner to focus on optimizing a QueryPlan by only changing the order/parameters of the Spark primitives which are all already heavily optimized for each of their individual functions.

With this in mind, we can conduct a similar exercise as we did with the LogicalPlan and we’ll see that there are once again our friends LeafNode, UnaryNode and BinaryNode extending SparkPlan. These have similar children: LeafNode -> PhysicalRDD, InMemoryColumnarTableScan, etc; UnaryNode -> Project, various limits, etc; BinaryNode -> BroadcastHashJoin, CoGroup, etc. I won’t enumerate all of these but you can find most of the classes within the org.apache.spark.sql.execution package.

The abstract notable methods of a SparkPlan are as follows:

  • doPrepare(): Unit – Allows a certain operation to ensure that before the query is actually executed, some setup code is run. An example of this is BroadcastHashJoin where you may want to broadcast the result of one side of the join before performing the actual computation of the join.
  • doExecute(): RDD[InternalRow]

We’ll see later on that although all of these operations use the RDD primitives (via the doExecute method), an alternative to these is also becoming available via. code generation (aka. Tungsten).

Query Planning (a.k.a LogicalPlan -> PhysicalPlan)

Now that we’re familiar with the both Logical and Physical plans within Spark, the next question that’s likely to come up is how one converts from a LogicalPlan -> PhysicalPlan. This is the job of the Query Planner. In Spark the QueryPlanner is an abstract class with one defined method: plan(LogicalPlan): PhysicalPlan. The way it performs this planning this is through applying a series of Strategies provided by the implementation class. A Strategy takes in a LogicalPlan and returns a list of PhysicalPlans, any of which would be a valid execution of the logical plan. In theory this allows you to perform cost estimation of all possible plans and choose the lowest cost one before going and executing it. In practice, we’ll see with Query execution that only the first one is used currently.

The implementations of QueryPlanner as as follows: QueryPlanner -> SparkStrategies -> SparkPlanner. The SparkStrategies class is mostly just a container for all of the strategies which are used by SparkPlanner. SparkPlanner is the acutal implementation of QueryPlanner which converts LogicalPlans to SparkPlans.

It’s also worth noting that HiveContext also contains an additional QueryPlanner which extends SparkPlanner and adds on HiveTableScan among other Hive specific rules. This is how HiveContext is able to perform things like partition pruning using analysis stored within the HiveContext (obtained from Hive analyze tables) and how it’s able to handle hive’s DDL (Data Description Language) syntax for creating tables in the metastore, etc.

Query Execution

After query planning takes place, there is the final stage of query execution. Once again, another level of rules is applied here to the planned query (now an instance of SparkPlan instead of LogicalPlan). These differ from Strategies because their job is not to change a LogicalPlan to a PhysicalPlan, but instead to alter a SparkPlan to ensure that it can be executed both successfully and optimally.

The two rules that you’ll see applied to SparkPlans within the standard SQL context are:

  1. Add Exchange (see: EnsureRequirements.class)
  2. WholeStageCodeGen (see: CollapseCodeGenStages.class)

Exchange

The first step of “Adding Exchange” is basically how catalyst figures out when to shuffle data during the execution of your plan. Generally with RDD calls, some steps are explicitly shuffle inducing (eg. groupByKey, etc.). In SparkSQL to figure out how much code generation can be done as well as to identify any opportunities where shuffling can be avoided we have to explicitly keep track of places where shuffles are required and what kind of key distribution these operations expect post-shuffle. For an example of a possible shuffle optimization consider when you have a partitioned data source and you’re joining with an unpartitioned one, you can greatly increase performance if you’re joining on the partitioned key by just shuffling the second dataset to match the first one rather than choosing a standard partitioning scheme and shuffling them both.

It takes each transformation in the SparkPlan and checks to see if it has a required input distribution of partitions. The most lax distribution is AllTuples which basically says that the operation can be performed anywhere and no operation is required. Then there are other distributions, such as ClusterDistributions which expect clusters (eg. keys) to be grouped together and SortedDistributions. In situations where the output distribution doesn’t match the input distribution, an Exchange is inserted and during execution a shuffle is performed.

This part is arguably the most complex part of the Spark SQL code, and I’m unfortunately not going to get too deep into the weeds here but if you’re interested you can look more into Exchange.scala and ExchangeCoordinator.scala.

Code Generation (a.k.a Tungsten)

The next step is one of the most promising performance features coming into Spark 2.0 which is WholeStageCode generation. Earlier we mentioned that our SparkPlan comprises of Spark RDD primitives. This allows us to compose each of these primitives and for each row evaluate each of the primitives on each row, shuffling intermittently as required. One of the downsides of this is that is there is a lot of additional overhead (method calls, memory allocations/de-allocations from creating new rows). You can think of each RDD call as composing an additional iterator on top of the previous RDD row iterator. Consider the following code:

rdd.filter(x => x(0) == 1 || x(1) == 2).filter(x => x(3) < 2).count()

The two filters are iterators composed on top of one another on top of the underlying RDD. The count is an additional iterator on top of it (for the groupBy), plus the actual aggregation iterator. This results in several method calls to each of the iterators for every single row’s computation. This is what’s known as the Volcano method of query execution and it’s been around since the mid 90’s. The problem is that if you have billions of rows this can end up being a consequential amount of time wasted for these very simple operations because of all the method calls. On top of this, we create new row objects for each successive iterator, so tons of memory and time is wasted through garbage collecting these short lived intermediate iterators.

To deal with this fact, we can instead rewrite the example query above as one iterator which takes in a row and does the following:

int count = 0;
for (Row r : rows) {
   if ((r[0] == 1 || r[1] == 2) && r[3] < 2) {
       count++;
   }
}
return new Row(count)

The above code only creates one row object per output row, and doesn’t use any extraneous method calls. This allows the compiler/processor optimizations kick in and make this even faster through things like branch prediction and other techniques which processors can make use of.

The process of converting the SparkPlan to native Java code rather than RDD primitives is knowm as CodeGeneration and turning an entire all operation between a shuffle into one code block is known as WholeStageCodeGen which is coming in Spark 2.0.

The way this works is as follows:

  1. CollapseCodeGenStages.apply() is invoked on a processed SparkPlan
  2. We check to see if the SparkPlan supports CodeGen. If it does, we transform the plan to a WholeStageCodeGen object, otherwise we try to break up the Plan recursively by checking to see if any of its children support CodeGen.
  3. Return the collapsedSparkPlan which is now either a WholeStageCodeGen or the same plan with some of it’s children recursively been collapsed to be WholeStageCodeGen plans.
  4. When doExecute() is called on the WholeStageCodeGen object Spark creates java code for all of the operations that were collapsed into it (as we did above) and then it compiles that code using Janino, and then calls mapPartitions() with the generated class that uses bytecode for execution of the operations.
  5. These WholeStageCodeGen plans can sit within our tree as a normal stage would, it’s just a much more dense set of operations so execution otherwise proceeds as follows.

This ends up being an order of magnitude or so faster than the typical iterator model.

Conclusion

We’ve just walked through pretty much all the steps of Query planning and Query execution which take place in Spark SQL as of the current code base. If you’re more interested in this subject, there are likely going to be some big changes to the SQL infrastructure to support the Datasets API as well as a DataFrame streaming API so you can continue to watch the codebase and JIRA as well as the dev lists to see more of this.

If you found what you read here valuable, please come back and visit again and check out the other guides I’ll be posting. Next up I’ll be exploring how Spark uses on/off heap memory for things like cached tables, temporary storage during execution, shuffle, etc.

References:

  1. Spark SQL Paper: https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf
  2. Spark Source Code: http://github.com/apache/spark (The current state of master when I wrote this can be found here: https://github.com/apache/spark/tree/892b2dd6dd00d2088c75ab3c8443e8b3e44e5803)
  3. Hyunjung Park’s Query Planning Notes: http://infolab.stanford.edu/~hyunjung/cs346/qpnotes.html
  4. Volcano – Extensible and Parallel Query Evaluation System :http://paperhub.s3.amazonaws.com/dace52a42c07f7f8348b08dc2b186061.pdf

Leave a Reply

Your email address will not be published. Required fields are marked *