Comments (21)
Hi,
Just to understand more correctly, when you say a batch of 100, does this mean 100 events having different ids and payloads?
BigBen uses a bucketing approach to bucketize events, and the default size of the buckets is 1 minute. What that means is that all the events that are scheduled for say between 10:20:00-10:21:00 should have been received before the start of 10:20:00.
At the start of 10:20:00, BigBen will scan how many events are present in that bucket and work off of that. If there're events coming later for this bucket then they might get missed for processing. I think that's what's happening in your tests.
If you ran the same test but with event times after a minute, you should not be missing any events.
It is possible to run BigBen with a smaller window size (say 10 seconds) but I have not tested that fully yet.
from bigben.
Yes, each job is its own event having its own ID + payload.
I think I understand your explanation. I'll try and adjust my tests accordingly to be scheduled rounded to minute timings. So to confirm, it sounds like if i were to schedule something that was less than a minute away and in the same minute (schedule an event at 10:20:15 for 10:20:45), then there may be a chance it would not get processed. Is that correct?
from bigben.
Correct, say at the start of 10:20 BigBen would find that there're 2300 events, given that shard size is 1000, it deduces there're 3 shards to process so it schedules them. Now let's say after 10 seconds 800 more events come. Out of these 700 will go to shard 3 (since it only has 300 events) and the remaining ones will go to 4th shard. The 4th shard is totally missed because the computation of shards happened at the beginning of the minute.
For the 3rd shard, it a bit more involved, say the 2300rd event was scheduled at 10:20:45. For every shard BigBen paginates with around 400 events in a single page. (The events are sorted by time).
Once the 400 events are done, then next batch for the same shard is fetched with time > 10:20:45 (because events are sorted by time). So, if an event is scheduled at 10:20:10 for 10:20:15 then this event will be missed because at 10:20 from shard 3 300 events were fetched with last event time as 10:20:45. Any new events fetched from shard 3 will be after 10:20:45. So, it's a hit or miss scenario.
To be safe (event time - current time) > 60 seconds would be a safe delay.
Having said that, BigBen theoretically allows a configurable scan frequency but currently the lowest granularity supported is 1 minute. (schedule.scan.interval.minutes). We can look into supporting smaller granularities in future.
from bigben.
I've adjusted our tests so that now all events are set to schedule at the next minute (ex. an event created at 10:20:15, 10:20:30, and 10:20:59 are all scheduled for 10:21:00). So like you said, it seems like at 10:21:00, up to 400 of those events successfully trigger, and the rest that are scheduled at 10:21:00 are ignored.
Is there a way to have bigben fetch the next batch from the same shard starting at the same time? It seems from your explanation anything after 10:21:00 will be scheduled (so 10:21:01 onwards), but that creates the limitation that if >400 events are scheduled for the same time, then only 400 of them will trigger since the next batch is fetched from 10:21:01 onwards, not inclusive of the last successfully scheduled event.
Update on running tests: So I scheduled a batch of 400 events for the next minute on the dot (say, 12:00:00), then scheduled another batch of 400 events for 30 seconds after. The only 400 events that successfully trigger are the ones that occur on the minute, it seems.
from bigben.
Interesting. No that should not happen. All events should be fetched even if they are more than 400 and have exactly the same time. The pagination is actually on a tuple with the condition as (event time, event id) > (time, last seen id). So even if time is same the IDs will differ (the IDs are also sorted and these are not client provided event IDs but internally generated ones). That seems like a miss in the query. Thanks for pointing it out. I'll try to fix it today and release the fix.
from bigben.
@sandeepmalik 400 looks from below.
if (events.size >= fetchSize)
if event get processed, events.size would be decreased.
I think unit test cassandra/src/test/kotlin/com/walmartlabs/bigben/cassandra/tests/IntegrationTests.kt
fun test event loader
() {
from bigben.
@whomobile, that logic seems correct. The events is an immutable list of events loaded from db, the size doesn't decrease. The logic "if (events.size >= fetchSize)" basically is saying that if in the first fetch we got 400 events then there may be more and let's try to fetch them otherwise let's consider the shard done and terminate the pagination.
The pagination query
seems to be correct as well.
There's something else going on here. I'll try to replicate what @ajzwu8 is saying and see what's going on.
from bigben.
@sandeepmalik if that's immutable, that is right. I don't think IntegrationTests.kt can cover test for ShardTask.
But here is some side thought.
IntegrationTests.kt / test event loader use following like way to paginate it.
while (l.second.isNotEmpty()) { l.second.forEach { assertEquals(events["${it.eventTime}-${it.id}"], it) events.remove("${it.eventTime}-${it.id}") } l = module<EventLoader>().load(bucket, it, fetchSize, l.second.last().eventTime!!, l.second.last().id!!, l.first).get() }
but in ShardTask module,
if (events.size >= fetchSize) loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first) else immediateFuture(rp.first to events)
from above logic, how the result of
loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first)
will get processed?
else events.filter { it.status != PROCESSED }.map { e -> schedule(e). ...
may be I missed recursive logic somewhere?
from bigben.
you're right. The second loader() is missing a listener for processing. Thanks for spotting it. I'll work on fixing it.
from bigben.
This is fixed. Thanks guys.
from bigben.
Let me know if you still face any issues, otherwise I'll close the issue later.
from bigben.
One question about pagination query.
Current pagination query
loaderQuery = mappingManager.session.prepare("SELECT * FROM ${session.loggedKeyspace}.events WHERE bucket_id = ? AND shard = ? AND (event_time, id) > (?,?) LIMIT ?;")
and each one mapped to
return mappingManager.session.executeAsync(loaderQuery.bind(bucketId, shard, eventTime, eventId, fetchSize)).transform { null to mappingManager.mapper(EventC::class.java).map(it!!).toList() }
and that called like
loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first)
for example, event_time can be set to events.last().eventTime!!
. but I am not sure whether there are any guarantee that events.last().eventTime > events.last(-1).eventTime or events.last().id > events.last(-1).id
old days in sql db, pagination can be done by "cursors" , "offset" and/or "order by". In bigint primary key based table case, could use that primary key.
I just searched how it works on cassandra case, https://stackoverflow.com/questions/26757287/results-pagination-in-cassandra-cql and it looks setFetchSize or ( pagingState, token function) is possible options.
It looks setFetchSize looks best way to cassandra.
from bigben.
@whomobile, the event time and event id are clustering keys in the event table and hence sorted by nature (both are in ASC sort). That guarantees " events.last().eventTime > events.last(-1).eventTime or events.last().id > events.last(-1).id" is always true.
As far as fetch size is concerned, the query "loaderQuery = mappingManager.session.prepare("SELECT * FROM ${session.loggedKeyspace}.events WHERE bucket_id = ? AND shard = ? AND (event_time, id) > (?,?) LIMIT ?;") "
already uses a limit in the clause where we put value 400, so there's no need to set a fetch size again.
from bigben.
That should work. thank you for explanation. yes, no need to set fetch size again.
Anyway it looks setFetchSize could cause other issues...
https://support.datastax.com/hc/en-us/articles/115003076346-FAQ-Why-does-Java-driver-setFetchSize-return-all-rows-
from bigben.
Hello,
I'm not sure if this is related, but I just pulled and deployed/built (docker_deploy.sh). When I try to run BigBen (docker_run.sh), I receive the following:
docker logs docker_bigben_run_8a26836a65a5
Using bigben config file: uri:///dist/bigben-config.yaml
Error: Main method not found in class com.walmartlabs.bigben.app.RunKt, please define the main method as:
public static void main(String[] args)
or a JavaFX application class must extend javafx.application.Application
from bigben.
Let me rebuild and push the docker image again.
from bigben.
@schiacci, please try now, sorry about that. (do a fresh pull of the image)
from bigben.
Hey @sandeepmalik, thanks for the quick fix. We tested executing future events (scheduling for at least the next minute onwards) and it seems to be working now. I'm noticing that jobs that are scheduled "immediately" (eventTime as close to current time in payload) are lost, possibly due to the logic aforementioned.
I can bypass this for now using Kafka (instead of sending to bigben to send to application event receiver, just send immediately to app receiver), but I just wanted to let you know accordingly to confirm this as a design decision or a bug as well.
from bigben.
Hi @ajzwu8 when you say 'lost' you mean they are not triggered or not stored in the db? By design, the events that are scheduled for time < current time + lapse.offset are considered late arrivals / lapsed and are triggered immediately without saving in the db. The default offset (in bigban.yaml) is 0, so effectively any event time < current time (time at which server processes the event) are considered lapsed.
The rationale was that this usually would happen in two cases. Either the event is actually scheduled very close to current time. In that case you're right, I just felt that the app / client may just route the event directly to the destination (optionally waiting a few seconds for scheduling time if required to avoid client -> kafka -> bigben -> kafka overhead.
The second case is for some reason if bigben was down or lagging for some reason for an extended period then it may not be able to pull events fast enough from kafka and when it does those may already have been lapsed. In this case bigben leans towards processing those events as fast as possible to drain the lag asap by triggering them. Storing those events would have caused a bit more latency and hence less rate at which drag would go down.
However, if auditing of lapsed events is required then it should be easy to create a custom event processor which stores in db and then publishes to kafka.
I might consider adding it later as well.
Did you measure the performance? Would you be able to share how many events you are able to process and the H/W profile? Thanks.
from bigben.
@sandeepmalik I think default value is too small.
lapse.offset.minutes: 0
as 1 minutes is minimum interval allowed, it would be safe to set it 1
from bigben.
I am closing this issue. Thanks everyone for your help!
from bigben.
Related Issues (20)
- Correct format to configure Cassandra contactPoints? HOT 1
- Guide on how to customize services using extensions/plugins HOT 1
- Info on bucket.backlog.check.limit retry mechanism HOT 4
- purging: need clarity on bucket removal startegy while purging
- How to setup bigben multinode cluster on different machines HOT 1
- Though kafka i am publishing 1lakh events for same bucket, but only 50K jobs are getting stored HOT 1
- Error when trying to run bigben in a single node, non-docker execution.
- Remove schedules event? HOT 2
- Run into an problem when running BigBen locally with Docker HOT 2
- Issue with $HOST_IP
- Request: Documentation on how to run a BigBen cluster in Kubernetes HOT 2
- Not able to run through Docker!
- What is the future of BigBen ?
- Unable to install BigBen:app version: 1.0.7-SNAPSHOT HOT 3
- Request full wiki documantation
- What if i want to fire an event which is less than 60 seconds from now and the bucket scan already happened for this event? HOT 1
- No updates for a year... HOT 1
- Module function cannot be found for the fully qualified name [com.walmartlabs.bigben.app.RunKt.logs] HOT 1
- Bigben Kafka consumer dies whenever topic has >1 partition HOT 8
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from bigben.