Use Ibis with sparkctl¶
In this tutorial you will learn how to use Ibis with a sparkctl-managed Spark cluster on HPC. Ibis provides a unified Python dataframe API that can target multiple backends, including PySpark.
This tutorial covers three approaches:
Option A: Managed Cluster - Use sparkctl as a Python library with Spark Connect (recommended)
Option B: CLI with Spark Connect - Start the cluster via CLI, connect with Spark Connect
Option C: CLI with Traditional PySpark - Start the cluster via CLI, connect with full PySpark
Prerequisites¶
Install Ibis with the PySpark backend in your Python environment. You must pin pyspark to
version 4.0.0 to match sparkctl’s requirements:
$ pip install 'ibis-framework[pyspark]' 'pyspark==4.0.0'
This installs Ibis with PySpark support at the version compatible with sparkctl.
Option A: Managed Cluster (Spark Connect)¶
This approach uses sparkctl as a Python library to manage the entire cluster lifecycle within your script. The cluster starts when you enter the context manager and stops automatically when you exit. This approach uses Spark Connect under the hood.
Allocate compute nodes, such as with Slurm. This example acquires 4 CPUs and 30 GB of memory for the Spark master process and user application + Spark driver and 2 complete nodes for Spark workers.
$ salloc -t 01:00:00 -n4 --partition=shared --mem=30G : -N2 --account=<your-account> --mem=240G
Activate the Python environment that contains sparkctl and ibis-pyspark.
$ module load python $ source ~/python-envs/sparkctl
Add the code below to a Python script. This will configure and start the Spark cluster, create an Ibis connection, run your queries, and then stop the cluster.
import ibis from sparkctl import ClusterManager, make_default_spark_config # Load your global sparkctl configuration file (~/.sparkctl.toml). config = make_default_spark_config() # Set runtime options as desired. # config.runtime.driver_memory_gb = 20 # config.runtime.use_local_storage = True mgr = ClusterManager(config) with mgr.managed_cluster() as spark: # Create an Ibis connection and set it as the default backend con = ibis.pyspark.connect(session=spark) ibis.set_backend(con) # Create an in-memory table t = ibis.memtable({"a": [1, 2, 3], "b": [4, 5, 6]}) # Query using Ibis result = t.filter(t.a > 1).select(t.a, t.b, c=t.a + t.b) print(result.execute())
Run the script.
$ python my_ibis_job.py
Option B: CLI with Spark Connect¶
This approach uses the sparkctl CLI to start the cluster with Spark Connect Server, then connects from a separate Python session. This is useful when you want to start the cluster once and run multiple scripts or interactive sessions against it.
Allocate compute nodes, such as with Slurm. This example acquires 4 CPUs and 30 GB of memory for the Spark master process and user application + Spark driver and 2 complete nodes for Spark workers.
$ salloc -t 01:00:00 -n4 --partition=shared --mem=30G : -N2 --account=<your-account> --mem=240G
Activate the Python environment that contains sparkctl.
$ module load python $ source ~/python-envs/sparkctl
Configure the Spark cluster with the Connect Server enabled. The sparkctl code will detect the compute nodes based on Slurm environment variables.
$ sparkctl configure --connect-server
Start the cluster.
$ sparkctl start
Set the environment variables
SPARK_CONF_DIRandJAVA_HOME. This ensures that your application uses the Spark settings created in step 3. Instructions will be printed to the console. For example:$ export SPARK_CONF_DIR=$(pwd)/conf $ export JAVA_HOME=/datasets/images/apache_spark/jdk-21.0.7
Connect with Ibis. You can do this interactively or in a script.
$ python>>> import ibis >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate() >>> con = ibis.pyspark.connect(session=spark) >>> ibis.set_backend(con) >>> # Create and query tables >>> t = ibis.memtable({"x": [10, 20, 30], "y": [1.1, 2.2, 3.3]}) >>> t.filter(t.x > 15).execute()
When finished, stop the cluster.
$ sparkctl stop
Option C: CLI with Traditional PySpark¶
This approach uses the sparkctl CLI to start the cluster, then connects using traditional PySpark (not Spark Connect). Use this if you need full PySpark functionality or encounter compatibility issues with Spark Connect.
Allocate compute nodes, such as with Slurm. This example acquires 4 CPUs and 30 GB of memory for the Spark master process and user application + Spark driver and 2 complete nodes for Spark workers.
$ salloc -t 01:00:00 -n4 --partition=shared --mem=30G : -N2 --account=<your-account> --mem=240G
Activate the Python environment that contains sparkctl.
$ module load python $ source ~/python-envs/sparkctl
Configure the Spark cluster without the Connect Server.
$ sparkctl configure --no-connect-server
Start the cluster.
$ sparkctl start
Set the environment variables. Instructions will be printed to the console. For example:
$ export SPARK_CONF_DIR=$(pwd)/conf $ export JAVA_HOME=/datasets/images/apache_spark/jdk-21.0.7 $ export PYSPARK_PYTHON=$(which python)
Connect with Ibis using a traditional SparkSession.
$ python>>> import ibis >>> from pyspark.sql import SparkSession >>> import socket >>> master_url = f"spark://{socket.gethostname()}:7077" >>> spark = SparkSession.builder.master(master_url).appName("ibis").getOrCreate() >>> con = ibis.pyspark.connect(session=spark) >>> ibis.set_backend(con) >>> # Create and query tables >>> t = ibis.memtable({"x": [10, 20, 30], "y": [1.1, 2.2, 3.3]}) >>> t.filter(t.x > 15).execute()
When finished, stop the cluster.
$ sparkctl stop
Working with Data Files¶
Ibis with PySpark can read data files directly. Here are examples of common operations:
import ibis
from sparkctl import ClusterManager, make_default_spark_config
config = make_default_spark_config()
mgr = ClusterManager(config)
with mgr.managed_cluster() as spark:
con = ibis.pyspark.connect(session=spark)
ibis.set_backend(con)
# Read a Parquet file
t = con.read_parquet("/path/to/data.parquet")
# Read a CSV file
t = con.read_csv("/path/to/data.csv")
# Perform Ibis operations
result = (
t.group_by("category")
.agg(
count=t.count(),
total=t.value.sum(),
avg_value=t.value.mean(),
)
.order_by(ibis.desc("total"))
)
# Execute and get results
print(result.execute())
# Write results back to Parquet
result.to_parquet("/path/to/output.parquet")
Choosing Between Approaches¶
Approach |
Connection Type |
Best For |
|---|---|---|
Option A: Managed Cluster |
Spark Connect |
Scripts with defined start/end, automated pipelines, automatic cleanup |
Option B: CLI + Spark Connect |
Spark Connect |
Interactive exploration, multiple scripts against same cluster |
Option C: CLI + Traditional |
Full PySpark |
Full PySpark API access, Spark Connect compatibility issues |
Spark Connect vs Traditional PySpark:
Spark Connect is a lightweight client protocol. The driver runs on the cluster, and your Python code communicates with it over gRPC. This is the default for sparkctl’s
managed_cluster().Traditional PySpark runs the driver in your Python process. This gives you access to the full PySpark API but requires
PYSPARK_PYTHONto be set correctly.
Option A (Managed Cluster) is recommended for most workflows because it ensures the cluster is properly stopped even if your script encounters an error.