amazon-archives / amazon-kinesis-connectors Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
Hello Team,
I am using this library to store data from Kinesis stream to S3.
I appreciate the overall design of the library, but then I want to aggregate some data (e.g. calculate sum of timespan group by visitor id) before uploading it to S3.
After some googling I come to find another library https://github.com/awslabs/amazon-kinesis-aggregators but I am not sure how to use them together with the connector library.
I guess I should follow the instructions under this section: https://github.com/awslabs/amazon-kinesis-aggregators#integrating-aggregators-into-existing-applications
but where should I put the aggregate()
, checkpoint()
, initialize()
, shutdown()
calls? For which class(es) should I extend to do so ?
Hi All,
I've managed to get the amazon redshift connector running locally on my virtual machine, however, I would like to use this library as part of our production deployment.
Does anyone have an example of what the deployment topology would look like? For example, what is the throughput to Redshift when running this library on single EC2 instance? Does it need to be deployed using a distributed mechanism? If it can be distributed, will it integrate with Apache Spark (Streaming)?
Thanks in advanced for any help, Mike.
Hi
I feel that defining toClass & fromClass to throw IOException is very constraining.
In case, I am performing some validation and want to throw some custom Exception I cannot do that.
Also the KinesisConnectorRecordProcessor only handles IOExceptions, its even handle any other Runtime Exceptions that may occur. This is not a valid reason to stop processing the rest of the records.
Hence I request you to change the use java.lang.Exception or at the very least use RuntimeExceptions instead everywhere
At this stage, we only have CPU usage as per Auto scaling logic - if 80% CPU for 10 minutes, add new instance.
Do you have any other recommendations? Based on this matrices
http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html
I'm working on a project where we need to customize the library a bit. This is quite hard to do because lots of methods are private or some fields are final.
Some examples:
Problem 1: Partition key
//S3ManifestEmitter.java
// Use constant partition key to ensure file order
putRecordRequest.setPartitionKey(manifestStream);
Problem 2: Mock S3Client
//S3Emitter.java
public S3Emitter(KinesisConnectorConfiguration configuration) {
s3Bucket = configuration.S3_BUCKET;
s3Endpoint = configuration.S3_ENDPOINT;
s3client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
if (s3Endpoint != null) {
s3client.setEndpoint(s3Endpoint);
}
}
Hi,
If a runtime exception happens in our emitter code (preventing buffer.clear()
) it bubbles out and is swallowed in the KCL ProcessTask.java. The KCL continues feeding new records to the buffer. This allows the buffer to grow unbounded.
Then if the application recovers (before OutOfMemory), the large buffer is flushed all at once.
Would be great if there was ability to:
I'd like to use the redshiftbasic
example in order to hook up my Kinesis Stream to dump into Redshift:
It looks like, per the docs, that using the redshiftbasic
example is the way to go.
However, I'm not sure how to use this code.
Do I simply need to take the children of the folder, amazon-kinesis-connectors/src/main/samples/redshiftbasic? Then, I need to modify a few of the source files, such as KinesisMessageModelRedshiftTransformer.java
, to match my data-set?
When I run the build script via ant
, do I run it once and then the Kinesis stream is wired up to always talk to Redshift?
Or does the program run continuously? If it stops (let's say due to CNTRL
+ Z
, do I no longer get the functionality of Kinesis dumping into Redshift?
Also - how do I compile this project? Thanks
I have taken the S3 emitter sample and modified it to our needs. The core library code (under src/main/java) is unmodified. I left it running overnight for several nights for stability testing and encountered the following in the logs one morning (and have been unable to reproduce since):
2014-03-25 03:45:04,105 63555519 INFO [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2014-03-25 03:45:04,198 63555612 INFO [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 successfully took 1 leases: shardId-000000000000
2014-03-25 03:45:06,339 63557753 INFO [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2014-03-25 03:45:06,431 63557845 INFO [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 successfully took 1 leases: shardId-000000000000
[snip]
2014-03-25 03:49:13,416 63804830 INFO [pool-1-thread-3] com.mycompany.mypackage.S3Emitter - Successfully emitted 1002 records to S3 in s3://my-bucket/my-stream/2014-03-25/03/startid-stopid
2014-03-25 03:49:13,450 63804864 INFO [pool-1-thread-3] com.amazonaws.services.kinesis.leases.impl.LeaseRenewer - Worker abcd:-1234:efgh:-8000 refusing to update lease with key shardId-000000000000 because concurrency tokens don't match
2014-03-25 03:49:13,451 63804865 ERROR [pool-1-thread-3] com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor - com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
Last set of messages kept repeating. It continued to process items, but Kinesis wasn't acknowledging the processing. When I restarted the program, it re-processed all of those items. I have no idea what caused the problem has I have left it running for days on end with no problems, and the same levels of load going through Kinesis.
It appears that it was actually continuing to process new items instead of reprocessing the same set over and over again.
I have narrowed the problem down to KinesisConnectorRecordProcessor.emit(IRecordProcessorCheckpointer,List). https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorRecordProcessor.java#L134
emitter.emit on line 139 did not encounter any problems and returned an empty list since nothing failed.
The call to checkpointer.checkpoint(); on line 153 threw a ShutdownException, which was then properly caught.
The call to emitter.fail(unprocessed) in the catch block on 157 passed in the empty list returned above.
It would seem to me that if checkpointer.checkpoint() throws, emitter.fail() should be called with all emitItems. Additionally, at least if a ShutdownException is encountered, it would be nice to attempt to re-initialize the Kinesis consumer.
In the mean time, we should be able to make do with terminating our program any time emitter.fail() is called, and have a supervisor simply restart the program when it terminates.
Getting below error when i try to run dynamodb sample. Thanks in advance.
ava] INFO: Created new shardConsumer for shardId: shardId-000000000000, concurrencyToken: 800bb2cc-1a82-46b9-8ec8-f5d6287c9a69
[java] Nov 25, 2014 4:18:13 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
[java] INFO: No need to block on parents [] of shard shardId-000000000000
[java] Nov 25, 2014 4:18:13 PM com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable publishMetrics
[java] INFO: Successfully published 20 datums.
[java] Nov 25, 2014 4:18:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000000 with TRIM_HORIZON
[java] Nov 25, 2014 4:18:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000001 with TRIM_HORIZON
[java] Nov 25, 2014 4:18:16 PM com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter performBatchRequest
[java] SEVERE: Amazon DynamoDB Client could not perform batch request
[java] com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: PCQTJ5602VPM99LCCU7EHGUUFFVV4KQNSO5AEMVJF66Q9ASUAAJG)
[java] at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1077)
[java] at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:725)
[java] at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460)
[java] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295)
[java] at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3106)
[java] at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.batchWriteItem(AmazonDynamoDBClient.java:771)
[java] at com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter.performBatchRequest(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter.emit(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.emit(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.processRecords(Unknown Source)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:125)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
[java] at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[java] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[java] at java.lang.Thread.run(Thread.java:744)
It seems very logical to allow connector application to emit to multiple files.
I think the only way to currently implement is may be have multiple connectors to read from the same stream and emit to a different file/table
A stackoverflow for the same problem is open for the past 6 months w/o any answers
https://stackoverflow.com/questions/26108368/can-i-use-amazon-kinesis-connectors-to-send-a-stream-to-two-destinations-two-em
In reference to:
https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923
It appears that the connector does not work with ES as managed by aws. Although the initial connect appears to be succesful (using endpoint at port 443), the first call after makes it disconnect again:
2016-05-12 12:40:49,485 @main INFO ElasticsearchEmitter ElasticsearchEmitter using elasticsearch endpoint search-<...>.eu-west-1.es.amazonaws.com:443 ElasticsearchEmitter.java(118)
2016-05-12 12:40:49,640 @main INFO plugins [Leap-Frog] loaded [], sites [] PluginsService.java(151)
2016-05-12 12:41:44,595 @LeaseCoordinator-1 INFO LeaseTaker Worker cfe43d56bde02857:-651163f0:154a48db5ff:-8000 saw 36 total leases, 35 available leases, 1 workers. Target is 36 leases, I have 1 leases, I will take 35 leases LeaseTaker.java(403)
2016-05-12 12:42:51,789 @main INFO transport [Leap-Frog] failed to get node info for [#transport#-1][pbox-VirtualBox][inet[search-<...>.eu-west-1.es.amazonaws.com/52.17.232.195:443]], disconnecting... TransportClientNodesService.java(398)
org.elasticsearch.transport.SendRequestTransportException: [][inet[search-<...>.eu-west-1.es.amazonaws.com/52.17.232.195:443]][cluster:monitor/nodes/info]
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:286)
at org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:243)
at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:376)
A similar issue is reported @ amazon-archives/cloudwatch-logs-subscription-consumer#9
Perhaps the elasticsearch-hadoop (spark) receiver gives some hints how to approach this: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/reference.html; specifically the option:
conf.set("es.nodes.wan.only", "true"); (https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html)
UnmodifiableBuffer previously compares its own records
object against itself rather than against the records
object of the other buffer. This means that equals
would return true even if two buffers contained different records.
It does not escape REDSHIFT_DATA_DELIMITER
at KinesisMessageModelRedshiftTransformer#toDelimitedString()
. It will be broken when a record contains a delimiter.
From the directions:
Downloaded the PG jdbc driver http://jdbc.postgresql.org/download.html and put in the following directory:
src/jdbc/lib/postgresql-9.3-1101.jdbc41.jar
Added the driver path to build.xml
<path id="samples.classpath">
/* removed */
<fileset dir="${basedir}/../jdbc/lib" includes="**/*.jar" />
</path>
In RedshiftManifestSample.properties i have the URL:
redshiftURL = jdbc:postgresql://HOST_NAME_REMOVED.us-east-1.redshift.amazonaws.com
When I run the program with ant run
is see the following:
[java] Mar 28, 2014 1:21:48 PM samples.utils.RedshiftUtils tableExists
[java] SEVERE: java.sql.SQLException: No suitable driver found for jdbc:postgresql://HOST_NAME_REMOVED.us-east-1.redshift.amazonaws.com
[java] Exception in thread "main" java.lang.IllegalStateException: Could not create Redshift file table kinesisFiles
[java] at samples.KinesisConnectorExecutor.createRedshiftFileTable(Unknown Source)
[java] at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
[java] at samples.KinesisConnectorExecutor.<init>(Unknown Source)
[java] at samples.redshiftmanifest.RedshiftManifestExecutor.<init>(Unknown Source)
[java] at samples.redshiftmanifest.RedshiftManifestExecutor.main(Unknown Source)
[java] Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgresql://ht-redshift-test.chzuqbjvtyx7.us-east-1.redshift.amazonaws.com
[java] at java.sql.DriverManager.getConnection(DriverManager.java:596)
[java] at java.sql.DriverManager.getConnection(DriverManager.java:187)
[java] at samples.utils.RedshiftUtils.createRedshiftTable(Unknown Source)
[java] ... 5 more
We are using the latest build to read messages from a multi-shard Kinesis stream using workers running across multiple EC2 instances and write them to S3. In this context, please see the following doc:
http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-duplicates.html
It talks about cases where we can end up with reading duplicate records from the stream. The S3Emitter uses the buffer.firstSeqNumber and buffer.lastSeqNumber to determine the S3 filename. In the case of retries, this approach might not guarantee duplicate removal as the buffer can be filled differently with a different size or count. The approach suggested by the doc does not work in the case of S3Emitter as there is no way to control the exact number of records in the buffer (due to the emit being based on count, time OR size). Any thoughts on how we can overcome this limitation?
Thanks.
I've spent a ton of time trying to get anything to build with this project. There are no clear instructions to build the sample application. You try, and then you will find that you need the specific jar file of the Kinesis Client, placed into a specific location, from here: #1
Then, you find that it still fails because it wants an "external" directory in the source tree. Create that empty dir, and try again.
Next, the problem is 29 errors, e.g.:
ElasticsearchEmitter.java:241: error: cannot find symbol
[javac] } else if (response.getStatus().equals(ClusterHealthStatus.GREEN)) {
[javac] ^
[javac] symbol: variable ClusterHealthStatus
[javac] location: class ElasticsearchEmitter
Another possible clue:
warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotation Processor' less than -source '1.7'
I am using java 1.7, but given this error, even tried version 1.6 which did not resolve. I also tried both the open sdk and the oracle sdk.
All I want to do is compile a sample file and move on, pulling out my hair and stabbing my eyeballs out in frustration here.....
Hi,
I am trying to implement kinesis-s3 connector. my question is how I can monitor job (production perspective) and in case of any exception how I can take action (send mail, reschedule job).
Is cloudwatch good option ? What are other recommended ways
Thanks
Ankur
Hi !
I am not able to find any help on the topic of integrating kinesis connectors with the python helper.
Kalyan : )
At a high-level, what does this sample do?
I am trying to run it, but I admit that, if I understand what it did at a high-level, I think that it'd help me to run the sample.
Does it populate data from users.txt
into S3, and then put the data into Kinesis? Finally, it puts the records into Redshift (from Kinesis) depending upon either a time or byte threshold?
I'm confused what it does at a high-level.
Additionally, I had modified src/main/samples/redshiftbasic/RedshiftBasicSample.properties
to create a Redshift cluster + Kinesis stream.
Yet, after I run ant run
, I don't see the stream or cluster created when running aws kinesis list-streams
or look at AWS Management Console's Cluster view for Redshift.
Either the attached code or the ant configuration does not properly handle unicode characters when writing to output files. Did anyone experience this issue and find appropriate lines of code that need to be modified to handle unicode characters?
I've been trying unsuccessfully to run the different samples e.g. the S3 sample.
To do that, I have:
AwsCredentials.properties
file in each of the sample folders (e.g. amazon-kinesis-connector/src/main/samples/s3/AwsCredentials.properties
), with accessKey
and secretKey
fields set. (Note that this is an exact copy of the AwsCredentials.properties
file I used to run the aws-java-sdk-1.6.12 samples successfully.amazon-kinesis-connector/src/aws-java-sdk-1.6.12/
and amazon-kinesis-connector/src/KinesisClientLibrary/lib/amazon-kinesis-client-1.0.0.jar
which was a bit weird...)ant run
from the command line in e.g. amazon-kinesis-connector/main/samples/s3
I keep getting the following error:
[java] Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
[java] at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
[java] at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2120)
[java] at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:296)
[java] at samples.utils.KinesisUtils.streamExists(Unknown Source)
[java] at samples.utils.KinesisUtils.createAndWaitForStreamToBecomeAvailable(Unknown Source)
[java] at samples.utils.KinesisUtils.createInputStream(Unknown Source)
[java] at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
[java] at samples.KinesisConnectorExecutor.<init>(Unknown Source)
[java] at samples.s3.S3Executor.<init>(Unknown Source)
[java] at samples.s3.S3Executor.main(Unknown Source)
[java] Java Result: 1
It seems like the AwsCredentials.properties
file is not being loaded... Is that right? What am I doing wrong? Any help would be much appreciated
I am wondering if the library supports Http Proxy. We are trying to use the ES connector using AWS Cloudwatch Consumer and for some reason the API is unable to store the Lease information in DynamoDB. I get the HTTP connection timed out error.
I tried passing the proxy settings through JAVA_OPTIONS in call to the "java -jar" as System properties but that doesn't seem to make any difference.
Any known fixes for this?
Here's the Stack trace to the effect.
2016-02-20 02:57:10,741 INFO AmazonHttpClient - Unable to execute HTTP request: connect timed out
java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:656)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:524)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:403)
at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:118)
at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177)
at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:304)
at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:611)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:446)
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:706)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:467)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3240)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:1047)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:373)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:317)
at com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase.run(KinesisConnectorExecutorBase.java:95)
at com.amazonaws.services.logs.connectors.samples.elasticsearch.ElasticsearchConnector.main(ElasticsearchConnector.java:38)
Thanks
Hi , I have an application which is developed by using Struts and hibernate. now i am trying to connect AWS Redshift. My credentials are like (in Hibernate cfg file)
1)driver_class: org.postgresql.Driver
connection.url: jdbc:postgresql://host_name.redshift.amazonaws.com:5439/user
dialect: org.hibernate.dialect.PostgreSQL82Dialect
2)driver_class: com.amazon.redshift.jdbc4.Driver
connection.url: jdbc:redshift://host_name.redshift.amazonaws.com:5439/user
dialect: org.hibernate.dialect.PostgreSQL82Dialect(using this sincedont know the dialect name here)
both the cases I am getting
Error: Could not open connection: No suitable driver found for jdbc:postgresql:
I have added redshift JDBC & postgreysql jars also. I have no idea on this can anybody help me on this .
If my emitter is down for some reason, I see the size of my buffer growing larger and larger until my VM runs out of memory. The reason is the record processor continues to read records from Kinesis, regardless of the size of my buffer. This separate thread is reading the records from Kinesis and draining to my buffer continuously, whether I can emit the records at that moment or not.
Is there some way to provide back-pressure information to the thread that is polling Kinesis for additional records in these cases? Wouldn't any process that emits records much slower than they are read from Kinesis end up in an OoM situation?
We use batch semantics in our emitter to write to Elasticsearch which can handle about a 10MB payload, so we restrict our in-memory buffer to 10MB. However, if Elasticsearch is down for some reason for a period of a minute or so, our buffer grows to 100MB in size. Doesn't anyone who uses the KCL have to consider segmenting the records passed to their emitter because they could be passed, many, many more records than they expect given their buffer configuration parameters?
For writing to Amazon Elasticsearch Service, we should put signature v4 to our request. Do you plan to support that? If not, I'll try to implement it. Basic idea is implementing AmazonElasticsearchServiceEmitter
for requesting Amazon Elasticsearch Service. ElasticsearchObject
and ElasticsearchTransformer
may be works well with Amazon Elasticsearch Service too.
Hello,
Is it possible to add support for dateformat and timeformat into RedshiftBasicEmitter ?
Thanks,
Elasticsearch Java library is also required from version 1.1.1, which is missing in README.
How to handle the case where not all records in stream have same processing method? Do we need to override the processRecords method in KinesisConnectorRecordProcessor to add custom processing?
The emitter interface contains a #fail(List records) method. That's quite useful because if for some reason one message cannot be emitted after N retries, we just publish that to an sqs dead letter queue.
We would like to do the same but also when the message cannot be transformed to T. At the moment, if the transformer throws an exception, the KinesisConnectorRecordProcessor class just logs the exception and that's it.
It would be cool if in addition to log the exception, it can call a callback in the transformer or in the interface that you guys think is better, so then we can do something with the messages that can not be transformed (for example, invalid json, or a json without a field that the pipeline assumes present).
I am trying to run the RedshiftBasic sample and I have included aws.accessKeyId and aws.secretKey properties at the end of the RedshiftBasicSample.properties file per the instructions. When I run 'ant run', the following trace comes out:
run:
[javac] /Users/andrewguy/www/amazon-kinesis-connectors/src/main/samples/redshiftbasic/build.xml:48: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
[javac] Compiling 28 source files to /Users/andrewguy/www/amazon-kinesis-connectors/src/main/build
[javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.8'
[javac] 1 warning
[java] Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
[java] at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
[java] at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2486)
[java] at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:861)
[java] at samples.utils.KinesisUtils.streamExists(Unknown Source)
[java] at samples.utils.KinesisUtils.createAndWaitForStreamToBecomeAvailable(Unknown Source)
[java] at samples.utils.KinesisUtils.createInputStream(Unknown Source)
[java] at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
[java] at samples.KinesisConnectorExecutor.<init>(Unknown Source)
[java] at samples.redshiftbasic.RedshiftBasicExecutor.<init>(Unknown Source)
[java] at samples.redshiftbasic.RedshiftBasicExecutor.main(Unknown Source)
[java] Java Result: 1
BUILD SUCCESSFUL
Total time: 3 seconds
What am I missing?
Thanks.
In the samples provided, we have the following property defined in s3.properties file:
bufferMillisecondsLimit = 3600000
This property seems to indicate that the buffer's contents will be emitted when the configured time elapses (among other limits). However, I do not see this property being referred anywhere in the source. Even the "BasicMemoryBuffer.java" uses only the other two limits (count & size) and not this time-based property. What would be the right place to implement the time-based emitting of records from the buffer? I checked KinesisConnectorRecordProcessor.java, but feel that check-pointing would be difficult without some global state. Any pointers would greatly help.
Would be nice to see integration tests.
The AWS ES service is using es 1.5.2, but your library are depend on 1.2.1.
There is an exception when I try to connect to AWS ES service.
org.elasticsearch.client.transport.NoNodeAvailableException: No node available
Any thought on this? I am not sure if I am right.
Would be great to get a latest version with all the latest fixes available on Maven Central so we can stop using our fork!
In some cases, data is batched on the client side before sending it to Kinesis via a PutRecord request. (For instance, the data blob may be a json array containing objects that should each be handled separately. I'm doing something like this, and looking to store each object on its own line in a newline-delimited json file on S3.) The connector library supports this by including both ITransformer
and ICollectionTransformer
interfaces and the processRecords
method in KinesisConnectorRecordProcessor
contains the following:
...
if (transformer instanceof ITransformer) {
ITransformer<T, U> singleTransformer = (ITransformer<T, U>) transformer;
filterAndBufferRecord(singleTransformer.toClass(record), record);
} else if (transformer instanceof ICollectionTransformer) {
ICollectionTransformer<T, U> listTransformer = (ICollectionTransformer<T, U>) transformer;
Collection<T> transformedRecords = listTransformer.toClass(record);
for (T transformedRecord : transformedRecords) {
filterAndBufferRecord(transformedRecord, record);
}
}
...
And filterAndBufferRecord
is implemented as:
private void filterAndBufferRecord(T transformedRecord, Record record) {
if (filter.keepRecord(transformedRecord)) {
buffer.consumeRecord(transformedRecord, record.getData().array().length, record.getSequenceNumber());
}
}
Notice that filterAndBufferRecord
uses record.getData().array().length
as the size of the record. So in the case of transformer
being an ICollectionTransformer
implementation, each sub-record is passed to buffer.consumeRecord
but with the size of the whole batch instead of the individual sub-record. This means the buffer will flush too often because the size it's keeping track of is inflated.
My workaround for now is just setting bufferByteSizeLimit
to Integer.MAX_VALUE
, and relying on bufferRecordCountLimit
to flush the buffer.
When running ant run
for the redshiftmanifest
sample project I'm seeing the following error:
[java] com.amazonaws.services.kinesis.model.InvalidArgumentException: Status Code: 400, AWS Service: AmazonKinesis, AWS Request ID: ba19e440-b6bd-11e3-a5c0-c73f0bed95c2, AWS Error Code: InvalidArgumentException, AWS Error Message:
StartingSequenceNumber 49537979178794861780081237450294896636708704732336619521 used in GetShardIterator on shard shardId-000000000000 in stream primaryManifestStream under account xxxxxxxxxxx is invalid because it did not come from this stream.
Here is more output from the terminal:
[java] INFO: Initialization complete. Starting worker loop.
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 needed 1 leases but none were expired, so it will steal lease shardId-000000000000 from b8c3c4f597fd6f29:4c01183b:1450a7aca8c:-8000
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 saw 2 total leases, 0 available leases, 2 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 successfully took 1 leases: shardId-000000000000
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog infoForce
[java] INFO: Created new shardConsumer for shardId: shardId-000000000000, concurrencyToken: 8add86f6-1135-4511-b5a4-717bcf7ffbc8
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
[java] INFO: No need to block on parents [] of shard shardId-000000000000
[java] Mar 28, 2014 2:12:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000000 with 49537979178794861780081237450294896636708704732336619521
[java] Mar 28, 2014 2:12:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask call
[java] SEVERE: Caught exception:
[java] com.amazonaws.services.kinesis.model.InvalidArgumentException: Status Code: 400, AWS Service: AmazonKinesis, AWS Request ID: ba19e440-b6bd-11e3-a5c0-c73f0bed95c2, AWS Error Code: InvalidArgumentException, AWS Error Message: StartingSequenceNumber 49537979178794861780081237450294896636708704732336619521 used in GetShardIterator on shard shardId-000000000000 in stream primaryManifestStream under account xxxxxxxxxxx is invalid because it did not come from this stream.
[java] at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
[java] at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
[java] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
[java] at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2152)
[java] at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:406)
[java] at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getIterator(KinesisProxy.java:237)
[java] at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.getIterator(MetricsCollectingKinesisProxyDecorator.java:121)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getIterator(KinesisDataFetcher.java:142)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.advanceIteratorAfterSequenceNumber(KinesisDataFetcher.java:104)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.advanceIteratorAfter(KinesisDataFetcher.java:122)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.initialize(KinesisDataFetcher.java:94)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:67)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
[java] at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[java] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[java] at java.lang.Thread.run(Thread.java:744)
BUILD FAILED
/Users/aconbere/Packages/amazon/amazon-kinesis-connectors/src/main/samples/redshiftmanifest/build.xml:25: /Users/aconbere/Packages/amazon/amazon-kinesis-connectors/src/aws-java-sdk-1.6.9/third-party does not exist.
it's not clear to me exactly how the project expects those files to be present. Should I just got get the aws sdk and put it there?
Hi we are actively working on reading Kinesis data and storing it in HBase(EMR). Is there a Java connector example/code sample for this. Thanks.
Can you provide Tutorial of using this Library for Dynamodb??
hadoop@ip-10-102-180-39:/mnt/streamingtest/amazon-kinesis-connectors/src/main$ ant -buildfile samples/s3/build.xml
Buildfile: /mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml
run:
[mkdir] Created dir: /mnt/streamingtest/amazon-kinesis-connectors/src/main/build
[javac] /mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml:45: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
[javac] Compiling 56 source files to /mnt/streamingtest/amazon-kinesis-connectors/src/main/build
BUILD FAILED
/mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml:45: /third-party does not exist.
Total time: 0 seconds
We are currently using the S3Emitter to KCL to transfer data to a bucket. The current file format is <firstSeq>-<lastSeq>
.
The bucket grows to contain thousands+ files really quickly and we found it really useful to prefix the file format with yyyy/MM/dd/HH/.
Is this something the AWS team would be find useful and be interested in adding into this project?
Hi Team,
We are evaluating this connector library, as of now we are sending records in Kinesis at a rate of 3MP/s
and using this library we are saving data to S3.
Connector library works well when data ingestion is slow but as we increase speed, connector library continues with same speed of saving records...
Is there any way we can scale it out or auto scaling?
The requirements state the Elasticsearch connector depends on Elasticsearch 1.2.1
But that version is about a year old, and they're now all the way up to 1.6.x
Not sure if it's an easy change or fairly involved to upgrade.
Manually, I imported JSON data into Redshift by doing:
create table ...
commandCOPY
commandMy copy command look something like:
copy TABLE_NAME
from PATH_TO_S3_OBJECT
credentials ...
json 'PATH_TO_S3_OBJECT_JSONPATH_FILE
My json
argument pointed to a file that looked like:
{
"jsonpaths": [
"$.name",
"$.phone_number"
]
}
Note that I used the the Copy JSON approach in order to copy JSON data into Redshift columns.
Could you please tell me how to use this library for importing JSON into Redshift?
We would like to shop the application log (for this KCL app) to cloud watch. Does it generate any specific log file? If not, can I at least add it when running the app....
Would it be possible to get the "Elasticsearch.template" CloudFormation script referenced in src/main/samples/elasticsearch/ElasticsearchSample.properties added?
It is referenced as being available in the blog post announcing the Elasticsearch connector:
You can use the CloudFormation template in our sample to quickly create an Elasticsearch cluster on Amazon Elastic Compute Cloud (EC2), fully managed by Auto Scaling.
Would be nice if we had the jarfile for this on maven central (sans samples).
I ran the following command:
/opt/storm/bin/storm jar /shared/KinesisStormSpout27.jar SampleTopology sample.properties RemoteMode
Getting an error message on
Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
I checked my KinesisStormSpout27.jar file package which I built with Eclipse, converted to Maven Java Project, loaded dedendencies, and I know and verified that this "missing" file is there in packaged jar , inside aws-java-sdk-1.7.13.jar
Any help is appreciated.
Sam
See complete error below:
[vagrant@supervisor1 ~]$ /opt/storm/bin/storm jar /shared/KinesisStormSpout27.jar SampleTopology sample.properties RemoteMode
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/kryo-2.21.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/commons-io-2.4.jar:/opt/storm/lib/httpclient-4.3.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.