Giter VIP home page Giter VIP logo

kafka-connect-sqs's Introduction

kafka-connect-sqs

The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue).

Compatibility matrix

Connector version Kafka Connect API AWS SDK
1.4 3.1.1 1.12.241
1.5 3.3.2 1.12.409
1.6 3.4.1 1.12.669

Building the distributable

You can build the connector with Maven using the standard lifecycle goals:

mvn clean
mvn package

Source connector

SQS source connector reads from an AWS SQS queue and publishes to a Kafka topic.

Required properties:

  • topics: Kafka topic to be written to.
  • sqs.queue.url: URL of the SQS queue to be read from.

Optional properties:

  • sqs.region: AWS region of the SQS queue to be read from.
  • sqs.endpoint.url: Override value for the AWS region specific endpoint.
  • sqs.max.messages: Maximum number of messages to read from SQS queue for each poll interval. Range is 0 - 10 with default of 1.
  • sqs.wait.time.seconds: Duration (in seconds) to wait for a message to arrive in the queue. Default is 1.
  • sqs.message.attributes.enabled: If true, it gets the SQS MessageAttributes and inserts them as Kafka Headers (only string headers are currently supported). Default is false.
  • sqs.message.attributes.include.list: The comma separated list of MessageAttribute names to be included, if empty it includes all the Message Attributes. Default is the empty string.
  • sqs.message.attributes.partition.key: The name of a single AWS SQS MessageAttribute to use as the partition key. If this is not specified, default to the SQS message ID as the partition key.

Sample IAM policy

When using this connector, ensure the authentication principal has privileges to read messages from the SQS queue.

{
  "Version": "2012-10-17",
  "Statement": [{
    "Sid": "kafka-connect-sqs-source",
    "Effect": "Allow",
    "Action": [
      "sqs:DeleteMessage",
      "sqs:GetQueueUrl",
      "sqs:ListQueues",
      "sqs:ReceiveMessage"
    ],
    "Resource": "arn:aws:sqs:*:*:*"
  }]
}

Sink connector

SQS sink connector reads from a Kafka topic and publishes to an AWS SQS queue.

Required properties:

  • topics: Kafka topic to be read from.
  • sqs.queue.url: URL of the SQS queue to be written to.

Optional properties:

  • sqs.region: AWS region of the SQS queue to be written to.
  • sqs.endpoint.url: Override value for the AWS region specific endpoint.
  • sqs.message.attributes.enabled: If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false.
  • sqs.message.attributes.include.list: The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string.

Sample SQS queue policy

Define a corresponding SQS queue policy that allows the connector to send messages to the SQS queue:

{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-2:<AWS_ACCOUNT>:my-queue/SQSDefaultPolicy",
  "Statement": [
    {
      "Sid": "kafka-connect-sqs-sink",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Your principal ARN>"
      },
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:us-west-2:<AWS_ACCOUNT>:my-queue"
    }
  ]
}

Sample IAM policy

When using this connector, ensure the authentication principal has privileges to read messages from the SQS queue.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "kafka-connect-sqs-sink",
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage"
      ],
      "Resource": "arn:aws:sqs:*:*:*"
    }
  ]
}

AWS authentication

By default, the connector uses the AWS SDK DefaultAWSCredentialsProviderChain to determine the identity of the connector. This works well in simple scenarios when the connector gains privileges granted to the Kafka Connect worker (i.e., environment variables, EC2 instance metadata, etc.)

When the identity of the connector must be separate from the worker, supply an implementation of sqs.credentials.provider.class in the worker's classpath. There are two implementations directly included within this library:

  • com.nordstrom.kafka.connect.auth.AWSUserCredentialsProvider
  • com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider

AWSUserCredentialsProvider

Use this credentials provider to cause the connector to authenticate as a specific IAM user.

Required properties:

  • sqs.credentials.provider.class: Must be com.nordstrom.kafka.connect.auth.AWSUserCredentialsProvider
  • sqs.credentials.provider.accessKeyId: AWS access key of the IAM user
  • sqs.credentials.provider.secretKey: AWS secret key of the IAM user

AWSAssumeRoleCredentialsProvider

Use this credentials provider to cause the connector to assume an IAM role.

Required properties:

  • sqs.credentials.provider.class: Must be com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider
  • sqs.credentials.provider.role.arn: ARN of the IAM role to assume
  • sqs.credentials.provider.session.name: A session name specific to this connector

Optional properties:

The IAM role will have a corresponding trust policy. For example:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<AWS_ACCOUNT>:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "my-external-id"
        }
      }
    }
  ]
}

Running the connector

This example demonstrates using the sink connector to send a message to an SQS queue from Kafka.

  • Setup an SQS queue
  • Setup Kafka. Use the cluster defined in docker-compose.yaml if you don't have one
  • Customize the files in the config directory; for example, config/sink-connector.properties.example

Now, start the sink connector in standalone mode:

$KAFKA_HOME/bin/connect-standalone.sh \
  config/connect-worker.properties config/sink-connector.properties

Use a tool to produce messages to the Kafka topic.

bin/kafka-console-producer --bootstrap-server localhost:9092 \
    --topic hello-sqs-sink \
    --property parse.headers=true \
    --property 'headers.delimiter=\t'
>test:abc\t{"hello":"world"}

kafka-connect-sqs's People

Contributors

adamweyant avatar aksinghgrd avatar dependabot[bot] avatar dylanmei avatar koosie0507 avatar peterehik-px avatar rajki avatar satyam1210 avatar sgtpepperlhcb avatar snyk-bot avatar twoi avatar valmoz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-sqs's Issues

SQS connector doesn't work without AWS_REGION env variable.

Hi, so I was trying to run the source connector in MSK connect and kept getting errors when trying to connect because it couldn't figure out what region my SQS queue was in. When using MSK connect, we can only set the worker/connector configurations, there's no way to set environment variables so I created this PR to allow the user to set the sqs region using a connector configuration.

I've tested this out locally to make sure it works and it does. This connector doesn't work on MSK connect when using IAM auth but it's not because of anything wrong with the connector, MSK connect is just weird and buggy. I created a support ticket with AWS like 3 days ago and they can't figure out why the connector doesn't work. The issue has to do with the plugin classloader, my guess is they put the aws-iam-auth jar in the plugin path instead of in the kafka/libs folder or some other classspath. It's a known issue: aws/aws-msk-iam-auth#11

Anyway as soon as I brought up a local kafka connect instance, it worked perfectly with IAM auth, I was able to connect to sqs and sink messages to my kafka topic.

Thanks y'all for making this open source, it's been super helpful to me already.

Here's my PR: https://github.com/Nordstrom/kafka-connect-sqs/pull/21/files

Usage of this library in Amazon Managed Streaming for Apache Kafka (MSK) and MSK Connect

We are trying to use the SQS connector with SQS as a source and AWS MSK as the destination using MSK Connect (https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect.html)

MSK Connect explicitly says:
MSK Connect uses Kafka Connect 2.7.1, an open-source framework for connecting Apache Kafka clusters with external systems such as databases, search indexes, and file systems

Reviewing the latest versions of this library, they state:
Running the connector on versions of Kafka Connect prior to 3.0 is not recommended.

We haven't been successful in getting the connector running and are debugging, but if I'm reading this correct, should I assume that this wouldn't be a recommended configuration? That this library isn't really suitable for sending messages to AWS MSK using MSK Connect?

Timeout error on Source

We're currently using the connector from AWS MSK.
While executing the load test, we encountered the following error:

WorkerSourceTask{id=test-1-5-7-0} flushing 7423 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)

WorkerSourceTask{id=test-1-5-7-0} Failed to flush, timed out while waiting for producer to flush outstanding 6365 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509)

WorkerSourceTask{id=test-1-5-7-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:116)

Is there some config that we could apply to avoid this issue? or is it something related to the config of the MSK cluster?

Thanks
Mauro

Add the ability the write messages with a partition key

Our Kafka topics are set up with a key-based retention policy. The keys we want to use correspond to values from the payload on the SQS messages. Currently we have set up intermediate topics that the connector writes to and consumer/producers that read the messages and write to another topic with the correct partition key.

Is there appetite for a configuration of the connect agent that can set the partition key? In our case the message payload is JSON and I know that's not always the case. But I can see an implementation that relies on the MessageAttributes of the SQS messages to be useful and more generally applicable.

It's been a long time since I've written any Java but I'd be willing to try my hand at a PR if the feature is something could be accepted.

Cheers!

1.6.2 java.lang.NoClassDefFoundError: com/amazonaws/services/sqs/AmazonSQSClientBuilder

Hi,

As in the new version 1.6.2 sqs.endpoint.url was claimed to be truly optional (for which, thank you!), I wanted to update and verify this. Unfortunately, I'm running into some issues with the new version.

[2024-06-04 07:23:23,391] WARN .get-class:class=com.amazonaws.auth.DefaultAWSCredentialsProviderChain (com.nordstrom.kafka.connect.sqs.SqsClient)
[2024-06-04 07:23:23,391] ERROR WorkerSinkTask{id=xxxxxxxxx} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NoClassDefFoundError: com/amazonaws/services/sqs/AmazonSQSClientBuilder
  at com.nordstrom.kafka.connect.sqs.SqsClient.<init>(SqsClient.java:54)
  at com.nordstrom.kafka.connect.sqs.SqsSinkConnectorTask.start(SqsSinkConnectorTask.java:63)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)

It is running on a connect cluster, using confluent platform version 7.6. Connect cluster is running in EKS, where IRSA is being used. Configuration of my connector is as follows:

{
    "config": {
      "connector.class": "com.nordstrom.kafka.connect.sqs.SqsSinkConnector",
      "topics": "topic",
      "tasks.max": "2",
      "name": "connector",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry",
      "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
      "sqs.queue.url": "https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXX/kafka-connect-sink",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "sqs.message.attributes.enabled": true,
      "sqs.endpoint.url": ""
    },
    "name": "connector"
  }

Please let me know if you need any other information

MSK Connect failing with - Unable to execute HTTP request: Connect to sts.amazonaws.com:443

I'm trying to set up MSK connect with SQS as a source and MSK as a sink.

I tried to use AssumeRole as described in the README.md example.
I created an IAM role kafka-connect-sqs-role and gave it to the MSK connect and set up the credentials properties and got Unable to execute HTTP request: Connect to sts.amazonaws.com:443.

The credentials properties:

sqs.credentials.provider.session.name=my-kafka-connector-source-sqs-session
sqs.credentials.provider.role.arn=arn:aws:iam::<ACCOUNT-ID>:role/kafka-connect-sqs-role
sqs.credentials.provider.external.id=my-queue-external-id
sqs.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider

When I tried to remove the AssumeRole, like explained issue #30, I got the same error only with SQS instead of STS (Unable to execute HTTP request: Connect to sqs.amazonaws.com:443)

connect properties

connector.class=com.nordstrom.kafka.connect.sqs.SqsSourceConnector
sqs.wait.time.seconds=5
topics=event-via-sqs
tasks.max=3
sqs.credentials.provider.session.name=my-kafka-connector-source-sqs-session
sqs.max.messages=5
sqs.credentials.provider.role.arn=arn:aws:iam::<ACCOUNT-ID>:role/kafka-connect-sqs
sqs.queue.url=https://sqs.us-east-1.amazonaws.com/<ACCOUNT-ID>/matan-test-sqs.fifo
sqs.region=us-east-1
sqs.credentials.provider.external.id=my-queue-external-id
value.converter=org.apache.kafka.connect.storage.StringConverter
sqs.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider
key.converter=org.apache.kafka.connect.storage.StringConverter

kafka-connect-sqs-role

MSK permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:<ACCOUNT-ID>:cluster/matan-test-msk/7e6aa63b-3875-48ed-aad1-4d6c9fa7432d-22"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:<ACCOUNT-ID>:topic/matan-test-msk/7e6aa63b-3875-48ed-aad1-4d6c9fa7432d-22/*"
            ]
        }
    ]
}

SQS permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "sqs:*"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}

STS permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "sts:*",
            "Resource": "*"
        }
    ]
}

Exception log:

[Worker-0e2d8e6aecab5dfeb] [2022-07-21 07:56:47,598] ERROR [matan2|task-1] WorkerSourceTask{id=matan2-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-0e2d8e6aecab5dfeb] com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/54.239.29.25] failed: connect timed out
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1219)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1165)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.doInvoke(AWSSecurityTokenServiceClient.java:1727)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.invoke(AWSSecurityTokenServiceClient.java:1694)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.invoke(AWSSecurityTokenServiceClient.java:1683)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.executeAssumeRole(AWSSecurityTokenServiceClient.java:532)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.assumeRole(AWSSecurityTokenServiceClient.java:501)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.newSession(STSAssumeRoleSessionCredentialsProvider.java:348)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.access$000(STSAssumeRoleSessionCredentialsProvider.java:44)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:93)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:90)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.RefreshableTask.refreshValue(RefreshableTask.java:295)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:251)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:192)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:320)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:43)
[Worker-0e2d8e6aecab5dfeb] 	at com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider.getCredentials(AWSAssumeRoleCredentialsProvider.java:43)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1269)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:845)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:794)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2329)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2296)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2285)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.sqs.AmazonSQSClient.executeReceiveMessage(AmazonSQSClient.java:1715)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1683)
[Worker-0e2d8e6aecab5dfeb] 	at com.nordstrom.kafka.connect.sqs.SqsClient.receive(SqsClient.java:116)
[Worker-0e2d8e6aecab5dfeb] 	at com.nordstrom.kafka.connect.sqs.SqsSourceConnectorTask.poll(SqsSourceConnectorTask.java:80)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-0e2d8e6aecab5dfeb] Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/54.239.29.25] failed: connect timed out
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.conn.$Proxy42.connect(Unknown Source)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1346)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
[Worker-0e2d8e6aecab5dfeb] 	... 47 more
[Worker-0e2d8e6aecab5dfeb] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-0e2d8e6aecab5dfeb] 	at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
[Worker-0e2d8e6aecab5dfeb] 	at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
[Worker-0e2d8e6aecab5dfeb] 	at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
[Worker-0e2d8e6aecab5dfeb] 	... 63 more

Feature Request: Optional delete

Our use case is fairly unique, but I feel like this could also be helpful for other edge cases.

Currently, the https://github.com/Nordstrom/kafka-connect-sqs/blob/master/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java#L115 class is set to always delete, which can make both testing and monitoring solutions difficult.

For our case specifically, we would like to pseduo-replicate our SQS messages into kafka for analysis to gain visibility with SQL where SQS leaves us fairly blind and forces us to rely on logging or tracing.

If this were to delete the record on the consumer, this would cause production issues for us.

A simple feature flag or environment variable would be great.

Happy to provide a PR

Allow AmazonSQS client to be more configurable

Seems somewhat related to this issue.

It would be useful to be able to provide a provider class for the entire AmazonSQS client object to allow for a host of configuration issues rather than just the credentials provider class today.

My use case is to provide an endpoint configuration to use for local testing but I'm sure there are countless others (eg enabling SDK metrics). Examples of custom client configuration is here

https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/creating-clients.html

Perhaps this project could include an interface such as the following and allow for users to provide a class that implements the interface. config is the Kafka Connect name/value pairs config map, so that the client could be created based on client config and not have to rely on system properties or environment variables that might not be settable if using a hosted Kafka Connect such as MSK Connect.

public interface AmazonSQSProvider {
   AmazonSQS provide(Map<String, String> config);
}

Critical vulnerabilities in dependency jackson-databind:jar:2.6.7.4

Trivy report picking up critical vulnerabilities in com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.4, this is being pulled in by com.amazonaws:aws-java-sdk-core:jar:1.11.1034

+---------------------------------------------+------------------+----------+-------------------+--------------------------+---------------------------------------------------------------+
|                   LIBRARY                   | VULNERABILITY ID | SEVERITY | INSTALLED VERSION |      FIXED VERSION       |                             TITLE                             |
+---------------------------------------------+------------------+----------+-------------------+--------------------------+---------------------------------------------------------------+
| com.fasterxml.jackson.core:jackson-databind | CVE-2017-15095   | CRITICAL | 2.6.7.4           | 2.7.9.2, 2.8.10, 2.9.1   | jackson-databind: Unsafe                                      |
| (kafka-connect-sqs-1.3.0.jar)               |                  |          |                   |                          | deserialization due to                                        |
|                                             |                  |          |                   |                          | incomplete black list (incomplete                             |
|                                             |                  |          |                   |                          | fix for CVE-2017-7525)...                                     |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2017-15095                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2018-11307   |          |                   | 2.7.9.4, 2.8.11.2, 2.9.6 | jackson-databind: Potential                                   |
|                                             |                  |          |                   |                          | information exfiltration with                                 |
|                                             |                  |          |                   |                          | default typing, serialization                                 |
|                                             |                  |          |                   |                          | gadget from MyBatis                                           |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2018-11307                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2018-7489    |          |                   | 2.7.9.3, 2.8.11.1, 2.9.5 | jackson-databind: incomplete fix                              |
|                                             |                  |          |                   |                          | for CVE-2017-7525 permits unsafe                              |
|                                             |                  |          |                   |                          | serialization via c3p0 libraries                              |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2018-7489                          |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2019-14540   |          |                   | 2.9.10                   | jackson-databind:                                             |
|                                             |                  |          |                   |                          | Serialization gadgets in                                      |
|                                             |                  |          |                   |                          | com.zaxxer.hikari.HikariConfig                                |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-14540                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2019-14893   |          |                   | 2.8.11.5, 2.9.10         | jackson-databind:                                             |
|                                             |                  |          |                   |                          | Serialization gadgets in                                      |
|                                             |                  |          |                   |                          | classes of the xalan package                                  |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-14893                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2019-16335   |          |                   | 2.9.10                   | jackson-databind:                                             |
|                                             |                  |          |                   |                          | Serialization gadgets in                                      |
|                                             |                  |          |                   |                          | com.zaxxer.hikari.HikariDataSource                            |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-16335                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2019-16942   |          |                   | 2.9.10.1                 | jackson-databind:                                             |
|                                             |                  |          |                   |                          | Serialization gadgets in                                      |
|                                             |                  |          |                   |                          | org.apache.commons.dbcp.datasources.*                         |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-16942                         |
+                                             +------------------+          +                   +                          +---------------------------------------------------------------+
|                                             | CVE-2019-16943   |          |                   |                          | jackson-databind:                                             |
|                                             |                  |          |                   |                          | Serialization gadgets in                                      |
|                                             |                  |          |                   |                          | com.p6spy.engine.spy.P6DataSource                             |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-16943                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2019-17267   |          |                   | 2.9.10                   | jackson-databind: Serialization                               |
|                                             |                  |          |                   |                          | gadgets in classes of                                         |
|                                             |                  |          |                   |                          | the ehcache package                                           |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-17267                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2019-17531   |          |                   | 2.9.10.1                 | jackson-databind:                                             |
|                                             |                  |          |                   |                          | Serialization gadgets in                                      |
|                                             |                  |          |                   |                          | org.apache.log4j.receivers.db.*                               |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-17531                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2019-20330   |          |                   | 2.8.11.5, 2.9.10.2       | jackson-databind: lacks                                       |
|                                             |                  |          |                   |                          | certain net.sf.ehcache blocking                               |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2019-20330                         |
+                                             +------------------+----------+                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2018-5968    | HIGH     |                   | 2.7.9.5, 2.8.11.1, 2.9.4 | jackson-databind: unsafe                                      |
|                                             |                  |          |                   |                          | deserialization due to incomplete                             |
|                                             |                  |          |                   |                          | blacklist (incomplete fix                                     |
|                                             |                  |          |                   |                          | for CVE-2017-7525 and...                                      |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2018-5968                          |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2020-35490   |          |                   | 2.9.10.8                 | jackson-databind: mishandles the interaction                  |
|                                             |                  |          |                   |                          | between serialization gadgets and typing, related to          |
|                                             |                  |          |                   |                          | org.apache.commons.dbcp2.datasources.PerUserPoolDataSource... |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2020-35490                         |
+                                             +------------------+          +                   +                          +---------------------------------------------------------------+
|                                             | CVE-2020-35491   |          |                   |                          | jackson-databind: mishandles the interaction                  |
|                                             |                  |          |                   |                          | between serialization gadgets and typing, related to          |
|                                             |                  |          |                   |                          | org.apache.commons.dbcp2.datasources.SharedPoolDataSource...  |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2020-35491                         |
+                                             +------------------+          +                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2020-36518   |          |                   | 2.12.6.1, 2.13.2.1       | jackson-databind: denial of service                           |
|                                             |                  |          |                   |                          | via a large depth of nested objects                           |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2020-36518                         |
+                                             +------------------+----------+                   +--------------------------+---------------------------------------------------------------+
|                                             | CVE-2018-1000873 | MEDIUM   |                   | 2.9.8                    | jackson-modules-java8: DoS due                                |
|                                             |                  |          |                   |                          | to an Improper Input Validation                               |
|                                             |                  |          |                   |                          | -->avd.aquasec.com/nvd/cve-2018-1000873                       |
+---------------------------------------------+------------------+----------+-------------------+--------------------------+---------------------------------------------------------------+

AWS region parameter

Hello!
I'm trying to create the connector with AWS Managed Kafka Service (MSK) and facing the issue with region not defined:

[Worker-096e720ff778a1351] com.amazonaws.SdkClientException: Unable to find a region via the region provider chain. Must provide an explicit region in the builder or setup environment to supply a region.

As far as I understood Region could be passed via Environment only and not via Parameters, as per docker-compose example.

private final String AWS_REGION = "AWS_REGION";

https://github.com/Nordstrom/kafka-connect-sqs/blob/master/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java#L31

Please suggest if any chance to pass AWS_REGION via parameter, because AWS MSK seems doesn't support env variables ?

Missing SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY configuration in config def - SQS Source connector

org.apache.kafka.common.config.ConfigException: Unknown configuration 'sqs.message.attributes.partition.key' at org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:162) at org.apache.kafka.common.config.AbstractConfig.getString(AbstractConfig.java:197) at com.nordstrom.kafka.connect.sqs.SqsConnectorConfig.<init>(SqsConnectorConfig.java:33) at com.nordstrom.kafka.connect.sqs.SqsSourceConnectorConfig.<init>(SqsSourceConnectorConfig.java:51) at com.nordstrom.kafka.connect.sqs.SqsSourceConnectorTask.start(SqsSourceConnectorTask.java:61) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:277) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)

Missing configuration for SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY in class SqsSourceConnectorConfig while creating ConfigDef CONFIG_DEF

Deployment in AWS MSK Connect fails with sts "connect timed out"

Hello,

Thank you very much for this nice connector, I wish I could use it!
I am deploying it in AWS MSK Connect and the exception that I see is the following:

com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/X.Y.Z.V] failed: connect timed out

Any help will be appreciated! ๐Ÿ™

Configuration

The role that I have provided to the MSK Connector (test-kafka-connect-sqs-source-role) has more than enough permissions in the policies and the trust policy looks like this:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kafkaconnect.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        },
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

The trust in kafkaconnect.amazonaws.com is needed so the service can used this IAM role.

The configuration of the connector is the following:

name=TestSQSToKafka
connector.class=com.nordstrom.kafka.connect.sqs.SqsSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=event-via-sqs
tasks.max=3
sqs.wait.time.seconds=5
sqs.max.messages=5
sqs.queue.url=https://sqs.eu-west-1.amazonaws.com/123456789012/test-event
sqs.region=eu-west-1
sqs.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider
sqs.credentials.provider.role.arn=arn:aws:iam::123456789012:role/test-kafka-connect-sqs-source-role
sqs.credentials.provider.session.name=my-kafka-connector-source-sqs-session
sqs.credentials.provider.external.id=test-kafka-connect-sqs-source-role

Kafka authentication method is set to "None"

Versions

Apache Kafka version: 2.6.2
Apache Kafka Connect version: 2.7.1
kafka-connect-sqs: 1.4.0

Logs

The full exceptions is the following

 [2022-06-23 13:22:30,701] ERROR [TestSQSToKafka|task-0] WorkerSourceTask{id=TestSQSToKafka-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)"
 com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/X.Y.Z.V] failed: connect timed out
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1219)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1165)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.doInvoke(AWSSecurityTokenServiceClient.java:1727)
 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.invoke(AWSSecurityTokenServiceClient.java:1694)
 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.invoke(AWSSecurityTokenServiceClient.java:1683)
 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.executeAssumeRole(AWSSecurityTokenServiceClient.java:532)
 	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.assumeRole(AWSSecurityTokenServiceClient.java:501)
 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.newSession(STSAssumeRoleSessionCredentialsProvider.java:348)
 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.access$000(STSAssumeRoleSessionCredentialsProvider.java:44)
 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:93)
 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:90)
 	at com.amazonaws.auth.RefreshableTask.refreshValue(RefreshableTask.java:295)
 	at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:251)
 	at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:192)
 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:320)
 	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:43)
 	at com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider.getCredentials(AWSAssumeRoleCredentialsProvider.java:43)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1269)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:845)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:794)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
 	at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2329)
 	at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2296)
 	at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2285)
 	at com.amazonaws.services.sqs.AmazonSQSClient.executeReceiveMessage(AmazonSQSClient.java:1715)
 	at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1683)
 	at com.nordstrom.kafka.connect.sqs.SqsClient.receive(SqsClient.java:116)
 	at com.nordstrom.kafka.connect.sqs.SqsSourceConnectorTask.poll(SqsSourceConnectorTask.java:80)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 	at java.base/java.lang.Thread.run(Thread.java:829)
 Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/209.54.177.164] failed: connect timed out
 	at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
 	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
 	at jdk.internal.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 	at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
 	at com.amazonaws.http.conn.$Proxy42.connect(Unknown Source)
 	at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
 	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
 	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
 	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
 	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
 	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
 	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1346)
 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
 	... 47 more
 Caused by: java.net.SocketTimeoutException: connect timed out
 	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
 	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
 	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
 	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
 	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 	at java.base/java.net.Socket.connect(Socket.java:609)
 	at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
 	at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
 	at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
 	... 62 more
 [2022-06-23 13:22:30,702] INFO [TestSQSToKafka|task-0] task.stop:OK (com.nordstrom.kafka.connect.sqs.SqsSourceConnectorTask:126)"

Support for IAM auth connectivity

I'm not well versed in java and all but the new MSK serverless requires at least to enable IAM auth client configuration: https://docs.aws.amazon.com/msk/latest/developerguide/serverless.html "MSK Serverless requires IAM access control for all clusters. For more information, see IAM access control."

From the documentation: https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#how-to-use-iam-access-control

I implemented kafka-ui which required this setting to be set: https://github.com/provectus/kafka-ui/blob/master/documentation/guides/AWS_IAM.md

Is it as simple as adding "aws-msk-iam-auth" in the pom.xml and add few properties?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.