Porpose of the project is to create extensions to default set of serializers and deserializers provided by Apache Kafka.
- Chained serializer execution.
- Protect payload with symmetric or asymmetric encryption algorithm.
- Create random initialization vector when required.
- Serializer implementing hybrid encryption.
- Generate and verify digital signature to guarantee authentication and data integrity.
- Installation
- Chained Serializer
- Encryption Serializer
- Hybrid Encryption Serializer
- Digital Signature Serializer
- Tutorial
- Download latest release ZIP archive from GitHub and extract its content to temporary folder.
- Copy kafka-serde-ext-${version}.jar with all third-party dependencies to classpath of Kafka producers and consumers.
- Version 1.0.0 depends only on Bouncy Castle security provider.
- Configure Kafka producers and consumers according to below documentation.
Chained serializer allows to execute ordered list of serializers passing output of one to another. Keeping in mind that Kafka org.apache.kafka.common.serialization.Serializer
interface transforms given Java type always to byte array, it make sense to add extra serializers performing modifications to byte array itself. Let us take an example of application
leveraging org.apache.kafka.common.serialization.StringSerializer
to encode value of Kafka record. If we wanted to encrypt the payload, we can use chained serializer to transform
Java String
to byte array, followed by encryption serializer. Plese note that application code does not require any further updates.
Example producer configuration:
value.serializer = io.macronova.kafka.common.serialization.ChainedSerializer
0.serializer = org.apache.kafka.common.serialization.StringSerializer
1.serializer = io.macronova.kafka.common.serialization.EncryptSerializer
1.transformation = AES/ECB/PKCS5Padding
1.secret = 770A8A65DA156D24EE2A093277530142
Serializers are ordered by position prefix (0 based) and allow to take extra configuration parameters. In the above example, transformation and secret properties
will be passed to io.macronova.kafka.common.serialization.EncryptSerializer
during initialization.
Example consumer configuration:
value.deserializer = io.macronova.kafka.common.serialization.ChainedDeserializer
0.deserializer = io.macronova.kafka.common.serialization.DecryptDeserializer
0.transformation = AES/ECB/PKCS5Padding
0.secret = 770A8A65DA156D24EE2A093277530142
1.deserializer = org.apache.kafka.common.serialization.StringDeserializer
Sequence of serializers needs to be reversed in case of deserialization. First we decrypt payload and only once successful, construct Java object from decoded byte array.
Please review src/examples folder for complete code sample.
Encryption serializer allows to encrypt and decrypt stream of bytes using symmetric or asymmetric cryptography. For complete list of supported algorithms, consult Bouncy Castly documentation. In case given transformation requires initialization vector, serializer will generate one composed from random bytes and prepend it to the output array.
Serializer class: io.macronova.kafka.common.serialization.EncryptSerializer
Deserializer class: io.macronova.kafka.common.serialization.DecryptDeserializer
Configuration parameters:
Property Name | Mandatory | Description |
---|---|---|
transformation | Always | Cryptography transformation. |
secret | shared secret encryption | Hexadecimal representation of secret key. |
key.store.path | key pair encryption | Key store path. |
key.store.type | No | Key store type. Default: JKS . |
key.store.password | key pair encryption | Key store password. |
key.store.alias | key pair encryption | Key alias. |
key.store.alias.password | key pair encryption, deserializer | Alias password. |
Users are required to specify either secret
(for shared passphrase encryption) or key.store.path
property (for asymmetric cryptography algorithm).
Below diagram presents output data format.
+------------------------------------+
| initialization vector | encrypted |
| (optional, 8-16 bytes) | data |
+------------------------------------+
Please review src/examples directory for complete code sample.
Example configuration of symmetric encryption / decryption using shared secret key:
transformation = AES/ECB/PKCS5Padding
secret = 770A8A65DA156D24EE2A093277530142
Example of asymmetric encryption (private and public keys retrieved from key store):
transformation = RSA/None/PKCS1Padding
key.store.path = /tmp/keystore.jks
key.store.password = changeit
key.store.alias = key1
key.store.alias.password = changeit # Required only by deserializer (Kafka consumer).
Classic asymmetric encryption does not allow to encode content larger than key size. To overcome mentioned limitation, hybrid approach does the following:
- Generate random secret key.
- Encrypt hereby key with asymmetric algorithm (e.g. RSA).
- Encrypt payload with symmetric algorithm (e.g. AES) using key generated in step 1.
Note that Kafka producer and consumer need only to know asymmetric key pair to successfully encrypt and decrypt data.
Serializer class: io.macronova.kafka.common.serialization.HybridEncryptSerializer
Deserializer class: io.macronova.kafka.common.serialization.HybridDecryptDeserializer
Configuration parameters (all required unless specified otherwise):
Property Name | Description |
---|---|
symmetric.transformation | Symmetric transformation. |
asymmetric.transformation | Asymmetric transformation, requires padding. |
asymmetric.key.store.path | Key store path. |
asymmetric.key.store.type | Key store type. Not mandatory, default: JKS . |
asymmetric.key.store.password | Key store password. |
asymmetric.key.store.alias | Key alias. |
asymmetric.key.store.alias.password | Alias password. Mandatory only for deserializer. |
Below diagram presents output data format.
+--------------------------------------------------------------+
| RSA encoded secret | initialization vector | AES encrypted |
| key used for AES | (optional, 8-16 bytes) | data |
+--------------------------------------------------------------+
Note: Always use asymmetric algorithm with padding. For classic
RSA
this would meanRSA/None/PKCS1Padding
.
symmetric.transformation = AES/ECB/PKCS5Padding
asymmetric.transformation = RSA/None/PKCS1Padding
asymmetric.key.store.path = /tmp/keystore.jks
asymmetric.key.store.password = changeit
asymmetric.key.store.alias = key1
asymmetric.key.store.alias.password = changeit # Required only by deserializer (Kafka consumer).
Digital signature serializer allows to generate signature and check its correctness during deserialization to guarantee authentication and integrity of data.
For complete list of supported algorithms, consult Bouncy Castly documentation.
Deserializer throws org.apache.kafka.common.errors.SerializationException
if verification turns out invalid. Digital signature is prepended to output byte array.
Serializer class: io.macronova.kafka.common.serialization.GenerateSignatureSerializer
Deserializer class: io.macronova.kafka.common.serialization.VerifySignatureDeserializer
Configuration parameters (all required unless specified otherwise):
Property Name | Description |
---|---|
algorithm | Digital signature algorithm. |
key.store.path | Key store path. |
key.store.type | Key store type. Not mandatory, default: JKS . |
key.store.password | Key store password. |
key.store.alias | Key alias. |
key.store.alias.password | Alias password. Mandatory only for serializer. |
Below diagram presents output data format.
+------------------+
| signature | data |
+------------------+
algorithm = SHA256withRSA
key.store.path = /tmp/keystore.jks
key.store.password = changeit
key.store.alias = key1
key.store.alias.password = changeit # Required only by serializer (Kafka producer).
Read our five minute blog post.