Comments (11)
For small payload with low frequency, it should not make much difference. You can tune later if you desire better performance/latency.
from fluvio.
I see that you have a sleep after every send. If that sleep is larger than the linger time, the producer will send the batch with the records that it currently have batched (1 in your case). Could you try without that sleep in the producer or increasing the default flush time(linger time)?
from fluvio.
Updated the produce loop body to:
counter += 1;
producer.send(RecordKey::NULL, counter.to_string()).await?;
println!("[PRODUCER] sent: {}", counter);
tokio::time::sleep(Duration::from_millis(80)).await;
if counter % 5 == 0 {
tokio::time::sleep(Duration::from_secs(5)).await;
}
Which results in:
[PRODUCER] sent: 1
[PRODUCER] sent: 2
[CONSUMER] recieved: 1
[CONSUMER] recieved: 2
[PRODUCER] sent: 3
[PRODUCER] sent: 4
[PRODUCER] sent: 5
[CONSUMER] recieved: 3
[CONSUMER] recieved: 4
[PRODUCER] sent: 6
[PRODUCER] sent: 7
[PRODUCER] sent: 8
[PRODUCER] sent: 9
[PRODUCER] sent: 10
[CONSUMER] recieved: 5
[CONSUMER] recieved: 6
[CONSUMER] recieved: 7
[PRODUCER] sent: 11
[PRODUCER] sent: 12
[PRODUCER] sent: 13
[PRODUCER] sent: 14
[PRODUCER] sent: 15
[CONSUMER] recieved: 8
[CONSUMER] recieved: 9
[CONSUMER] recieved: 10
It seems to still no be able to consume "all of them". I was not aware that the producer determined consumer batch sizes, was hoping for an operation where the consumer tries to pull as much as it can until max_bytes or the like is reached. Is this somehow possible?
from fluvio.
I think that you could do that if you create the producer with a large linger time. That way, in the practice, the producer will send records when it reaches max bytes.
let seconds_in_a_year = 31_536_000;
let linger_time = std::time::Duration::from_secs(seconds_in_a_year);
let config = TopicProducerConfigBuilder::default()
.linger(linger_time)
.build().expect("Failed to create topic producer config");
let producer = fluvio.topic_producer_with_config("my-fluvio-topic", config).await.expect("Failed to create a producer");
from fluvio.
My bad, I think that I misinterpreted your previous message. Yes, the producer is who create the batches, so the consumer can only consume the batches in the order they were created and with the size the producer pushed.
I was not aware that the producer determined consumer batch sizes, was hoping for an operation where the consumer tries to pull as much as it can until max_bytes or the like is reached. Is this somehow possible
The low level API, does something like that, and it should being used by: stream_batches_with_config.
So in your example, maybe the confusion is that when you print:
[PRODUCER] sent: 7
the record is not send to fluvio yet, it is just batched in the producer. @gibbz00 Could you try, manually flushing in this section:
if counter % 5 == 0 {
producer.flush().await;
tokio::time::sleep(Duration::from_secs(5)).await;
from fluvio.
Hmm, okay. Flushing helps some, but not enough, so long as the producer determines batches. Bummer really. We're writing a pipeline where producers write seldom to a topic, but where we want to at any point spin up consumers with quick reads.
from fluvio.
Hi @gibbz00, in your use case, recommendation is to flush on every write. This will ensure pending writes are send to fluvio with zero delay such that consumer can read immediately. Since you are not writing much, there should not be performance impact. Fluvio produce is optimized for heavy writes for default setting.
So code should be something like this:
counter += 1;
producer.send(RecordKey::NULL, counter.to_string()).await?;
producer.flush().await?;
println!("[PRODUCER] sent: {}", counter);
tokio::time::sleep(Duration::from_millis(80)).await;
The consumer should be able see new records without delay.
from fluvio.
Hi @sehz, thanks for your reply. Yeah, flush on every write is what we're currently doing. Perhaps there's little overhead to making multiple requests opposed to receiving them all in one go? That is, maybe I'm just looking at a premature optimization.
from fluvio.
Well then, maybe this issue can be closed, seeing that this was mostly a misunderstanding on how batches are created. But it would be really nice to have an option to consume multiple records on the consumer side, regardless of how they were inserted.
from fluvio.
With manual flush or zero linger you should get multiple records
from fluvio.
Closing issue as it sounds like it's resolved, but feel free to reopen if needed. (Or come by and chat in our Discord too.)
from fluvio.
Related Issues (20)
- mirroring: remote mirrored topic should be read-only for local changes HOT 3
- mirroring: audit log and diagnosis
- [Enahancement]: Add BASE OFFSET to `show partition list` HOT 2
- Detect `Artifacts` Updates in `PackageSet` on `fvm update`
- [feature] add `smdk clean`
- [Bug]: `fluvio cluster resume` stuck in `trying to connect to SC` HOT 12
- [Bug]: Smartmodule consume returns an error HOT 1
- [Release Checklist]: `v0.11.8`
- `fvm self update` should perform update process
- Postpone `group` ownership in SMDK/CDK generation to Publish Time
- revise smartmodule build ci for consistency
- Bug: Connector ignore invalid transformation configuration HOT 1
- Bug: Generated `sink` connector cannot be builded due to missing `Stream::next` impl
- [Release Checklist]: `v0.11.9`
- Create a PR for release
- Capability to update `stable` channel on `fvm update`
- smartengine: missing parameters from smartmodules need to report smartmodule name
- Include README as part of `smdk publish`
- index repair API and tooling
- `cdk generate` params and generated project integrity
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fluvio.