Giter VIP home page Giter VIP logo

Comments (11)

sehz avatar sehz commented on May 29, 2024 1

For small payload with low frequency, it should not make much difference. You can tune later if you desire better performance/latency.

from fluvio.

morenol avatar morenol commented on May 29, 2024

I see that you have a sleep after every send. If that sleep is larger than the linger time, the producer will send the batch with the records that it currently have batched (1 in your case). Could you try without that sleep in the producer or increasing the default flush time(linger time)?

from fluvio.

gibbz00 avatar gibbz00 commented on May 29, 2024

Updated the produce loop body to:

counter += 1;
producer.send(RecordKey::NULL, counter.to_string()).await?;
println!("[PRODUCER] sent: {}", counter);
tokio::time::sleep(Duration::from_millis(80)).await;

if counter % 5 == 0 {
   tokio::time::sleep(Duration::from_secs(5)).await;
}

Which results in:

[PRODUCER] sent: 1
[PRODUCER] sent: 2
[CONSUMER] recieved: 1
[CONSUMER] recieved: 2
[PRODUCER] sent: 3
[PRODUCER] sent: 4
[PRODUCER] sent: 5
[CONSUMER] recieved: 3
[CONSUMER] recieved: 4
[PRODUCER] sent: 6
[PRODUCER] sent: 7
[PRODUCER] sent: 8
[PRODUCER] sent: 9
[PRODUCER] sent: 10
[CONSUMER] recieved: 5
[CONSUMER] recieved: 6
[CONSUMER] recieved: 7
[PRODUCER] sent: 11
[PRODUCER] sent: 12
[PRODUCER] sent: 13
[PRODUCER] sent: 14
[PRODUCER] sent: 15
[CONSUMER] recieved: 8
[CONSUMER] recieved: 9
[CONSUMER] recieved: 10

It seems to still no be able to consume "all of them". I was not aware that the producer determined consumer batch sizes, was hoping for an operation where the consumer tries to pull as much as it can until max_bytes or the like is reached. Is this somehow possible?

from fluvio.

morenol avatar morenol commented on May 29, 2024

I think that you could do that if you create the producer with a large linger time. That way, in the practice, the producer will send records when it reaches max bytes.

let seconds_in_a_year = 31_536_000;
let linger_time = std::time::Duration::from_secs(seconds_in_a_year);
let config = TopicProducerConfigBuilder::default()
    		.linger(linger_time)
    		.build().expect("Failed to create topic producer config");
let producer = fluvio.topic_producer_with_config("my-fluvio-topic", config).await.expect("Failed to create a producer");

from fluvio.

morenol avatar morenol commented on May 29, 2024

My bad, I think that I misinterpreted your previous message. Yes, the producer is who create the batches, so the consumer can only consume the batches in the order they were created and with the size the producer pushed.

I was not aware that the producer determined consumer batch sizes, was hoping for an operation where the consumer tries to pull as much as it can until max_bytes or the like is reached. Is this somehow possible

The low level API, does something like that, and it should being used by: stream_batches_with_config.

So in your example, maybe the confusion is that when you print:

[PRODUCER] sent: 7

the record is not send to fluvio yet, it is just batched in the producer. @gibbz00 Could you try, manually flushing in this section:

if counter % 5 == 0 {
   producer.flush().await;
   tokio::time::sleep(Duration::from_secs(5)).await;

from fluvio.

gibbz00 avatar gibbz00 commented on May 29, 2024

Hmm, okay. Flushing helps some, but not enough, so long as the producer determines batches. Bummer really. We're writing a pipeline where producers write seldom to a topic, but where we want to at any point spin up consumers with quick reads.

from fluvio.

sehz avatar sehz commented on May 29, 2024

Hi @gibbz00, in your use case, recommendation is to flush on every write. This will ensure pending writes are send to fluvio with zero delay such that consumer can read immediately. Since you are not writing much, there should not be performance impact. Fluvio produce is optimized for heavy writes for default setting.

So code should be something like this:

counter += 1;
producer.send(RecordKey::NULL, counter.to_string()).await?;
producer.flush().await?;
println!("[PRODUCER] sent: {}", counter);
tokio::time::sleep(Duration::from_millis(80)).await;

The consumer should be able see new records without delay.

from fluvio.

gibbz00 avatar gibbz00 commented on May 29, 2024

Hi @sehz, thanks for your reply. Yeah, flush on every write is what we're currently doing. Perhaps there's little overhead to making multiple requests opposed to receiving them all in one go? That is, maybe I'm just looking at a premature optimization.

from fluvio.

gibbz00 avatar gibbz00 commented on May 29, 2024

Well then, maybe this issue can be closed, seeing that this was mostly a misunderstanding on how batches are created. But it would be really nice to have an option to consume multiple records on the consumer side, regardless of how they were inserted.

from fluvio.

sehz avatar sehz commented on May 29, 2024

With manual flush or zero linger you should get multiple records

from fluvio.

digikata avatar digikata commented on May 29, 2024

Closing issue as it sounds like it's resolved, but feel free to reopen if needed. (Or come by and chat in our Discord too.)

from fluvio.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.