Comments (12)
@hnaoto the way that i've seen this done, specifically in librdkafka
which is the standard C client for Kafka, is that the credentials are stored in an object. that object is refreshed on a certain cadence (about 80% of the way through the credential lifetime set by duration_sec
). however, this requires that the refresh mechanism is 1. scheduled and 2. running on a background thread. the credential object requires that it be locked and unlocked upon update since multiple threads may be trying to access it concurrently.
here's what i got to in late 2021. https://github.com/garrett528/ruby-kafka/pull/2/files. it's not complete and i don't think i ever got the thread to be scheduled properly (i'm no rubyist so i may be heading in the wrong direction trying to port C to Ruby).
here's the librdkafka
C implementation that i wrote to do this if it helps. https://github.com/UrbanCompass/librdkafka/blob/master/src/rdkafka_sasl_aws_msk_iam.c
from ruby-kafka.
@garrett528 Thank you so much for sharing all the details. The solution that you tried sounds promising. Let me see whether I can figure out the scheduling part.
from ruby-kafka.
Hi @garrett528, I went through the branch that you shared. May I ask some questions? You mentioned "thread was not scheduled properly".. What kind of errors did you get? (For example, did the credentials get updated after expiration?)
from ruby-kafka.
trying to remember where i stopped... right now, that code doesn't actually call the sts endpoint so that definitely needs to be added. the refresh thread works but it is fixed to sleep(60)
instead of using the refresh duration. additionally, it's using an infinite loop + sleep instead of an actual scheduling mechanism. that may not be the best way to handle this and i don't think i ever tested whether the creds pick up new values after the refresh is executed.
from ruby-kafka.
Thanks for sharing all the information @garrett528 😃
I did some digging into Ruby AWS SDK and looks like the the temporary credentials(assumeRoleCredentials) will be refreshed in the background automatically (source code: https://github.com/aws/aws-sdk-ruby/blob/version-3/gems/aws-sdk-core/lib/aws-sdk-core/refreshing_credentials.rb)
If I try to pass the role credentials object to the kafka client, the role credentials object used by Ruby Kafka client will get updated in the background as well. (From some people's perspectives, Ruby is "pass by object reference"....)
Do you think it is still required to refresh the credentials in Kakfa Ruby client if the credentials object got updated in the background?
As far as I know, once a connection is established, the client can keep talking to Kafka. Kafka won't kick away the client with expired credentials proactively. The code change in this MR #951 is working but I haven't figured out a way to verify the behavior of the client when it needs to re-establish a connection.
from ruby-kafka.
oh that's a good find! i'm not sure what the mechanism is for how the ruby-kafka
client itself manages once the connection requires reset. the way i tested this on the C client was to set up an IAM-enabled test MSK cluster and write a script to instantiate a producer and consumer on a fixed interval (say 1 minute) and have it run for longer than the refresh duration. that helped me pinpoint the fact that the C client actually requires some background thread to refresh the credential and then see what happens when the broker thread disconnects from MSK due to the credential expiring.
you should be able to set the credential duration to a minimum of 15 minutes so it's a bit annoying to test.
from ruby-kafka.
Thanks so much for sharing the testing method @garrett528! I tried that with a MSK cluster and let the program run for around 8 hours. It worked fine.
I also took a further look at the source code of Ruby Kafka client. On a high level, the authentication process appears to work in the following fashion:
- client: init
sasl_authenticator
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/client.rb#L110 - pass
sasl_authenticator
to a method to init@connection_builder
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/client.rb#L110 - pass
@connection_builder
to a method to initbroker_pool
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/client.rb#L820- broker_pool: use
@connection_builder
to createbroker
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker_pool.rb#L13- broker: all the core methods of
broker
rely on the methodsend_request
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker.rb#L199 send_request
relies on@connection
. (@connection
will be set as nil if there isIdleConnection
orConnectionError
error.)- Depends on the scenario, sometimes upstream will raise
ConnectionError
, sometime upstream will retry.. Retry should eventually trigger@connection_builder.build_connection
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker.rb#L214- connection_builder:
build_connection
will call@sasl_authenticator.authenticate!(connection)
- sasl_authenticator:
sasl_authenticator
will call the method accordingly to authenticate the client https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl_authenticator.rb#L71 - In this particular scenario, it will call
authenticate!
ofawsmskiam
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl/awsmskiam.rb#L30.authenticate!
will create a payload that wrapped instance variables@access_key_id
and@session_token
.
- connection_builder:
- broker: all the core methods of
- broker_pool: use
To sum up, if there is no connection error, the client will keep working even if the credentials have expired.
If there is a connection error, depends on scenario, sometimes the Kafka client might just raise the connection error and crash the client. The application code will be responsible for re-creating the client at this point. The assumeRole credentials have been getting refreshed in the background, so the newly created client will be using valid assumeRole credentials.
If the Kafka client attempts to recreate the connection, it will invoke build_connection
and authenticate!
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl/awsmskiam.rb#L30 eventually. The credentials of awsmskiam.rb
are encapsulated in an object and have been getting updated in the background automatically. The newly created connection should work as expected.
I apologize for the long text. I might miss some nuances of how connection errors are handled by Ruby Kafka client. I think the authentication process should be similar across clients written in different languages. I think the current implementation
should be sufficient. Does the analysis sound reasonable to you?
from ruby-kafka.
@hnaoto that makes sense to me. do you know which scenarios cause a crash vs a retry? if those scenarios are well-defined, it will help devs understand the circumstance that cause failure and allow them to make an educated decision on whether to catch and restart the client or to let it fail and investigate.
outside of that, this is great work!
from ruby-kafka.
I checked how ConnectionError
exceptions are handled. ConnectionError can be thrown in a number of situations https://github.com/zendesk/ruby-kafka/search?q=ConnectionError
From the perspective of producer, looks like the pattern is:
- the
ConnectionError
will be caught by the upstream https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb#L405 - the upstream will retry util the counter hit @max_retries https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb#L422 (
max_retries
is a client/producer config) - A connection can be re-established during retry. Otherwise, the client will crash after all the retries are exhausted.
from ruby-kafka.
ok so it does have a retry mechanism that it hits. sounds like it's less of an issue with this client anyways since the ruby aws sdk handles the credential refresh automatically (the C client does not!).
from ruby-kafka.
Yes, there is a retry mechanism but once the client crashed, the messages inside the local buffer will be lost as well. (The behavior of buffer was briefly mentioned in this section: https://github.com/zendesk/ruby-kafka#buffering-and-error-handling). Ideally, the client should be able to re-establish the connection before it reaches the retry limits if there is an intermittent network issue.
😄 It would be much more complicated to implement refreshing credentials manually, and I think some people who worked on ruby AWS SDK had to conquer some hurdles as well. (Related threads: aws/aws-sdk-ruby#2641 and aws/aws-sdk-ruby#2642)
from ruby-kafka.
Issue has been marked as stale due to a lack of activity.
from ruby-kafka.
Related Issues (20)
- Support Reporting Metrics to OpenTelemetry HOT 5
- Update latest release on repo landing page. HOT 1
- Offset is getting reset to earliest after getting an "invalid offset" error HOT 1
- Support for KAFKA-4148 / KIP-79 / search offsets by timestamp in Consumer HOT 1
- EOFError when connecting with SCRAM authentication HOT 2
- Extra space in `[Producer ]` prefix of async producer logs HOT 1
- Join request timeout doesn't consider rebalance_timeout HOT 2
- Kafka emit to Datadog is broken under ruby 3 HOT 2
- Create new configuration to be able to disable '[Producer] Sending' logs. HOT 1
- Why "consumer.each_message..." not looping indefinitely HOT 1
- OutOfOrderSequenceNumberError for async idempotent producer HOT 1
- Infinite loop in Cluster.get_coordinator if CoordinatorNotAvailable occurs HOT 1
- parameter "ssl_verify_hostname" is not used HOT 1
- Unexpected failure during shutdown after disconnect from a restarting Broker HOT 1
- Help: Kafka topics are deleted automatically after sudo systemctl stop kafka is done. HOT 2
- Long term plan for ruby-kafka HOT 1
- https://ruby-kafka-slack.herokuapp.com is not working. HOT 4
- Ruby Kafka Issues with MessageBufferOverFlow HOT 2
- Add support for using ssl_ca_cert as string HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from ruby-kafka.