Giter VIP home page Giter VIP logo

Comments (21)

sandeepmalik avatar sandeepmalik commented on July 16, 2024

Hi,

Just to understand more correctly, when you say a batch of 100, does this mean 100 events having different ids and payloads?

BigBen uses a bucketing approach to bucketize events, and the default size of the buckets is 1 minute. What that means is that all the events that are scheduled for say between 10:20:00-10:21:00 should have been received before the start of 10:20:00.

At the start of 10:20:00, BigBen will scan how many events are present in that bucket and work off of that. If there're events coming later for this bucket then they might get missed for processing. I think that's what's happening in your tests.

If you ran the same test but with event times after a minute, you should not be missing any events.

It is possible to run BigBen with a smaller window size (say 10 seconds) but I have not tested that fully yet.

from bigben.

ajzwu8 avatar ajzwu8 commented on July 16, 2024

Yes, each job is its own event having its own ID + payload.

I think I understand your explanation. I'll try and adjust my tests accordingly to be scheduled rounded to minute timings. So to confirm, it sounds like if i were to schedule something that was less than a minute away and in the same minute (schedule an event at 10:20:15 for 10:20:45), then there may be a chance it would not get processed. Is that correct?

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

Correct, say at the start of 10:20 BigBen would find that there're 2300 events, given that shard size is 1000, it deduces there're 3 shards to process so it schedules them. Now let's say after 10 seconds 800 more events come. Out of these 700 will go to shard 3 (since it only has 300 events) and the remaining ones will go to 4th shard. The 4th shard is totally missed because the computation of shards happened at the beginning of the minute.

For the 3rd shard, it a bit more involved, say the 2300rd event was scheduled at 10:20:45. For every shard BigBen paginates with around 400 events in a single page. (The events are sorted by time).
Once the 400 events are done, then next batch for the same shard is fetched with time > 10:20:45 (because events are sorted by time). So, if an event is scheduled at 10:20:10 for 10:20:15 then this event will be missed because at 10:20 from shard 3 300 events were fetched with last event time as 10:20:45. Any new events fetched from shard 3 will be after 10:20:45. So, it's a hit or miss scenario.

To be safe (event time - current time) > 60 seconds would be a safe delay.

Having said that, BigBen theoretically allows a configurable scan frequency but currently the lowest granularity supported is 1 minute. (schedule.scan.interval.minutes). We can look into supporting smaller granularities in future.

from bigben.

ajzwu8 avatar ajzwu8 commented on July 16, 2024

I've adjusted our tests so that now all events are set to schedule at the next minute (ex. an event created at 10:20:15, 10:20:30, and 10:20:59 are all scheduled for 10:21:00). So like you said, it seems like at 10:21:00, up to 400 of those events successfully trigger, and the rest that are scheduled at 10:21:00 are ignored.

Is there a way to have bigben fetch the next batch from the same shard starting at the same time? It seems from your explanation anything after 10:21:00 will be scheduled (so 10:21:01 onwards), but that creates the limitation that if >400 events are scheduled for the same time, then only 400 of them will trigger since the next batch is fetched from 10:21:01 onwards, not inclusive of the last successfully scheduled event.

Update on running tests: So I scheduled a batch of 400 events for the next minute on the dot (say, 12:00:00), then scheduled another batch of 400 events for 30 seconds after. The only 400 events that successfully trigger are the ones that occur on the minute, it seems.

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

Interesting. No that should not happen. All events should be fetched even if they are more than 400 and have exactly the same time. The pagination is actually on a tuple with the condition as (event time, event id) > (time, last seen id). So even if time is same the IDs will differ (the IDs are also sorted and these are not client provided event IDs but internally generated ones). That seems like a miss in the query. Thanks for pointing it out. I'll try to fix it today and release the fix.

from bigben.

whomobile avatar whomobile commented on July 16, 2024

@sandeepmalik 400 looks from below.

loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first)

if (events.size >= fetchSize)

if event get processed, events.size would be decreased.

I think unit test cassandra/src/test/kotlin/com/walmartlabs/bigben/cassandra/tests/IntegrationTests.kt

fun test event loader() {

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

@whomobile, that logic seems correct. The events is an immutable list of events loaded from db, the size doesn't decrease. The logic "if (events.size >= fetchSize)" basically is saying that if in the first fetch we got 400 events then there may be more and let's try to fetch them otherwise let's consider the shard done and terminate the pagination.

The pagination query

loaderQuery = mappingManager.session.prepare("SELECT * FROM ${session.loggedKeyspace}.events WHERE bucket_id = ? AND shard = ? AND (event_time, id) > (?,?) LIMIT ?;")

seems to be correct as well.

There's something else going on here. I'll try to replicate what @ajzwu8 is saying and see what's going on.

from bigben.

whomobile avatar whomobile commented on July 16, 2024

@sandeepmalik if that's immutable, that is right. I don't think IntegrationTests.kt can cover test for ShardTask.

But here is some side thought.

IntegrationTests.kt / test event loader use following like way to paginate it.
while (l.second.isNotEmpty()) { l.second.forEach { assertEquals(events["${it.eventTime}-${it.id}"], it) events.remove("${it.eventTime}-${it.id}") } l = module<EventLoader>().load(bucket, it, fetchSize, l.second.last().eventTime!!, l.second.last().id!!, l.first).get() }

but in ShardTask module,
if (events.size >= fetchSize) loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first) else immediateFuture(rp.first to events)

from above logic, how the result of
loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first)
will get processed?
else events.filter { it.status != PROCESSED }.map { e -> schedule(e). ...

may be I missed recursive logic somewhere?

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

you're right. The second loader() is missing a listener for processing. Thanks for spotting it. I'll work on fixing it.

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

This is fixed. Thanks guys.

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

Let me know if you still face any issues, otherwise I'll close the issue later.

from bigben.

whomobile avatar whomobile commented on July 16, 2024

@sandeepmalik

One question about pagination query.

Current pagination query
loaderQuery = mappingManager.session.prepare("SELECT * FROM ${session.loggedKeyspace}.events WHERE bucket_id = ? AND shard = ? AND (event_time, id) > (?,?) LIMIT ?;")
and each one mapped to
return mappingManager.session.executeAsync(loaderQuery.bind(bucketId, shard, eventTime, eventId, fetchSize)).transform { null to mappingManager.mapper(EventC::class.java).map(it!!).toList() }
and that called like
loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first)

for example, event_time can be set to events.last().eventTime!!. but I am not sure whether there are any guarantee that events.last().eventTime > events.last(-1).eventTime or events.last().id > events.last(-1).id

old days in sql db, pagination can be done by "cursors" , "offset" and/or "order by". In bigint primary key based table case, could use that primary key.

I just searched how it works on cassandra case, https://stackoverflow.com/questions/26757287/results-pagination-in-cassandra-cql and it looks setFetchSize or ( pagingState, token function) is possible options.

It looks setFetchSize looks best way to cassandra.

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

@whomobile, the event time and event id are clustering keys in the event table and hence sorted by nature (both are in ASC sort). That guarantees " events.last().eventTime > events.last(-1).eventTime or events.last().id > events.last(-1).id" is always true.

As far as fetch size is concerned, the query "loaderQuery = mappingManager.session.prepare("SELECT * FROM ${session.loggedKeyspace}.events WHERE bucket_id = ? AND shard = ? AND (event_time, id) > (?,?) LIMIT ?;") "

already uses a limit in the clause where we put value 400, so there's no need to set a fetch size again.

from bigben.

whomobile avatar whomobile commented on July 16, 2024

That should work. thank you for explanation. yes, no need to set fetch size again.

Anyway it looks setFetchSize could cause other issues...
https://support.datastax.com/hc/en-us/articles/115003076346-FAQ-Why-does-Java-driver-setFetchSize-return-all-rows-

from bigben.

schiacci avatar schiacci commented on July 16, 2024

Hello,
I'm not sure if this is related, but I just pulled and deployed/built (docker_deploy.sh). When I try to run BigBen (docker_run.sh), I receive the following:

docker logs docker_bigben_run_8a26836a65a5
Using bigben config file: uri:///dist/bigben-config.yaml
Error: Main method not found in class com.walmartlabs.bigben.app.RunKt, please define the main method as:
   public static void main(String[] args)
or a JavaFX application class must extend javafx.application.Application

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

Let me rebuild and push the docker image again.

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

@schiacci, please try now, sorry about that. (do a fresh pull of the image)

from bigben.

ajzwu8 avatar ajzwu8 commented on July 16, 2024

Hey @sandeepmalik, thanks for the quick fix. We tested executing future events (scheduling for at least the next minute onwards) and it seems to be working now. I'm noticing that jobs that are scheduled "immediately" (eventTime as close to current time in payload) are lost, possibly due to the logic aforementioned.

I can bypass this for now using Kafka (instead of sending to bigben to send to application event receiver, just send immediately to app receiver), but I just wanted to let you know accordingly to confirm this as a design decision or a bug as well.

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

Hi @ajzwu8 when you say 'lost' you mean they are not triggered or not stored in the db? By design, the events that are scheduled for time < current time + lapse.offset are considered late arrivals / lapsed and are triggered immediately without saving in the db. The default offset (in bigban.yaml) is 0, so effectively any event time < current time (time at which server processes the event) are considered lapsed.

https://github.com/walmartlabs/bigben/blob/master/lib/src/main/kotlin/com/walmartlabs/bigben/api/EventReceiver.kt#L189

The rationale was that this usually would happen in two cases. Either the event is actually scheduled very close to current time. In that case you're right, I just felt that the app / client may just route the event directly to the destination (optionally waiting a few seconds for scheduling time if required to avoid client -> kafka -> bigben -> kafka overhead.

The second case is for some reason if bigben was down or lagging for some reason for an extended period then it may not be able to pull events fast enough from kafka and when it does those may already have been lapsed. In this case bigben leans towards processing those events as fast as possible to drain the lag asap by triggering them. Storing those events would have caused a bit more latency and hence less rate at which drag would go down.

However, if auditing of lapsed events is required then it should be easy to create a custom event processor which stores in db and then publishes to kafka.

I might consider adding it later as well.

Did you measure the performance? Would you be able to share how many events you are able to process and the H/W profile? Thanks.

from bigben.

whomobile avatar whomobile commented on July 16, 2024

@sandeepmalik I think default value is too small.

lapse.offset.minutes: 0
as 1 minutes is minimum interval allowed, it would be safe to set it 1

from bigben.

sandeepmalik avatar sandeepmalik commented on July 16, 2024

I am closing this issue. Thanks everyone for your help!

from bigben.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.