Skip to main content
We're enhancing our site and your experience, so please keep checking back as we evolve.
Back to News
Guaranteeing in-partition order for partitioned-writing in Apache Spark

Guaranteeing in-partition order for partitioned-writing in Apache Spark

5 December 2022
  • Open Source Software

Written by Enrico Minack, Open Source Software Contributor.

Apache Spark has been gaining popularity for certain big data applications since its initial release in 2014.

Initially, coders glommed onto Apache Spark for its ability to deal with the iterative algorithms popular in machine learning. Since then, coders have been finding more and more ways to use the open source analytics engine.

Today, a common use case for Spark jobs is to write their result data into multiple sub-directories (partitions), where each contains files sorted in some way. This allows consumers of these files to make use of the partitioning and order to improve performance.

Before Spark 3.0, the result was always as expected. Since Spark 3.0[1], however, the sorting of written files is no longer a guarantee, which breaks assumptions and correctness of your downstream systems. So knowing when files are still guaranteed to be sorted, and understanding when your code suffers from this bug, can save you a lot of precious engineering time.

Example

val ids = 1000000
val days = 2
val parts = 2

spark.conf.set("spark.sql.adaptive.enabled", true)
val ds = (
  spark.range(0, days, 1, parts)
       .withColumnRenamed("id", "day")
       .join(spark.range(0, ids, 1, parts)
)

Given a dataframe ds with columns day and id (as above), we can partition the dataframe by values of column day, while sorting written files by column id. We will get one file for each value of day, while these files are sorted by id:

ds.repartition($"day")                  // repartition the data, write-partition columns and more
  .sortWithinPartitions($"day", $"id")  // in-partition order, repartition columns and more
  .write
  .partitionBy("day")                   // write-partition columns
  .csv("data.csv")

It is important that the columns given in .sortWithinPartitions start with columns given in .repartition. Further, columns given in .repartition start with columns .partitionBy. The order of columns is also significant.

Note: The behaviour described here preserving in-partition order holds for .repartition as well as for .repartitionByRange.

Running the above example with one executor and 512 MB memory will produce files that are not sorted correctly:

spark-shell --driver-memory 512m --master "local[1]"

Note that the memory is key here. Increasing the memory avoids the bug, but does not fix it. Increasing the number of cores per executor or the value for ids will bring it back, eventually.

The created directory data.csv contains the following result files:

data.csv/day=0/part-00005-dfc4c5c2-f4dc-4328-ac12-5e3436dd5ba2.c000.csv
data.csv/day=1/part-00069-dfc4c5c2-f4dc-4328-ac12-5e3436dd5ba2.c000.csv

The files above look like:

0
524288
1
524289
2
...
999999
...
524286
524287

where they should read:

0
1
2
...
524286
524287
524288
524289
...
999999

This is clearly bad, especially when your files are all sorted during development, but in production, larger amounts of data are more likely to hit this bug unnoticed.

So, when is the in-partition order NOT preserved in the written files?

The in-partition order is NOT preserved in the written files when writing with Spark 3.0, 3.1, 3.2 (before 3.2.3), or 3.3 (before 3.3.2), when Adaptive Query Execution (AQE) is enabled. As soon as spilling occurs, the in-partition order is lost.

Spilling occurs when memory (RAM, not disk) available to each executor is insufficient to store an entire partition. Increasing the memory reduces the likelihood of spilling, but it cannot guarantee it is not happening. Hence, in-partition order cannot be guaranteed to be preserved.

To put this differently, it is safe to use Spark 3.2 from 3.2.3 on, 3.3 from 3.3.2 on, and Spark 3.4. Writing without partitioning (without .partitionBy) also preserves in-partition order.

Fortunately, there are a few workarounds for the affected versions.

Workarounds

There are workarounds that you can use to ensure that your files always come back sorted after going through the writing operation. Each of the workarounds discussed below applies when in-partition order starts with the write-partition columns. It wouldn’t make sense to use these measures for any of the other cases.

The workarounds are either disabling AQE entirely, caching your DataFrame before writing, or by misusing bucketing (without bucketing your data).

Workaround I: Disable AQE

AQE can be enabled via config option spark.sql.adaptive.enabled. Setting this option to false will disable AQE and the bug disappears.

spark.conf.set("spark.sql.adaptive.enabled", false)

ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .csv("data.csv")

spark.conf.set("spark.sql.adaptive.enabled", true)

This disables AQE for your entire Spark session. All Spark jobs that run while AQE is disabled will not benefit from the performance improvements that AQE offers, including constructing the DataFrame to be written. If you see a severe performance penalty for your Spark application due to disabling AQE, see Workarounds II and III.

Workaround II: Cache DataFrame before writing (partially disabling AQE)

Caching the DataFrame just before calling .write will guarantee in-partition order. But this will consume the memory and the disk of your executors. This also disables AQE, but only for the transformations that occur in your DataFrame before calling .cache. Any other job in your Spark session will benefit from enabled AQE.

ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .cache
  .write
  .partitionBy("day")
  .csv("data.csv")

The cached DataFrame has to be unpersisted after writing is completed. This frees memory and disk consumed by .cache. Otherwise, this will be freed eventually when your Spark application terminates.

// remember to unpersist the .cache above
ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .unpersist

Note: Setting Spark option spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to true enables AQE for cached DataFrames. Unfortunately, this does not cache the final plan, but the AdaptiveSparkPlan, which causes this bug in the first place. See the Investigation section for details.

If you really require AQE for constructing your DataFrame to be written, see Workaround III.

Workaround III: Write intermediate DataFrame before caching (with AQE enabled)

If we really need AQE for our partition-written DataFrame, we can use the following strategy:

We first write the DataFrame as is, without repartitioning, without in-partition order, and without partitioned-writing. The actual file format is not relevant here. It can be different from the final file format.

ds.write.csv("intermediate.csv")

Imagine ds is a long and complex sequence of transformations and joins. Writing the intermediate DataFrame first allows us to employ AQE to generate the DataFrame.

Once written, we can read and cache the intermediate files and perform our sorted partitioned-write with AQE disabled. This reduces the number of transformations for which we must disable AQE to the absolute minimum required to avoid the bug.

spark.read.schema(ds.schema).csv("intermediate.csv")
  .repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .cache
  .write
  .partitionBy("day")
  .csv("data.csv")

Only repartition and sortWithinPartitions will run while AQE is disabled.

As with Workaround II, remember to unpersist the cached DataFrame:

// remember to unpersist the .cache above
spark.read.schema(ds.schema).csv("intermediate.csv")
  .repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .unpersist

Workaround IV: Use bucketing (with AQE enabled)

The advantage of this workaround is that AQE is enabled all the time. The downside is that a redundant column has to be added and stored (bucket), and that files are written to a table in the Spark Warehouse instead of to an arbitrary path.

Note that .sortBy substitutes for .sortWithinPartitions:

import org.apache.spark.sql.functions.lit

ds.repartition($"day")
  .select($"day", $"id", lit(0).as("bucket"))
  .write
  .bucketBy(1, "bucket")
  .partitionBy("day")
  .sortBy("id")
  .format("csv")
  .saveAsTable("data")

The resulting files appear in the Spark Warehouse (path configured via spark.sql.warehouse.dir):

$ ls -1 spark-warehouse/data/*/part-*
'spark-warehouse/data/day=0/part-00005-b980af55-263d-4a33-b75f-4966409dce8b_00000.c000.csv'
'spark-warehouse/data/day=1/part-00069-b980af55-263d-4a33-b75f-4966409dce8b_00000.c000.csv'

instead of:

$ ls -1 data.csv/*/part-*
'data.csv/day=0/part-00005-a2ed7b4c-cb62-4144-b587-9fa9c79ec428.c000.csv'
'data.csv/day=1/part-00069-a2ed7b4c-cb62-4144-b587-9fa9c79ec428.c000.csv'

Investigation

Spark has to read an entire partition into memory in order to sort it. When the available memory is not sufficient, the part that fits into memory is sorted and written to disk. The sorted part of the partition is called a spill.

The remainder of the partition is read into memory and sorted until fully processed. Once the entire partition is split into spills, all the spills can be merged into a single sorted partition. This is called Merge sort.

But why does AQE break in-partition order only when spills occur?

The FileFormatWriter imposes an in-partition ordering on the partitions (see FileFormatWriter.scala, lines 188-189):

val requiredOrdering = partitionColumns ++ 
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns

Here, partitionColumns corresponds to df.write.partitionBy(...), writerBucketSpec corresponds to df.write.bucketBy(...), and sortColumns refers to df.write.sortBy(...). Note that sortBy can only be used together with bucketBy and is not related to df.sortWithinPartitions.

This shows that FileFormatWriter has no knowledge of user-defined in-partition order (df.sortWithinPartitions). When the existing user-defined in-partition ordering is not compatible with the imposed ordering, the former gets discarded. It’s like Spark deliberately ignores your order when it doesn’t like it.

In our example, both orderings are compatible. With AQE enabled (and Spark between 3.0 and 3.3), the query plan given to FileFormatWriter contains an AdaptiveSparkPlan node:

AdaptiveSparkPlan isFinalPlan=false
+- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30]
      +- BroadcastNestedLoopJoin BuildLeft, Inner
         :- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
         :  +- Project [id#0L AS day#2L]
         :     +- Range (0, 2, step=1, splits=2)
         +- Range (0, 1000000, step=1, splits=2)

The AdaptiveSparkPlan contains the user-defined sort, but it actually hides this information from FileFormatWriter because AdaptiveSparkPlan pretends there is no order. Therefore, FileFormatWriter adds its own Sort node:

Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 1000000, step=1, splits=2)

The outer Sort [input[0, bigint, false] ASC NULLS FIRST] sorts for day only, which is the partition column in our example.

Sorting an already-sorted partition

Logically, given a partition sorted by day and id, sorting it again for day should not change it, because it is sorted for day already. Note that the query optimiser cannot remove the outer Sort given the inner Sort, because the intermediate AdaptiveSparkPlan disallows this.

So logically, there is no problem in adding the redundant Sort for day. But practically, the following happens: the partition sorted by day and id is read into memory and spilled (sorted by day) to disk. This preserves the existing order. The spills are merged based on day order, so all rows with the same day are considered equal. All spills contain rows of the same day, so the spills are read round-robin (see UnsafeSorterSpillMerger, lines 83-92):

@Override
public void loadNext() throws IOException {
  if (spillReader != null) {
    if (spillReader.hasNext()) {
      spillReader.loadNext();
      priorityQueue.add(spillReader);
    }
  }
  spillReader = priorityQueue.remove();
}

The current spillReader is added at the end of the priority queue, as it is “equal” to all other spillReaders. The next row is read from the root of the priority queue.

This fully explains why we see files like this:

0
524288
1
524289
2
...
475711
999999
475712
...
524286
524287

There were two spills, each properly sorted:

spill 1

spill 2

0

524288

1

524289

2

524290

...

...

475711

999999

475712

...

524287

The spills are merged by round-robin writing their values into the file: 0, 524288, 1, 524289, …

Workaround query plans

Let’s have a look at the query plans of the provided workarounds (all for Spark 3.3.2). We can clearly see that the outer Sort is either not added, or it represents the desired in-partition order (Workaround IV).

With Workaround I

Spark does not add the outer Sort that breaks existing order due to spilling.

*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=123]
   +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
      :- BroadcastExchange IdentityBroadcastMode, [plan_id=119]
      :  +- *(1) Project [id#0L AS day#2L]
      :     +- *(1) Range (0, 2, step=1, splits=2)
      +- *(2) Range (0, 1000000, step=1, splits=2)

With Workaround II

The DataFrame is clearly cached, which is represented by the InMemoryTableScan node.

InMemoryTableScan [day#2L, id#4L]
   +- InMemoryRelation [day#2L, id#4L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=171]
               +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
                  :- BroadcastExchange IdentityBroadcastMode, [plan_id=167]
                  :  +- *(1) Project [id#0L AS day#2L]
                  :     +- *(1) Range (0, 2, step=1, splits=2)
                  +- *(2) Range (0, 1000000, step=1, splits=2)

With Workaround III

Writing the DataFrame to an intermediate file uses AQE to construct the DataFrame.

AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildLeft, Inner
   :- BroadcastExchange IdentityBroadcastMode, [plan_id=345]
   :  +- Project [id#104L AS day#106L]
   :     +- Range (0, 5, step=1, splits=5)
   +- Range (0, 5000000, step=1, splits=5)

Reading the intermediate file has to be cached.

InMemoryTableScan [day#88L, id#89L]
   +- InMemoryRelation [day#88L, id#89L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Sort [day#88L ASC NULLS FIRST, id#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(day#88L, 200), REPARTITION_BY_COL, [plan_id=265]
               +- FileScan csv [day#88L,id#89L] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/enrico/Work/git/spark-GR-2/intermediate.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<day:bigint,id:bigint>

With Workaround IV

Workaround IV makes FileFormatWriter add a Sort node that is compatible with our user-defined ordering, so we do not need the sortWithinPartitions.

Sort [input[2, bigint, false] ASC NULLS FIRST, pmod(hash(input[1, int, false], 42), 1) ASC NULLS FIRST, input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Project [id#4L, 0 AS bucket#147, day#2L]
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=311]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [plan_id=309]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 1000000, step=1, splits=2)

Conclusion

We have seen that some situations exist where Spark does not guarantee that in-partition order is persisted when writing partitioned files. Users can assess if their individual use case is affected by this regression bug.

Four workarounds with varying complexity and impact on Adaptive Query Execution (AQE) have been presented.

The code path that causes the erroneous behaviour could clearly be identified, which allows us to understand that the workarounds guarantee the in-partition order.

[1]Spark versions 3.0.x, 3.1.x, 3.2.0, 3.2.1, 3.2.2, 3.3.0, and 3.3.1 are affected by the issue described in this article. This author’s fix to this issue is scheduled to be released as Spark 3.2.3 and 3.3.2.

Spark’s groupByKey should be avoided – and here’s why
  • 05 Dec 2022

Apache Spark is very popular when it comes to processing tabular data of arbitrary size. One common operation is to group the data by some columns to further process those grouped data. Spark has two ways of grouping data groupBy and groupByKey, while the latter works, it may cause performance issues in some cases.

Read more

Stay up to date with
G-Research