Skip to main content
We're enhancing our site and your experience, so please keep checking back as we evolve.
Back to News
PySpark apps with dependencies: Managing Python dependencies in code

PySpark apps with dependencies: Managing Python dependencies in code

24 January 2024
  • Open Source Software

Running a Python Spark application on a cluster like Yarn or Kubernetes requires all Python packages that are used by the Python application to be installed on the driver and all cluster nodes. Multiple established approaches to install dependencies exist. Common drawbacks of these approaches are that the dependencies have to be prepared outside your Python code, or that the footprint of the dependency installation is much larger than what is really needed. This article presents an alternative approach that allows installing Python dependencies at runtime, with a small footprint, by reusing existing PIP infrastructure – and all this from some simple Python code.

Existing approaches and drawbacks

All alternative approaches – like Conda, packed virtual environments, PEX, and cluster-pack – require the packages to be installed locally first. That virtual Python environment (venv) is then wrapped into an archive file and shipped to the cluster nodes. Installing the packages and preparing the archive is usually done outside the PySpark application, either manually in a terminal or by running a shell script (Conda, venv-pack, PEX). Running such commands might be difficult for users, especially in a notebook environment like Jupyter.

For some approaches, the archive contains the entire Python installation, not just the packages required by the PySpark application, which might not be needed by the cluster nodes. Shipping just the minimal set of required packages can improve start-up speed.

Those archived environments complement the PySpark application. Consecutive versions of the app very likely depend on the same package versions. This makes the archives replicate the identical Python environments over and over again. Alternatively, installing the packages from a package repository on runtime significantly reduces the footprint of a deployed PySpark application.

Install dependencies from Python code

An alternative approach is provided by the spark-extension package, which has the following objectives:

  • Small footprint: Only required packages are shipped to the cluster nodes, not the entire environment that starts the PySpark application. Dependencies are downloaded at runtime to keep the profile of the application low.
  • Isolation: Other PySpark apps must not see the packages.
  • Native code: Support packages with native code (compiled libraries).
  • Interactive: Support package installation without restarting an active PySpark session.
  • Based on PIP: Utilize PIP features like fetching from PyPi, caching files, building wheels, or version ranges.

The spark-extension package  allows the user to install PIP packages and Poetry projects from within an active Spark session. This makes it especially useful for working with interactive notebooks like Jupyter. There is no need to restart the Spark session to install more packages.

These three steps are needed to install packages:

  1. Add spark-extension to your PySpark application by either installing the pyspark-extension PyPi package or by adding the spark-extension Maven package (note the differences in the package name) as a dependency as follows:

1.1. If you install the pyspark-extension PyPi package (on the driver only):

pip install pyspark-extension==2.11.1.3.5

1.2. If you add the spark-extension Maven package as a dependency:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars.packages", "uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5") \
    .getOrCreate()
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5 [script.py]
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5

2. Import the gresearch.spark package

# noinspection PyUnresolvedReferences
from gresearch.spark import *

3. Install PIP packages or Poetry projects

spark.install_pip_package("pandas", "pyarrow")
spark.install_poetry_project("../my-poetry-project/", poetry_python="../venv-poetry/bin/python")

The install_pip_package function supports all pip install arguments:

# install packages with version specs
spark.install_pip_package("pandas==1.4.3", "pyarrow~=8.0.0")

# install packages from package sources (e.g. git clone https://github.com/pandas-dev/pandas.git)
spark.install_pip_package("./pandas/")

# install packages from git repo
spark.install_pip_package("git+https://github.com/pandas-dev/pandas.git@main")

# use a pip cache directory to cache downloaded and built whl files
spark.install_pip_package("pandas", "pyarrow", "--cache-dir", "/home/user/.cache/pip")

# use an alternative index url (other than https://pypi.org/simple)
spark.install_pip_package("pandas", "pyarrow", "--index-url", "https://artifacts.company.com/pypi/simple")

# install pip packages quietly (only disables output of PIP)
spark.install_pip_package("pandas", "pyarrow", "--quiet")

Example

The following example uses install_pip_package in a standalone Spark cluster.

First checkout the example code:

git clone https://github.com/G-Research/spark-extension.git
cd spark-extension/examples/python-deps

Build a Docker image based on the official Spark release:

docker build -t spark-extension-example-docker .

Start the example Spark standalone cluster consisting of a Spark master and one worker:

docker compose -f docker-compose.yml up -d

Run the example.py Spark application on the example cluster:

docker exec spark-master spark-submit --master spark://master:7077 --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5 /example/example.py

The --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5 argument tells spark-submit to add the spark-extension Maven package to the Spark job.

Alternatively, install the pyspark-extension PyPi package via pip install and remove the --packages argument from spark-submit:

docker exec spark-master pip install --user pyspark_extension==2.11.1.3.5
docker exec spark-master spark-submit --master spark://master:7077 /example/example.py

This output proves that PySpark could call into the function func, which only works when Pandas and PyArrow are installed:

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

Test that spark.install_pip_package("pandas", "pyarrow") is really required by this example by removing this (red) line from example.py

 from pyspark.sql import SparkSession

 def main():
     spark = SparkSession.builder.appName("spark_app").getOrCreate()

     def func(df):
         return df

     from gresearch.spark import install_pip_package

-    spark.install_pip_package("pandas", "pyarrow")
     spark.range(0, 3, 1, 5).mapInPandas(func, "id long").show()

 if __name__ == "__main__":
     main()

… and running the spark-submit command again. The example does not work any more, because the Pandas and PyArrow packages are missing from the driver:

Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/utils.py", line 27, in require_minimum_pandas_version
ModuleNotFoundError: No module named 'pandas'

Finally, shutdown the example cluster:

docker compose -f docker-compose.yml down

How it works

By calling spark.install_pip_package("pandas", "pyarrow"), the driver installs the packages into a local temporary directory using pip install --target DIR:

Collecting pandas
  Downloading pandas-2.0.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.4 MB)
Collecting pyarrow
  Downloading pyarrow-14.0.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (38.2 MB)
Collecting pytz>=2020.1
  Downloading pytz-2023.3.post1-py2.py3-none-any.whl (502 kB)
Collecting numpy>=1.20.3; python_version < "3.10"
  Downloading numpy-1.24.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
Collecting python-dateutil>=2.8.2
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Collecting tzdata>=2022.1
  Downloading tzdata-2023.4-py2.py3-none-any.whl (346 kB)
Collecting six>=1.5
  Downloading six-1.16.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: pytz, numpy, six, python-dateutil, tzdata, pandas, pyarrow
Successfully installed numpy-1.24.4 pandas-2.0.3 pyarrow-14.0.2 python-dateutil-2.8.2 pytz-2023.3.post1 six-1.16.0 tzdata-2023.4

This directory is compressed into a zip file, which is then added to the spark session:

24/01/04 15:05:22 INFO SparkContext: Added archive /tmp/spark-31bc3ae3-501c-4a04-b0a1-71b251a86e90/userFiles-c8bf3f38-9cfc-473b-bb67-9ac9f693feb3/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip#spark-extension-pip-pkgs-1704380649.417889 at spark://c9d4bba1eb87:42597/files/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip with timestamp 1704380722891
24/01/04 15:05:22 INFO Utils: Copying /tmp/spark-31bc3ae3-501c-4a04-b0a1-71b251a86e90/userFiles-c8bf3f38-9cfc-473b-bb67-9ac9f693feb3/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip to /tmp/spark-8258016d-073e-4a06-9135-6374e55e7693/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip

Spark then decompresses that file and adds all contained files to the driver …:

24/01/04 15:05:22 INFO SparkContext: Unpacking an archive /tmp/spark-31bc3ae3-501c-4a04-b0a1-71b251a86e90/userFiles-c8bf3f38-9cfc-473b-bb67-9ac9f693feb3/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip#spark-extension-pip-pkgs-1704380649.417889 from /tmp/spark-8258016d-073e-4a06-9135-6374e55e7693/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip to /tmp/spark-31bc3ae3-501c-4a04-b0a1-71b251a86e90/userFiles-c8bf3f38-9cfc-473b-bb67-9ac9f693feb3/spark-extension-pip-pkgs-1704380649.417889

… and all executors:

24/01/04 15:05:27 INFO Executor: Fetching spark://c9d4bba1eb87:42597/files/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip#spark-extension-pip-pkgs-1704380649.417889 with timestamp 1704380722891
24/01/04 15:05:27 INFO Utils: Fetching spark://c9d4bba1eb87:42597/files/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip to /tmp/spark-960f2258-c1a5-4359-bf7f-181ae39038b0/executor-e15f2a63-ffa9-4967-b492-8a01dfd0a218/spark-d84b99ff-1262-432b-9a0d-6c436afc805e/fetchFileTemp12930731486451374887.tmp
24/01/04 15:05:28 INFO Utils: Copying /tmp/spark-960f2258-c1a5-4359-bf7f-181ae39038b0/executor-e15f2a63-ffa9-4967-b492-8a01dfd0a218/spark-d84b99ff-1262-432b-9a0d-6c436afc805e/-9994355801704380722891_cache to /tmp/spark-552a0dc1-5a28-4ca1-9bbb-734347dbd571/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip
24/01/04 15:05:28 INFO Executor: Unpacking an archive spark://c9d4bba1eb87:42597/files/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip#spark-extension-pip-pkgs-1704380649.417889 from /tmp/spark-552a0dc1-5a28-4ca1-9bbb-734347dbd571/spark-extension-pip-pkgs-1704380649.417889-9148709612831554122.zip to /opt/spark/work/app-20240104150408-0000/1/./spark-extension-pip-pkgs-1704380649.417889

The contained packages become available to the Python code on the driver and the executors. Temporary files and archives are removed on Spark session termination.

Without calling spark.install_pip_package("pandas", "pyarrow"), we get the following error, proving that Pandas is not installed (on the driver):

Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/utils.py", line 27, in require_minimum_pandas_version
ModuleNotFoundError: No module named 'pandas'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/example/example.py", line 15, in <module>
    main()
  File "/example/example.py", line 12, in main
    spark.range(0, 3, 1, 5).mapInPandas(func, "id long").show()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/map_ops.py", line 92, in mapInPandas
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 336, in pandas_udf
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/utils.py", line 34, in require_minimum_pandas_version
ImportError: Pandas >= 1.0.5 must be installed; however, it was not found.

Summary

The spark-extension package allows installing PyPi packages and Poetry projects into running PySpark sessions with a single line of code. Required packages are defined inside the Python application, which removes the need to install any dependencies outside the Python code (e.g. on a terminal). Installing packages does not require restarting PySpark sessions, making this particularly useful in interactive notebook environments. This supports packages with native code (compiled libraries), and installed packages are invisible outside the PySpark app.

Latest News

An interview with Michael Kagan (CTO at NVIDIA)
  • 02 Jul 2024

We spoke to Michael Kagan, CTO at NVIDIA, in an exclusive interview shot ahead of his keynote talk at the G-Research Distinguished Speaker Symposium.

Read article
G-Research 2024 PhD prize winners: Imperial College London
  • 18 Jun 2024
Read article

Stay up to date with
G-Research