Giter VIP home page Giter VIP logo

Comments (6)

ajhunyady avatar ajhunyady commented on May 31, 2024 1

@gibbz00 thanks for reporting, we'll take a look.

from fluvio.

gibbz00 avatar gibbz00 commented on May 31, 2024 1

Hi, thanks for the response and taking your time to analyze the issue 😊

Can also confirm that your conclusion seems correct by running your code over here. (Only when FLV_SHORT_RECONCILLATION was set to 3 though, not consistently on 10.) Shortening it will have to do for now :3

Again, big thank you

from fluvio.

ajhunyady avatar ajhunyady commented on May 31, 2024 1

@gibbz00, I'd love to connect on Discord. We are rolling out Stateful Services and you may want to join our private release. Please DM me if you are interested.

from fluvio.

digikata avatar digikata commented on May 31, 2024

Thanks for the example code @gibbz00! I used it to reproduce the behavior and then analyze the issue a bit further.

Overall it looks like it has to do with our replica startup logic. If a client start looking at end offsets before a FLV_SHORT_RECONCILLATION default of 10 seconds, you can get different end offset values. This depends on the spu connected to. Once the sync starts, the replicas stay in sync in a finer interval.

The FLV_SHORT_RECONCILLATION time can be customized by setting an environment variable of the same name to a value in seconds. If the cluster is started with FLV_SHORT_RECONCILLATION=3 the example works as expected. There is a tradeoff in how short or long the setting is and how much network chatter that might create.

It is a little unexpected that we don't start the sync up earlier and I have opened an issue to take a look at that over a longer term. (and fix the mispelling of reconciliation). #3790

from fluvio.

digikata avatar digikata commented on May 31, 2024

Some small modifications to rule out a race from producer to consumer, and play with when reading End started:

use std::sync::Arc;
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin, Offset, RecordKey};
use futures::TryStreamExt;
use std::time::Duration;

const DELAY_MILLIS: u64 = 1000;
const MAX_RECORDS: u8 = 15;
const REC_TRIGGER_OFFSET: u8 = 11;  // start sometime after FLV_SHORT_RECONCILLIATION setting in seconds
const TOPIC_NAME: &str = "dectest-offset";
const TOPIC_REPLICAS: u32 = 3;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    reset_topic().await?;

    println!("Number of topic replicas: {}", TOPIC_REPLICAS);

    let notify_start = Arc::new(tokio::sync::Notify::new());
    tokio::spawn(consume(TestOffset::Beginning, notify_start.clone()));
    tokio::spawn(consume(TestOffset::End, notify_start.clone()));

    let producer = fluvio::producer(TOPIC_NAME).await?;
    for index in 0..MAX_RECORDS {
        producer.send(RecordKey::NULL, index.to_string()).await?;
        println!("[PRODUCER] sent: {}", index);
        tokio::time::sleep(Duration::from_millis(DELAY_MILLIS)).await;
    }

    Ok(())
}

#[derive(PartialEq)]
enum TestOffset {
    Beginning,
    End,
}

async fn consume(offset: TestOffset, onotify: Arc<tokio::sync::Notify>) -> anyhow::Result<()> {
    // wait for consume signal to start
    if offset == TestOffset::End {
        onotify.notified().await;
    }

    let mut stream = fluvio::consumer(TOPIC_NAME, 0)
        .await?
        .stream(match offset {
            TestOffset::Beginning => Offset::beginning(),
            TestOffset::End => Offset::end(),
        })
        .await?;

    while let Some(record) = stream.try_next().await? {
        let index = record.get_value().as_utf8_lossy_string().parse::<u8>()?;

        println!(
            "[CONSUMER_{}] recieved: {}",
            match offset {
                TestOffset::Beginning => "BEGINNING",
                TestOffset::End => "END",
            },
            index
        );

        if offset == TestOffset::Beginning && index == REC_TRIGGER_OFFSET {
            onotify.notify_waiters();
        }

        if index == MAX_RECORDS - 1 {
            break;
        }
    }

    Ok(())
}

async fn reset_topic() -> anyhow::Result<()> {
    let admin = FluvioAdmin::connect().await?;
    let _ = admin.delete::<TopicSpec>(TOPIC_NAME).await;
    admin
        .create(TOPIC_NAME.to_string(), false, TopicSpec::new_computed(1, TOPIC_REPLICAS, None))
        .await?;

    Ok(())
}

from fluvio.

digikata avatar digikata commented on May 31, 2024

I'll close this, but if you run into more problems or have added questions, feel free to reopen or create a new issue

from fluvio.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.