Giter VIP home page Giter VIP logo

Comments (10)

sehz avatar sehz commented on May 30, 2024 1

Just to clarify, there are two I/O to fluvio cluster. First is to SC which manages metadata and second to SPU which handles actual consumer/producer. SC should not be handling SPU connection. It is responsibility of SPU to manage its own connection. This is fundamentally how Fluvio works.

Here's suggestion; just add new ErrorCode variant. In the SPU,ScDispatcher can expire OffsetChangeLister which is used by StreamFetchHandler. Then StreamFetchHandler can detect then and send appropriate ErrorCode. This should work because consumer always talks to leader who manages replica.

Thanks for working on this, this is not trivial excercise.

from fluvio.

jdafont avatar jdafont commented on May 30, 2024 1

Appreciate the comments! I'll take a look at that and will knock out the tests + changes over the next day or two hopefully.

from fluvio.

jdafont avatar jdafont commented on May 30, 2024

Howdy, thanks for the assistance so far in Discord for helping me figure my way through this. So far, a viable solution to me is looking like the following, but would love to get some thoughts/gut checks before implementing:

In crates/fluvio-spu/src/services/public/stream_fetch.rs async fn process (...) -> Result<(), StreamFetchError>, we do a select! in a loop. Here, we could listen to a new ReplicaDeleted event. When this event is selected, we check if the replica deleted matches our own, and if so we can simply return a StreamFetchError::Fetch(...) with a new appropriate error code like ReplicaDeleted with the appropriate error message. When the error is returned, send_back_error is called. I think this might be enough to terminate the connection, but it's not totally obvious to me because we return Ok(()) here, which bubbles up to a loop at crates/fluvio-spu/src/services/public/mod.rs async fn respond(...).

The next question is how to emit the event whenever the SC sends over a deleted topic notification. Both the consumer connection and SC dispatcher connections have access to the shared global context ctx. The dispatcher processes replica changes and eventually works it's way down into crates/fluvio-spu/src/core/global_context.rs async fn apply_replica_actions(...). Here, we process replica deletions for both followers and leaders. Currently, it seems like only leaders handle file stream fetch requests. However, in either case, we can emit the replica deleted signal whenever we process the replica deletion typically.

Sorry for the wall of text! Let me know if that sounds like a good plan and if I should continue with an implementation.

from fluvio.

jdafont avatar jdafont commented on May 30, 2024

Understood on the two I/O to Fluvio. I appreciate the suggestion and would like a little clarification, if possible. I'm not entirely sure what you mean by expiring the OffsetChangeListener. I suppose I am interpreting that to mean something like changing the listener to return a Result<i64, ErrorCode> instead of just the i64 as it is now. Then, it would be quite simple to return the appropriate error code.

It does not seem so easy to figure out which OffsetChangeListener to expire from the perspective of ScDispatcher, because the OffsetChangeListener lives, ultimately, in the ConnectionContext which gets created for each consumer connection, which the ScDispatcher has no real reference to.

from fluvio.

sehz avatar sehz commented on May 30, 2024

It would be challenge to change OffsetChangeListener. Instead, it could return some negative number or other to indicate end of stream. Similar ideas to other stream

from fluvio.

jdafont avatar jdafont commented on May 30, 2024

I posted a PR to explore these ideas and would appreciate some feedback to get it up to standards. I still have tests to add around this feature and would appreciate some guidance on what's required to be acceptable. The current solution registers connections (OffsetPublishers) with the shared leader state. This allows the ScDispatcher to trivially signal a new offset value that represents an error (TopicDelete = -2). One drawback to this is that the SharedLeaderState will accumulate (and, worse, keep alive) OffsetPublisher objects resulting in a small memory-leak for each connection. Perhaps I can downgrade these to weak Arcs to alleviate that problem. Anyways, I got it working locally and would love some feedback!

#3861

from fluvio.

sehz avatar sehz commented on May 30, 2024

Provide comments. Using a cleaner scheduler can be used to clean up stale connections. You can take a look cleaner.rs in storage crate as example

from fluvio.

jdafont avatar jdafont commented on May 30, 2024

Okay so with the merged PR (#3861), the consumer side of things is taken care of. Now, maybe I can look into the producer side of things. Also, the error message printed isn't completely the same as the expected message described, so I can look into changing the consumer CLI command to produce that error as expected (instead of "the topic is deleted").

If there's some other higher priority, I'd be happy to work on that as well.

from fluvio.

ajhunyady avatar ajhunyady commented on May 30, 2024

Nice work @jdafont, and thank you for looking into the producer as well.

from fluvio.

github-actions avatar github-actions commented on May 30, 2024

Stale issue message

from fluvio.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.