Data skipping for Spark SQL
Data skipping can significantly boost the performance of SQL queries by skipping over irrelevant data objects or files based on a summary metadata associated with each object.
Data skipping uses the open source Xskipper library for creating, managing and deploying data skipping indexes with Apache Spark. See Xskipper - An Extensible Data Skipping Framework.
For more details on how to work with Xskipper see:
In addition to the open source features in Xskipper, the following features are also available:
- Geospatial data skipping
- Encrypting indexes
- Data skipping with joins (for Spark 3 only)
- Samples showing these features
Geospatial data skipping
You can also use data skipping when querying geospatial data sets using geospatial functions from the spatio-temporal library.
- To benefit from data skipping in data sets with latitude and longitude columns, you can collect the min/max indexes on the latitude and longitude columns.
- Data skipping can be used in data sets with a geometry column (a UDT column) by using a built-in Xskipper plugin.
The next sections show you to work with the geospatial plugin.
Setting up the geospatial plugin
To use the plugin, load the relevant implementations using the Registration module. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.
-
For Scala:
import com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory import com.ibm.xskipper.stmetaindex.index.STIndexFactory import com.ibm.xskipper.stmetaindex.translation.parquet.{STParquetMetaDataTranslator, STParquetMetadatastoreClauseTranslator} import io.xskipper._ Registration.addIndexFactory(STIndexFactory) Registration.addMetadataFilterFactory(STMetaDataFilterFactory) Registration.addClauseTranslator(STParquetMetadatastoreClauseTranslator) Registration.addMetaDataTranslator(STParquetMetaDataTranslator)
-
For Python:
from xskipper import Xskipper from xskipper import Registration Registration.addMetadataFilterFactory(spark, 'com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory') Registration.addIndexFactory(spark, 'com.ibm.xskipper.stmetaindex.index.STIndexFactory') Registration.addMetaDataTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetaDataTranslator') Registration.addClauseTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetadatastoreClauseTranslator')
Index building
To build an index, you can use the addCustomIndex
API. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.
-
For Scala:
import com.ibm.xskipper.stmetaindex.implicits._ // index the dataset val xskipper = new Xskipper(spark, dataset_path) xskipper .indexBuilder() // using the implicit method defined in the plugin implicits .addSTBoundingBoxLocationIndex("location") // equivalent //.addCustomIndex(STBoundingBoxLocationIndex("location")) .build(reader).show(false)
-
For Python:
xskipper = Xskipper(spark, dataset_path) # adding the index using the custom index API xskipper.indexBuilder() \ .addCustomIndex("com.ibm.xskipper.stmetaindex.index.STBoundingBoxLocationIndex", ['location'], dict()) \ .build(reader) \ .show(10, False)
Supported functions
The list of supported geospatial functions includes the following:
- ST_Distance
- ST_Intersects
- ST_Contains
- ST_Equals
- ST_Crosses
- ST_Touches
- ST_Within
- ST_Overlaps
- ST_EnvelopesIntersect
- ST_IntersectsInterior
Encrypting indexes
If you use a Parquet metadata store, the metadata can optionally be encrypted using Parquet Modular Encryption (PME). This is achieved by storing the metadata itself as a Parquet data set, and thus PME can be used to encrypt it. This feature applies to all input formats, for example, a data set stored in CSV format can have its metadata encrypted using PME.
In the following section, unless specified otherwise, when referring to footers, columns, and so on, these are with respect to metadata objects, and not to objects in the indexed data set.
Index encryption is modular and granular in the following way:
- Each index can either be encrypted (with a per-index key granularity) or left in plain text
- Footer + object name column:
- Footer column of the metadata object which in itself is a Parquet file contains, among other things:
- Schema of the metadata object, which reveals the types, parameters and column names for all indexes collected. For example, you can learn that a
BloomFilter
is defined on columncity
with a false-positive probability of0.1
. - Full path to the original data set or a table name in case of a Hive metastore table.
- Schema of the metadata object, which reveals the types, parameters and column names for all indexes collected. For example, you can learn that a
- Object name column stores the names of all indexed objects.
- Footer column of the metadata object which in itself is a Parquet file contains, among other things:
- Footer + metadata column can either be:
-
Both encrypted using the same key. This is the default. In this case, the plain text footer configuration for the Parquet objects comprising the metadata in encrypted footer mode, and the object name column is encrypted using the selected key.
-
Both in plain text. In this case, the Parquet objects comprising the metadata are in plain text footer mode, and the object name column is not encrypted.
If at least one index is marked as encrypted, then a footer key must be configured regardless of whether plain text footer mode is enabled or not. If plain text footer is set then the footer key is used only for tamper-proofing. Note that in that case the object name column is not tamper proofed.
If a footer key is configured, then at least one index must be encrypted.
-
Before using index encryption, you should check the documentation on PME and make sure you are familiar with the concepts.
key
is configured in any Xskipper API, it's always the label `NEVER the key itself`.To use index encryption:
-
Follow all the steps to make sure PME is enabled. See PME.
-
Perform all regular PME configurations, including Key Management configurations.
-
Create encrypted metadata for a data set:
- Follow the regular flow for creating metadata.
- Configure a footer key. If you wish to set a plain text footer + object name column, set
io.xskipper.parquet.encryption.plaintext.footer
totrue
(See samples below). - In
IndexBuilder
, for each index you want to encrypt, add the label of the key to use for that index.
To use metadata during query time or to refresh existing metadata, no setup is necessary other than the regular PME setup required to make sure the keys are accessible (literally the same configuration needed to read an encrypted data set).
Samples
The following samples show metadata creation using a key named k1
as a footer + object name key, and a key named k2
as a key to encrypt a MinMax
for temp
, while also creating a ValueList
for city
, which is left in plain text. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.
-
For Scala:
// index the dataset val xskipper = new Xskipper(spark, dataset_path) // Configuring the JVM wide parameters val jvmComf = Map( "io.xskipper.parquet.mdlocation" -> md_base_location, "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION") Xskipper.setConf(jvmConf) // set the footer key val conf = Map( "io.xskipper.parquet.encryption.footer.key" -> "k1") xskipper.setConf(conf) xskipper .indexBuilder() // Add an encrypted MinMax index for temp .addMinMaxIndex("temp", "k2") // Add a plaintext ValueList index for city .addValueListIndex("city") .build(reader).show(false)
-
For Python
xskipper = Xskipper(spark, dataset_path) # Add JVM Wide configuration jvmConf = dict([ ("io.xskipper.parquet.mdlocation", md_base_location), ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")]) Xskipper.setConf(spark, jvmConf) # configure footer key conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1")]) xskipper.setConf(conf) # adding the indexes xskipper.indexBuilder() \ .addMinMaxIndex("temp", "k1") \ .addValueListIndex("city") \ .build(reader) \ .show(10, False)
If you want the footer + object name to be left in plain text mode (as mentioned above), you need to add the configuration parameter:
-
For Scala:
// index the dataset val xskipper = new Xskipper(spark, dataset_path) // Configuring the JVM wide parameters val jvmComf = Map( "io.xskipper.parquet.mdlocation" -> md_base_location, "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION") Xskipper.setConf(jvmConf) // set the footer key val conf = Map( "io.xskipper.parquet.encryption.footer.key" -> "k1", "io.xskipper.parquet.encryption.plaintext.footer" -> "true") xskipper.setConf(conf) xskipper .indexBuilder() // Add an encrypted MinMax index for temp .addMinMaxIndex("temp", "k2") // Add a plaintext ValueList index for city .addValueListIndex("city") .build(reader).show(false)
-
For Python
xskipper = Xskipper(spark, dataset_path) # Add JVM Wide configuration jvmConf = dict([ ("io.xskipper.parquet.mdlocation", md_base_location), ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")]) Xskipper.setConf(spark, jvmConf) # configure footer key conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1"), ("io.xskipper.parquet.encryption.plaintext.footer", "true")]) xskipper.setConf(conf) # adding the indexes xskipper.indexBuilder() \ .addMinMaxIndex("temp", "k1") \ .addValueListIndex("city") \ .build(reader) \ .show(10, False)
Data skipping with joins (for Spark 3 only)
With Spark 3, you can use data skipping in join queries such as:
SELECT *
FROM orders, lineitem
WHERE l_orderkey = o_orderkey and o_custkey = 800
This example shows a star schema based on the TPC-H benchmark schema (see TPC-H) where lineitem is a fact table and contains many records, while the orders table is a dimension table which has a relatively small number of records compared to the fact tables.
The above query has a predicate on the orders tables which contains a small number of records which means using min/max will not benefit much from data skipping.
Dynamic data skipping is a feature which enables queries such as the above to benefit from data skipping by first extracting the relevant l_orderkey
values based on the condition on the orders
table and then
using it to push down a predicate on l_orderkey
that uses data skipping indexes to filter irrelevant objects.
To use this feature, enable the following optimization rule. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.
-
For Scala:
import com.ibm.spark.implicits. spark.enableDynamicDataSkipping()
-
For Python:
from sparkextensions import SparkExtensions SparkExtensions.enableDynamicDataSkipping(spark)
Then use the Xskipper API as usual and your queries will benefit from using data skipping.
For example, in the above query, indexing l_orderkey
using min/max will enable skipping over the lineitem
table and will improve query performance.
Support for older metadata
Xskipper supports older metadata created by the MetaIndexManager seamlessly. Older metadata can be used for skipping as updates to the Xskipper metadata are carried out automatically by the next refresh operation.
If you see DEPRECATED_SUPPORTED
in front of an index when listing indexes or running a describeIndex
operation, the metadata version is deprecated but is still supported and skipping will work. The next refresh operation
will update the metadata automatically.
Parent topic: Notebooks and scripts