Guaranteeing in-partition order for partitioned-writing in Apache Spark
Written by Enrico Minack, Open Source Software Contributor.
Apache Spark has been gaining popularity for certain big data applications since its initial release in 2014.
Initially, coders glommed onto Apache Spark for its ability to deal with the iterative algorithms popular in machine learning. Since then, coders have been finding more and more ways to use the open source analytics engine.
Today, a common use case for Spark jobs is to write their result data into multiple sub-directories (partitions), where each contains files sorted in some way. This allows consumers of these files to make use of the partitioning and order to improve performance.
Before Spark 3.0, the result was always as expected. Since Spark 3.0[1], however, the sorting of written files is no longer a guarantee, which breaks assumptions and correctness of your downstream systems. So knowing when files are still guaranteed to be sorted, and understanding when your code suffers from this bug, can save you a lot of precious engineering time.
Example
val ids = 1000000 val days = 2 val parts = 2 spark.conf.set("spark.sql.adaptive.enabled", true) val ds = ( spark.range(0, days, 1, parts) .withColumnRenamed("id", "day") .join(spark.range(0, ids, 1, parts) )
Given a dataframe ds
with columns day
and id
(as above), we can partition the dataframe by values of column day
, while sorting written files by column id
. We will get one file for each value of day
, while these files are sorted by id
:
ds.repartition($"day") // repartition the data, write-partition columns and more .sortWithinPartitions($"day", $"id") // in-partition order, repartition columns and more .write .partitionBy("day") // write-partition columns .csv("data.csv")
It is important that the columns given in .sortWithinPartitions
start with columns given in .repartition
. Further, columns given in .repartition
start with columns .partitionBy
. The order of columns is also significant.
Note: The behaviour described here preserving in-partition order holds for .repartition
as well as for .repartitionByRange
.
Running the above example with one executor and 512 MB memory will produce files that are not sorted correctly:
spark-shell --driver-memory 512m --master "local[1]"
Note that the memory is key here. Increasing the memory avoids the bug, but does not fix it. Increasing the number of cores per executor or the value for ids
will bring it back, eventually.
The created directory data.csv
contains the following result files:
data.csv/day=0/part-00005-dfc4c5c2-f4dc-4328-ac12-5e3436dd5ba2.c000.csv data.csv/day=1/part-00069-dfc4c5c2-f4dc-4328-ac12-5e3436dd5ba2.c000.csv
The files above look like:
0 524288 1 524289 2 ... 999999 ... 524286 524287
where they should read:
0 1 2 ... 524286 524287 524288 524289 ... 999999
This is clearly bad, especially when your files are all sorted during development, but in production, larger amounts of data are more likely to hit this bug unnoticed.
So, when is the in-partition order NOT preserved in the written files?
The in-partition order is NOT preserved in the written files when writing with Spark 3.0, 3.1, 3.2 (before 3.2.3), or 3.3 (before 3.3.2), when Adaptive Query Execution (AQE) is enabled. As soon as spilling occurs, the in-partition order is lost.
Spilling occurs when memory (RAM, not disk) available to each executor is insufficient to store an entire partition. Increasing the memory reduces the likelihood of spilling, but it cannot guarantee it is not happening. Hence, in-partition order cannot be guaranteed to be preserved.
To put this differently, it is safe to use Spark 3.2 from 3.2.3 on, 3.3 from 3.3.2 on, and Spark 3.4. Writing without partitioning (without .partitionBy
) also preserves in-partition order.
Fortunately, there are a few workarounds for the affected versions.
Workarounds
There are workarounds that you can use to ensure that your files always come back sorted after going through the writing operation. Each of the workarounds discussed below applies when in-partition order starts with the write-partition columns. It wouldn’t make sense to use these measures for any of the other cases.
The workarounds are either disabling AQE entirely, caching your DataFrame before writing, or by misusing bucketing (without bucketing your data).
Workaround I: Disable AQE
AQE can be enabled via config option spark.sql.adaptive.enabled
. Setting this option to false
will disable AQE and the bug disappears.
spark.conf.set("spark.sql.adaptive.enabled", false) ds.repartition($"day") .sortWithinPartitions($"day", $"id") .write .partitionBy("day") .csv("data.csv") spark.conf.set("spark.sql.adaptive.enabled", true)
This disables AQE for your entire Spark session. All Spark jobs that run while AQE is disabled will not benefit from the performance improvements that AQE offers, including constructing the DataFrame to be written. If you see a severe performance penalty for your Spark application due to disabling AQE, see Workarounds II and III.
Workaround II: Cache DataFrame before writing (partially disabling AQE)
Caching the DataFrame just before calling .write
will guarantee in-partition order. But this will consume the memory and the disk of your executors. This also disables AQE, but only for the transformations that occur in your DataFrame before calling .cache
. Any other job in your Spark session will benefit from enabled AQE.
ds.repartition($"day") .sortWithinPartitions($"day", $"id") .cache .write .partitionBy("day") .csv("data.csv")
The cached DataFrame has to be unpersisted after writing is completed. This frees memory and disk consumed by .cache
. Otherwise, this will be freed eventually when your Spark application terminates.
// remember to unpersist the .cache above ds.repartition($"day") .sortWithinPartitions($"day", $"id") .unpersist
Note: Setting Spark option spark.sql.optimizer.canChangeCachedPlanOutputPartitioning
to true
enables AQE for cached DataFrames. Unfortunately, this does not cache the final plan, but the AdaptiveSparkPlan, which causes this bug in the first place. See the Investigation section for details.
If you really require AQE for constructing your DataFrame to be written, see Workaround III.
Workaround III: Write intermediate DataFrame before caching (with AQE enabled)
If we really need AQE for our partition-written DataFrame, we can use the following strategy:
We first write the DataFrame as is, without repartitioning, without in-partition order, and without partitioned-writing. The actual file format is not relevant here. It can be different from the final file format.
ds.write.csv("intermediate.csv")
Imagine ds
is a long and complex sequence of transformations and joins. Writing the intermediate DataFrame first allows us to employ AQE to generate the DataFrame.
Once written, we can read and cache the intermediate files and perform our sorted partitioned-write with AQE disabled. This reduces the number of transformations for which we must disable AQE to the absolute minimum required to avoid the bug.
spark.read.schema(ds.schema).csv("intermediate.csv") .repartition($"day") .sortWithinPartitions($"day", $"id") .cache .write .partitionBy("day") .csv("data.csv")
Only repartition
and sortWithinPartitions
will run while AQE is disabled.
As with Workaround II, remember to unpersist the cached DataFrame:
// remember to unpersist the .cache above spark.read.schema(ds.schema).csv("intermediate.csv") .repartition($"day") .sortWithinPartitions($"day", $"id") .unpersist
Workaround IV: Use bucketing (with AQE enabled)
The advantage of this workaround is that AQE is enabled all the time. The downside is that a redundant column has to be added and stored (bucket
), and that files are written to a table in the Spark Warehouse instead of to an arbitrary path.
Note that .sortBy
substitutes for .sortWithinPartitions
:
import org.apache.spark.sql.functions.lit ds.repartition($"day") .select($"day", $"id", lit(0).as("bucket")) .write .bucketBy(1, "bucket") .partitionBy("day") .sortBy("id") .format("csv") .saveAsTable("data")
The resulting files appear in the Spark Warehouse (path configured via spark.sql.warehouse.dir
):
$ ls -1 spark-warehouse/data/*/part-* 'spark-warehouse/data/day=0/part-00005-b980af55-263d-4a33-b75f-4966409dce8b_00000.c000.csv' 'spark-warehouse/data/day=1/part-00069-b980af55-263d-4a33-b75f-4966409dce8b_00000.c000.csv'
instead of:
$ ls -1 data.csv/*/part-* 'data.csv/day=0/part-00005-a2ed7b4c-cb62-4144-b587-9fa9c79ec428.c000.csv' 'data.csv/day=1/part-00069-a2ed7b4c-cb62-4144-b587-9fa9c79ec428.c000.csv'
Investigation
Spark has to read an entire partition into memory in order to sort it. When the available memory is not sufficient, the part that fits into memory is sorted and written to disk. The sorted part of the partition is called a spill.
The remainder of the partition is read into memory and sorted until fully processed. Once the entire partition is split into spills, all the spills can be merged into a single sorted partition. This is called Merge sort.
But why does AQE break in-partition order only when spills occur?
The FileFormatWriter
imposes an in-partition ordering on the partitions (see FileFormatWriter.scala, lines 188-189):
val requiredOrdering = partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
Here, partitionColumns
corresponds to df.write.partitionBy(...)
, writerBucketSpec
corresponds to df.write.bucketBy(...)
, and sortColumns
refers to df.write.sortBy(...)
. Note that sortBy
can only be used together with bucketBy
and is not related to df.sortWithinPartitions
.
This shows that FileFormatWriter
has no knowledge of user-defined in-partition order (df.sortWithinPartitions
). When the existing user-defined in-partition ordering is not compatible with the imposed ordering, the former gets discarded. It’s like Spark deliberately ignores your order when it doesn’t like it.
In our example, both orderings are compatible. With AQE enabled (and Spark between 3.0 and 3.3), the query plan given to FileFormatWriter
contains an AdaptiveSparkPlan
node:
AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [plan_id=28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 1000000, step=1, splits=2)
The AdaptiveSparkPlan
contains the user-defined sort, but it actually hides this information from FileFormatWriter
because AdaptiveSparkPlan
pretends there is no order. Therefore, FileFormatWriter
adds its own Sort node:
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [plan_id=28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 1000000, step=1, splits=2)
The outer Sort [input[0, bigint, false] ASC NULLS FIRST]
sorts for day
only, which is the partition column in our example.
Sorting an already-sorted partition
Logically, given a partition sorted by day
and id
, sorting it again for day should not change it, because it is sorted for day
already. Note that the query optimiser cannot remove the outer Sort
given the inner Sort
, because the intermediate AdaptiveSparkPlan
disallows this.
So logically, there is no problem in adding the redundant Sort
for day
. But practically, the following happens: the partition sorted by day
and id
is read into memory and spilled (sorted by day
) to disk. This preserves the existing order. The spills are merged based on day
order, so all rows with the same day are considered equal. All spills contain rows of the same day
, so the spills are read round-robin (see UnsafeSorterSpillMerger, lines 83-92):
@Override public void loadNext() throws IOException { if (spillReader != null) { if (spillReader.hasNext()) { spillReader.loadNext(); priorityQueue.add(spillReader); } } spillReader = priorityQueue.remove(); }
The current spillReader
is added at the end of the priority queue, as it is “equal” to all other spillReader
s. The next row is read from the root of the priority queue.
This fully explains why we see files like this:
0 524288 1 524289 2 ... 475711 999999 475712 ... 524286 524287
There were two spills, each properly sorted:
spill 1 | spill 2 |
0 | 524288 |
1 | 524289 |
2 | 524290 |
... | ... |
475711 | 999999 |
475712 | |
... | |
524287 |
The spills are merged by round-robin writing their values into the file: 0, 524288, 1, 524289, …
Workaround query plans
Let’s have a look at the query plans of the provided workarounds (all for Spark 3.3.2). We can clearly see that the outer Sort
is either not added, or it represents the desired in-partition order (Workaround IV).
With Workaround I
Spark does not add the outer Sort
that breaks existing order due to spilling.
*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=123] +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [plan_id=119] : +- *(1) Project [id#0L AS day#2L] : +- *(1) Range (0, 2, step=1, splits=2) +- *(2) Range (0, 1000000, step=1, splits=2)
With Workaround II
The DataFrame is clearly cached, which is represented by the InMemoryTableScan
node.
InMemoryTableScan [day#2L, id#4L] +- InMemoryRelation [day#2L, id#4L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=171] +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [plan_id=167] : +- *(1) Project [id#0L AS day#2L] : +- *(1) Range (0, 2, step=1, splits=2) +- *(2) Range (0, 1000000, step=1, splits=2)
With Workaround III
Writing the DataFrame to an intermediate file uses AQE to construct the DataFrame.
AdaptiveSparkPlan isFinalPlan=false +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [plan_id=345] : +- Project [id#104L AS day#106L] : +- Range (0, 5, step=1, splits=5) +- Range (0, 5000000, step=1, splits=5)
Reading the intermediate file has to be cached.
InMemoryTableScan [day#88L, id#89L] +- InMemoryRelation [day#88L, id#89L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Sort [day#88L ASC NULLS FIRST, id#89L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#88L, 200), REPARTITION_BY_COL, [plan_id=265] +- FileScan csv [day#88L,id#89L] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/enrico/Work/git/spark-GR-2/intermediate.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<day:bigint,id:bigint>
With Workaround IV
Workaround IV makes FileFormatWriter
add a Sort
node that is compatible with our user-defined ordering, so we do not need the sortWithinPartitions
.
Sort [input[2, bigint, false] ASC NULLS FIRST, pmod(hash(input[1, int, false], 42), 1) ASC NULLS FIRST, input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Project [id#4L, 0 AS bucket#147, day#2L] +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=311] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [plan_id=309] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 1000000, step=1, splits=2)
Conclusion
We have seen that some situations exist where Spark does not guarantee that in-partition order is persisted when writing partitioned files. Users can assess if their individual use case is affected by this regression bug.
Four workarounds with varying complexity and impact on Adaptive Query Execution (AQE) have been presented.
The code path that causes the erroneous behaviour could clearly be identified, which allows us to understand that the workarounds guarantee the in-partition order.
[1]↩Spark versions 3.0.x, 3.1.x, 3.2.0, 3.2.1, 3.2.2, 3.3.0, and 3.3.1 are affected by the issue described in this article. This author’s fix to this issue is scheduled to be released as Spark 3.2.3 and 3.3.2.
Related articles
Stay up to-date with G-Research
Subscribe to our newsletter to receive news & updates
You can click here to read our privacy policy. You can unsubscribe at anytime.