sjrusso8 / spark-connect-rs Goto Github PK
View Code? Open in Web Editor NEWApache Spark Connect Client for Rust
Home Page: https://docs.rs/spark-connect-rs
License: Apache License 2.0
Apache Spark Connect Client for Rust
Home Page: https://docs.rs/spark-connect-rs
License: Apache License 2.0
When creating a dataframe from spark.sql
and then using select
throws an error. Expected to be able to select the column from the data.
Commands I ran (I am a total n00b):
rustup
cargo new spark-connect-gist
Cargo.toml
to include spark connect rs and tokio
dependencies:[dependencies]
spark-connect-rs = "0.0.1-beta.3"
tokio = { version = "1", features = ["full"] }
arch -arm64 brew install cmake
brew install protobuf
main.rs
: https://gist.github.com/sjrusso8/2b4e43af462367a15f91db5a33627449cargo run
Here is the error message:
Compiling spark-connect-gist v0.1.0 (/Users/matthew.powers/Documents/code/my_apps/spark-connect-gist)
Finished dev [unoptimized + debuginfo] target(s) in 1.00s
Running `target/debug/spark-connect-gist`
Error: tonic::transport::Error(Transport, hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 61, kind: ConnectionRefused, message: "Connection refused" })))
Hi @sjrusso8 can you tell me the rationale behind exposing a camel cased API?
Seems like we should be fine just using idiomatic rust snake casing.
On a separate note, do you have an IM channel of some kind to chat about this project? I would love to iterate more rapidly with you.
Create similar bindings as with Rust but available in server side js (node, deno, bun, ...). The sdk should closely resemble the rust one, and only deviate when either necessary due to napi limitations, or when it is unidiomatic in JS.
napi.rs seems to be good crate to leverage and is relatively easy to use.
The branch feat/napi contains a super quick pass at creating the bindings. The experiment only covers these areas
.sql
select
, and filter
count()
show()
There is a lot of use of clone()
and some not great implementations to create a new empty dataframe to satisfy the napi requirements. The polars js interop is a good example of how the bindings might function.
The collect()
panics when returning a large result. The arrow-ipc streamreader is not parsing the data correctly.
Example:
use spark_connect_rs;
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
spark
.clone()
.range(None, 100000, 1, Some(1))
.collect()
.await
.unwrap();
Ok(())
}
This results in a panic
thread 'main' panicked at /home/sjrusso/Documents/code/projects/rust-projects/spark-connect-rs/src/session.rs:191:30:
called `Result::unwrap()` on an `Err` value: IpcError("Not expecting a schema when messages are read")
stack backtrace:
0: rust_begin_unwind
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:645:5
1: core::panicking::panic_fmt
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:72:14
2: core::result::unwrap_failed
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/result.rs:1649:5
3: core::result::Result<T,E>::unwrap
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/result.rs:1073:23
4: spark_connect_rs::session::SparkSession::consume_plan::{{closure}}
at ./src/session.rs:191:12
5: spark_connect_rs::dataframe::DataFrame::collect::{{closure}}
at ./src/dataframe.rs:99:14
6: sql::main::{{closure}}
at ./examples/sql.rs:21:10
7: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:63
8: tokio::runtime::coop::with_budget
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5
9: tokio::runtime::coop::budget
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5
10: tokio::runtime::park::CachedParkThread::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:31
11: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/blocking.rs:66:9
12: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
13: tokio::runtime::context::runtime::enter_runtime
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/runtime.rs:65:16
14: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
15: tokio::runtime::runtime::Runtime::block_on
at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/runtime.rs:349:45
16: sql::main
at ./examples/sql.rs:46:5
17: core::ops::function::FnOnce::call_once
at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
The initial DataFrameWriter
is created but there is also another way to write data via DataFrameWriterV2
. This method has a slightly different implementation and leverages a different proto command message.
I think the methods should mirror the ones found on the Spark API guide, and a new method should be added onto the DataFrame
for writeTo
.
Being able to compile the rust bindings into wasm32-unknown-unknown
and/or wasm32-wasi
would be interesting. This could allow some interesting interactions between a browser and spark. A wasm32-wasi
target would allow spark programs to run on any runtime.
A feature flag under the core
bindings for wasm
already exists and does compile successfully to those targets mentioned above. The issue arises when trying to send a HTTP1.1 request with grpc-web
to the Spark Connect server, which only accepts normal HTTP2 grpc requests. There are methods of standing up a proxy server with envoy to forward the gRPC browser request to the backend server. But this feels like a lot of effort for the client to do.
The branch feat/wasm contains the early experiment and trying to run wasm with wasmtime. Issue arises with using async code in wasm. There is probably a way to code it correctly, but I don't have time to finish the experiment
There are many functions for Spark, and most of them are created via a macro. However, not all of them have unit test coverage. Create additional unit tests based on similar test conditions from the exiting Spark API test cases.
I have been mirror the docstring tests from the PySpark API for reference.
The overall documentation needs to be reviewed and matched against the Spark Core Classes and Functions. For instance, the README
should be accurate to what functions and methods are currently implemented compared to the existing Spark API.
However, there are probably a few misses that are either currently implemented but marked as open
, or were accidentally excluded. Might consider adding a few sections for other classes like StreamingQueryManager
, DataFrameNaFunctions
, DataFrameStatFunctions
, etc.
Implement the ability to use positional/keyword args with sql
. Because of the differences between python and rust, the function arguments need to be clearly implemented.
The pyspark process for sql allows for literals and dataframes to be in one argument. However, rust probably won't take to kindly to that input arg. If a user passes in a DataFrame
it will need to be handled with a SubqueryAlias
and if it's a literal it will be passed in as either a positional or a keyword argument.
We might want to only allow for to inputs parameters are options. Something like this?
sql<T: ToLiteral>(self, sql_query: &str, col_args: Option<HashMap<String, T>>, df_args: Option<HashMap<String, DataFrame>>) -> DataFrame
This could allow a user to do these variations.
spark.sql("SELECT * FROM table", None, None).await?;
let df = spark.range(...);
// create the hashmap
spark.sql("SELECT * FROM {df}" None, Some(df_hashmap)).await?;
let col = "name";
// create the hashmap
spark.sql("SELECT {col} FROM {df}", Some(col_hashmap), Some(df_hashmap)).await?;
Or should positional SQL be a completely different method all together? like sql_params
? So that a user doesn't need to fuss with adding None
x2 to all their sql statements.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.