Skip to content

Commit

Permalink
add workaround for property
Browse files Browse the repository at this point in the history
  • Loading branch information
SnuK87 committed Oct 19, 2023
1 parent 070c027 commit ec96d17
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 83 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ KAFKA_MAX_BLOCK_MS=10000

A full list of available configurations can be found in the [official kafka docs](https://kafka.apache.org/documentation/#producerconfigs).


| :warning: WARNING |
|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Because some environments have difficulties with empty string variables, a workaround for `SSL_ENDPOINT_IDENTIFICATION_ALGORITHM` was implemented. To disable the host name verification set the value to `disabled`. The module will transfer the value to an empty string when creating the kafka client. |

### Kafka client using secure connection
As mentioned above the kafka client can be configured by passing parameters to the start command. To make kafka open a SSL/TLS secured connection you can add the following parameters:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,100 +2,102 @@

import java.util.HashMap;
import java.util.Map;

import org.keycloak.Config.Scope;

public class KafkaProducerConfig {

// https://kafka.apache.org/documentation/#producerconfigs
// https://kafka.apache.org/documentation/#producerconfigs

public static Map<String, Object> init(Scope scope) {
Map<String, Object> propertyMap = new HashMap<>();
KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();
public static Map<String, Object> init(Scope scope) {
Map<String, Object> propertyMap = new HashMap<>();
KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();

for (KafkaProducerProperty property : producerProperties) {
String propertyEnv = System.getenv("KAFKA_" + property.name());
for (KafkaProducerProperty property : producerProperties) {
String propertyEnv = System.getenv("KAFKA_" + property.name());

if (property.getName() != null && scope.get(property.getName(), propertyEnv) != null) {
propertyMap.put(property.getName(), scope.get(property.getName(), propertyEnv));
}
}
if (property == KafkaProducerProperty.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM &&
("disabled").equalsIgnoreCase(scope.get(property.getName(), propertyEnv))) {
propertyMap.put(property.getName(), "");
} else if (property.getName() != null && scope.get(property.getName(), propertyEnv) != null) {
propertyMap.put(property.getName(), scope.get(property.getName(), propertyEnv));
}
}

return propertyMap;
}
return propertyMap;
}

enum KafkaProducerProperty {
ACKS("acks"), //
BUFFER_MEMORY("buffer.memory"), //
COMPRESSION_TYPE("compression.type"), //
RETRIES("retries"), //
SSL_KEY_PASSWORD("ssl.key.password"), //
SSL_KEYSTORE_CERTIFICATE_CHAIN("ssl.keystore.certificate.chain"), //
SSL_KEYSTORE_LOCATION("ssl.keystore.location"), //
SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), //
SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), //
SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), //
BATCH_SIZE("batch.size"), //
CLIENT_DNS_LOOKUP("client.dns.lookup"), //
CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), //
DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), //
LINGER_MS("linger.ms"), //
MAX_BLOCK_MS("max.block.ms"), //
MAX_REQUEST_SIZE("max.request.size"), //
PARTITIONER_CLASS("partitioner.class"), //
RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), //
REQUEST_TIMEOUT_MS("request.timeout.ms"), //
SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), //
SASL_JAAS_CONFIG("sasl.jaas.config"), //
SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), //
SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), //
SASL_LOGIN_CLASS("sasl.login.class"), //
SASL_MECHANISM("sasl.mechanism"), //
SECURITY_PROTOCOL("security.protocol"), //
SEND_BUFFER_BYTES("send.buffer.bytes"), //
SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), //
SSL_KEYSTORE_TYPE("ssl.keystore.type"), //
SSL_PROTOCOL("ssl.protocol"), //
SSL_PROVIDER("ssl.provider"), //
SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), //
ENABLE_IDEMPOTENCE("enable.idempotence"), //
INTERCEPTOR_CLASS("interceptor.classes"), //
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), //
METADATA_MAX_AGE_MS("metadata.max.age.ms"), //
METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), //
METRIC_REPORTERS("metric.reporters"), //
METRIC_NUM_SAMPLES("metrics.num.samples"), //
METRICS_RECORDING_LEVEL("metrics.recording.level"), //
METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), //
RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), //
RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), //
RETRY_BACKOFF_MS("retry.backoff.ms"), //
SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), //
SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), //
SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), //
SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), //
SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), //
SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), //
SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), //
SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), //
SECURITY_PROVIDERS("security.providers"), //
SSL_CIPHER_SUITES("ssl.cipher.suites"), //
SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), //
SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), //
SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), //
SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), //
TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), //
TRANSACTION_ID("transactional.id");
enum KafkaProducerProperty {
ACKS("acks"), //
BUFFER_MEMORY("buffer.memory"), //
COMPRESSION_TYPE("compression.type"), //
RETRIES("retries"), //
SSL_KEY_PASSWORD("ssl.key.password"), //
SSL_KEYSTORE_CERTIFICATE_CHAIN("ssl.keystore.certificate.chain"), //
SSL_KEYSTORE_LOCATION("ssl.keystore.location"), //
SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), //
SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), //
SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), //
BATCH_SIZE("batch.size"), //
CLIENT_DNS_LOOKUP("client.dns.lookup"), //
CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), //
DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), //
LINGER_MS("linger.ms"), //
MAX_BLOCK_MS("max.block.ms"), //
MAX_REQUEST_SIZE("max.request.size"), //
PARTITIONER_CLASS("partitioner.class"), //
RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), //
REQUEST_TIMEOUT_MS("request.timeout.ms"), //
SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), //
SASL_JAAS_CONFIG("sasl.jaas.config"), //
SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), //
SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), //
SASL_LOGIN_CLASS("sasl.login.class"), //
SASL_MECHANISM("sasl.mechanism"), //
SECURITY_PROTOCOL("security.protocol"), //
SEND_BUFFER_BYTES("send.buffer.bytes"), //
SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), //
SSL_KEYSTORE_TYPE("ssl.keystore.type"), //
SSL_PROTOCOL("ssl.protocol"), //
SSL_PROVIDER("ssl.provider"), //
SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), //
ENABLE_IDEMPOTENCE("enable.idempotence"), //
INTERCEPTOR_CLASS("interceptor.classes"), //
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), //
METADATA_MAX_AGE_MS("metadata.max.age.ms"), //
METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), //
METRIC_REPORTERS("metric.reporters"), //
METRIC_NUM_SAMPLES("metrics.num.samples"), //
METRICS_RECORDING_LEVEL("metrics.recording.level"), //
METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), //
RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), //
RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), //
RETRY_BACKOFF_MS("retry.backoff.ms"), //
SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), //
SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), //
SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), //
SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), //
SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), //
SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), //
SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), //
SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), //
SECURITY_PROVIDERS("security.providers"), //
SSL_CIPHER_SUITES("ssl.cipher.suites"), //
SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), //
SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), //
SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), //
SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), //
TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), //
TRANSACTION_ID("transactional.id");

private String name;
private String name;

private KafkaProducerProperty(String name) {
this.name = name;
}
private KafkaProducerProperty(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
public String getName() {
return name;
}
}

}

0 comments on commit ec96d17

Please sign in to comment.