Skip to main content
We're enhancing our site and your experience, so please keep checking back as we evolve.
Back to News
Observing Spark Aggregates: Cheap Metrics from Datasets

Observing Spark Aggregates: Cheap Metrics from Datasets

6 February 2024
  • Open Source Software

Data pipelines always have assumptions about the data they process: there may be columns with unique identifiers, non-null values, numerical values in certain ranges (e.g. [0..1]), or categorical values with a set of allowed values (e.g. "yes" and "no" only). If these assumptions are not met by input data, the produced output cannot be reliably used by downstream systems. This makes it essential to monitor the dataset characteristics before they are processed, to avoid unexpected results.

Checking whether input or output data meet such assumptions is usually done with scalable Big Data platforms in addition to the actual data processing. This adds extra computation and delay to time-critical pipelines. Spark provides a way to extract some kind of metrics from datasets while executing the main pipeline transformations: Spark observation metrics.

This article explores the niche where observation metrics are preferable, discusses their limitations, and highlights situations where these metrics are imprecise and the user has to be careful.

Actions, transformations, and caching

Before we dive straight into Spark observations, we first need to understand the difference between actions and transformations[1]. If you are familiar with actions and transformations, as well as caching, you can skip this section and jump right into the Computing metrics as observations section.

A dataset can be thought of as a table of data, consisting of rows and columns. Such tables can be modified in shape (number of columns or rows) or content (the actual values) by applying some transformation, which produces a new dataset. The original dataset is unmodified and continues to exist. The new dataset only exists as a declaration consisting of the original dataset and the transformation. In general, declaring such a transformation does not process any data, so this is very quick and does not depend on the size of the table.

On the other hand, executing an action on a dataset executes all transformations and processes data in a scalable and distributed way.

Laptop with code
A PySpark bug makes co-grouping with window function partition-key-order-sensitive
  • 06 Feb 2024

Spark is used to process tabular data of arbitrary size. One common operation is to group the data by some grouping columns.

Read more

Here is an example:

 

// create a dataset by reading a csv file
val ds = spark.read.option("header", true).csv("users.csv")
// id  user    created     deleted
// 1   Alice   2023-07-01  null
// 2   Bob     2023-07-08  2023-08-01
// 3   Charly  2023-07-15  null

// add column `exists` with "yes" and "no" values, reflecting a value for `deleted` exists
val ds2 = ds.withColumn("exists", when($"deleted".isNotNull, lit("yes")).otherwise(lit("no")))
// id  user    created     deleted     exists
// 1   Alice   2023-07-01  null        yes
// 2   Bob     2023-07-08  2023-08-01  no
// 3   Charly  2023-07-15  null        yes

val ds3 = ds2.where($"exists" === "yes")
// id  user    created     deleted  exists
// 1   Alice   2023-07-01  null     yes
// 3   Charly  2023-07-15  null     yes

val ds4 = ds3.select($"id", $"user")
// id  user
// 1   Alice
// 3   Charly

These four lines of code represent transformations. The tables are not materialized or printed by that code, but serve as some visualization of what effect the transformation will have. Even reading the CSV file does not actually read the entire file but only declares the schema and filename.

Calling an action on the final dataset ds4 executes all four transformations: reading the CSV file, adding a column, filtering rows and selecting columns. Only actions[2] use the distributed computation capabilities of Spark.

Examples of actions are show, write, and collect:

ds4.show()
ds4.write.csv("existing-users.csv")
ds4.collect()

Executing multiple actions executes all transformations multiple times. Each of the above example actions execute all four transformations. In other words, results of the first action are not reused by subsequent actions, unless caching is enabled.

Caching datasets

The result of transformations can be reused by subsequent actions by calling cache first.

// mark ds4 to be cached
ds4.cache()

// executes four transformations and caches result
ds4.count()

// reuses result of the four transformations
ds4.write.csv("existing-users.csv")
ds4.collect()

The disadvantage of caching is that this occupies memory or disk storage to store the computed dataset df4.

The memory storage is freed by calling unpersist:

// frees storage occupied by ds4
ds4.unpersist()

Computing metrics as transformations

Metrics of datasets like the number of rows, the average of a column, or the number of null values of a column, can be declared through transformations. But they have to be executed by an action in order to be retrieved (e.g. show, write, or collect). This executes all other transformations of the initial dataset as well.

Imagine a data processing job that reads a dataset and declares dozens of expensive transformations, which might take hours to process. Declaring and retrieving metrics on the result dataset through an action, before or after actually storing the result dataset, will execute all transformations twice. So retrieving the metrics takes as long as processing your dataset, effectively multiplying the processing time.

// read input data
val raw = spark.read.csv("raw-data.csv")

// process data
val clean = prepare_dataset(raw)

// write clean data
clean.write.csv("clean-data.csv")

// retrieve metrics about our clean data
val rows = clean.count()
val average = clean.select(avg($"value")).as[Double].collect().head
val nullValues = clean.select(count(when($"value".isNull, lit(1)))).as[Long].collect().head

The three metrics execute all transformations of dataset (clean) three times. This makes your data processing job take three times longer.

The result dataset clean could be cached, which makes the transformation result reused by the metrics, but this may require a vast amount of storage, which might not be available.

The least expensive alternative is usually to read the written data and compute the metrics from that dataset:

val written = spark.read.csv("clean-data.csv")
val rows = written.count()
val average = written.select(avg($"value")).as[Double].collect().head
val nullValues = written.select(count(when($"value".isNull, lit(1)))).as[Long].collect().head

Any of these approaches require either extra cache memory or processing time. This is where Spark observation metrics come into play. They extract metrics while an action is executed. Caching is not required and computation is reduced to the minimum.

Computing metrics as observations

Spark allows declaring metrics through the transformation observe, where metrics are declared as aggregations. The difference from metrics as transformations is that they are not retrieved through an action return value (e.g. val rows = clean.count()), but retrieved through an Observation instance (e.g. observation.get("rows")), while executing another action (e.g. cleanWithMetrics.write.csv("clean-data.csv")):

import org.apache.spark.sql.Observation

// read input data
val raw = spark.read.option("header", true).csv("raw-data.csv")

// process data, this adds loads of expensive transformations
val clean = prepare_dataset(raw)

// define observations on data
val observation = Observation()
val cleanWithMetrics = clean.observe(
  observation,
  count().as("rows"),
  avg($"value").as("average"),
  count(when($"value".isNull, lit(1))).as("null values")
)

// write clean data
cleanWithMetrics.write.csv("clean-data.csv")

// retrieve metrics about our clean data
val rows = observation.get("rows")
val average = observation.get("average")
val nullValues = observation.get("null values")

After executing an action on the dataset returned by observe (here cleanWithMetrics), the declared metrics can be retrieved via the Observation instance. They have been computed while executing the action and become available with the termination of the action.

Limitations of observation metrics

Observation metrics are restricted to aggregate functions. These are functions that return a single aggregated value for the entire dataset like sum, avg, or collect_set. Further, observation metrics aim at providing metrics with low additional computational effort. This prohibits any aggregations that require a shuffle stage (repartitioning the dataset) like count_distinct (while approx_count_distinct is allowed) or any window function (e.g. sum.over). We look into some ways to work around this limitation below.

Another restriction is that executing an action on the observed dataset a second time will not change the observed metrics. Metrics are collected only from the first invocation of an action. Consequently, an Observation instance can only be used for one Dataset.observe call.

Over-counting observation metrics

There are situations where stages that contain observations are executed multiple times. This will execute the aggregations multiple times as well, creating over-counting metrics.

A common situation where this happens is when an observed dataset (i.e. the dataset returned by observe) is sorted:

// define observations on data
val observation = Observation()
val cleanWithMetrics = clean.observe(
  observation,
  count().as("rows"),
  avg($"value").as("average"),
  count(when($"value".isNull, lit(1))).as("null values")
).sort()

Now, the sort first evaluates the observation once while sampling the observed dataset to prepare for the sort, then it evaluates the observation a second time to actually sort the observed dataset. We will see that metric "rows" will be twice as large as expected.

A skewed dataset evaluates some partitions even three times, all others twice, so we get completely unpredictable observations. Let’s look at this reproducible example:

import org.apache.spark.sql.Observation
import org.apache.spark.sql.functions.log2
import org.apache.spark.sql.expressions.Window

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", value = false)

val ids = spark.range(0, 1000000, 1, 100)

val once = Observation()
val twice = Observation()
val skewed = Observation()

// an observation run once
ids.observe(once, count("*").as("rows")).collect

// an observation run twice
ids.observe(twice, count("*").as("rows")).sort($"id".desc).collect

// a skewed dataset runs some partitions three times, all others twice
ids.withColumn("bits", log2($"id").cast("int"))
  // this repartitions the dataset by column `bits`, which is highly skewed
  .withColumn("bits-card", count("*").over(Window.partitionBy($"bits")))
  .observe(skewed, count("*").as("rows"))
  // sorting this skewed dataset evaluates some partitions three times, others twice
  .sort($"id")
  .collect
(once.get("rows"), twice.get("rows"), skewed.get("rows"))
// (1000000,2000000,2984192)

This example shows how sensitive observations are to subsequent transformations. Observations can still be used in those situations when the observed dataset is cached, but as discussed above, the niche advantage of observation metrics is avoiding caching the entire dataset in the first place.

The conclusion here is to use observations only in situations where no over-counting occurs, or only with metrics that are robust against over-counting like the minimum, maximum, or the existence of a property. Alternatively, handle the result with the required care.

Counting null values for instance, where none are expected will produce a count of 0 if no nulls exist. So a 0 can be trusted, even if over-counting occurs. A number larger than 0 does not give you a precise amount of null values, but it tells you null values exist, and the upper bound of such null values.

Observation metrics with shuffles

Metrics that involve distinct or window functions require a shuffle stage. Such a shuffle renders the metrics expensive, which is against the purpose of cheap observation metrics. However, such metrics can still be computed cheaply in situations where the shuffle stage is already required by the subsequent transformation. In that situation, we can still observe metrics without invoking extra computational effort.

We have seen in Limitations of observation metrics that observation metrics have to be aggregate functions (return a single value) while window functions do not aggregate any rows. Using these functions requires an additional aggregation. We can observe such aggregations by first materializing the window function and then observing the aggregation.

Let’s look at the following example. We want to get the maximum cardinality of values in column "id", i.e., the number of rows that have the same value in that column. Such an aggregation can be expressed by a window function, aggregated by max:

import org.apache.spark.sql.expressions.Window

val dsWithCardinality = ds.withColumn("cardinality", count("*").over(Window.partitionBy($"id")))
// id  user    created     deleted     exists  cardinality
// 1   Alice   2023-07-01  null        yes     1
// 2   Bob     2023-07-08  2023-08-01  no      1
// 3   Charly  2023-07-15  null        yes     2
// 3   Charly  2023-08-01  null        yes     2

val maxCardinality = dsWithCardinality.select(max($"cardinality")).as[Long].head
// val maxCardinality: Long = 2

The shuffle stage (repartitioning) required by the window function can be seen via explain:

dsWithCardinality.explain()
== Physical Plan ==
Window [count(1) windowspecdefinition(id, …) AS cardinality], [id]
+- *(1) Sort [id ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id, 200), ENSURE_REQUIREMENTS
      +- FileScan csv [id,user,created,deleted] …

The query plan[3] repartitions the dataset by column id (see step Exchange hashpartitioning(id, 200), ENSURE_REQUIREMENTS), which is an expensive operation. If our data pipeline contains a transformation that requires such a repartitioning (e.g. join, groupBy, or a window function with partitionBy), then we can compute our observation right before that transformation for free.

Here is an example for a join by column "id" transformation, e.g. ds.join(logins, "id"):

val logins = spark.read.option("header", true).csv("user_logins.csv")
ds.join(logins, "id").explain
== Physical Plan ==
*(5) Project [id, user, created, deleted, exists, login]
+- *(5) SortMergeJoin [id], [id], Inner
   :- *(2) Sort [id ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id, 200), ENSURE_REQUIREMENTS
   :     +- *(1) Filter isnotnull(id)
   :        +- FileScan csv [id,user,created,deleted,exists] …
   +- *(4) Sort [id ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id, 200), ENSURE_REQUIREMENTS
         +- *(3) Filter isnotnull(id)
            +- FileScan csv [id,login] …

The query plan shows that the join requires a repartitioning by column "id", which we can reuse to materialize our window function metric to then observe the max aggregation for free.

To be more precise: the window function repartitions the dataset, the observed metric aggregates the window function result, and the subsequent join reuses the repartitioned dataset:

val dsWithMetrics = dsWithCardinality
  .observe(observation, max($"cardinality"))
  .drop("cardinality")
  .join(logins, "id")

dsWithMetrics.explain()
== Physical Plan ==
*(5) Project [id, user, created, deleted, exists, cardinality, login]
+- *(5) SortMergeJoin [id], [id], Inner
   :- *(2) Filter isnotnull(id)
   :  +- CollectMetrics …, [max(cardinality) AS max(cardinality)]
   :     +- Window [count(1) windowspecdefinition(id, …) AS cardinality], [id]
   :        +- *(1) Sort [id ASC NULLS FIRST], false, 0
   :           +- Exchange hashpartitioning(id, 200), ENSURE_REQUIREMENTS
   :              +- FileScan csv [id,user,created,deleted,exists] …
   +- *(4) Sort [id ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id, 200), ENSURE_REQUIREMENTS
         +- *(3) Filter isnotnull(id)
            +- FileScan csv [id,login] …

We can see in this query plan that there is no extra shuffle involved.

Example metrics

Here is a small collection of common dataset metrics and how they can be computed as observations.

metric action observation
number of rows
df.count
count("*")
number of non-null values in column c
df.where($"c".isNotNull)
  .count
count($"c")
number of null values in column c
df.where($"c".isNull)
  .count
count(when($"c".isNull, lit(1))
distinct values of column c
df.select($"c")
  .distinct
  .collect
collect_set($"c")
distinct count (approximation)
df.select(count_distinct($"c"))
  .as[Long]
  .head
count_distinct($"c")

Note that collect_set($"c") should not be used when a large collection of distinct values is expected.

The spark-extension package provides an aggregate function count_null($"c") that is equivalent to above metric count(when($"c".isNull, lit(1)).

Conclusion

We have seen that Spark Observations are great for computing simple aggregate metrics on Spark Datasets while executing actions. This allows the reuse of intermediate transformation results to compute the metrics without the need for caching or recomputing parts of the Dataset.

However, significant drawbacks remain. Observations can only compute aggregate functions, and they cannot compute functions that rely on shuffle operations such as distinct functions or window functions.

In some situations, observations will be executed multiple times, which produces inaccurate results. For instance, the count aggregate will then over-count the actual amount. Therefore, results must be treated with care.

[1] Actions and transformations are described in the Spark Dataset API.

[2] Transformations may use distributed computation capabilities, and run Spark jobs to collect information needed to declare the transformation, but they never process the entire dataset.

[3] Query plans are generated with Spark 3.4.1 and options spark.sql.adaptive.enabled = false and spark.sql.autoBroadcastJoinThreshold = -1.

Latest News

G Research
G-Research September 2024 Grant Winners
  • 08 Oct 2024

Each month, we provide up to £2,000 in grant money to early career researchers in quantitative disciplines. Hear from our August grant winners.

Read article
Lessons learned: Delivering software programs (part 5)
  • 04 Oct 2024

Hear more from our Head of Forecasting Engineering and learn how to keep your projects on track by embracing constant change and acting quickly.

Read article

Latest Events

  • Quantitative Engineering
  • Quantitative Research

Oxford Coding Challenge

23 Oct 2024 University of Oxford, Computer Science Lecture Theatre A, 7 Parks Rd, Oxford, OX1 3QG
  • Quantitative Engineering
  • Quantitative Research

Cambridge Coding Challenge

28 Oct 2024 East Hub 1, University of Cambridge, JJ Thomson Avenue, Cambridge, CB3 0US
  • Quantitative Engineering
  • Quantitative Research

Cambridge Quant Challenge

06 Nov 2024 University of Cambridge, Centre for Mathematical Sciences,  Wilberforce Road,  Cambridge CB3 0WA

Stay up to date with
G-Research