sjrusso8 / spark-connect-rs Goto Github PK
View Code? Open in Web Editor NEWApache Spark Connect Client for Rust
Home Page: https://docs.rs/spark-connect-rs
License: Apache License 2.0
Apache Spark Connect Client for Rust
Home Page: https://docs.rs/spark-connect-rs
License: Apache License 2.0
Create similar bindings as with Rust but available in server side js (node, deno, bun, ...). The sdk should closely resemble the rust one, and only deviate when either necessary due to napi limitations, or when it is unidiomatic in JS.
napi.rs seems to be good crate to leverage and is relatively easy to use.
The branch feat/napi contains a super quick pass at creating the bindings. The experiment only covers these areas
.sql
select
, and filter
count()
show()
There is a lot of use of clone()
and some not great implementations to create a new empty dataframe to satisfy the napi requirements. The polars js interop is a good example of how the bindings might function.
The collect()
panics when returning a large result. The arrow-ipc streamreader is not parsing the data correctly.
Example:
use spark_connect_rs;
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
spark
.clone()
.range(None, 100000, 1, Some(1))
.collect()
.await
.unwrap();
Ok(())
}
This results in a panic
thread 'main' panicked at /home/sjrusso/Documents/code/projects/rust-projects/spark-connect-rs/src/session.rs:191:30:
called `Result::unwrap()` on an `Err` value: IpcError("Not expecting a schema when messages are read")
stack backtrace:
0: rust_begin_unwind
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:645:5
1: core::panicking::panic_fmt
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:72:14
2: core::result::unwrap_failed
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/result.rs:1649:5
3: core::result::Result<T,E>::unwrap
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/result.rs:1073:23
4: spark_connect_rs::session::SparkSession::consume_plan::{{closure}}
at ./src/session.rs:191:12
5: spark_connect_rs::dataframe::DataFrame::collect::{{closure}}
at ./src/dataframe.rs:99:14
6: sql::main::{{closure}}
at ./examples/sql.rs:21:10
7: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:63
8: tokio::runtime::coop::with_budget
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5
9: tokio::runtime::coop::budget
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5
10: tokio::runtime::park::CachedParkThread::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:31
11: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/blocking.rs:66:9
12: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
13: tokio::runtime::context::runtime::enter_runtime
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/runtime.rs:65:16
14: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
15: tokio::runtime::runtime::Runtime::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/runtime.rs:349:45
16: sql::main
at ./examples/sql.rs:46:5
17: core::ops::function::FnOnce::call_once
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Implement the ability to use positional/keyword args with sql
. Because of the differences between python and rust, the function arguments need to be clearly implemented.
The pyspark process for sql allows for literals and dataframes to be in one argument. However, rust probably won't take to kindly to that input arg. If a user passes in a DataFrame
it will need to be handled with a SubqueryAlias
and if it's a literal it will be passed in as either a positional or a keyword argument.
We might want to only allow for to inputs parameters are options. Something like this?
sql<T: ToLiteral>(self, sql_query: &str, col_args: Option<HashMap<String, T>>, df_args: Option<HashMap<String, DataFrame>>) -> DataFrame
This could allow a user to do these variations.
spark.sql("SELECT * FROM table", None, None).await?;
let df = spark.range(...);
// create the hashmap
spark.sql("SELECT * FROM {df}" None, Some(df_hashmap)).await?;
let col = "name";
// create the hashmap
spark.sql("SELECT {col} FROM {df}", Some(col_hashmap), Some(df_hashmap)).await?;
Or should positional SQL be a completely different method all together? like sql_params
? So that a user doesn't need to fuss with adding None
x2 to all their sql statements.
Create the DataFrameStatFunctions
object and implement the remaining methods for approxQuantile
, corr
, cov
, crossTab
,freqItems
, sampleBy
There are many functions for Spark, and most of them are created via a macro. However, not all of them have unit test coverage. Create additional unit tests based on similar test conditions from the exiting Spark API test cases.
I have been mirror the docstring tests from the PySpark API for reference.
Implement the missing method for checkpoint
and localCheckpoint
on the DataFrame
The examples currently use paths that are for the Docker workflow.
It would be cool if the examples could also work with non-Docker setups (e.g. when I manually spin up Spark Connect on localhost).
Perhaps we can check in all those data files into this repo, so these examples can work out of the box with Docker and Spark Connect localhost.
This issue will be the organizing issue for all the remaining spark functions to method.
Based on the readme here is the list
Create the DataFrameNaFunctions
object and implement the remaining methods for drop
, fill
, and replace
Spark 4.0 implements changes to the connect proto. We will to analyze the spec and identify what has changed.
Additionally, we will need to support both a client for Spark 3.5 and Spark 4.0. There should be a feature flag on the client for 3_5
or 4_0
Being able to compile the rust bindings into wasm32-unknown-unknown
and/or wasm32-wasi
would be interesting. This could allow some interesting interactions between a browser and spark. A wasm32-wasi
target would allow spark programs to run on any runtime.
A feature flag under the core
bindings for wasm
already exists and does compile successfully to those targets mentioned above. The issue arises when trying to send a HTTP1.1 request with grpc-web
to the Spark Connect server, which only accepts normal HTTP2 grpc requests. There are methods of standing up a proxy server with envoy to forward the gRPC browser request to the backend server. But this feels like a lot of effort for the client to do.
The branch feat/wasm contains the early experiment and trying to run wasm with wasmtime. Issue arises with using async code in wasm. There is probably a way to code it correctly, but I don't have time to finish the experiment
Implement the method for createTable
and createExternalTable
on the Catalog
.
Commands I ran (I am a total n00b):
rustup
cargo new spark-connect-gist
Cargo.toml
to include spark connect rs and tokio
dependencies:[dependencies]
spark-connect-rs = "0.0.1-beta.3"
tokio = { version = "1", features = ["full"] }
arch -arm64 brew install cmake
brew install protobuf
main.rs
: https://gist.github.com/sjrusso8/2b4e43af462367a15f91db5a33627449cargo run
Here is the error message:
Compiling spark-connect-gist v0.1.0 (/Users/matthew.powers/Documents/code/my_apps/spark-connect-gist)
Finished dev [unoptimized + debuginfo] target(s) in 1.00s
Running `target/debug/spark-connect-gist`
Error: tonic::transport::Error(Transport, hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 61, kind: ConnectionRefused, message: "Connection refused" })))
The overall documentation needs to be reviewed and matched against the Spark Core Classes and Functions. For instance, the README
should be accurate to what functions and methods are currently implemented compared to the existing Spark API.
However, there are probably a few misses that are either currently implemented but marked as open
, or were accidentally excluded. Might consider adding a few sections for other classes like StreamingQueryManager
, DataFrameNaFunctions
, DataFrameStatFunctions
, etc.
Implement the initial methods to read and write .csv
, .json
, .orc
, .parquet
, and .text
.
Consider creating ConfigOpts
trait for each of those file options and have a custom struct represent the options for each of those file types.
Create the Options and modify the opts object. The object is passed into the method.
let mut opts = CsvOptions::new()
opts.header = true;
opts.delimiter = b'|';
let df = spark.read().csv(path, opts)
Example of what the function signature might look like
impl DataFrameReader {
....
pub fn csv<C: ConfigOpts>(path: &str, opts: Some(C))
}
The initial DataFrameWriter
is created but there is also another way to write data via DataFrameWriterV2
. This method has a slightly different implementation and leverages a different proto command message.
I think the methods should mirror the ones found on the Spark API guide, and a new method should be added onto the DataFrame
for writeTo
.
When creating a dataframe from spark.sql
and then using select
throws an error. Expected to be able to select the column from the data.
Git submodules are annoying, and the current submodule is only ever checked out at a specific release tag. It would be easier to just have the folder containing the copied protobufs.
I think it might look something like this in the repo
├── core <- core implementation in Rust
│ │ - spark4_0 <- protobuf for spark 4.0.0
│ └─ spark3_5 <- protobuf for spark 3.5.1
Related to #61
There is an error when the release to cargo
pipeline is ran. So I have been running it manually.
Error run 9025621050
I already have a fix in mind for this issue, and I can make a PR if this issue is reviewed and approved, thanks!
The current spark-connect-rs library configures all endpoints to https
scheme. When tls
feature is enabled, connection cannot be successfully made to server without TLS configured, for example, when connecting to a Spark cluster set up at localhost
When tls
feature is enabled in spark-connect-rs crate, connection to server with / without TLS configured should both be successful
Default endpoint scheme to http
, set it to https
only when use_ssl=true
is specified in connection string
Hi, thanks a lot for this nice crate!
I'd like to report a deadlock issue when a spark session is cloned and used concurrently.
#46 demonstrates a possible workflow leading to a deadlock.
The gist is, everywhere #[allow(clippy::await_holding_lock)]
is used poses a possibility of resulting in a deadlock when a spark session is cloned and used concurrently.
-> When a task is suspended holding a lock, another task will wait for the lock to be released without yielding the executor.
-> This is a very common "dangerous" asynchronous programming pattern that should always be avoided at all cost.
Therefore, I would suggest that we remove the 'clone' method from SparkSession
, or replace rwlock
with an asynchronous lock. What do you think of this?
Hi @sjrusso8 can you tell me the rationale behind exposing a camel cased API?
Seems like we should be fine just using idiomatic rust snake casing.
On a separate note, do you have an IM channel of some kind to chat about this project? I would love to iterate more rapidly with you.
Lots of other programming languages can access gRPC and Arrow, which means many different languages will be recreating the core client logic to handle the client requests and response handling.
The idea is that there could one kernel client that all other programming languages use, and then the specific language is left to implement the specific of the core spark objects.
client.rs
into core
and move all other rust specific implementations into rust
client.rs
that are only for the rust
libraryclient.rs
to create a new ConnectClientError
error type, (currently leverages SparkError
)A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.