Written by Enrico Minack, Open Source Software Contributor
Apache Spark is a very popular Big Data platform for processing tabular data of arbitrary size. At G-Research, Spark is used by many teams in numerous setups, various levels of complexity and with any possible (and even impossible) data sizes. Even a mature system such as Apache Spark exposes bugs, incorrectness and missing features in such a demanding environment.
Investing and contributing back to the Open Source community of Apache Spark is natural for G-Research, given the gains that have been realised from using the platform in the first place.
With its latest version 3.4.0 release, Spark introduces many new features and a lot more bug fixes. The following features and fixes have been contributed over the course of 2022 by the author of this article. This work has generously been enabled by G-Research, its incredible team of engineers and its extraordinary range of world-class use cases.
Overview
The latest Spark version 3.4.0 ships, among many others, the following new features:
- Un-pivoting in Scala/Java, Python and SQL APIs
- Processing sorted grouped dataframes
- Spark tables can report their in-partition ordering
These bug fixes have been contributed:
- Guaranteeing in-partition order for partitioned-writing
- Asserting identical key size when co-grouping groups in PySpark
- Correcting suggestion of backticks use in error messages
- Fixing aggregation optimisation for anti-join and semi-join with ambiguous predicates
Un-pivoting in Scala/Java, Python and SQL
Spark has been supporting pivoting grouped dataframes since ancient version 1.6.0, but the reverse operation has just been added now. This is also well known as melt
. The new operation outperforms existing implementations, and reduces user code complexity significantly.
The community has also made this new operation available to Spark R.
Processing sorted grouped dataframes
Common operations on grouped dataframes are flatMapGroups
and cogroup
. The new version of Spark provides variants for those transformations that support sorted groups: flatMapSortedGroups
and cogroupSorted
, respectively.
Spark tables can report their in-partition ordering
The Spark Table
interface allows users to integrate reading any data source into Spark. Examples for such data sources are files, databases or Web services.
A new interface has been introduced that allows those sources to report the ordering of read data. Subsequent transformations, like window functions, that require an ordering of the data, now can be satisfied by the source, so that Spark will not sort the data again. In earlier versions, Spark would have sorted the data even though they are already in the right order. The interface allows data source implementers to easily report that fact.
Guaranteeing in-partition order for partitioned-writing
We have discussed earlier how Spark lost the guarantee in version 3.0.0 that writing sorted partitioned files are ordered as expected. This guarantee is critical for correctness of results and downstream systems that consume written data. Spark version 3.4.0 provides this guarantee while adding tests, so that this feature does not ever get lost unnoticed again.
Asserting identical key size when co-grouping groups in PySpark
Calling DataFrame.groupBy(...).cogroup(...)
in PySpark allows to co-group grouped dataframes that are grouped with different numbers of group keys, but this is not allowed. The resulting error message used to be anything but specific about this fact. Improving this message helps users to iterate over the code and fix such an issue quickly.
Correcting suggestion of backticks use in error messages
Spark requires the use of backticks when column names contain dots (.)
because dots have a special meaning when referencing columns. Error messages should help the user spot the problem quickly, and ideally suggest a fix. However, existing suggestions were incorrect and even increased confusion using backticks correctly.
Fixing aggregation optimisation for anti-join and semi-join with ambiguous predicates
Some very specific types of joins produced incorrect results. Such correctness bugs are particularly dangerous as they do not raise an error but silently return wrong data. In a long sequence of operations, it is especially hard to find the one operation that injected the incorrectness. Getting those issues ironed out of Spark is to the benefit of all Spark users.
Deep Dive
Un-pivoting in Scala/Java, Python and SQL
Un-pivoting a wide table like:
+---+---+---+----+ | id|int|lng| dbl| +---+---+---+----+ | 1| 11| 12|13.0| | 2| 21| 22|23.0| +---+---+---+----+
into a long table:
+---+------+-----+ | id|column|value| +---+------+-----+ | 1| int| 11.0| | 1| lng| 12.0| | 1| dbl| 13.0| | 2| int| 21.0| | 2| lng| 22.0| | 2| dbl| 23.0| +---+------+-----+
can now be done with a single unpivot:
// Scala val df = Seq((1, 11, 12L, 13.0), (2, 21, 22L, 23.0)).toDF("id", "int", "lng", "dbl") df.unpivot(Array($"id"), "column", "value").show()
# Python df = spark.createDataFrame([(1, 11, 12, 13.0), (2, 21, 22, 23.0)], ["id", "int", "lng", "dbl"]) df.unpivot("id", ["int", "long", "double"], "column", "value").show()
--- SQL CREATE TABLE values (id INT, int INT, lng LONG, dbl DOUBLE); INSERT INTO values VALUES (1, 11, 12, 13.0), (2, 21, 22, 23.0); SELECT * FROM values UNPIVOT (value FOR column IN (int, long, double));
There are three approaches to perform unpivot
with existing Spark operations. At their heart, they are based on the explode
, stack
and flatMap
transformations. These produce long and hard-to-read code. Making this a first-class Spark operation simplifies user code, improves readability and speed.
The unpivot operation is up to 10 times faster than some other implementations, and usually among the best performing approaches across all data size magnitudes:
References:
- Existing approaches for
unpivot
in Spark: apache/spark#36150 (comment) - A simple benchmark for
unpivot
: apache/spark#36150 (comment) - Pull requests: #36150, #37304, #37407
- Spark issues: SPARK-38864, SPARK-39877, SPARK-39876
Processing sorted grouped dataframes
Grouping dataframes with DataFrame.groupBy(...)
by some grouping columns allows further processing of individual groups. A group consists of all rows that have the same values for those grouping columns. Those values are often referred to as the group key. Transformations like flatMapGroups
and cogroup
then call for each group a given method (processGroup
in the example below) with the group key (id: Int
) and an iterator (it: Iterator[Row]
) over the group rows.
import org.apache.spark.sql.Row val df = Seq((1, 1, 11), (1, 3, 13), (1, 2, 12), (2, 1, 21), (2, 2, 22)).toDF("id", "seq", "value") def processGroup(id: Int, it: Iterator[Row]): Iterator[(Int, Int)] = it.zipWithIndex.map { case (row: Row, idx: Int) => (id, row.getInt(2) * idx) } df.groupByKey(_.getInt(0)) .flatMapGroups(processGroup) .toDF("id", "idx * value") .show()
This produces something similar to:
+---+-----------+ | id|idx * value| +---+-----------+ | 1| 0| | 1| 13| | 1| 24| | 2| 0| | 2| 22| +---+-----------+
Unfortunately, we cannot make any assumption about the order of the iterator given to the method processGroup
. Since the values of column "idx * value"
in our example are sensitive to that order, that guarantee is paramount.
If we require the processing of the iterator in a particular order, we would have to sort it first. This, however, requires all data to be loaded into memory.
A nice feature about flatMapGroups
and cogroup
is that groups can be arbitrarily large. In other words, the iterator can provide more rows than would fit into memory. Spark is robust against such massive data sizes, but is your implementation of processGroup
? Sorting the iterator inside processGroup
is definitely going to hit the memory limitation.
If your processGroup
method requires a sorted iterator you can now use flatMapSortedGroups
and cogroupSorted
:
df.groupByKey(_.getInt(0)) .flatMapSortedGroups($"seq")(processGroup) .toDF("id", "idx * value") .show()
This always produces the following, as it guarantees that group rows are sorted by column "seq"
:
+---+-----------+ | id|idx * value| +---+-----------+ | 1| 0| | 1| 12| | 1| 26| | 2| 0| | 2| 22| +---+-----------+
Similarly, you can co-group sorted groups as easily as this:
left.cogroupSorted(right)($"seq")($"seq")(processCoGroup) .toDF("id", "left idx * value", "right idx * value") .show
+---+----------------+-----------------+ | id|left idx * value|right idx * value| +---+----------------+-----------------+ | 1| 38| 12| | 2| 22| 22| +---+----------------+-----------------+
Here is the definition of the example processCoGroup
, left
and right
:
def processCoGroup(id: Int, left: Iterator[Row], right: Iterator[Row]): Iterator[(Int, Int, Int)] = Iterator( ( id, left.zipWithIndex.map { case (row: Row, idx: Int) => row.getInt(2) * idx }.sum, right.zipWithIndex.map { case (row: Row, idx: Int) => row.getInt(2) * idx }.sum ) ) val df2 = Seq((1, 1, 11), (1, 2, 12), (2, 1, 21), (2, 2, 22)) .toDF("id", "seq", "value") val left = df.groupByKey(_.getInt(0)) val right = df2.groupByKey(_.getInt(0))
References:
- Pull request #39640
- Spark issue SPARK-38591
Spark tables can report their in-partition ordering
The Spark Table
interface allows users or third parties to integrate any source of data into Spark. When reading from such a source, it can report to Spark various information about the data, like its existing partitioning or size statistics.
A data source can now also report its in-partition ordering by implementing the SupportsReportOrdering interface. This requires implementing only one simple method:
override def outputOrdering(): Array[SortOrder] = ...
Imagine a database table with a primary index. Usually, that index imposes an order on the data of a table. A source that reads from such a table can inform Spark about the ordering provided by the primary index.
Reporting the in-partition order is particularly useful in combination with reporting partitioning of the data. Imagine our source reads a dataframe partitioned by column "id"
and ordered inside each partition by column "id"
and "seq"
, then the following window function will not impose a shuffle and sort phase:
// the first line is obviously only pseudo code val df = spark.read.format("my.custom.datasource").load("path") df.show()
+---+---+-----+ | id|seq|value| +---+---+-----+ | 1| 1| 11| | 1| 2| 12| | 1| 3| 13| | 2| 1| 21| | 2| 2| 22| +---+---+-----+
Assume the example "my.custom.datasource"
data source provides a dataframe that is partitioned by column "id"
and partitions are ordered by columns "id"
and "seq"
. Reporting these partition and order information to Spark allows Spark to evaluate the following window function without shuffling and sorting the loaded data:
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.sum val window = Window.partitionBy($"id").orderBy($"seq") df.select($"id", $"seq", sum($"value").over(window).as("rolling sum")).show()
+---+---+-----------+ | id|seq|rolling sum| +---+---+-----------+ | 1| 1| 11| | 1| 2| 23| | 1| 3| 36| | 2| 1| 21| | 2| 2| 43| +---+---+-----------+
The query plan shows that no shuffle or sort phase will be executed:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [id#26, seq#27, rolling sum#134L] +- Window [... AS rolling sum#134L], [id#26], [seq#27 ASC NULLS FIRST] +- BatchScan (our example data source)
Without this new feature, Spark would shuffle and sort the entire dataset, which is expensive in Big Data land:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [id#26, seq#27, rolling sum#134L] +- Window [... AS rolling sum#134L], [id#26], [seq#27 ASC NULLS FIRST] +- Sort [id#26 ASC NULLS FIRST, seq#27 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#26, 200), ENSURE_REQUIREMENTS, [plan_id=191] +- BatchScan (our example data source)
References:
- Pull request #35965
- Spark issue SPARK-38647
Guaranteeing in-partition order for partitioned-writing
We have discussed earlier how Spark lost in version 3.0.0 the guarantee that writing sorted partitioned files would be ordered as expected. Under some situations, running the following does not produce files that are sorted as stated in the sortWithinPartitions transformation:
ds.repartition($"day") // repartition the data, write-partition columns and more .sortWithinPartitions($"day", $"id") // in-partition order, repartition columns and more .write .partitionBy("day") // write-partition columns .csv("data.csv")
The essence of the bug is that in the event of insufficient memory, a Spark executor will spill partition data to disk. That spilling will destroy the order of your written files. You will get files like:
0 524288 1 524289 2 ... 999999 ... 524286 524287
where the sorted columns should rather read:
0 1 2 ... 524286 524287 524288 524289 ... 999999
With Spark version 3.4.0, sorted partitioned writing is fixed again.
This has been back ported to version 3.2.3 and 3.3.2.
References:
- In-depth blog article on guaranteeing in partition order: https://www.gresearch.com/blog/article/guaranteeing-in-partition-order-for-partitioned-writing-in-apache-spark/
- Pull requests #39431, #38358
- Spark issues SPARK-41914, SPARK-40588
Asserting identical key size when co-grouping groups in PySpark
Co-grouping two grouped dataframes in PySpark requires them to be grouped by the same number of grouping keys:
left.groupBy("id", "key") .cogroup(right.groupBy("id", "key"))
If the number of grouping keys differ, the error message has not been really useful:
left.groupBy("id", "key") .cogroup(right.groupBy("id"))
py4j.protocol.Py4JJavaError: An error occurred while calling o726.collectToPython. : java.lang.IndexOutOfBoundsException: 1
With Spark version 3.4.0, the error message now states:
requirement failed: Cogroup keys must have same size: 2 != 1
This fix has not been backported.
References:
- Pull request #38036
- Spark issue SPARK-40601
Correcting suggestion of backticks use in error message
Spark requires the use of backticks when column names contain dots (.)
, because dots have a special meaning when referencing columns.
For instance, selecting column "the.id"
would have raised the following error, suggesting the exact same column name:
... Did you mean one of the following? [the.id]
Getting the backticks right has always been challenging when using dots in column names and nested fields, but those suggestions do not help to improve the situation.
Spark version 3.4.0 now correctly suggests the right use of backticks:
... Did you mean one of the following? [`the.id`]
This fix has not been backported.
References:
- Pull request #38256
- Spark issue SPARK-39783
Fixing aggregation optimisation for anti-join and semi-join with ambiguous predicates
A very peculiar anti-join turned out to produce a wrong result. Such correctness bugs are very hard to spot as they do not raise an error but happily return some result, which is just wrong. Working around those correctness bugs in a reliable way requires us to fully understand the cause. For this, we have to dig through the Spark internal code, find the bug and understand affected code paths. While we are there, we can also fix the bug for everyone.
Joining a dataframe with itself, while one side has an aggregation (e.g. a distinct
), used to produce an incorrect result for left-anti and left-semi join types:
val ids = Seq(1, 2, 3).toDF("id").distinct() ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").show()
You used to get:
+---+ | id| +---+ | 2| | 3| | 4| +---+
where the anti-join should have eliminated most of the rows:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[id#4], functions=[]) +- Exchange hashpartitioning(id#4, 200), ENSURE_REQUIREMENTS, [id=#42] +- HashAggregate(keys=[id#4], functions=[]) +- LocalTableScan [id#4]
The fix produces the expected query plan:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [id#6], [id#4], LeftAnti, BuildRight, false <══ ✅ :- HashAggregate(keys=[id#4], functions=[]) : +- Exchange hashpartitioning(id#4, 200), ENSURE_REQUIREMENTS, [plan_id=26] : +- HashAggregate(keys=[id#4], functions=[]) : +- LocalTableScan [id#4] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=31] +- HashAggregate(keys=[id#4], functions=[]) +- Exchange hashpartitioning(id#4, 200), ENSURE_REQUIREMENTS, [plan_id=28] +- HashAggregate(keys=[id#4], functions=[]) +- LocalTableScan [id#4]
This has been back ported to version 3.2.4 and 3.3.2.
References:
- Pull request #39131
- Spark issue SPARK-41162