Skip to content

feat: add SparkClient API for SparkConnect session management#225

Merged
google-oss-prow[bot] merged 14 commits intokubeflow:mainfrom
Shekharrajak:feat/sparkclient-mvp
Feb 12, 2026
Merged

feat: add SparkClient API for SparkConnect session management#225
google-oss-prow[bot] merged 14 commits intokubeflow:mainfrom
Shekharrajak:feat/sparkclient-mvp

Conversation

@Shekharrajak
Copy link
Member

@Shekharrajak Shekharrajak commented Jan 13, 2026

Closes #163 as per the KEP.

Features

  • SparkClient API: High-level client for SparkConnect lifecycle management
  • KubernetesSparkBackend: Backend implementation for SparkConnect CRD operations
  • SparkClient.connect(): Get PySpark SparkSession from existing server or auto-create new session

Quick Test

Option 1: Connect to Local Spark Connect Server (No Kubernetes required)

# 1. Start local Spark Connect server
docker run -d --name spark-connect -p 15002:15002 \
  apache/spark:3.5.0 \
  /opt/spark/sbin/start-connect-server.sh \
  --packages org.apache.spark:spark-connect_2.12:3.5.0
# 2. Wait ~30s for startup, then test
cd sdk
uv run python kubeflow/spark/examples/test_connect_url.py
# 3. Cleanup
docker stop spark-connect && docker rm spark-connect

Option 2: With Kind Cluster + Spark Operator

# 1. Setup Kind cluster with Spark Operator
./scripts/spark/setup-kind.sh

# 2. Run demo
uv run python kubeflow/spark/examples/demo_existing_sparkconnect.py

Examples:

# install inside this branch:  pip install -e ".[spark]" 
# Connect to existing SparkConnect
from kubeflow.spark import SparkClient
spark = SparkClient().connect(url="sc://localhost:15002")
df = spark.range(10)
df.show()

# ---- kubernetes 
from kubeflow.spark import SparkClient
from kubeflow.common.types import KubernetesBackendConfig

client = SparkClient(backend_config=KubernetesBackendConfig(namespace="spark-test"))
# create a new session and connect
spark = client.connect(name="my-session")

Copilot AI review requested due to automatic review settings January 13, 2026 14:22
@github-actions
Copy link
Contributor

🎉 Welcome to the Kubeflow SDK! 🎉

Thanks for opening your first PR! We're happy to have you as part of our community 🚀

Here's what happens next:

  • If you haven't already, please check out our Contributing Guide for repo-specific guidelines and the Kubeflow Contributor Guide for general community standards
  • Our team will review your PR soon! cc @kubeflow/kubeflow-sdk-team

Join the community:

Feel free to ask questions in the comments if you need any help or clarification!
Thanks again for contributing to Kubeflow! 🙏

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a SparkClient API for managing SparkConnect sessions on Kubernetes, enabling high-level lifecycle management and PySpark integration.

Changes:

  • Added SparkClient API with support for connecting to existing servers or auto-creating new sessions
  • Implemented KubernetesSparkBackend for SparkConnect CRD operations
  • Added comprehensive type definitions (Driver, Executor, SparkConnectInfo, SparkConnectState)

Reviewed changes

Copilot reviewed 112 out of 476 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
pyproject.toml Added spark extra dependencies (pyspark==3.5.0, grpcio, pandas, pyarrow)
uv.lock Updated lock file with new dependencies and resolution markers
kubeflow/spark/init.py Package exports for SparkClient and related types
kubeflow/spark/api/spark_client.py Main SparkClient implementation with connect() method
kubeflow/spark/backends/base.py Abstract base class defining backend interface
kubeflow/spark/backends/kubernetes/backend.py Kubernetes backend implementing SparkConnect CRD operations
kubeflow/spark/backends/kubernetes/utils.py Utility functions for CRD building and validation
kubeflow/spark/backends/kubernetes/constants.py Constants for SparkConnect CRD and defaults
kubeflow/spark/types/types.py Core type definitions (SparkConnectInfo, Driver, Executor, etc.)
kubeflow/spark/examples/*.py Example scripts demonstrating usage
kubeflow/spark/integration/*.py Integration tests with pytest fixtures
scripts/spark/setup-kind.sh Kind cluster setup script for testing
docs/spark-connect-local-testing.md Documentation for local testing

@Shekharrajak
Copy link
Member Author

Cleaning up commits.

@Shekharrajak
Copy link
Member Author

from kubeflow.spark import SparkClient

# Connect to localhost after port-forwarding
spark = SparkClient().connect(url="sc://localhost:15002")
df = spark.range(10)
df.show()
Screenshot 2026-01-16 at 11 30 34 PM

@Shekharrajak
Copy link
Member Author

cd kubeflow/sdk

  python3 << 'EOF'
  from kubeflow.spark.api.spark_client import SparkClient

  print("Creating SparkClient...")
  client = SparkClient()

  print("Connecting to sc://localhost:15002...")
  spark = client.connect(url="sc://localhost:15002")

  print("Creating DataFrame...")
  df = spark.range(10)

  print("Counting rows...")
  count = df.count()

  print(f"SUCCESS! Count = {count}")
  assert count == 10
  EOF

  Expected output:
  Creating SparkClient...
  Connecting to sc://localhost:15002...
  Connected! Creating DataFrame...
  Counting rows...
  SUCCESS! Count = 10

 Run Full Integration Test Suite

  uv run pytest kubeflow/spark/integration/test_existing_sparkconnect.py -v

  Expected output:
  test_existing_sparkconnect.py::TestExistingSparkConnect::test_connect_to_existing_server PASSED [ 16%]
  test_existing_sparkconnect.py::TestExistingSparkConnect::test_dataframe_operations PASSED [ 33%]
  test_existing_sparkconnect.py::TestExistingSparkConnect::test_sql_operations PASSED [ 50%]
  test_existing_sparkconnect.py::TestExistingSparkConnect::test_multi_column_dataframe PASSED [ 66%]
  test_existing_sparkconnect.py::TestExistingSparkConnect::test_multiple_transformations PASSED [ 83%]
  test_existing_sparkconnect.py::TestExistingSparkConnect::test_groupby_aggregations PASSED [100%]

  ================== 6 passed in 318.59s (0:05:18) ==================

  Step 3: Run Individual Tests (Optional)

  # basic connection
  uv run pytest kubeflow/spark/integration/test_existing_sparkconnect.py::TestExistingSparkConnect::test_connect_to_existing_server -v

  # Test DataFrame operations
  uv run pytest kubeflow/spark/integration/test_existing_sparkconnect.py::TestExistingSparkConnect::test_dataframe_operations -v

  # Test SQL operations
  uv run pytest kubeflow/spark/integration/test_existing_sparkconnect.py::TestExistingSparkConnect::test_sql_operations -v


Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this effort @Shekharrajak! Overall looks good, and I am looking forward to this!
I left my initial comments.


## Option 1: Docker-based Spark Connect Server (Recommended)

### Start Spark Connect Server
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so our API is working even without Kubernetes cluster?
Shall we re-use this functionality to run SparkClient() with container backend?
For example, when user runs connect() with ContainerBackend, we start spark container and allow user to connect to the Spark session.

We can add support for it in the follow up PRs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it is not automated but as you mentioned correctly we can have ContainerBackendConfig

client = SparkClient(backend_config=ContainerBackendConfig())
spark = client.connect()  # Auto-starts container, connects when ready

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have this enhancement issue open for future implementation. Thanks!

Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @Shekharrajak!
mostly lgtm, I left a few comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kramaranya recently added the docs for Kubeflow SDK: https://sdk.kubeflow.org/en/latest/
Please can you create page for Spark there?

Comment on lines 49 to 59
delete_cluster() {
log_info "Deleting Kind cluster: $CLUSTER_NAME"
kind delete cluster --name "$CLUSTER_NAME" 2>/dev/null || true
log_info "Cluster deleted"
}

create_cluster() {
if kind get clusters 2>/dev/null | grep -q "^${CLUSTER_NAME}$"; then
log_warn "Cluster '$CLUSTER_NAME' already exists"
return 0
fi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for create and delete.
Do you want to just copy this script and add installation of Spark Operator from the helm charts?

@coveralls
Copy link

coveralls commented Jan 29, 2026

Pull Request Test Coverage Report for Build 21941935956

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 1115 of 1292 (86.3%) changed or added relevant lines in 18 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+4.3%) to 72.296%

Changes Missing Coverage Covered Lines Changed/Added Lines %
kubeflow/spark/backends/kubernetes/backend_test.py 214 215 99.53%
kubeflow/spark/types/options.py 88 89 98.88%
kubeflow/spark/api/spark_client.py 27 32 84.38%
kubeflow/spark/backends/base.py 17 23 73.91%
kubeflow/spark/backends/kubernetes/utils.py 95 103 92.23%
kubeflow/spark/types/validation.py 8 58 13.79%
kubeflow/spark/backends/kubernetes/backend.py 159 265 60.0%
Totals Coverage Status
Change from base Build 21919105330: 4.3%
Covered Lines: 3943
Relevant Lines: 5454

💛 - Coveralls

create_cluster
setup_test_namespace
install_spark_operator
if [[ "${E2E_CRD_ONLY:-0}" == "1" ]]; then
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

installing spark operator was taking more than 10 min and timed out . For now we have sparkconnect CRD check and validating CRD + API acceptance only (create/get/delete SparkConnect).

We are missing validation :

End-to-end Spark session
• client.connect() is not run in CI.
• No check that we get a real SparkSession, that spark.range(), df.count(), spark.stop() work, or that the SDK’s wait/ready logic works.

• test_spark_connect_simple_example and test_spark_advanced_options_example are not run in CI anymore.
• So we don’t validate that examples/spark/spark_connect_simple.py and spark_advanced_options.py succeed against a real cluster.

Still not sure why it is taking long time. Also we have not release spark operator helm version which have sparkconnect CRD available so I was trying to build from spark-operator codebase directly .

@Shekharrajak
Copy link
Member Author

Getting some errors in CI like :

kubectl logs -f  spark-connect-c74264f5-server  -n spark-test 
starting org.apache.spark.sql.connect.service.SparkConnectServer, logging to /opt/spark/logs/spark--org.apache.spark.sql.connect.service.SparkConnectServer-1-spark-connect-c74264f5-server.out
Spark Command: /opt/java/openjdk/bin/java -cp /opt/spark/conf:/opt/spark/jars/* -Xmx1g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --master k8s://https://10.96.0.1:443 --conf spark.driver.port=7078 --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/connect-name=spark-connect-c74264f5 --conf spark.driver.bindAddress=0.0.0.0 --conf spark.driver.host=10.244.0.14 --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.namespace=spark-test --conf spark.kubernetes.container.image=apache/spark:3.5.0 --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.executor.podNamePrefix=spark-connect-c74264f5 --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/connect-name=spark-connect-c74264f5 --conf spark.kubernetes.driver.pod.name=spark-connect-c74264f5-server --conf spark.executor.instances=2 --conf spark.kubernetes.executor.container.image=apache/spark:3.5.0 --class org.apache.spark.sql.connect.service.SparkConnectServer --name Spark Connect server spark-internal
========================================
26/01/29 19:11:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Error: Failed to load class org.apache.spark.sql.connect.service.SparkConnectServer.
Failed to load main class org.apache.spark.sql.connect.service.SparkConnectServer.
You need to specify Spark Connect jars with --jars or --packages.
26/01/29 19:11:01 INFO ShutdownHookManager: Shutdown hook called
26/01/29 19:11:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-1816c8d5-8944-4e69-abb9-e2bf7ee565a6

and

kubectl logs -f  spark-connect-2e2e1386-server  -n spark-test 
starting org.apache.spark.sql.connect.service.SparkConnectServer, logging to /opt/spark/logs/spark--org.apache.spark.sql.connect.service.SparkConnectServer-1-spark-connect-2e2e1386-server.out
Spark Command: /opt/java/openjdk/bin/java -cp /opt/spark/conf:/opt/spark/jars/* -Xmx1g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --master k8s://https://10.96.0.1:443 --conf spark.driver.port=7078 --conf spark.jars.packages=org.apache.spark:spark-connect_2.12:3.5.0 --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/connect-name=spark-connect-2e2e1386 --conf spark.driver.bindAddress=0.0.0.0 --conf spark.driver.host=10.244.0.17 --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.namespace=spark-test --conf spark.kubernetes.container.image=apache/spark:3.5.0 --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.executor.podNamePrefix=spark-connect-2e2e1386 --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/connect-name=spark-connect-2e2e1386 --conf spark.kubernetes.driver.pod.name=spark-connect-2e2e1386-server --conf spark.executor.instances=2 --conf spark.kubernetes.executor.container.image=apache/spark:3.5.0 --class org.apache.spark.sql.connect.service.SparkConnectServer --name Spark Connect server spark-internal
========================================
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
org.apache.spark#spark-connect_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9f61b3e3-3352-46ff-9132-7e2ea8c08393;1.0
	confs: [default]
Exception in thread "main" java.io.FileNotFoundException: /home/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-9f61b3e3-3352-46ff-9132-7e2ea8c08393-1.0.xml (No such file or directory)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(Unknown Source)
	at java.base/java.io.FileOutputStream.<init>(Unknown Source)
	at java.base/java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:71)
	at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:63)
	at org.apache.ivy.core.module.descriptor.DefaultModuleDescriptor.toIvyFile(DefaultModuleDescriptor.java:553)
	at org.apache.ivy.core.cache.DefaultResolutionCacheManager.saveResolvedModuleDescriptor(DefaultResolutionCacheManager.java:184)
	at org.apache.ivy.core.resolve.ResolveEngine.resolve(ResolveEngine.java:259)
	at org.apache.ivy.Ivy.resolve(Ivy.java:522)
	at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1535)
	at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

@Shekharrajak
Copy link
Member Author

looks like RBAC is needed for various permissions :

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1:443/api/v1/namespaces/spark-test/pods/spark-connect-9661bdcf-server. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods "spark-connect-9661bdcf-server" is forbidden: User "system:serviceaccount:spark-test:default" cannot get resource "pods" in API group "" in the namespace "spark-test".
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:671)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:651)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:597)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:560)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:140)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at io.fabric8.kubernetes.client.http.ByteArrayBodyHandler.onBodyDone(ByteArrayBodyHandler.java:52)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$OkHttpAsyncBody.doConsume(OkHttpClientImpl.java:137)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

@Shekharrajak
Copy link
Member Author

Shekharrajak commented Jan 29, 2026

In CI spark connect server stuck forever in ready state :

kubectl describe  sparkconnect spark-connect-554dd822 -n spark-test 
Name:         spark-connect-554dd822
Namespace:    spark-test
Labels:       <none>
Annotations:  <none>
API Version:  sparkoperator.k8s.io/v1alpha1
Kind:         SparkConnect
Metadata:
  Creation Timestamp:  2026-01-29T19:29:18Z
  Generation:          1
  Resource Version:    12234
  UID:                 33f957ed-f1c9-4534-8043-5d2ab8bc7803
Spec:
  Executor:
    Instances:  2
  Image:        apache/spark:3.5.0
  Server:
  Spark Conf:
    spark.jars:   https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar
  Spark Version:  3.5.0
Status:
  Conditions:
    Last Transition Time:  2026-01-29T19:29:19Z
    Message:               Server pod is ready
    Reason:                ServerPodReady
    Status:                True
    Type:                  ServerPodReady
  Executors:
    Running:         2
  Last Update Time:  2026-01-29T19:29:24Z
  Server:
    Pod Ip:        10.244.0.27
    Pod Name:      spark-connect-554dd822-server
    Service Name:  spark-connect-554dd822-server
  Start Time:      2026-01-29T19:29:18Z
  State:           Ready
Events:            <none>

@Shekharrajak
Copy link
Member Author

Github Soark Examples e2e :



    • `spark_connect_simple` (in-cluster sparkconnect created):
      • Level 1: create session (auto name) → client.connect() → spark.range(10).count(), df.show() → spark.stop().
      • Level 2: create session (named my-simple-session-<uuid>) → client.connect() → spark.range(100).count(), df.show(10) → spark.stop().
    • `spark_advanced_options` (in-cluster):
      • One example: create session with labels/annotations → client.connect() → spark.range(100).count() → spark.stop().

SPARK_OPERATOR_IMAGE_TAG: local
timeout-minutes: 15

- name: Build and load Spark E2E runner image (in-cluster)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pytest (outside cluster) → creates Job → Job Pod runs Python example → SparkClient creates SparkConnect CR → Spark Operator creates server +
  executors → backend waits for READY and builds in-cluster URL → Python connects to server, runs DataFrame ops → script exits → Job completes → test
  collects logs and cleans up.

@Shekharrajak
Copy link
Member Author

CI test flow :

Pytest (outside cluster) → creates Job → Job Pod runs Python example → SparkClient creates SparkConnect CR → Spark Operator creates server +
executors → backend waits for READY and builds in-cluster URL → Python connects to server, runs DataFrame ops → script exits → Job completes → test
collects logs and cleans up.

@Shekharrajak
Copy link
Member Author

github checks looking good now.

- Add spark extra with pyspark[connect]==3.4.1 for grpcio, pandas, pyarrow
- Update uv.lock with resolved dependencies
- Update .gitignore for spark-related files

Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
@Shekharrajak
Copy link
Member Author

Please can you squash your commits or sign them for DCO?

Done

@andreyvelich andreyvelich changed the title feat(spark): add SparkClient API for SparkConnect session management feat: add SparkClient API for SparkConnect session management Feb 9, 2026
Shekharrajak and others added 2 commits February 9, 2026 22:47
Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
Signed-off-by: Shekhar Prasad Rajak <5774448+Shekharrajak@users.noreply.github.com>
Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
…te_and_connect()

Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be in good shape to merge this initial version!
Thanks a lot for this work, exciting to see SparkConnect support in Kubeflow SDK @Shekharrajak 🚀
/lgtm

/assign @astefanutti @kramaranya @Fiona-Waters @szaher @tariq-hasan

@google-oss-prow
Copy link
Contributor

@andreyvelich: GitHub didn't allow me to assign the following users: tariq-hasan.

Note that only kubeflow members with read permissions, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time.
For more information please see the contributor guide

Details

In response to this:

We should be in good shape to merge this initial version!
Thanks a lot for this work, exciting to see SparkConnect support in Kubeflow SDK @Shekharrajak 🚀
/lgtm

/assign @astefanutti @kramaranya @Fiona-Waters @szaher @tariq-hasan

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@google-oss-prow google-oss-prow bot added the lgtm label Feb 10, 2026
@astefanutti
Copy link
Contributor

Thanks @Shekharrajak for this awesome work!

For reference the follow-up discussion on the ability to pass arbitrary URL w.r.t. to backend addressability: #274

/lgtm

Copy link
Contributor

@kramaranya kramaranya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Shekharrajak for this incredible work!
/lgtm
I left a few comments/questions but we can open follow up PRs for those :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to move those examples to the website https://sdk.kubeflow.org/en/latest/examples.html
That could be done in a separate PR though

Comment on lines +30 to +51
@abc.abstractmethod
def connect(
self,
num_executors: Optional[int] = None,
resources_per_executor: Optional[dict[str, str]] = None,
spark_conf: Optional[dict[str, str]] = None,
driver: Optional[Driver] = None,
executor: Optional[Executor] = None,
options: Optional[list] = None,
) -> SparkConnectInfo:
"""Create a new SparkConnect session (INTERNAL USE ONLY).

This is an internal method used by SparkClient.connect().
Use SparkClient.connect() instead of calling this directly.

Args:
num_executors: Number of executor instances.
resources_per_executor: Resource requirements per executor.
spark_conf: Spark configuration properties.
driver: Driver configuration.
executor: Executor configuration.
options: List of configuration options (use Name option for custom name).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intent that connect() means “create session” on the backend interface or “connect to existing session”?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It creates SparkConnect CRD and connects to that session.

@@ -0,0 +1,123 @@
#!/usr/bin/env python3
# Copyright 2025 The Kubeflow Authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update this in copyright headers in all new files?

Suggested change
# Copyright 2025 The Kubeflow Authors.
# Copyright 2026 The Kubeflow Authors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update docs in Kubeflow SDK website to include SparkClient https://sdk.kubeflow.org/en/latest/

@kramaranya
Copy link
Contributor

@Shekharrajak could you please rebase the PR?

Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
@google-oss-prow google-oss-prow bot removed the lgtm label Feb 12, 2026
@andreyvelich
Copy link
Member

/lgtm
/assign @kramaranya

@google-oss-prow google-oss-prow bot added the lgtm label Feb 12, 2026
@kramaranya
Copy link
Contributor

Thanks @Shekharrajak! Incredible work!!

/approve
/unhold

@google-oss-prow
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: kramaranya

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit c6a6c52 into kubeflow:main Feb 12, 2026
18 checks passed
@google-oss-prow google-oss-prow bot added this to the v0.4 milestone Feb 12, 2026
@kramaranya
Copy link
Contributor

@Shekharrajak could you please follow up on the items I mentioned #225 (review)? Or at least create github issues to track those?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants

Comments