Comments (7)
It's an interesting idea, but KinesisClient doesn't provide a callback based api, so I'm not sure how you would use get-records with core.async. And the Kinesis Client Library already uses an unbounded thread pool under the hood...
from amazonica.
Maybe I am misunderstanding - the intent is not to use get-records. The idea is to use IRecordProcessor
almost the same as today. i.e. adapt processRecords
in processor-factory
as outlined in Step 1.
from amazonica.
I guess I'd be interested to see more specifics about the actual use case. core.async is useful to let you program synchronously while still working with a callback based api. A good example would be where you're making an async http call, with something like http-kit, and you want to do something with the response. But the processing didn't originate with that http call, there was some thread of execution that existed before, a request for a stock quote, whatever, that needs to make the http call and do something with the response. So it's nice to trade that callback api for core.async channels. But with Kinesis, you've just got this stream, the whole application begins with the function you provide. There's technically a callback api with the worker, but you don't have to return any value to some other thread of execution looking to do something with the result.
The other benefit of core.async is optimization, as you're tying up fewer threads than if you were using futures for everything. But the KCL is already invoking every call to processRecords in its own thread, so you're not saving anything there. (Technically, you'd actually be tying up more threads as soon as you introduce core.async.)
So basically, the only advantage could be the api, which I guess I'm just not seeing what the advantage to <!'ing off a channel would be as compared to a plain old function, when every value that you take off the channel would have been put there on its own thread from the KCL pool.
(Technically, you could supply your own ExecutorService to KCL, but I think that'll prove to be non-trivial.)
from amazonica.
I agree that a guiding use case would be very helpful. I don't have a specific one, but with Directed Acyclic Graphs (referred to in the Kinesis docs for more complex stream processing) it would be much better to create a DAG with channels than to emit from one Kinesis stream to another Kinesis stream.
A major advantage is in the API as you say. Also the benefit of separating stream processing components from each other (and from Kinesis). The result might be similar to Apache Storm but lacking the exactly-once semantics.
Food for thought anyway as core.async evolves.
from amazonica.
I am still learning the details of Kinesis and other AWS services, but here is one generic use case to consider: taking Kinesis data, processing it, and storing it reliably in DynamoDB, Redshift, or S3. Apologies for the length.
How this use case is addressed in Java
- "The Kinesis Client Library can be used in conjunction with the Kinesis Connector Library to reliably move data from Amazon Kinesis to Amazon DynamoDB, Amazon Redshift, and Amazon S3"
http://docs.aws.amazon.com/kinesis/latest/dev/before-you-begin.html KinesisConnectorRecordProcessor
is the base class for any KinesisConnector. It implementsIRecordProcessor
.
https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorRecordProcessor.java- The
processRecords
method uses application defined filter and transformer callbacks, and stores the new records in a buffer - Buffered records are periodically stored and checkpointed, not necessarily on every ingestion from Kinesis
- Java users implement the processing pipeline by subclassing from Connector Library classes; for DynamoDB for example:
https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/samples/dynamodb/DynamoDBMessageModelPipeline.java
Aggregation of records and checkpointing
Key design issue: records cannot be checkpointed until they are stored (in DynamoDB, Redshift, S3, etc) otherwise data can be lost
- If records are stored one at a time (or whatever amount Kinesis delivers each iteration) the basic
IRecordProcessor
-based approach works without risk of data loss; but often it is better to aggregate records and store them in chunks according to a defined policy. - In any situation, records cannot be checkpointed before they are stored (in DynamoDB, RedShift or S3) otherwise data could lost in case of a failure
- Allowing records to be aggregated for storage independent of the Kinesis arrival rate is useful
- In Clojure this could be done if something like the
KinesisConnectorRecordProcessor
scheme was used and the app supplied the appropriate callback methods. Or, channels could be used to decouple the processing pipeline and checkpointing from record ingestion. - Whatever the implementation, the complete system would ingest, transform, aggregate, store and only then checkpoint to avoid data loss.
Summary
For this use case, the main benefits of using core.async channels might be:
- An Amazonica app could independently use Kinesis and DynamoDB or other storage services, and bulk load data reliably independent of the Kinesis arrival rate
- Checkpointing and processing are decoupled from record ingestion, without the baggage of Java classes
- Easier to implement in Amazonica and in the app compared to using the Kinesis Connector Library
More info
from amazonica.
@mcohen01 I am almost finished a complete implementation, including a test framework that mocks Kinesis so that tests can be done outside of AWS. It won't be merge-worthy yet, but I will send a pull request soon anyway to show the general direction and get any feedback.
from amazonica.
This is a relevant discussion thread, just fyi:
https://forums.aws.amazon.com/message.jspa?messageID=531052
from amazonica.
Related Issues (20)
- Calling S3 multipart functions and eastwood warnings HOT 2
- acm namespace under certificatemanager.clj causes import to fail HOT 1
- SQS send-message with message-attributes HOT 3
- Update README.md Lambda Section
- Old dependencies have known vulnerabilities HOT 1
- How to get the uri of a file saved to aws using put-object?
- Unaccurate :arglists for #'amazonica.aws.simpleemail/send-email HOT 3
- Support for KCL 2 (enhanced fan out)
- How to get the progress in uploading to an s3 bucket? HOT 2
- Route53 list-hosted-zones in China HOT 1
- Details on "Could not determine best method to invoke for download using arguments"
- Kinesis checkpointing does not work correctly when leasing multiple shards
- Kinesis client workers do not gracefully shutdown
- How to start new Kinesis worker at InitialPositionInStream.LATEST? HOT 1
- s3 region error
- Very large transient libraries HOT 2
- get-shard-iterator with AT_TIMESTAMP not working HOT 1
- Kinesis worker doesn't use amazonica credentials mechanism HOT 2
- s3 example from the README throws Execution error (IllegalStateException)
- No release tags since 2015
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from amazonica.