Skip to main content
We're enhancing our site and your experience, so please keep checking back as we evolve.
Back to News
Un-pivot, sorted groups and many bug fixes: Celebrating the first Spark 3.4 release

Un-pivot, sorted groups and many bug fixes: Celebrating the first Spark 3.4 release

21 March 2023
  • Open Source Software

Written by Enrico Minack, Open Source Software Contributor

Apache Spark is a very popular Big Data platform for processing tabular data of arbitrary size. At G-Research, Spark is used by many teams in numerous setups, various levels of complexity and with any possible (and even impossible) data sizes. Even a mature system such as Apache Spark exposes bugs, incorrectness and missing features in such a demanding environment.

Investing and contributing back to the Open Source community of Apache Spark is natural for G-Research, given the gains that have been realised from using the platform in the first place.

With its latest version 3.4.0 release, Spark introduces many new features and a lot more bug fixes. The following features and fixes have been contributed over the course of 2022 by the author of this article. This work has generously been enabled by G-Research, its incredible team of engineers and its extraordinary range of world-class use cases.

Overview

The latest Spark version 3.4.0 ships, among many others, the following new features:

  • Un-pivoting in Scala/Java, Python and SQL APIs
  • Processing sorted grouped dataframes
  • Spark tables can report their in-partition ordering

These bug fixes have been contributed:

  • Guaranteeing in-partition order for partitioned-writing
  • Asserting identical key size when co-grouping groups in PySpark
  • Correcting suggestion of backticks use in error messages
  • Fixing aggregation optimisation for anti-join and semi-join with ambiguous predicates

Un-pivoting in Scala/Java, Python and SQL

Spark has been supporting pivoting grouped dataframes since ancient version 1.6.0, but the reverse operation has just been added now. This is also well known as melt. The new operation outperforms existing implementations, and reduces user code complexity significantly.

The community has also made this new operation available to Spark R.

Read more details below.

Processing sorted grouped dataframes

Common operations on grouped dataframes are flatMapGroups and cogroup. The new version of Spark provides variants for those transformations that support sorted groups: flatMapSortedGroups and cogroupSorted, respectively.

Read more details below.

Spark tables can report their in-partition ordering

The Spark Table interface allows users to integrate reading any data source into Spark. Examples for such data sources are files, databases or Web services.

A new interface has been introduced that allows those sources to report the ordering of read data. Subsequent transformations, like window functions, that require an ordering of the data, now can be satisfied by the source, so that Spark will not sort the data again. In earlier versions, Spark would have sorted the data even though they are already in the right order. The interface allows data source implementers to easily report that fact.

Read more details below.

Guaranteeing in-partition order for partitioned-writing

We have discussed earlier how Spark lost the guarantee in version 3.0.0 that writing sorted partitioned files are ordered as expected. This guarantee is critical for correctness of results and downstream systems that consume written data. Spark version 3.4.0 provides this guarantee while adding tests, so that this feature does not ever get lost unnoticed again.

Read more details below.

Asserting identical key size when co-grouping groups in PySpark

Calling DataFrame.groupBy(...).cogroup(...) in PySpark allows to co-group grouped dataframes that are grouped with different numbers of group keys, but this is not allowed. The resulting error message used to be anything but specific about this fact. Improving this message helps users to iterate over the code and fix such an issue quickly.

Read more details below.

Correcting suggestion of backticks use in error messages

Spark requires the use of backticks when column names contain dots (.) because dots have a special meaning when referencing columns. Error messages should help the user spot the problem quickly, and ideally suggest a fix. However, existing suggestions were incorrect and even increased confusion using backticks correctly.

Read more details below.

Fixing aggregation optimisation for anti-join and semi-join with ambiguous predicates

Some very specific types of joins produced incorrect results. Such correctness bugs are particularly dangerous as they do not raise an error but silently return wrong data. In a long sequence of operations, it is especially hard to find the one operation that injected the incorrectness. Getting those issues ironed out of Spark is to the benefit of all Spark users.

Read more details below.

Deep Dive

Un-pivoting in Scala/Java, Python and SQL

Un-pivoting a wide table like:

+---+---+---+----+
| id|int|lng| dbl|
+---+---+---+----+
|  1| 11| 12|13.0|
|  2| 21| 22|23.0|
+---+---+---+----+

into a long table:

+---+------+-----+
| id|column|value|
+---+------+-----+
|  1|   int| 11.0|
|  1|   lng| 12.0|
|  1|   dbl| 13.0|
|  2|   int| 21.0|
|  2|   lng| 22.0|
|  2|   dbl| 23.0|
+---+------+-----+

can now be done with a single unpivot:

// Scala
val df = Seq((1, 11, 12L, 13.0), (2, 21, 22L, 23.0)).toDF("id", "int", "lng", "dbl")
df.unpivot(Array($"id"), "column", "value").show()
# Python
df = spark.createDataFrame([(1, 11, 12, 13.0), (2, 21, 22, 23.0)], ["id", "int", "lng", "dbl"])
df.unpivot("id", ["int", "long", "double"], "column", "value").show()
--- SQL
CREATE TABLE values (id INT, int INT, lng LONG, dbl DOUBLE);
INSERT INTO values VALUES (1, 11, 12, 13.0), (2, 21, 22, 23.0);
SELECT * FROM values UNPIVOT (value FOR column IN (int, long, double));

There are three approaches to perform unpivot with existing Spark operations. At their heart, they are based on the explode, stack and flatMap transformations. These produce long and hard-to-read code. Making this a first-class Spark operation simplifies user code, improves readability and speed.

The unpivot operation is up to 10 times faster than some other implementations, and usually among the best performing approaches across all data size magnitudes:

 

References:

Back to top

Processing sorted grouped dataframes

Grouping dataframes with DataFrame.groupBy(...) by some grouping columns allows further processing of individual groups. A group consists of all rows that have the same values for those grouping columns. Those values are often referred to as the group key. Transformations like flatMapGroups and cogroup then call for each group a given method (processGroup in the example below) with the group key (id: Int) and an iterator (it: Iterator[Row]) over the group rows.

import org.apache.spark.sql.Row
val df = Seq((1, 1, 11), (1, 3, 13), (1, 2, 12), (2, 1, 21), (2, 2, 22)).toDF("id", "seq", "value")

def processGroup(id: Int, it: Iterator[Row]): Iterator[(Int, Int)] =
  it.zipWithIndex.map { case (row: Row, idx: Int) => (id, row.getInt(2) * idx) }

df.groupByKey(_.getInt(0))
  .flatMapGroups(processGroup)
  .toDF("id", "idx * value")
  .show()

This produces something similar to:

+---+-----------+
| id|idx * value|
+---+-----------+
|  1|          0|
|  1|         13|
|  1|         24|
|  2|          0|
|  2|         22|
+---+-----------+

Unfortunately, we cannot make any assumption about the order of the iterator given to the method processGroup. Since the values of column "idx * value" in our example are sensitive to that order, that guarantee is paramount.

If we require the processing of the iterator in a particular order, we would have to sort it first. This, however, requires all data to be loaded into memory.

A nice feature about flatMapGroups and cogroup is that groups can be arbitrarily large. In other words, the iterator can provide more rows than would fit into memory. Spark is robust against such massive data sizes, but is your implementation of processGroup? Sorting the iterator inside processGroup is definitely going to hit the memory limitation.

If your processGroup method requires a sorted iterator you can now use flatMapSortedGroups and cogroupSorted:

df.groupByKey(_.getInt(0))
  .flatMapSortedGroups($"seq")(processGroup)
  .toDF("id", "idx * value")
  .show()

This always produces the following, as it guarantees that group rows are sorted by column "seq":

+---+-----------+
| id|idx * value|
+---+-----------+
|  1|          0|
|  1|         12|
|  1|         26|
|  2|          0|
|  2|         22|
+---+-----------+

Similarly, you can co-group sorted groups as easily as this:

left.cogroupSorted(right)($"seq")($"seq")(processCoGroup)
  .toDF("id", "left idx * value", "right idx * value")
  .show
+---+----------------+-----------------+
| id|left idx * value|right idx * value|
+---+----------------+-----------------+
|  1|              38|               12|
|  2|              22|               22|
+---+----------------+-----------------+

Here is the definition of the example processCoGroup, left and right:

def processCoGroup(id: Int,
                   left: Iterator[Row],
                   right: Iterator[Row]): Iterator[(Int, Int, Int)] =
  Iterator(
    (
  	id,
  	left.zipWithIndex.map { case (row: Row, idx: Int) =>
        row.getInt(2) * idx
      }.sum,
  	right.zipWithIndex.map { case (row: Row, idx: Int) =>
        row.getInt(2) * idx
      }.sum
    )
  )

val df2 = Seq((1, 1, 11), (1, 2, 12), (2, 1, 21), (2, 2, 22))
  .toDF("id", "seq", "value")
val left = df.groupByKey(_.getInt(0))
val right = df2.groupByKey(_.getInt(0))

References:

Back to top

Spark tables can report their in-partition ordering

The Spark Table interface allows users or third parties to integrate any source of data into Spark. When reading from such a source, it can report to Spark various information about the data, like its existing partitioning or size statistics.

A data source can now also report its in-partition ordering by implementing the SupportsReportOrdering interface. This requires implementing only one simple method:

override def outputOrdering(): Array[SortOrder] = ...

Imagine a database table with a primary index. Usually, that index imposes an order on the data of a table. A source that reads from such a table can inform Spark about the ordering provided by the primary index.

Reporting the in-partition order is particularly useful in combination with reporting partitioning of the data. Imagine our source reads a dataframe partitioned by column "id" and ordered inside each partition by column "id" and "seq", then the following window function will not impose a shuffle and sort phase:

// the first line is obviously only pseudo code
val df = spark.read.format("my.custom.datasource").load("path")
df.show()
+---+---+-----+
| id|seq|value|
+---+---+-----+
|  1|  1|   11|
|  1|  2|   12|
|  1|  3|   13|
|  2|  1|   21|
|  2|  2|   22|
+---+---+-----+

Assume the example "my.custom.datasource" data source provides a dataframe that is partitioned by column "id" and partitions are ordered by columns "id" and "seq". Reporting these partition and order information to Spark allows Spark to evaluate the following window function without shuffling and sorting the loaded data:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum

val window = Window.partitionBy($"id").orderBy($"seq")
df.select($"id", $"seq", sum($"value").over(window).as("rolling sum")).show()
+---+---+-----------+
| id|seq|rolling sum|
+---+---+-----------+
|  1|  1|         11|
|  1|  2|         23|
|  1|  3|         36|
|  2|  1|         21|
|  2|  2|         43|
+---+---+-----------+

The query plan shows that no shuffle or sort phase will be executed:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#26, seq#27, rolling sum#134L]
   +- Window [... AS rolling sum#134L], [id#26], [seq#27 ASC NULLS FIRST]
      +- BatchScan (our example data source)

Without this new feature, Spark would shuffle and sort the entire dataset, which is expensive in Big Data land:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#26, seq#27, rolling sum#134L]
   +- Window [... AS rolling sum#134L], [id#26], [seq#27 ASC NULLS FIRST]
      +- Sort [id#26 ASC NULLS FIRST, seq#27 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#26, 200), ENSURE_REQUIREMENTS, [plan_id=191]
            +- BatchScan (our example data source)

References:

Back to top

Guaranteeing in-partition order for partitioned-writing

We have discussed earlier how Spark lost in version 3.0.0 the guarantee that writing sorted partitioned files would be ordered as expected. Under some situations, running the following does not produce files that are sorted as stated in the sortWithinPartitions transformation:

ds.repartition($"day")                  // repartition the data, write-partition columns and more
  .sortWithinPartitions($"day", $"id")  // in-partition order, repartition columns and more
  .write
  .partitionBy("day")                   // write-partition columns
  .csv("data.csv")

The essence of the bug is that in the event of insufficient memory, a Spark executor will spill partition data to disk. That spilling will destroy the order of your written files. You will get files like:

0
524288
1
524289
2
...
999999
...
524286
524287

where the sorted columns should rather read:

0
1
2
...
524286
524287
524288
524289
...
999999

With Spark version 3.4.0, sorted partitioned writing is fixed again.

This has been back ported to version 3.2.3 and 3.3.2.

References:

Back to top

Asserting identical key size when co-grouping groups in PySpark

Co-grouping two grouped dataframes in PySpark requires them to be grouped by the same number of grouping keys:

left.groupBy("id", "key")
    .cogroup(right.groupBy("id", "key"))

If the number of grouping keys differ, the error message has not been really useful:

left.groupBy("id", "key")
    .cogroup(right.groupBy("id"))
py4j.protocol.Py4JJavaError: An error occurred while calling o726.collectToPython.
: java.lang.IndexOutOfBoundsException: 1

With Spark version 3.4.0, the error message now states:

requirement failed: Cogroup keys must have same size: 2 != 1

This fix has not been backported.

References:

Back to top

Correcting suggestion of backticks use in error message

Spark requires the use of backticks when column names contain dots (.), because dots have a special meaning when referencing columns.

For instance, selecting column "the.id" would have raised the following error, suggesting the exact same column name:

... Did you mean one of the following? [the.id]

Getting the backticks right has always been challenging when using dots in column names and nested fields, but those suggestions do not help to improve the situation.

Spark version 3.4.0 now correctly suggests the right use of backticks:

... Did you mean one of the following? [`the.id`]

This fix has not been backported.

References:

Back to top

Fixing aggregation optimisation for anti-join and semi-join with ambiguous predicates

A very peculiar anti-join turned out to produce a wrong result. Such correctness bugs are very hard to spot as they do not raise an error but happily return some result, which is just wrong. Working around those correctness bugs in a reliable way requires us to fully understand the cause. For this, we have to dig through the Spark internal code, find the bug and understand affected code paths. While we are there, we can also fix the bug for everyone.

Joining a dataframe with itself, while one side has an aggregation (e.g. a distinct), used to produce an incorrect result for left-anti and left-semi join types:

val ids = Seq(1, 2, 3).toDF("id").distinct()
ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").show()

You used to get:

+---+
| id|
+---+
|  2|
|  3|
|  4|
+---+

where the anti-join should have eliminated most of the rows:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[id#4], functions=[])
   +- Exchange hashpartitioning(id#4, 200), ENSURE_REQUIREMENTS, [id=#42]
  	+- HashAggregate(keys=[id#4], functions=[])
     	+- LocalTableScan [id#4]

The fix produces the expected query plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [id#6], [id#4], LeftAnti, BuildRight, false               <══ ✅
   :- HashAggregate(keys=[id#4], functions=[])
   :  +- Exchange hashpartitioning(id#4, 200), ENSURE_REQUIREMENTS, [plan_id=26]
   : 	+- HashAggregate(keys=[id#4], functions=[])
   :    	+- LocalTableScan [id#4]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=31]
  	+- HashAggregate(keys=[id#4], functions=[])
     	+- Exchange hashpartitioning(id#4, 200), ENSURE_REQUIREMENTS, [plan_id=28]
        	+- HashAggregate(keys=[id#4], functions=[])
           	+- LocalTableScan [id#4]

This has been back ported to version 3.2.4 and 3.3.2.

References:

Back to top

A PySpark bug makes co-grouping with window function partition-key-order-sensitive
  • Technology Innovation and Open Source
  • 21 Mar 2023

Spark is used to process tabular data of arbitrary size. One common operation is to group the data by some grouping columns.

Read more

Latest News

G Research
G-Research April 2024 Grant Winners
  • 02 May 2024
Read article
G Research
G-Research March 2024 Grant Winners
  • 15 Apr 2024

Each month, we provide up to £2,000 in grant money to early career researchers in quantitative disciplines. Hear from our January grant winners.

Read article
Insights from SXSW 2024: Reflections of a G-Research Software Engineer
  • 08 Apr 2024

Here, Michael W, an engineer in our Dallas engineering and infrastructure hub, provides an overview of what he took away from his time at SXSW 2024.

Read article

Stay up to date with
G-Research