Comments (10)
Just to clarify, there are two I/O to fluvio cluster. First is to SC which manages metadata and second to SPU which handles actual consumer/producer. SC should not be handling SPU connection. It is responsibility of SPU to manage its own connection. This is fundamentally how Fluvio works.
Here's suggestion; just add new ErrorCode
variant. In the SPU,ScDispatcher
can expire OffsetChangeLister which is used by StreamFetchHandler
. Then StreamFetchHandler
can detect then and send appropriate ErrorCode. This should work because consumer always talks to leader who manages replica.
Thanks for working on this, this is not trivial excercise.
from fluvio.
Appreciate the comments! I'll take a look at that and will knock out the tests + changes over the next day or two hopefully.
from fluvio.
Howdy, thanks for the assistance so far in Discord for helping me figure my way through this. So far, a viable solution to me is looking like the following, but would love to get some thoughts/gut checks before implementing:
In crates/fluvio-spu/src/services/public/stream_fetch.rs
async fn process (...) -> Result<(), StreamFetchError>
, we do a select!
in a loop. Here, we could listen to a new ReplicaDeleted
event. When this event is selected, we check if the replica deleted matches our own, and if so we can simply return a StreamFetchError::Fetch(...)
with a new appropriate error code like ReplicaDeleted
with the appropriate error message. When the error is returned, send_back_error
is called. I think this might be enough to terminate the connection, but it's not totally obvious to me because we return Ok(())
here, which bubbles up to a loop at crates/fluvio-spu/src/services/public/mod.rs
async fn respond(...)
.
The next question is how to emit the event whenever the SC sends over a deleted topic notification. Both the consumer connection and SC dispatcher connections have access to the shared global context ctx
. The dispatcher processes replica changes and eventually works it's way down into crates/fluvio-spu/src/core/global_context.rs
async fn apply_replica_actions(...)
. Here, we process replica deletions for both followers and leaders. Currently, it seems like only leaders handle file stream fetch requests. However, in either case, we can emit the replica deleted signal whenever we process the replica deletion typically.
Sorry for the wall of text! Let me know if that sounds like a good plan and if I should continue with an implementation.
from fluvio.
Understood on the two I/O to Fluvio. I appreciate the suggestion and would like a little clarification, if possible. I'm not entirely sure what you mean by expiring the OffsetChangeListener. I suppose I am interpreting that to mean something like changing the listener to return a Result<i64, ErrorCode>
instead of just the i64
as it is now. Then, it would be quite simple to return the appropriate error code.
It does not seem so easy to figure out which OffsetChangeListener
to expire from the perspective of ScDispatcher
, because the OffsetChangeListener
lives, ultimately, in the ConnectionContext
which gets created for each consumer connection, which the ScDispatcher
has no real reference to.
from fluvio.
It would be challenge to change OffsetChangeListener
. Instead, it could return some negative number or other to indicate end of stream. Similar ideas to other stream
from fluvio.
I posted a PR to explore these ideas and would appreciate some feedback to get it up to standards. I still have tests to add around this feature and would appreciate some guidance on what's required to be acceptable. The current solution registers connections (OffsetPublishers) with the shared leader state. This allows the ScDispatcher to trivially signal a new offset value that represents an error (TopicDelete = -2). One drawback to this is that the SharedLeaderState will accumulate (and, worse, keep alive) OffsetPublisher objects resulting in a small memory-leak for each connection. Perhaps I can downgrade these to weak Arcs to alleviate that problem. Anyways, I got it working locally and would love some feedback!
from fluvio.
Provide comments. Using a cleaner
scheduler can be used to clean up stale connections. You can take a look cleaner.rs
in storage
crate as example
from fluvio.
Okay so with the merged PR (#3861), the consumer side of things is taken care of. Now, maybe I can look into the producer side of things. Also, the error message printed isn't completely the same as the expected message described, so I can look into changing the consumer CLI command to produce that error as expected (instead of "the topic is deleted").
If there's some other higher priority, I'd be happy to work on that as well.
from fluvio.
Nice work @jdafont, and thank you for looking into the producer as well.
from fluvio.
Stale issue message
from fluvio.
Related Issues (20)
- revise smartmodule build ci for consistency
- Bug: Connector ignore invalid transformation configuration HOT 1
- Bug: Generated `sink` connector cannot be builded due to missing `Stream::next` impl
- [Release Checklist]: `v0.11.9`
- Create a PR for release
- Capability to update `stable` channel on `fvm update`
- smartengine: missing parameters from smartmodules need to report smartmodule name
- Include README as part of `smdk publish`
- index repair API and tooling
- `cdk generate` params and generated project integrity
- mirroring: delete topic on remote when delete it from home HOT 1
- mirroring: remove βhideβ flag of home and remote commands
- mirroring: add consumer --mirror argument to consume only from the selected edge.
- mirroring: export file should have topics again
- mirroring: topic --mirror-add and --mirror-delete arguments
- mirroring: disconnect remote when delete remote from home
- mirroring: add disconnect and enable commands
- Sync scope and policy on SPU
- mirroring: implement scope/policy authorization for spu
- mirroring: feedback when connecting to home
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. πππ
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google β€οΈ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fluvio.