Note: This is being written as of early December of 2015 and currently assumes Spark 1.5.2 API. The data sources API has been out for a few versions now but it’s still stabilizing so some of this might change to be out of date.
I’m writing this guide as part of my own exploration on how to write a data source using the Spark SQL Data Sources API. We’ll start with exploration of the interfaces and features, then dive into 2 examples: Parquet and Spark JDBC. Finally we will cover an end to end example of writing your own simple data source.
Data Sources API
What is it and what does it get me?
The Data Sources API at a high level is an API for turning existing data from various underlying sources into Spark DataFrames and, if you’d like, writing Spark DataFrames to data stores. It requires very little from the underlying datastore as you’ll soon see, but provides many interfaces to allow varying degrees of flexiblity with how you load the data and what you allow.
At a slightly lower level, any Data Source implementation implements:
- BaseRelation – Represents a relation or table in an underlying data source from which a DataFrame can be generated. Defines how to generate the schema for that relation.
- TableScan – Represents a way to scan a given relation, generating a RDD[Row] which then turns into the DataFrame.
- RelationProvider – Takes in a list of parameters and returns a base relation. This is basically a factory for creating relations (DataFrames) from user defined parameters.
Usually BaseRelation and TableScan will be implemented together. You can think of this as defining the structure and interpretation of a given dataset together.
By implementing these things you get all kinds of cool stuff:
- You can load RDDs/DataFrames from pretty much wherever you want, specifying how to partition the loading of data across executors.
- You can perform filter pushdown and column pruning which result in huge performance gains for columnar stores, partitioned data sources or places when you have metadata available. (eg. certain SQL databases, parquet, etc)
- You can shift work on to the underlying data store if it can do certain portions of the query more efficiently (see CatalystScan).
Tell me about the code:
To start with the API itself, we can ground ourselves in the following file:
This is the file where most of the data source interfaces are defined and it also contains the abstract classes for HDFS data sources. As we mentioned above, the key interfaces you’d need to implement here are: BaseRelation, RelationProvider, and TableScan but below is a list of interfaces with short descriptions. Using each of those interfaces, you can layer roughly as much functionality as you would like into your data source, ranging from read only, to filter push down to insert support.
- Basic Interfaces:
- DataSourceRegister – A trait for registering a short name for a data source.
- RelationProvider – Exactly what it sounds like.
- BaseRelation – A simple relation which specifies a table, dataset, etc. in an underlying data source.
- TableScan – Specifies a basic full scan for a relation, basically how to return an unfiltered RDD from a relation.
- SchemaRelationProvider – A provider trait which lets the user specify the schema when a relation is requested.
- CreatableRelationProvider – A provider trait which specifies a method for creating new relations from a data frame.
- PrunedScan – Specifies a scan method which can preemptively remove unused columns which are passed in via catalyst.
- PrunedFilteredScan – Specifies a scan which can not only prune, but also push down filters into the underlying store.
- CatalystScan – Defines a scan pattern which makes use of the Catalyst query plan as opposed to parts of it.
- InsertableRelation – Trait for supporting insertaion of data frames into an existing relation.
- HadoopFsRelation – An abstract relation for data sources which live on the Hadoop filesystem. You can think of this as a dataset which can be represented as files on disk. Specific file formats extend this.
- Output Interfaces (OutputWriterFactory, OutputWriter) – These are primarly used when you extend HadoopFsRelation
These interfaces are all available to you but I would strongly recommend reading the docs for you in the source code which describe how each of these interfaces are used: ./sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
Using a Custom Datasource
Thanks to Java’s classpath magic, loading a custom data source is a farily trivial operation. Just make sure you compile your data source against the right version of spark that you’re currently using, in my case 1.5.1, and add it to your spark-submit/spark-shell as a jar and you should be able to reference it as follows:
This runs through ResolvedDataSource using the class name provided and passing the provided path as the “path” variable.
One of the most frequently used data sources when I was working at Palantir Technologies was Apache’s Parquet. Due to our workflows with structured, tabular data it afforded us a lot of fantastic performance benefits. It’s a columnar file format which is stored on Hadoop file systems which stores plenty of valuable metadata about the data contained and uses different encodings to efficiently store data.
Diving into the code we can find some of the details of how this is enabled. All of the Parquet data source code can be found here: ./sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet but the most relevant file is that of ParquetRelation.scala.
Within this file you can see the implementation for Parquet via the DefaultSource class and the ParquetRelation class. Looking at DefaultSource we see that parquet is sourced from a HadoopFsRelationProvider. We also see looking at the ParquetRelation class that
If you’re thinking of connecting to a JDBC compatible SQL database, Spark has a pretty good built in connector for this that is DataSource compatible. The code can be found in:
As an example, we’ll take the situation where you have a dataset of keyed data laid out on disk with filenames set by the contained keys. For example, your directory could have: A1.csv, B2.csv, K5.csv which represent all of the data for each of those keys. By taking advantage of the keys/dates in file names, we can make queries for specific keys/time ranges significantly more efficient by reading only the files for those keys. This gives us the performance of a HashMap rather than a List for lookups.
We’ll make a few assumptions here which can be trivially relaxed by parameterizing or extending our code.
- Our data source has a strict schema of Tag, Timestamp, Value – This can be relaxed by inferring the schema or extending SchemaProvider rather than BaseProvider to all the schema to be passed in.
- We don’t need write support. This can be relaxed by extending CreateTableProvider or InsertableRelation.
- Our files live on HDFS and Spark is configured to talk to that HDFS already.
Below is the implementation of our data source to do this, as you can see, the code is actually fairly simple.
As you can see we have two classes: the DefaultSource class and the PartitionedTripleRelation class.
The DefaultSource is our factory for the relations, this class, with this specific name is required in order for the data source to be used.
The ParititionedTripleRelation is our BaseRelation which specifies the schema for our tables as well as our table scan class. As you can see we chose to implemented PrunedFilteredScan rather than the plain old TableScan. This is because we wanted to be able to take advantage of the filter push down since we know our data is partitioned on disk.
Testing The Code
To actually test the code we can test a few of it’s functions to verify it behaves as we would expect. I’ve written the following code to test a few of the functions: https://github.com/hkothari/blogcode/blob/master/partitioned-triple-source/scripts/test_datasource.scala
This results in the following (with some default spark logging tuned out):
Count over whole dataset 15/12/04 00:25:43 INFO PartitionedTripleRelation: Building scan for path: /user/hkothari/tmp/partitioned-files 15/12/04 00:25:43 INFO PartitionedTripleRelation: Found 234 tags from path. 15/12/04 00:25:43 INFO PartitionedTripleRelation: Will read 234 files based on filters provided. res1: Long = 234000 Select column distinct count 15/12/04 00:25:54 INFO PartitionedTripleRelation: Building scan for path: /user/hkothari/tmp/partitioned-files 15/12/04 00:25:54 INFO PartitionedTripleRelation: Found 234 tags from path. 15/12/04 00:25:54 INFO PartitionedTripleRelation: Will read 234 files based on filters provided. res3: Long = 234 Select count one tag using sql where 15/12/04 00:26:08 INFO PartitionedTripleRelation: Building scan for path: /user/hkothari/tmp/partitioned-files 15/12/04 00:26:08 INFO PartitionedTripleRelation: Found 234 tags from path. 15/12/04 00:26:08 INFO PartitionedTripleRelation: Will read 1 files based on filters provided. res5: Long = 1000 Select count three tags using sql where in 15/12/04 00:26:09 INFO PartitionedTripleRelation: Building scan for path: /user/hkothari/tmp/partitioned-files 15/12/04 00:26:09 INFO PartitionedTripleRelation: Found 234 tags from path. 15/12/04 00:26:09 INFO PartitionedTripleRelation: Will read 3 files based on filters provided. res7: Long = 3000 Count using rdd filter 15/12/04 00:26:09 INFO PartitionedTripleRelation: Building scan for path: /user/hkothari/tmp/partitioned-files 15/12/04 00:26:09 INFO PartitionedTripleRelation: Found 234 tags from path. 15/12/04 00:26:09 INFO PartitionedTripleRelation: Will read 234 files based on filters provided. res9: Long = 1000
Looking at these results we can see some important characteristics of our data source and we have verified that it’s working. First and foremost you can see that things are working and are queries aren’t failing.
Next we also see that the filter pushdown is working as intended as evidenced by the first two SQL where clauses and the accompanied logging. As you see we get constant time lookup from our filters because it only needs to read one of the files. This results in read speeds that are 10x faster in this example but it’s really a comparison between O(n) and O(1).
Extending this example
As mentioned earlier, we mad a few assumptions when writing this data source. If we were to go off and extend this example to make it more generally useful you could take several steps:
- We could add write/insertion support by implementing the CreateTableProvider and InsertableRelation traits in our existing provider/relation. The following stackoverflow can provide help on how to write out partitioned files: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job.
That all being said, by implementing this data source, we are roughly re-implementing the partitioning support that is built into the HadoopFsRelation so we’d probably be better off extending that guy to take advantage of all the other benefits that are built into it.