epwalsh / batched-fn Goto Github PK
View Code? Open in Web Editor NEW๐ฆ Rust server plugin for deploying deep learning models with batched prediction
Home Page: https://crates.io/crates/batched-fn
License: Apache License 2.0
๐ฆ Rust server plugin for deploying deep learning models with batched prediction
Home Page: https://crates.io/crates/batched-fn
License: Apache License 2.0
Hi, thanks for this library!
I'm trying to use the library to write a demo using rust-bert
and actix
. When I tried to put the model in batched_fn!
, I got error like
error[E0277]: `*mut torch_sys::C_tensor` cannot be shared between threads safely
--> src/routes.rs:28:25
|
28 | let batch_predict = batched_fn! {
| _________________________^
29 | | handler = |batch: Vec<(Tensor, Tensor, Tensor, Tensor, Tensor)>, model: &PredictModel| -> Vec<String> {
30 | | let output = model.predict(batch.clone());
31 | | println!("Processed batch {:?} -> {:?}", batch, output);
... |
40 | | };
41 | | };
| |_____^ `*mut torch_sys::C_tensor` cannot be shared between threads safely
|
= help: within `(tch::Tensor, tch::Tensor, tch::Tensor, tch::Tensor, tch::Tensor)`, the trait `Sync` is not implemented for `*mut torch_sys::C_tensor`
= note: required because it appears within the type `tch::Tensor`
= note: required because it appears within the type `(tch::Tensor, tch::Tensor, tch::Tensor, tch::Tensor, tch::Tensor)`
note: required by a bound in `BatchedFn`
--> /Users/user/.asdf/installs/rust/1.59.0/registry/src/github.com-1ecc6299db9ec823/batched-fn-0.2.2/src/lib.rs:227:25
|
227 | T: 'static + Send + Sync + std::fmt::Debug,
| ^^^^ required by this bound in `BatchedFn`
= note: this error originates in the macro `$crate::__batched_fn_internal` (in Nightly builds, run with -Z macro-backtrace for more info)
Need the model be Sync
and Send
? I know there's a rust-dl-webserver
project but I'm not quite understand the mechanism differences between actix
and warp
as I'm pretty new to Rust. Can you provide an simple actix
example or help me understand the usage with batched_fn!
? e.g. How does context
, config
, handler
works? Are they all required? Is the code inside context
run only once for initialization (like loading the model)? Should one put other fields beside model
in context
?
Many thanks.
Is there any way to make context of batch_fn not be static? I want to load the models after the server has been initialized with a configuration of a list of models, so trying to do something like this -
let batched_generate = batched_fn! {
handler = |batch: Vec<Vec<String>>, model: &HashMap<String, SentenceEmbeddingsModel>| -> Vec<Result<Vec<Vec<f32>>, RustBertError>> {
let mut batched_result = Vec::with_capacity(batch.len());
for input in batch {
let result = model.encode(&input);
batched_result.push(result);
}
batched_result
};
config = {
max_batch_size: 1,
max_delay: 100,
channel_cap: Some(20),
};
context = {
models: &self.loaded_models,
};
};
Where self.loaded_models is created in the constructor of the struct but looks like context needs be static. Any thoughts on how to accomplish this?
Thanks for the great package!
When tuning max_batch_size
and max_delay
, I wonder how can I benchmark or record the GPU execution time for a batch? I don't know where to put the time related codes in the code due to my limited knowledge to Rust. It might be easy for a single request but I have no idea for a batch.
Thanks.
Hi,
I'm trying to avoid writing
config = {
max_batch_size: 32,
};
and use
config = {
max_batch_size: max_batch_size,
};
instead so that I can adjust the batch size dynamically or read the config from some variable/setting. But I got errors like
error[E0435]: attempt to use a non-constant value in a constant
--> src/http/predict.rs:76:61
|
70 | let handler = batched_fn::batched_fn! {
| ___________________-
71 | | handler = |batch: Vec<Input>, model: &SentimentModel| -> Vec<Output> {
72 | | let predictions = model.predict(&batch.iter().map(String::as_str).collect::<Vec<&str>>().as_slice());
73 | | predictions.iter().map(|x| Response { score: x.score }).collect()
... |
76 | | max_batch_size: if Cuda::cudnn_is_available() { batch_size } else { 1 },
| | ^^^^^^^^^^ non-constant value
... |
85 | | };
86 | | };
| |_____- help: consider using `let` instead of `static`: `let BATCHED_FN`
For more information about this error, try `rustc --explain E0435`.
error: could not compile `axum-sst` due to previous error
Also it seems that the batch size can not be adjusted once the thread is started. Even so, how can I pass max_batch_size
by a variable when initializing? Must the passed max_batch_size
be static
or const
? e.g.
struct Config {
max_batch_size: usize,
}
static PREDICT_CONFIG: Config = Config { max_batch_size: 32 }; // or `Config.from_file`...
...
config = {
max_batch_size: if Cuda::cudnn_is_available() { PREDICT_CONFIG.max_batch_size } else { 1 },
};
Thanks.
If I have 2 concurrent requesters which are batched together, how does this know which response to send to which requester?
Hello! thanks for this library ๐
I'm having trouble executing async tasks form within the closure. From what I can tell, the closure cannot be an async function, and the model inference that I'm doing needs to be await
-ed from within a tokio runtime.
Here's what I'm trying to do :
batched_fn! {
handler = |batch: Batch<Input>, model: &Model| -> Batch<Output> {
let output = model.predict(batch.clone()).await; // note the await
println!("Processed batch {:?} -> {:?}", batch, output);
output
};
config = {
max_batch_size: 4,
max_delay: 50,
};
context = {
model: Model::load(),
};
};
Any way this could be possible? I'm willing to help if you can point me in the right direction!
I'm having an issue using gRPC where if the request is cancelled prematurely it kills the batched_fn, and will only be resolved by restarting the service completely, and I was wondering if there is a way to re-initialize the thread if it disconnects at runtime?
I'm fairly certain it is also similar if not the same issue as this poster from another repo:
epwalsh/rust-dl-webserver#60
Steps to reproduce with gRPC:
// Uses rust_bert for SentenceEmbeddingsModel
async fn encode_setence(input: String) -> Result<Vec<f32>, batched_fn::Error> {
let batch_encode = batched_fn! {
handler = |batch: Vec<String>, model: &SentenceEmbeddingsModel| -> Vec<Vec<f32>> {
let span = info_span!("batch_handler");
let _enter = span.enter();
debug!("{:?}", batch);
model.encode(&batch).unwrap()
};
config = {
max_batch_size: 16,
max_delay: 100,
channel_cap: Some(20),
};
context = {
model: {
let span = info_span!("batch_context");
let _context_enter = span.enter();
info!("Initializing Model...");
let span = info_span!("model_load");
let _load_enter = span.enter();
info!("Cuda: {}", Cuda::cudnn_is_available());
info!("Model: {}", LOCAL_MODEL);
SentenceEmbeddingsBuilder::local(LOCAL_MODEL)
.with_device(tch::Device::cuda_if_available())
.create_model()
.context("Failed to initialize embedding model").unwrap()
},
};
};
batch_encode(input).await
}
#[tonic::async_trait]
impl Encoder for MyEncoder {
#[instrument(skip_all)]
async fn encode_sentence(
&self,
request: Request<SentenceRequest>,
) -> Result<Response<EmbeddingReply>, Status> {
info!("EncodeSentence request recieved...");
let data = encode_setence(request.into_inner().sentence)
.await
.map_err(|err| {
warn!("{:?}", err);
Status::internal("batch encoder broken")
})?;
let reply = EmbeddingReply { data };
Ok(Response::new(reply))
}
}
2024-03-05T17:13:50.133775Z INFO example: Initializing Tonic...
2024-03-05T17:13:50.134101Z INFO tonic_startup: example: address: [::1]:50051
2024-03-05T17:13:50.134206Z INFO tonic_startup: example: Initializing Tonic Reflection...
2024-03-05T17:13:50.135073Z INFO example: Tonic Initialized...
2024-03-05T17:13:56.507081Z INFO encode_sentence: example: EncodeSentence request received...
2024-03-05T17:13:56.507693Z INFO batch_context: example: Initializing Model...
2024-03-05T17:13:56.524687Z INFO batch_context:model_load: example: Cuda: true
2024-03-05T17:14:00.489850Z INFO encode_sentence: example: EncodeSentence request received...
2024-03-05T17:14:01.497046Z INFO encode_sentence: example: EncodeSentence request received...
2024-03-05T17:14:02.012228Z INFO encode_sentence: example: EncodeSentence request received...
thread '<unnamed>' panicked at example\src\main.rs:85:24:
Channel from calling thread disconnected: "SendError(..)"
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2024-03-05T17:14:04.710488Z INFO encode_sentence: example: EncodeSentence request received...
2024-03-05T17:14:04.710735Z WARN encode_sentence: example: Disconnected
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.