Giter VIP home page Giter VIP logo

amazon-kinesis-connectors's People

Contributors

afitzgibbon avatar alexconlin avatar cory-bradshaw avatar dependabot[bot] avatar gauravgh avatar jamesiri avatar jawsthegame avatar jganoff avatar manango avatar pfifer avatar rmahfoud avatar tkawachi avatar wanis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amazon-kinesis-connectors's Issues

Compatibility with AWS Kinesis Aggregator library?

Hello Team,

I am using this library to store data from Kinesis stream to S3.

I appreciate the overall design of the library, but then I want to aggregate some data (e.g. calculate sum of timespan group by visitor id) before uploading it to S3.

After some googling I come to find another library https://github.com/awslabs/amazon-kinesis-aggregators but I am not sure how to use them together with the connector library.

I guess I should follow the instructions under this section: https://github.com/awslabs/amazon-kinesis-aggregators#integrating-aggregators-into-existing-applications

but where should I put the aggregate(), checkpoint(), initialize(), shutdown() calls? For which class(es) should I extend to do so ?

amazon redshift connector deployment

Hi All,

I've managed to get the amazon redshift connector running locally on my virtual machine, however, I would like to use this library as part of our production deployment.

Does anyone have an example of what the deployment topology would look like? For example, what is the throughput to Redshift when running this library on single EC2 instance? Does it need to be deployed using a distributed mechanism? If it can be distributed, will it integrate with Apache Spark (Streaming)?

Thanks in advanced for any help, Mike.

ITransformer: Using java.lang.Exception instead of IOException

Hi

I feel that defining toClass & fromClass to throw IOException is very constraining.
In case, I am performing some validation and want to throw some custom Exception I cannot do that.
Also the KinesisConnectorRecordProcessor only handles IOExceptions, its even handle any other Runtime Exceptions that may occur. This is not a valid reason to stop processing the rest of the records.
Hence I request you to change the use java.lang.Exception or at the very least use RuntimeExceptions instead everywhere

Emitters should be more extensible

I'm working on a project where we need to customize the library a bit. This is quite hard to do because lots of methods are private or some fields are final.

Some examples:

Problem 1: Partition key

  • Requirement: Use a custom partition key to publish files to the manifest stream. This may be required when you need to run COPY commands in parallel to different tables.
//S3ManifestEmitter.java
// Use constant partition key to ensure file order
putRecordRequest.setPartitionKey(manifestStream);
  • Problem: To change that piece of code, we had to copy-paste the whole emit method.

Problem 2: Mock S3Client

  • Requirement: I want to unit test a module that extends S3Emitter. For that, I want to mock AmazonS3Client so it does not call the real service.
//S3Emitter.java
 public S3Emitter(KinesisConnectorConfiguration configuration) {
        s3Bucket = configuration.S3_BUCKET;
        s3Endpoint = configuration.S3_ENDPOINT;
        s3client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
        if (s3Endpoint != null) {
            s3client.setEndpoint(s3Endpoint);
        }
    }
  • Problem: There is no way to mock the S3Client because it is created inside the constructor. There is no constructor that receives the client. I ended up copy-pasting the whole class.

Runtime Exceptions allow buffer to grow unbounded

Hi,

If a runtime exception happens in our emitter code (preventing buffer.clear()) it bubbles out and is swallowed in the KCL ProcessTask.java. The KCL continues feeding new records to the buffer. This allows the buffer to grow unbounded.

Then if the application recovers (before OutOfMemory), the large buffer is flushed all at once.

Would be great if there was ability to:

  1. Pause processing/respond to backpressure if we detect some issue, and resume once it goes away
  2. Have KinesisConnectorRecordProcessor.java: transformToOutput(buffer) use the buffer configuration to cap how much data is flushed. ie: only output n items, or n bytes of items at a time.

Uncertain How to Use `redshiftbasic` Exaple

I'd like to use the redshiftbasic example in order to hook up my Kinesis Stream to dump into Redshift:

  • either every N minutes
  • when the bucket fills up to N bytes
  • etc.

It looks like, per the docs, that using the redshiftbasic example is the way to go.

However, I'm not sure how to use this code.

Do I simply need to take the children of the folder, amazon-kinesis-connectors/src/main/samples/redshiftbasic? Then, I need to modify a few of the source files, such as KinesisMessageModelRedshiftTransformer.java, to match my data-set?

When I run the build script via ant, do I run it once and then the Kinesis stream is wired up to always talk to Redshift?

Or does the program run continuously? If it stops (let's say due to CNTRL + Z, do I no longer get the functionality of Kinesis dumping into Redshift?

Also - how do I compile this project? Thanks

Incorrect handling of exceptions during checkpointing.

I have taken the S3 emitter sample and modified it to our needs. The core library code (under src/main/java) is unmodified. I left it running overnight for several nights for stability testing and encountered the following in the logs one morning (and have been unable to reproduce since):

2014-03-25 03:45:04,105 63555519 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2014-03-25 03:45:04,198 63555612 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 successfully took 1 leases: shardId-000000000000
2014-03-25 03:45:06,339 63557753 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2014-03-25 03:45:06,431 63557845 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 successfully took 1 leases: shardId-000000000000
[snip]
2014-03-25 03:49:13,416 63804830 INFO  [pool-1-thread-3] com.mycompany.mypackage.S3Emitter - Successfully emitted 1002 records to S3 in s3://my-bucket/my-stream/2014-03-25/03/startid-stopid
2014-03-25 03:49:13,450 63804864 INFO  [pool-1-thread-3] com.amazonaws.services.kinesis.leases.impl.LeaseRenewer - Worker abcd:-1234:efgh:-8000 refusing to update lease with key shardId-000000000000 because concurrency tokens don't match
2014-03-25 03:49:13,451 63804865 ERROR [pool-1-thread-3] com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor - com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard

Last set of messages kept repeating. It continued to process items, but Kinesis wasn't acknowledging the processing. When I restarted the program, it re-processed all of those items. I have no idea what caused the problem has I have left it running for days on end with no problems, and the same levels of load going through Kinesis.

It appears that it was actually continuing to process new items instead of reprocessing the same set over and over again.

I have narrowed the problem down to KinesisConnectorRecordProcessor.emit(IRecordProcessorCheckpointer,List). https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorRecordProcessor.java#L134

emitter.emit on line 139 did not encounter any problems and returned an empty list since nothing failed.
The call to checkpointer.checkpoint(); on line 153 threw a ShutdownException, which was then properly caught.
The call to emitter.fail(unprocessed) in the catch block on 157 passed in the empty list returned above.

It would seem to me that if checkpointer.checkpoint() throws, emitter.fail() should be called with all emitItems. Additionally, at least if a ShutdownException is encountered, it would be nice to attempt to re-initialize the Kinesis consumer.

In the mean time, we should be able to make do with terminating our program any time emitter.fail() is called, and have a supervisor simply restart the program when it terminates.

Getting below error when i try to run dynamodb sample.

Getting below error when i try to run dynamodb sample. Thanks in advance.

ava] INFO: Created new shardConsumer for shardId: shardId-000000000000, concurrencyToken: 800bb2cc-1a82-46b9-8ec8-f5d6287c9a69
[java] Nov 25, 2014 4:18:13 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
[java] INFO: No need to block on parents [] of shard shardId-000000000000
[java] Nov 25, 2014 4:18:13 PM com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable publishMetrics
[java] INFO: Successfully published 20 datums.
[java] Nov 25, 2014 4:18:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000000 with TRIM_HORIZON
[java] Nov 25, 2014 4:18:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000001 with TRIM_HORIZON
[java] Nov 25, 2014 4:18:16 PM com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter performBatchRequest
[java] SEVERE: Amazon DynamoDB Client could not perform batch request
[java] com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: PCQTJ5602VPM99LCCU7EHGUUFFVV4KQNSO5AEMVJF66Q9ASUAAJG)
[java] at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1077)
[java] at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:725)
[java] at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460)
[java] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295)
[java] at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3106)
[java] at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.batchWriteItem(AmazonDynamoDBClient.java:771)
[java] at com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter.performBatchRequest(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter.emit(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.emit(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.processRecords(Unknown Source)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:125)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
[java] at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[java] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[java] at java.lang.Thread.run(Thread.java:744)

ElasticSearch connector does not work against AWS-ES service

It appears that the connector does not work with ES as managed by aws. Although the initial connect appears to be succesful (using endpoint at port 443), the first call after makes it disconnect again:

2016-05-12 12:40:49,485 @main INFO ElasticsearchEmitter ElasticsearchEmitter using elasticsearch endpoint search-<...>.eu-west-1.es.amazonaws.com:443 ElasticsearchEmitter.java(118)
2016-05-12 12:40:49,640 @main INFO plugins [Leap-Frog] loaded [], sites [] PluginsService.java(151)
2016-05-12 12:41:44,595 @LeaseCoordinator-1 INFO LeaseTaker Worker cfe43d56bde02857:-651163f0:154a48db5ff:-8000 saw 36 total leases, 35 available leases, 1 workers. Target is 36 leases, I have 1 leases, I will take 35 leases LeaseTaker.java(403)
2016-05-12 12:42:51,789 @main INFO transport [Leap-Frog] failed to get node info for [#transport#-1][pbox-VirtualBox][inet[search-<...>.eu-west-1.es.amazonaws.com/52.17.232.195:443]], disconnecting... TransportClientNodesService.java(398)
org.elasticsearch.transport.SendRequestTransportException: [][inet[search-<...>.eu-west-1.es.amazonaws.com/52.17.232.195:443]][cluster:monitor/nodes/info]
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:286)
at org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:243)
at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:376)

A similar issue is reported @ amazon-archives/cloudwatch-logs-subscription-consumer#9

Perhaps the elasticsearch-hadoop (spark) receiver gives some hints how to approach this: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/reference.html; specifically the option:

conf.set("es.nodes.wan.only", "true"); (https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html)

PostgreSQL driver for redshiftmanifest sample

From the directions:

https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/samples/redshiftmanifest/build.xml#L4

Downloaded the PG jdbc driver http://jdbc.postgresql.org/download.html and put in the following directory:

src/jdbc/lib/postgresql-9.3-1101.jdbc41.jar

Added the driver path to build.xml

<path id="samples.classpath">
  /* removed */
  <fileset dir="${basedir}/../jdbc/lib" includes="**/*.jar" />
</path>

In RedshiftManifestSample.properties i have the URL:

redshiftURL = jdbc:postgresql://HOST_NAME_REMOVED.us-east-1.redshift.amazonaws.com

When I run the program with ant run is see the following:

[java] Mar 28, 2014 1:21:48 PM samples.utils.RedshiftUtils tableExists
[java] SEVERE: java.sql.SQLException: No suitable driver found for jdbc:postgresql://HOST_NAME_REMOVED.us-east-1.redshift.amazonaws.com
[java] Exception in thread "main" java.lang.IllegalStateException: Could not create Redshift file table kinesisFiles
[java]  at samples.KinesisConnectorExecutor.createRedshiftFileTable(Unknown Source)
[java]  at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
[java]  at samples.KinesisConnectorExecutor.<init>(Unknown Source)
[java]  at samples.redshiftmanifest.RedshiftManifestExecutor.<init>(Unknown Source)
[java]  at samples.redshiftmanifest.RedshiftManifestExecutor.main(Unknown Source)
[java] Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgresql://ht-redshift-test.chzuqbjvtyx7.us-east-1.redshift.amazonaws.com
[java]  at java.sql.DriverManager.getConnection(DriverManager.java:596)
[java]  at java.sql.DriverManager.getConnection(DriverManager.java:187)
[java]  at samples.utils.RedshiftUtils.createRedshiftTable(Unknown Source)
[java]  ... 5 more

Handling duplicate records

We are using the latest build to read messages from a multi-shard Kinesis stream using workers running across multiple EC2 instances and write them to S3. In this context, please see the following doc:

http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-duplicates.html

It talks about cases where we can end up with reading duplicate records from the stream. The S3Emitter uses the buffer.firstSeqNumber and buffer.lastSeqNumber to determine the S3 filename. In the case of retries, this approach might not guarantee duplicate removal as the buffer can be filled differently with a different size or count. The approach suggested by the doc does not work in the case of S3Emitter as there is no way to control the exact number of records in the buffer (due to the emit being based on count, time OR size). Any thoughts on how we can overcome this limitation?

Thanks.

Compiling

I've spent a ton of time trying to get anything to build with this project. There are no clear instructions to build the sample application. You try, and then you will find that you need the specific jar file of the Kinesis Client, placed into a specific location, from here: #1

Then, you find that it still fails because it wants an "external" directory in the source tree. Create that empty dir, and try again.

Next, the problem is 29 errors, e.g.:

ElasticsearchEmitter.java:241: error: cannot find symbol
    [javac]         } else if (response.getStatus().equals(ClusterHealthStatus.GREEN)) {
    [javac]                                                ^
    [javac]   symbol:   variable ClusterHealthStatus
    [javac]   location: class ElasticsearchEmitter   

Another possible clue:

warning: Supported source version 'RELEASE_6' from annotation processor     'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotation    Processor' less than -source '1.7'

I am using java 1.7, but given this error, even tried version 1.6 which did not resolve. I also tried both the open sdk and the oracle sdk.

All I want to do is compile a sample file and move on, pulling out my hair and stabbing my eyeballs out in frustration here.....

Monitoring and reschudling

Hi,
I am trying to implement kinesis-s3 connector. my question is how I can monitor job (production perspective) and in case of any exception how I can take action (send mail, reschedule job).
Is cloudwatch good option ? What are other recommended ways

Thanks
Ankur

Understanding `redshiftbasic`

At a high-level, what does this sample do?

I am trying to run it, but I admit that, if I understand what it did at a high-level, I think that it'd help me to run the sample.

Does it populate data from users.txt into S3, and then put the data into Kinesis? Finally, it puts the records into Redshift (from Kinesis) depending upon either a time or byte threshold?

I'm confused what it does at a high-level.

Additionally, I had modified src/main/samples/redshiftbasic/RedshiftBasicSample.properties to create a Redshift cluster + Kinesis stream.

Yet, after I run ant run, I don't see the stream or cluster created when running aws kinesis list-streams or look at AWS Management Console's Cluster view for Redshift.

unicode handling

Either the attached code or the ant configuration does not properly handle unicode characters when writing to output files. Did anyone experience this issue and find appropriate lines of code that need to be modified to handle unicode characters?

`Unable to load AWS credentials from any provider in the chain` error message when try and run any of the samples

I've been trying unsuccessfully to run the different samples e.g. the S3 sample.

To do that, I have:

  1. Created an AwsCredentials.properties file in each of the sample folders (e.g. amazon-kinesis-connector/src/main/samples/s3/AwsCredentials.properties), with accessKey and secretKey fields set. (Note that this is an exact copy of the AwsCredentials.properties file I used to run the aws-java-sdk-1.6.12 samples successfully.
  2. Copied the aws-java-sdk-1.6.12 and KinesisClientLibrary into the locations specified in the build.xml file. (As an aside - this meant copying the libraries into amazon-kinesis-connector/src/aws-java-sdk-1.6.12/ and amazon-kinesis-connector/src/KinesisClientLibrary/lib/amazon-kinesis-client-1.0.0.jar which was a bit weird...)
  3. Running ant run from the command line in e.g. amazon-kinesis-connector/main/samples/s3

I keep getting the following error:

[java] Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
     [java]     at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2120)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:296)
     [java]     at samples.utils.KinesisUtils.streamExists(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createAndWaitForStreamToBecomeAvailable(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createInputStream(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.<init>(Unknown Source)
     [java]     at samples.s3.S3Executor.<init>(Unknown Source)
     [java]     at samples.s3.S3Executor.main(Unknown Source)
     [java] Java Result: 1

It seems like the AwsCredentials.properties file is not being loaded... Is that right? What am I doing wrong? Any help would be much appreciated

HTTP proxy support

I am wondering if the library supports Http Proxy. We are trying to use the ES connector using AWS Cloudwatch Consumer and for some reason the API is unable to store the Lease information in DynamoDB. I get the HTTP connection timed out error.

I tried passing the proxy settings through JAVA_OPTIONS in call to the "java -jar" as System properties but that doesn't seem to make any difference.

Any known fixes for this?

Here's the Stack trace to the effect.

2016-02-20 02:57:10,741 INFO AmazonHttpClient - Unable to execute HTTP request: connect timed out
java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:656)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:524)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:403)
at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:118)
at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177)
at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:304)
at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:611)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:446)
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:706)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:467)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3240)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:1047)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:373)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:317)
at com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase.run(KinesisConnectorExecutorBase.java:95)
at com.amazonaws.services.logs.connectors.samples.elasticsearch.ElasticsearchConnector.main(ElasticsearchConnector.java:38)

Thanks

Hibernate - Redshift.

Hi , I have an application which is developed by using Struts and hibernate. now i am trying to connect AWS Redshift. My credentials are like (in Hibernate cfg file)
1)driver_class: org.postgresql.Driver
connection.url: jdbc:postgresql://host_name.redshift.amazonaws.com:5439/user
dialect: org.hibernate.dialect.PostgreSQL82Dialect

2)driver_class: com.amazon.redshift.jdbc4.Driver
connection.url: jdbc:redshift://host_name.redshift.amazonaws.com:5439/user
dialect: org.hibernate.dialect.PostgreSQL82Dialect(using this sincedont know the dialect name here)

both the cases I am getting
Error: Could not open connection: No suitable driver found for jdbc:postgresql:

I have added redshift JDBC & postgreysql jars also. I have no idea on this can anybody help me on this .

Kinesis back-pressure applied from the KCL

If my emitter is down for some reason, I see the size of my buffer growing larger and larger until my VM runs out of memory. The reason is the record processor continues to read records from Kinesis, regardless of the size of my buffer. This separate thread is reading the records from Kinesis and draining to my buffer continuously, whether I can emit the records at that moment or not.

Is there some way to provide back-pressure information to the thread that is polling Kinesis for additional records in these cases? Wouldn't any process that emits records much slower than they are read from Kinesis end up in an OoM situation?

We use batch semantics in our emitter to write to Elasticsearch which can handle about a 10MB payload, so we restrict our in-memory buffer to 10MB. However, if Elasticsearch is down for some reason for a period of a minute or so, our buffer grows to 100MB in size. Doesn't anyone who uses the KCL have to consider segmenting the records passed to their emitter because they could be passed, many, many more records than they expect given their buffer configuration parameters?

Emitter for Amazon Elasticsearch Service

For writing to Amazon Elasticsearch Service, we should put signature v4 to our request. Do you plan to support that? If not, I'll try to implement it. Basic idea is implementing AmazonElasticsearchServiceEmitter for requesting Amazon Elasticsearch Service. ElasticsearchObject and ElasticsearchTransformer may be works well with Amazon Elasticsearch Service too.

KinesisConnectorRecordProcessor

How to handle the case where not all records in stream have same processing method? Do we need to override the processRecords method in KinesisConnectorRecordProcessor to add custom processing?

Add a fail callback when the message can not be transformed.

The emitter interface contains a #fail(List records) method. That's quite useful because if for some reason one message cannot be emitted after N retries, we just publish that to an sqs dead letter queue.

We would like to do the same but also when the message cannot be transformed to T. At the moment, if the transformer throws an exception, the KinesisConnectorRecordProcessor class just logs the exception and that's it.

It would be cool if in addition to log the exception, it can call a callback in the transformer or in the interface that you guys think is better, so then we can do something with the messages that can not be transformed (for example, invalid json, or a json without a field that the pipeline assumes present).

Unable to load AWS credentials from any provider in the chain

I am trying to run the RedshiftBasic sample and I have included aws.accessKeyId and aws.secretKey properties at the end of the RedshiftBasicSample.properties file per the instructions. When I run 'ant run', the following trace comes out:

run:
    [javac] /Users/andrewguy/www/amazon-kinesis-connectors/src/main/samples/redshiftbasic/build.xml:48: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 28 source files to /Users/andrewguy/www/amazon-kinesis-connectors/src/main/build
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.8'
    [javac] 1 warning
     [java] Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
     [java]     at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2486)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:861)
     [java]     at samples.utils.KinesisUtils.streamExists(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createAndWaitForStreamToBecomeAvailable(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createInputStream(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.<init>(Unknown Source)
     [java]     at samples.redshiftbasic.RedshiftBasicExecutor.<init>(Unknown Source)
     [java]     at samples.redshiftbasic.RedshiftBasicExecutor.main(Unknown Source)
     [java] Java Result: 1

BUILD SUCCESSFUL
Total time: 3 seconds

What am I missing?

Thanks.

Time-based buffer sample

In the samples provided, we have the following property defined in s3.properties file:

bufferMillisecondsLimit = 3600000

Flush when buffer exceeds 25 Amazon Kinesis records, 1 MB size limit or when time since last buffer exceeds 1 hour

This property seems to indicate that the buffer's contents will be emitted when the configured time elapses (among other limits). However, I do not see this property being referred anywhere in the source. Even the "BasicMemoryBuffer.java" uses only the other two limits (count & size) and not this time-based property. What would be the right place to implement the time-based emitting of records from the buffer? I checked KinesisConnectorRecordProcessor.java, but feel that check-pointing would be difficult without some global state. Any pointers would greatly help.

AWS ES service version conflict

The AWS ES service is using es 1.5.2, but your library are depend on 1.2.1.
There is an exception when I try to connect to AWS ES service.

org.elasticsearch.client.transport.NoNodeAvailableException: No node available

Any thought on this? I am not sure if I am right.

Incorrect bufferByteSizeLimit Calculation for Batched Kinesis Records

In some cases, data is batched on the client side before sending it to Kinesis via a PutRecord request. (For instance, the data blob may be a json array containing objects that should each be handled separately. I'm doing something like this, and looking to store each object on its own line in a newline-delimited json file on S3.) The connector library supports this by including both ITransformer and ICollectionTransformer interfaces and the processRecords method in KinesisConnectorRecordProcessor contains the following:

...
if (transformer instanceof ITransformer) {
    ITransformer<T, U> singleTransformer = (ITransformer<T, U>) transformer;
    filterAndBufferRecord(singleTransformer.toClass(record), record);
} else if (transformer instanceof ICollectionTransformer) {
    ICollectionTransformer<T, U> listTransformer = (ICollectionTransformer<T, U>) transformer;
    Collection<T> transformedRecords = listTransformer.toClass(record);
    for (T transformedRecord : transformedRecords) {
        filterAndBufferRecord(transformedRecord, record);
    }
}
...

And filterAndBufferRecord is implemented as:

private void filterAndBufferRecord(T transformedRecord, Record record) {
    if (filter.keepRecord(transformedRecord)) {
        buffer.consumeRecord(transformedRecord, record.getData().array().length, record.getSequenceNumber());
    }
}

Notice that filterAndBufferRecord uses record.getData().array().length as the size of the record. So in the case of transformer being an ICollectionTransformer implementation, each sub-record is passed to buffer.consumeRecord but with the size of the whole batch instead of the individual sub-record. This means the buffer will flush too often because the size it's keeping track of is inflated.

My workaround for now is just setting bufferByteSizeLimit to Integer.MAX_VALUE, and relying on bufferRecordCountLimit to flush the buffer.

GetShardIterator... invalid because it did not come from this stream

When running ant run for the redshiftmanifest sample project I'm seeing the following error:

[java] com.amazonaws.services.kinesis.model.InvalidArgumentException: Status Code: 400, AWS Service: AmazonKinesis, AWS Request ID: ba19e440-b6bd-11e3-a5c0-c73f0bed95c2, AWS Error Code: InvalidArgumentException, AWS Error Message:
StartingSequenceNumber 49537979178794861780081237450294896636708704732336619521 used in GetShardIterator on shard shardId-000000000000 in stream primaryManifestStream under account xxxxxxxxxxx is invalid because it did not come from this stream.

Here is more output from the terminal:

[java] INFO: Initialization complete. Starting worker loop.
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 needed 1 leases but none were expired, so it will steal lease shardId-000000000000 from b8c3c4f597fd6f29:4c01183b:1450a7aca8c:-8000
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 saw 2 total leases, 0 available leases, 2 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 successfully took 1 leases: shardId-000000000000
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog infoForce
[java] INFO: Created new shardConsumer for shardId: shardId-000000000000, concurrencyToken: 8add86f6-1135-4511-b5a4-717bcf7ffbc8
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
[java] INFO: No need to block on parents [] of shard shardId-000000000000
[java] Mar 28, 2014 2:12:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000000 with 49537979178794861780081237450294896636708704732336619521
[java] Mar 28, 2014 2:12:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask call
[java] SEVERE: Caught exception:
[java] com.amazonaws.services.kinesis.model.InvalidArgumentException: Status Code: 400, AWS Service: AmazonKinesis, AWS Request ID: ba19e440-b6bd-11e3-a5c0-c73f0bed95c2, AWS Error Code: InvalidArgumentException, AWS Error Message: StartingSequenceNumber 49537979178794861780081237450294896636708704732336619521 used in GetShardIterator on shard shardId-000000000000 in stream primaryManifestStream under account xxxxxxxxxxx is invalid because it did not come from this stream.
[java]  at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
[java]  at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
[java]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
[java]  at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2152)
[java]  at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:406)
[java]  at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getIterator(KinesisProxy.java:237)
[java]  at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.getIterator(MetricsCollectingKinesisProxyDecorator.java:121)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getIterator(KinesisDataFetcher.java:142)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.advanceIteratorAfterSequenceNumber(KinesisDataFetcher.java:104)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.advanceIteratorAfter(KinesisDataFetcher.java:122)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.initialize(KinesisDataFetcher.java:94)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:67)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
[java]  at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[java]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[java]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[java]  at java.lang.Thread.run(Thread.java:744)

Following directions for running the samples results in failed build.

BUILD FAILED
/Users/aconbere/Packages/amazon/amazon-kinesis-connectors/src/main/samples/redshiftmanifest/build.xml:25: /Users/aconbere/Packages/amazon/amazon-kinesis-connectors/src/aws-java-sdk-1.6.9/third-party does not exist.

it's not clear to me exactly how the project expects those files to be present. Should I just got get the aws sdk and put it there?

/third-party does not exist. BUILD FAILED

hadoop@ip-10-102-180-39:/mnt/streamingtest/amazon-kinesis-connectors/src/main$ ant -buildfile samples/s3/build.xml
Buildfile: /mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml

run:
[mkdir] Created dir: /mnt/streamingtest/amazon-kinesis-connectors/src/main/build
[javac] /mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml:45: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
[javac] Compiling 56 source files to /mnt/streamingtest/amazon-kinesis-connectors/src/main/build

BUILD FAILED
/mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml:45: /third-party does not exist.

Total time: 0 seconds

S3Emitter to allow folders

We are currently using the S3Emitter to KCL to transfer data to a bucket. The current file format is <firstSeq>-<lastSeq>.

The bucket grows to contain thousands+ files really quickly and we found it really useful to prefix the file format with yyyy/MM/dd/HH/.

Is this something the AWS team would be find useful and be interested in adding into this project?

How to scale it

Hi Team,

We are evaluating this connector library, as of now we are sending records in Kinesis at a rate of 3MP/s
and using this library we are saving data to S3.
Connector library works well when data ingestion is slow but as we increase speed, connector library continues with same speed of saving records...
Is there any way we can scale it out or auto scaling?

Upgrade to a newer Elasticsearch version ?

The requirements state the Elasticsearch connector depends on Elasticsearch 1.2.1

But that version is about a year old, and they're now all the way up to 1.6.x

Not sure if it's an easy change or fairly involved to upgrade.

Using JSONPaths to `COPY` Objects into RedShift from Kinesis?

Manually, I imported JSON data into Redshift by doing:

  • create Redshift cluster
  • log onto the Redshift DB via JDBC driver
  • run a create table ... command
  • run the COPY command

My copy command look something like:

copy TABLE_NAME
from PATH_TO_S3_OBJECT
credentials ...
json 'PATH_TO_S3_OBJECT_JSONPATH_FILE

My json argument pointed to a file that looked like:

{
    "jsonpaths": [
        "$.name",
        "$.phone_number"
     ]
}

Note that I used the the Copy JSON approach in order to copy JSON data into Redshift columns.

Could you please tell me how to use this library for importing JSON into Redshift?

Application logs

We would like to shop the application log (for this KCL app) to cloud watch. Does it generate any specific log file? If not, can I at least add it when running the app....

Missing Elasticsearch.template CloudFormation script for Elasticsearch sample

Would it be possible to get the "Elasticsearch.template" CloudFormation script referenced in src/main/samples/elasticsearch/ElasticsearchSample.properties added?

It is referenced as being available in the blog post announcing the Elasticsearch connector:

You can use the CloudFormation template in our sample to quickly create an Elasticsearch cluster on Amazon Elastic Compute Cloud (EC2), fully managed by Auto Scaling.

I'm getting an AWSCredentials.properties file not found error

I ran the following command:

/opt/storm/bin/storm jar /shared/KinesisStormSpout27.jar SampleTopology sample.properties RemoteMode

Getting an error message on

Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider

I checked my KinesisStormSpout27.jar file package which I built with Eclipse, converted to Maven Java Project, loaded dedendencies, and I know and verified that this "missing" file is there in packaged jar , inside aws-java-sdk-1.7.13.jar

Any help is appreciated.

Sam

See complete error below:

[vagrant@supervisor1 ~]$ /opt/storm/bin/storm jar /shared/KinesisStormSpout27.jar SampleTopology sample.properties RemoteMode
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/kryo-2.21.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/commons-io-2.4.jar:/opt/storm/lib/httpclient-4.3.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.