Comments (6)
@gibbz00 thanks for reporting, we'll take a look.
from fluvio.
Hi, thanks for the response and taking your time to analyze the issue 😊
Can also confirm that your conclusion seems correct by running your code over here. (Only when FLV_SHORT_RECONCILLATION
was set to 3 though, not consistently on 10.) Shortening it will have to do for now :3
Again, big thank you
from fluvio.
@gibbz00, I'd love to connect on Discord. We are rolling out Stateful Services and you may want to join our private release. Please DM me if you are interested.
from fluvio.
Thanks for the example code @gibbz00! I used it to reproduce the behavior and then analyze the issue a bit further.
Overall it looks like it has to do with our replica startup logic. If a client start looking at end offsets before a FLV_SHORT_RECONCILLATION default of 10 seconds, you can get different end offset values. This depends on the spu connected to. Once the sync starts, the replicas stay in sync in a finer interval.
The FLV_SHORT_RECONCILLATION time can be customized by setting an environment variable of the same name to a value in seconds. If the cluster is started with FLV_SHORT_RECONCILLATION=3
the example works as expected. There is a tradeoff in how short or long the setting is and how much network chatter that might create.
It is a little unexpected that we don't start the sync up earlier and I have opened an issue to take a look at that over a longer term. (and fix the mispelling of reconciliation). #3790
from fluvio.
Some small modifications to rule out a race from producer to consumer, and play with when reading End started:
use std::sync::Arc;
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin, Offset, RecordKey};
use futures::TryStreamExt;
use std::time::Duration;
const DELAY_MILLIS: u64 = 1000;
const MAX_RECORDS: u8 = 15;
const REC_TRIGGER_OFFSET: u8 = 11; // start sometime after FLV_SHORT_RECONCILLIATION setting in seconds
const TOPIC_NAME: &str = "dectest-offset";
const TOPIC_REPLICAS: u32 = 3;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
reset_topic().await?;
println!("Number of topic replicas: {}", TOPIC_REPLICAS);
let notify_start = Arc::new(tokio::sync::Notify::new());
tokio::spawn(consume(TestOffset::Beginning, notify_start.clone()));
tokio::spawn(consume(TestOffset::End, notify_start.clone()));
let producer = fluvio::producer(TOPIC_NAME).await?;
for index in 0..MAX_RECORDS {
producer.send(RecordKey::NULL, index.to_string()).await?;
println!("[PRODUCER] sent: {}", index);
tokio::time::sleep(Duration::from_millis(DELAY_MILLIS)).await;
}
Ok(())
}
#[derive(PartialEq)]
enum TestOffset {
Beginning,
End,
}
async fn consume(offset: TestOffset, onotify: Arc<tokio::sync::Notify>) -> anyhow::Result<()> {
// wait for consume signal to start
if offset == TestOffset::End {
onotify.notified().await;
}
let mut stream = fluvio::consumer(TOPIC_NAME, 0)
.await?
.stream(match offset {
TestOffset::Beginning => Offset::beginning(),
TestOffset::End => Offset::end(),
})
.await?;
while let Some(record) = stream.try_next().await? {
let index = record.get_value().as_utf8_lossy_string().parse::<u8>()?;
println!(
"[CONSUMER_{}] recieved: {}",
match offset {
TestOffset::Beginning => "BEGINNING",
TestOffset::End => "END",
},
index
);
if offset == TestOffset::Beginning && index == REC_TRIGGER_OFFSET {
onotify.notify_waiters();
}
if index == MAX_RECORDS - 1 {
break;
}
}
Ok(())
}
async fn reset_topic() -> anyhow::Result<()> {
let admin = FluvioAdmin::connect().await?;
let _ = admin.delete::<TopicSpec>(TOPIC_NAME).await;
admin
.create(TOPIC_NAME.to_string(), false, TopicSpec::new_computed(1, TOPIC_REPLICAS, None))
.await?;
Ok(())
}
from fluvio.
I'll close this, but if you run into more problems or have added questions, feel free to reopen or create a new issue
from fluvio.
Related Issues (20)
- mirroring: remote mirrored topic should be read-only for local changes HOT 3
- mirroring: audit log and diagnosis
- [Enahancement]: Add BASE OFFSET to `show partition list` HOT 2
- Detect `Artifacts` Updates in `PackageSet` on `fvm update`
- [feature] add `smdk clean`
- [Bug]: `fluvio cluster resume` stuck in `trying to connect to SC` HOT 12
- [Bug]: Smartmodule consume returns an error HOT 1
- [Release Checklist]: `v0.11.8`
- `fvm self update` should perform update process
- Postpone `group` ownership in SMDK/CDK generation to Publish Time
- revise smartmodule build ci for consistency
- Bug: Connector ignore invalid transformation configuration HOT 1
- Bug: Generated `sink` connector cannot be builded due to missing `Stream::next` impl
- [Release Checklist]: `v0.11.9`
- Create a PR for release
- Capability to update `stable` channel on `fvm update`
- smartengine: missing parameters from smartmodules need to report smartmodule name
- Include README as part of `smdk publish`
- index repair API and tooling
- `cdk generate` params and generated project integrity
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fluvio.