feat: add SparkClient API for SparkConnect session management#225
feat: add SparkClient API for SparkConnect session management#225google-oss-prow[bot] merged 14 commits intokubeflow:mainfrom
Conversation
|
🎉 Welcome to the Kubeflow SDK! 🎉 Thanks for opening your first PR! We're happy to have you as part of our community 🚀 Here's what happens next:
Join the community:
Feel free to ask questions in the comments if you need any help or clarification! |
fdea5fb to
20afb3a
Compare
There was a problem hiding this comment.
Pull request overview
This PR implements a SparkClient API for managing SparkConnect sessions on Kubernetes, enabling high-level lifecycle management and PySpark integration.
Changes:
- Added SparkClient API with support for connecting to existing servers or auto-creating new sessions
- Implemented KubernetesSparkBackend for SparkConnect CRD operations
- Added comprehensive type definitions (Driver, Executor, SparkConnectInfo, SparkConnectState)
Reviewed changes
Copilot reviewed 112 out of 476 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Added spark extra dependencies (pyspark==3.5.0, grpcio, pandas, pyarrow) |
| uv.lock | Updated lock file with new dependencies and resolution markers |
| kubeflow/spark/init.py | Package exports for SparkClient and related types |
| kubeflow/spark/api/spark_client.py | Main SparkClient implementation with connect() method |
| kubeflow/spark/backends/base.py | Abstract base class defining backend interface |
| kubeflow/spark/backends/kubernetes/backend.py | Kubernetes backend implementing SparkConnect CRD operations |
| kubeflow/spark/backends/kubernetes/utils.py | Utility functions for CRD building and validation |
| kubeflow/spark/backends/kubernetes/constants.py | Constants for SparkConnect CRD and defaults |
| kubeflow/spark/types/types.py | Core type definitions (SparkConnectInfo, Driver, Executor, etc.) |
| kubeflow/spark/examples/*.py | Example scripts demonstrating usage |
| kubeflow/spark/integration/*.py | Integration tests with pytest fixtures |
| scripts/spark/setup-kind.sh | Kind cluster setup script for testing |
| docs/spark-connect-local-testing.md | Documentation for local testing |
|
Cleaning up commits. |
20afb3a to
f140fdc
Compare
f140fdc to
82dd912
Compare
|
There was a problem hiding this comment.
Thank you for this effort @Shekharrajak! Overall looks good, and I am looking forward to this!
I left my initial comments.
|
|
||
| ## Option 1: Docker-based Spark Connect Server (Recommended) | ||
|
|
||
| ### Start Spark Connect Server |
There was a problem hiding this comment.
Interesting, so our API is working even without Kubernetes cluster?
Shall we re-use this functionality to run SparkClient() with container backend?
For example, when user runs connect() with ContainerBackend, we start spark container and allow user to connect to the Spark session.
We can add support for it in the follow up PRs.
There was a problem hiding this comment.
Right now it is not automated but as you mentioned correctly we can have ContainerBackendConfig
client = SparkClient(backend_config=ContainerBackendConfig())
spark = client.connect() # Auto-starts container, connects when ready
There was a problem hiding this comment.
We can have this enhancement issue open for future implementation. Thanks!
andreyvelich
left a comment
There was a problem hiding this comment.
Thanks for the updates @Shekharrajak!
mostly lgtm, I left a few comments.
docs/spark-integration-testing.md
Outdated
There was a problem hiding this comment.
@kramaranya recently added the docs for Kubeflow SDK: https://sdk.kubeflow.org/en/latest/
Please can you create page for Spark there?
| delete_cluster() { | ||
| log_info "Deleting Kind cluster: $CLUSTER_NAME" | ||
| kind delete cluster --name "$CLUSTER_NAME" 2>/dev/null || true | ||
| log_info "Cluster deleted" | ||
| } | ||
|
|
||
| create_cluster() { | ||
| if kind get clusters 2>/dev/null | grep -q "^${CLUSTER_NAME}$"; then | ||
| log_warn "Cluster '$CLUSTER_NAME' already exists" | ||
| return 0 | ||
| fi |
There was a problem hiding this comment.
Same comment for create and delete.
Do you want to just copy this script and add installation of Spark Operator from the helm charts?
4fbea60 to
237462f
Compare
Pull Request Test Coverage Report for Build 21941935956Warning: This coverage report may be inaccurate.This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.
Details
💛 - Coveralls |
| create_cluster | ||
| setup_test_namespace | ||
| install_spark_operator | ||
| if [[ "${E2E_CRD_ONLY:-0}" == "1" ]]; then |
There was a problem hiding this comment.
installing spark operator was taking more than 10 min and timed out . For now we have sparkconnect CRD check and validating CRD + API acceptance only (create/get/delete SparkConnect).
We are missing validation :
End-to-end Spark session
• client.connect() is not run in CI.
• No check that we get a real SparkSession, that spark.range(), df.count(), spark.stop() work, or that the SDK’s wait/ready logic works.
• test_spark_connect_simple_example and test_spark_advanced_options_example are not run in CI anymore.
• So we don’t validate that examples/spark/spark_connect_simple.py and spark_advanced_options.py succeed against a real cluster.
Still not sure why it is taking long time. Also we have not release spark operator helm version which have sparkconnect CRD available so I was trying to build from spark-operator codebase directly .
|
Getting some errors in CI like : and |
|
looks like RBAC is needed for various permissions : |
|
In CI spark connect server stuck forever in ready state : |
|
Github Soark Examples e2e : |
| SPARK_OPERATOR_IMAGE_TAG: local | ||
| timeout-minutes: 15 | ||
|
|
||
| - name: Build and load Spark E2E runner image (in-cluster) |
There was a problem hiding this comment.
Pytest (outside cluster) → creates Job → Job Pod runs Python example → SparkClient creates SparkConnect CR → Spark Operator creates server +
executors → backend waits for READY and builds in-cluster URL → Python connects to server, runs DataFrame ops → script exits → Job completes → test
collects logs and cleans up.
|
CI test flow : Pytest (outside cluster) → creates Job → Job Pod runs Python example → SparkClient creates SparkConnect CR → Spark Operator creates server + |
|
github checks looking good now. |
- Add spark extra with pyspark[connect]==3.4.1 for grpcio, pandas, pyarrow - Update uv.lock with resolved dependencies - Update .gitignore for spark-related files Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
e8ab2ad to
d3f450e
Compare
Done |
Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> Signed-off-by: Shekhar Prasad Rajak <5774448+Shekharrajak@users.noreply.github.com>
Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
…te_and_connect() Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
97662bb to
6c85187
Compare
andreyvelich
left a comment
There was a problem hiding this comment.
We should be in good shape to merge this initial version!
Thanks a lot for this work, exciting to see SparkConnect support in Kubeflow SDK @Shekharrajak 🚀
/lgtm
/assign @astefanutti @kramaranya @Fiona-Waters @szaher @tariq-hasan
|
@andreyvelich: GitHub didn't allow me to assign the following users: tariq-hasan. Note that only kubeflow members with read permissions, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
|
Thanks @Shekharrajak for this awesome work! For reference the follow-up discussion on the ability to pass arbitrary URL w.r.t. to backend addressability: #274 /lgtm |
kramaranya
left a comment
There was a problem hiding this comment.
Thanks @Shekharrajak for this incredible work!
/lgtm
I left a few comments/questions but we can open follow up PRs for those :)
There was a problem hiding this comment.
It would be great to move those examples to the website https://sdk.kubeflow.org/en/latest/examples.html
That could be done in a separate PR though
| @abc.abstractmethod | ||
| def connect( | ||
| self, | ||
| num_executors: Optional[int] = None, | ||
| resources_per_executor: Optional[dict[str, str]] = None, | ||
| spark_conf: Optional[dict[str, str]] = None, | ||
| driver: Optional[Driver] = None, | ||
| executor: Optional[Executor] = None, | ||
| options: Optional[list] = None, | ||
| ) -> SparkConnectInfo: | ||
| """Create a new SparkConnect session (INTERNAL USE ONLY). | ||
|
|
||
| This is an internal method used by SparkClient.connect(). | ||
| Use SparkClient.connect() instead of calling this directly. | ||
|
|
||
| Args: | ||
| num_executors: Number of executor instances. | ||
| resources_per_executor: Resource requirements per executor. | ||
| spark_conf: Spark configuration properties. | ||
| driver: Driver configuration. | ||
| executor: Executor configuration. | ||
| options: List of configuration options (use Name option for custom name). |
There was a problem hiding this comment.
Is the intent that connect() means “create session” on the backend interface or “connect to existing session”?
There was a problem hiding this comment.
It creates SparkConnect CRD and connects to that session.
| @@ -0,0 +1,123 @@ | |||
| #!/usr/bin/env python3 | |||
| # Copyright 2025 The Kubeflow Authors. | |||
There was a problem hiding this comment.
Could you please update this in copyright headers in all new files?
| # Copyright 2025 The Kubeflow Authors. | |
| # Copyright 2026 The Kubeflow Authors. |
There was a problem hiding this comment.
Feel free to add OWNERS similar to https://github.com/kubeflow/sdk/blob/main/kubeflow/hub/OWNERS
There was a problem hiding this comment.
We should also update docs in Kubeflow SDK website to include SparkClient https://sdk.kubeflow.org/en/latest/
|
@Shekharrajak could you please rebase the PR? |
Signed-off-by: Shekhar Rajak <shekharrajak@live.com>
|
/lgtm |
|
Thanks @Shekharrajak! Incredible work!! /approve |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: kramaranya The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
@Shekharrajak could you please follow up on the items I mentioned #225 (review)? Or at least create github issues to track those? |

Closes #163 as per the KEP.
Features
SparkSessionfrom existing server or auto-create new sessionQuick Test
Option 1: Connect to Local Spark Connect Server (No Kubernetes required)
Option 2: With Kind Cluster + Spark Operator
Examples: