Disclaimer: This is an UNOFFICIAL community project!
Kryptonite for Kafka provides a turn-key ready Kafka Connect single message transformation (SMT) called CipherField
. The simple examples below show how to install, configure and apply the SMT to encrypt and decrypt record fields.
Either you build this project from sources via Maven or you can download pre-built, self-contained packages of the latest artefacts. Starting with Kryptonite for Kafka 0.4.0, the pre-built Kakfa Connect SMT can be downloaded directly from the release pages.
In order to deploy this custom SMT put the root folder of the extracted archive into your 'connect plugin path' that is configured to be scanned during boostrap of the kafka connect worker node(s).
The following fictional data record value without schema - represented in JSON-encoded format - is used to illustrate a simple encrypt/decrypt scenario:
{
"id": "1234567890",
"myString": "some foo bla text",
"myInt": 42,
"myBoolean": true,
"mySubDoc1": {"myString":"hello json"},
"myArray1": ["str_1","str_2","...","str_N"],
"mySubDoc2": {"k1":9,"k2":8,"k3":7}
}
Let's assume the fields "myString"
,"myArray1"
and "mySubDoc2"
of the above data record should get encrypted, the CipherField
SMT can be configured like so:
{
//...
"transforms":"cipher",
"transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
"transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
"transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
"transforms.cipher.field_mode": "OBJECT",
//...
}
The result after applying this SMT is a record in which all the fields specified in the field_config
parameter are encrypted using the keyset configured by its id with the cipher_data_key_identifier
parameter. The keysets themselves are configured using the parameter cipher_data_keys
where the key material itself is specified according to a Tink keyset configuration in JSON format (here is a concrete example). Apparently, the configured key materials have to be treated with utmost secrecy, for leaking any of the keyset materials renders encryption useless. The recommended way of doing this for now is to either
- indirectly reference keyset materials by externalizing them into a separate properties file (find a few details here)
or
- to NOT store the keyset materials at the client-side in the first place, but instead resolve keysets at runtime from a cloud KMS such as Azure Key Vault which is supported as well.
In general though, this can be considered a "chicken-and-egg" problem since the confidential settings in order to access a remote KMS also need to be stored somewhere somehow.
Since the configuration parameter field_mode
is set to OBJECT
, complex field types are processed as a whole instead of element-wise, the latter of which can be achieved by choosing ELEMENT
mode.
Below is an exemplary JSON-encoded record after the encryption:
{
"id": "1234567890",
"myString": "M007MIScg8F0A/cAddWbayvUPObjxuGFxisu5MUckDhBss6fo3gMWSsR4xOLPEfs4toSDDCxa7E=",
"myInt": 42,
"myBoolean": true,
"mySubDoc1": {"myString":"hello json"},
"myArray1": "UuEKnrv91bLImQvKqXTET7RTP93XeLfNRhzJaXVc6OGA4E+mbvGFs/q6WEFCAFy9wklJE5EPXJ+P85nTBCiVrTkU+TR+kUWB9zNplmOL70sENwwwsWux",
"mySubDoc2": "fLAnBod5U8eS+LVNEm3vDJ1m32/HM170ASgJLKdPF78qDxcsiWj+zOkvZBsk2g44ZWHiSDy3JrI1btmUQhJc4OTnmqIPB1qAADqKhJztvyfcffOfM+y0ISsNk4+V6k0XHBdaT1tJXqLTsyoQfWmSZsnwpM4WARo5/cQWdAwwsWux"
}
NOTE: Encrypted fields are always represented as Base64-encoded strings which contain both, the ciphertext of the fields' original values and authenticated but unencrypted(!) meta-data. If you want to learn about a few more details look here.
Provided that the keyset used to encrypt the original data record is made available to a specific sink connector, the CipherField
SMT can be configured to decrypt the data as follows:
{
//...
"transforms":"cipher",
"transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "DECRYPT",
"transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
"transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
"transforms.cipher.field_mode": "OBJECT",
//...
}
The result after applying this SMT is a record in which all the fields specified in the field_config
parameter are decrypted using the keyset that was used to encrypt the original data. Apparently, this can work if and only if the keyset is properly configured.
Below is an exemplary JSON-encoded record after the decryption, which is equal to the original record:
{
"id": "1234567890",
"myString": "some foo bla text",
"myInt": 42,
"myBoolean": true,
"mySubDoc1": {"myString":"hello json"},
"myArray1": ["str_1","str_2","...","str_N"],
"mySubDoc2": {"k1":9,"k2":8,"k3":7}
}
The following example is based on an Avro value record and used to illustrate a simple encrypt/decrypt scenario for data records with schema. The schema could be defined as:
{
"type": "record", "fields": [
{ "name": "id", "type": "string" },
{ "name": "myString", "type": "string" },
{ "name": "myInt", "type": "int" },
{ "name": "myBoolean", "type": "boolean" },
{ "name": "mySubDoc1", "type": "record",
"fields": [
{ "name": "myString", "type": "string" }
]
},
{ "name": "myArray1", "type": { "type": "array", "items": "string"}},
{ "name": "mySubDoc2", "type": { "type": "map", "values": "int"}}
]
}
The data of one such fictional record - represented by its Struct.toString()
output - might look as:
Struct{
id=1234567890,
myString=some foo bla text,
myInt=42,
myBoolean=true,
mySubDoc1=Struct{myString=hello json},
myArray1=[str_1, str_2, ..., str_N],
mySubDoc2={k1=9, k2=8, k3=7}
}
Let's assume the fields "myString"
,"myArray1"
and "mySubDoc2"
of the above data record should get encrypted, the CipherField
SMT can be configured as follows:
{
//...
"transforms":"cipher",
"transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
"transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
"transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
"transforms.cipher.field_mode": "OBJECT",
//...
}
The result after applying this SMT is a record in which all the fields specified in the field_config
parameter are encrypted using the keyset configured by its id with the cipher_data_key_identifier
parameter. The keysets themselves are configured using the parameter cipher_data_keys
where the key material itself is specified according to a Tink keyset configuration in JSON format (here is a concrete example). Apparently, the configured key materials have to be treated with utmost secrecy, for leaking any of the keyset materials renders encryption useless. The recommended way of doing this for now is to either
- indirectly reference keyset materials by externalizing them into a separate properties file (find a few details here)
or
- to NOT store the keyset materials at the client-side in the first place, but instead resolve keysets at runtime from a cloud KMS such as Azure Key Vault which is supported as well.
In general though, this can be considered a "chicken-and-egg" problem since the confidential settings in order to access a remote KMS also need to be store somewhere somehow.
Since the configuration parameter field_mode
in the configuration above is set to 'OBJECT', complex field types are processed as a whole instead of element-wise, the latter of which can be achieved by choosing ELEMENT
mode.
Below is an exemplary Struct.toString()
output of the record after the encryption:
Struct{
id=1234567890,
myString=MwpKn9k5V4prVVGvAZdm6iOp8GnVUR7zyT+Ljb+bhcrFaGEx9xSNOpbZaJZ4YeBsJAj7DDCxa7E=,
myInt=42,
myBoolean=true,
mySubDoc1=Struct{myString=hello json},
myArray1=Ujlij/mbI48akEIZ08q363zOfV+OMJ+ZFewZEMBiaCnk7NuZZH+mfw6HGobtRzvxeavRhTL3lKI1jYPz0CYl7PqS7DJOJtJ1ccKDa5FLAgP0BQwwsWux,
mySubDoc2=fJxvxo1LX1ceg2/Ba4+vq2NlgyJNiWGZhjWh6rkHQzuG+C7I8lNW8ECLxqJkNhuYuMMlZjK51gAZfID4HEWcMPz026HexzurptZdgkM1fqJMTMIryDKVlAicXc8phZ7gELZCepQWE0XKmQg0UBXr924V46x9I9QwaWUAdgwwsWux
}
NOTE 1: Encrypted fields are always represented as Base64-encoded strings which contain both, the ciphertext of the fields' original values and authenticated meta-data (unencrypted!) about the field in question. If you want to learn about a few more details look here.
NOTE 2: Obviously, in order to support this the original schema of the data record is automatically redacted such that any encrypted fields can be stored as strings, even though the original data types for the fields in question were different ones.
Provided that the keyset used to encrypt the original data record is made available to a specific sink connector, the CipherField
SMT can be configured to decrypt the data as follows:
{
//...
"transforms":"cipher",
"transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "DECRYPT",
"transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
"transforms.cipher.field_config": "[{\"name\":\"myString\",\"schema\": {\"type\": \"STRING\"}},{\"name\":\"myArray1\",\"schema\": {\"type\": \"ARRAY\",\"valueSchema\": {\"type\": \"STRING\"}}},{\"name\":\"mySubDoc2\",\"schema\": { \"type\": \"MAP\", \"keySchema\": { \"type\": \"STRING\" }, \"valueSchema\": { \"type\": \"INT32\"}}}]",
"transforms.cipher.field_mode": "OBJECT",
//...
}
Take notice of the extended field_config
parameter settings. For decryption of schema-aware data, the SMT configuration expects that for each field to decrypt the original schema information is explicitly specified. This allows to redact the encrypted record's schema towards a compatible decrypted record's schema upfront, such that the resulting plaintext field values can be stored in accordance with their original data types.
The result after applying this SMT is a record in which all the fields specified in the field_config
parameter are decrypted using the keyset that was used to encrypt the original data. Apparently, this can work if and only if the keyset is properly configured.
Below is the decrypted data - represented by its Struct.toString()
output - which is equal to the original record:
Struct{
id=1234567890,
myString=some foo bla text,
myInt=42,
myBoolean=true,
mySubDoc1=Struct{myString=hello json},
myArray1=[str_1, str_2, ..., str_N],
mySubDoc2={k1=9, k2=8, k3=7}
}
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
cipher_data_key_identifier | keyset identifier to be used as default data encryption keyset for all fields which don't refer to a specific keyset identifier in its field_config |
string |
!no default! |
|
high |
cipher_data_keys | JSON array with plain or encrypted data key objects specifying the key identifiers together with key
sets for encryption / decryption which are defined in Tink's key specification format. The contained
keyset objects are mandatory if
kms_type=NONE but the array may be left empty for e.g. kms_type=AZ_KV_SECRETS
in order to resolve keysets from a remote KMS such as Azure Key Vault.
NOTE: Irrespective of their origin, all plain or encrypted keysets
(see the example values in the right column) are expected to be valid tink keyset descriptions in
JSON format.
|
string |
[] |
JSON array either empty or holding N data key config objects each of which refers to a tink keyset in
JSON format (see "material" field)
[ { "identifier": "my-demo-secret-key-123", "material": { "primaryKeyId": 123456789, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "<BASE64_ENCODED_KEY_HERE>", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 123456789, "outputPrefixType": "TINK" } ] } } ] [ { "identifier": "my-demo-secret-key-123", "material": { "encryptedKeyset": "<ENCRYPTED_AND_BASE64_ENCODED_KEYSET_HERE>", "keysetInfo": { "primaryKeyId": 123456789, "keyInfo": [ { "typeUrl": "type.googleapis.com/google.crypto.tink.AesSivKey", "status": "ENABLED", "keyId": 123456789, "outputPrefixType": "TINK" } ] } } } ] |
high |
cipher_mode | defines whether the data should get encrypted or decrypted | string | !no default! |
ENCRYPT DECRYPT |
high |
field_config | JSON array with field config objects specifying which fields together with their settings should get
either encrypted / decrypted (nested field names are expected to be separated by . per
default, or by a custom path_delimiter |
string | JSON array holding at least one valid field config object, e.g.
[ { "name": "my-field-abc" }, { "name": "my-nested.field-xyz" } ] |
high | |
key_source | defines the nature and origin of the keysets:
kms_type and kms_config settings. When using encrypted data
keysets refer to the kek_type , kek_config and kek_uri settings as well.
|
string |
CONFIG |
CONFIG CONFIG_ENCRYPTED KMS KMS_ENCRYPTED |
high |
kms_type | defines if:
|
string |
NONE |
NONE AZ_KV_SECRETS |
medium |
kms_config | JSON object specifying KMS-specific client authentication settings. Currently only supports Azure Key
Vault kms_type=AZ_KV_SECRETS |
string |
{} |
JSON object defining the KMS-specific client authentication settings, e.g. for Azure Key Vault:
{ "clientId": "...", "tenantId": "...", "clientSecret": "...", "keyVaultUrl": "..." } |
medium |
kek_type | defines if KMS key encryption - currently only supports Google Cloud KMS - is used for encrypting data
keysets and must be specified when using kms_source=CONFIG_ENCRYPTED | KMS_ENCRYPTED
|
string |
NONE |
NONE GCP |
medium |
kek_config | JSON object specifying KMS-specific client authentication settings (currently only supports Google Cloud
KMS) kek_type=GCP |
string |
{} |
JSON object specifying the KMS-specific client authentication settings, e.g. for Google Cloud KMS:
{ "type": "service_account", "project_id": "...", "private_key_id": "...", "private_key": "-----BEGIN PRIVATE KEY----- ... -----END PRIVATE KEY-----\n", "client_email": "...", "client_id": "...", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "..." } |
medium |
kek_uri | URI referring to the key encryption key stored in the respective remote/cloud KMS, currently only support Google Cloud KMS | string |
!no default! |
a valid and supported Tink key encryption key URI, e.g. pointing to a key in Google Cloud KMS
(kek_type=GCP )
gcp-kms://... |
medium |
field_mode | defines how to process complex field types (maps, lists, structs), either as full objects or element-wise | string |
ELEMENT |
ELEMENT OBJECT |
medium |
cipher_algorithm | default cipher algorithm used for data encryption if not specified for a field in its field_config |
string |
TINK/AES_GCM |
TINK/AES_GCM TINK/AES_GCM_SIV |
medium |
cipher_text_encoding | defines the encoding of the resulting ciphertext bytes (currently only supports BASE64) | string |
BASE64 |
BASE64 |
low |
path_delimiter | path delimiter used as field name separator when referring to nested fields in the input record | string |
. |
non-empty string | low |
The problem with directly specifying configuration parameters which contain sensitive data, such as keyset materials, is that they are exposed via Kafka Connect's REST API. This means for connect clusters that are shared among teams the configured keyset materials would leak, which would be unacceptable. The way to deal with this for now, is to indirectly reference such configuration parameters from external property files.
This approach can be used to configure any kind of sensitive data such keyset materials themselves or KMS-specific client authentication settings, in case the keysets aren't sourced from the config directly but rather retrieved from a cloud KMS such as Azure Key Vault.
Below is a quick example of how such a configuration would look like:
- Before you can make use of configuration parameters from external sources you have to customize your Kafka Connect worker configuration by adding the following two settings:
connect.config.providers=file
connect.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
- Then you create the external properties file e.g.
classified.properties
which contains the keyset materials. This file needs to be available on all your Kafka Connect workers which you want to run Kryptonite on. Let's pretend the file is located at path/secrets/kryptonite/classified.properties
on your worker nodes:
cipher_data_keys=[{"identifier":"my-demo-secret-key-123","material":{<TINK_KEYSET_SPEC_JSON_HERE>}}]
- Finally, you simply reference this file and the corresponding key of the property therein, from your SMT configuration like so:
{
//...
"transforms":"cipher",
"transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"transforms.cipher.cipher_data_keys": "${file:/secrets/kryptonite/classified.properties:cipher_data_keys}",
"transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
"transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
"transforms.cipher.field_mode": "OBJECT",
//...
}
In case you want to learn more about configuration parameter externalization there is e.g. this nice blog post from the Debezium team showing how to externalize username and password settings using a docker-compose example.
Key material is configured in the cipher_data_keys
property of the CipherField
SMT which takes an array of JSON objects. The material
field in one such JSON object represents a keyset and might look as follows:
{
"primaryKeyId": 1234567890,
"key": [
{
"keyData": {
"typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey",
"value": "<BASE64_ENCODED_KEY_HERE>",
"keyMaterialType": "SYMMETRIC"
},
"status": "ENABLED",
"keyId": 1234567890,
"outputPrefixType": "TINK"
}
]
}
Note that the JSON snippet above needs to be specified either:
- as single-line JSON object in an external config file (
.properties
)
... "material": { "primaryKeyId": 1234567890, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "<BASE64_ENCODED_KEY_HERE>", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1234567890, "outputPrefixType": "TINK" } ] } ...
or
- as single-line escape/quoted JSON string if included directly within a connector's JSON configuration
"... \"material\": { \"primaryKeyId\": 1234567890, \"key\": [ { \"keyData\": { \"typeUrl\": \"type.googleapis.com/google.crypto.tink.AesGcmKey\", \"value\": \"<BASE64_ENCODED_KEY_HERE>\", \"keyMaterialType\": \"SYMMETRIC\" }, \"status\": \"ENABLED\", \"keyId\": 1234567890, \"outputPrefixType\": \"TINK\" } ] } ..."