From f9c242405e8b12b019c418b58d8db43b49058f73 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Fri, 15 Nov 2024 13:34:51 -0800 Subject: [PATCH 1/9] add configs to accept license and verify it This patch makes the following changes to integrate license verification: 1. Adds responsive.license and responsive.license.file configs to allow the user to configure their license. 2. ResponsiveKafkaStreams validates the license if the user has not configured a cloud API key. The user could technically get around the license verification by configuring a bogus key. We assume that we won't have malicious users for now, and defer a more rigorous check to a future PR. We should do the check in the background and not block starting the app so that we don't depend on our cloud for the user's application to run. 3. Moves around some of the test license files so that we can use them from different tests. --- .../kafka/api/ResponsiveKafkaStreams.java | 65 ++++++++ .../kafka/api/config/ResponsiveConfig.java | 19 +++ .../resources/license-keys/license-keys.json | 1 + .../kafka/api/ResponsiveKafkaStreamsTest.java | 141 ++++++++++++++++++ .../license/LicenseAuthenticatorTest.java | 11 +- .../keys/test.pem | 0 .../license-keys.json} | 2 +- .../invalid-missing-footer.pem | 0 .../invalid-missing-header.pem | 0 .../valid-real.pem | 0 .../valid-with-comment.pem | 0 .../valid.pem | 0 .../test-license-invalid-signature.json} | 0 .../test-license.json} | 0 14 files changed, 232 insertions(+), 7 deletions(-) create mode 100644 kafka-client/src/main/resources/license-keys/license-keys.json rename kafka-client/src/test/resources/{license-test/license-verifier => license-keys}/keys/test.pem (100%) rename kafka-client/src/test/resources/{license-test/license-verifier/signing-keys.json => license-keys/license-keys.json} (57%) rename kafka-client/src/test/resources/{license-test/public-key-pem-file-parser => public-key-pem-file-parser-test}/invalid-missing-footer.pem (100%) rename kafka-client/src/test/resources/{license-test/public-key-pem-file-parser => public-key-pem-file-parser-test}/invalid-missing-header.pem (100%) rename kafka-client/src/test/resources/{license-test/public-key-pem-file-parser => public-key-pem-file-parser-test}/valid-real.pem (100%) rename kafka-client/src/test/resources/{license-test/public-key-pem-file-parser => public-key-pem-file-parser-test}/valid-with-comment.pem (100%) rename kafka-client/src/test/resources/{license-test/public-key-pem-file-parser => public-key-pem-file-parser-test}/valid.pem (100%) rename kafka-client/src/test/resources/{license-test/license-verifier/license-test-invalid-signature.json => test-licenses/test-license-invalid-signature.json} (100%) rename kafka-client/src/test/resources/{license-test/license-verifier/license-test.json => test-licenses/test-license.json} (100%) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 9c3aa9a48..37aa6a800 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -32,6 +32,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import com.fasterxml.jackson.databind.ObjectMapper; import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -45,6 +46,11 @@ import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory; import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions; import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; +import dev.responsive.kafka.internal.license.LicenseAuthenticator; +import dev.responsive.kafka.internal.license.LicenseChecker; +import dev.responsive.kafka.internal.license.model.LicenseDocument; +import dev.responsive.kafka.internal.license.model.LicenseInfo; +import dev.responsive.kafka.internal.license.model.SigningKeys; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener; @@ -55,7 +61,11 @@ import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.SessionUtil; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; +import java.util.Base64; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -88,6 +98,8 @@ public class ResponsiveKafkaStreams extends KafkaStreams { + private static final String SIGNING_KEYS_PATH = "license-keys/license-keys.json"; + private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class); private final ResponsiveMetrics responsiveMetrics; @@ -200,6 +212,8 @@ protected ResponsiveKafkaStreams(final Params params) { params.time ); + validateLicense(params.responsiveConfig); + if (params.compatibilityMode == CompatibilityMode.FULL) { try { ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs); @@ -260,6 +274,57 @@ private static ResponsiveMetrics createMetrics( ), exportService); } + private static void validateLicense(final ResponsiveConfig configs) { + if (!configs.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) { + return; + } + final LicenseDocument licenseDocument = loadLicense(configs); + final SigningKeys signingKeys = loadSigningKeys(); + final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys); + final LicenseInfo licenseInfo = licenseAuthenticator.authenticate(licenseDocument); + final LicenseChecker checker = new LicenseChecker(); + checker.checkLicense(licenseInfo); + } + + private static SigningKeys loadSigningKeys() { + try { + return new ObjectMapper().readValue( + ResponsiveKafkaStreams.class.getClassLoader().getResource(SIGNING_KEYS_PATH), + SigningKeys.class + ); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private static LicenseDocument loadLicense(final ResponsiveConfig configs) { + final String license = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG); + final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG); + if (license.isEmpty() == licenseFile.isEmpty()) { + throw new ConfigException(String.format( + "Must set exactly one of %s or %s", + ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, + ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG + )); + } + final String licenseB64; + if (!license.isEmpty()) { + licenseB64 = license; + } else { + try { + licenseB64 = Files.readString(new File(licenseFile).toPath()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + final ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + /** * Fill in the props with any overrides and all internal objects shared via the configs * before these get finalized as a {@link StreamsConfig} object diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index acbc1b63d..c31b3208a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -49,6 +49,13 @@ public class ResponsiveConfig extends AbstractConfig { public static final String RESPONSIVE_ENV_CONFIG = "responsive.env"; private static final String RESPONSIVE_ENV_DOC = "The Responsive environment slug (not the environment ID)."; + public static final String RESPONSIVE_LICENSE_CONFIG = "responsive.license"; + private static final String RESPONSIVE_LICENSE_DOC = "The license you're using to run Responsive"; + + public static final String RESPONSIVE_LICENSE_FILE_CONFIG = "responsive.license.file"; + private static final String RESPONSIVE_LICENSE_FILE_DOC + = "A path to a file containing your license."; + public static final String COMPATIBILITY_MODE_CONFIG = "responsive.compatibility.mode"; private static final String COMPATIBILITY_MODE_DOC = "This configuration enables running Responsive " + "in compatibility mode, disabling certain features."; @@ -314,6 +321,18 @@ public class ResponsiveConfig extends AbstractConfig { new ConfigDef.NonEmptyString(), Importance.HIGH, RESPONSIVE_ENV_DOC + ).define( + RESPONSIVE_LICENSE_CONFIG, + Type.STRING, + "", + Importance.HIGH, + RESPONSIVE_LICENSE_DOC + ).define( + RESPONSIVE_LICENSE_FILE_CONFIG, + Type.STRING, + "", + Importance.HIGH, + RESPONSIVE_LICENSE_FILE_DOC ).define( STORAGE_BACKEND_TYPE_CONFIG, Type.STRING, diff --git a/kafka-client/src/main/resources/license-keys/license-keys.json b/kafka-client/src/main/resources/license-keys/license-keys.json new file mode 100644 index 000000000..bb9a9f69c --- /dev/null +++ b/kafka-client/src/main/resources/license-keys/license-keys.json @@ -0,0 +1 @@ +{"keys": []} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java index cfb5aeaf6..6a07a27b1 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java @@ -17,7 +17,10 @@ package dev.responsive.kafka.api; import static dev.responsive.kafka.api.config.ResponsiveConfig.COMPATIBILITY_MODE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.PLATFORM_API_KEY_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ORG_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; @@ -37,14 +40,22 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.CassandraClient; import dev.responsive.kafka.internal.db.CassandraClientFactory; +import dev.responsive.kafka.internal.license.exception.LicenseAuthenticationException; +import dev.responsive.kafka.internal.license.exception.LicenseUseViolationException; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.testutils.IntegrationTestUtils; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.Base64; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -62,6 +73,12 @@ @ExtendWith(MockitoExtension.class) class ResponsiveKafkaStreamsTest { + private static final String DECODED_LICENSE_FILE + = "test-licenses/test-license.json"; + private static final String DECODED_INVALID_LICENSE_FILE + = "test-licenses/test-license-invalid-signature.json"; + private static final String DECODED_TRIAL_EXPIRED_LICENSE_FILE + = "test-licenses/test-license-trial-expired.json"; private final KafkaClientSupplier supplier = new DefaultKafkaClientSupplier() { @Override @@ -126,6 +143,11 @@ public void setUp() { properties.put(RESPONSIVE_ORG_CONFIG, "responsive"); properties.put(RESPONSIVE_ENV_CONFIG, "license-test"); + + properties.put( + RESPONSIVE_LICENSE_CONFIG, + getEncodedLicense(DECODED_LICENSE_FILE) + ); } @SuppressWarnings("resource") @@ -171,4 +193,123 @@ public void shouldCreateResponsiveKafkaStreamsInMetricsOnlyModeWithUnverifiedCon ks.close(); } + @Test + public void shouldAcceptLicenseInLicenseFile() { + // given: + final File licenseFile = writeLicenseFile(DECODED_LICENSE_FILE); + properties.put(RESPONSIVE_LICENSE_CONFIG, ""); + properties.put(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG, licenseFile.getAbsolutePath()); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then (no throw): + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + + @Test + public void shouldThrowOnLicenseWithInvalidSignature() { + // given: + properties.put( + RESPONSIVE_LICENSE_CONFIG, + getEncodedLicense(DECODED_INVALID_LICENSE_FILE) + ); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then: + assertThrows( + LicenseAuthenticationException.class, + () -> { + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + ); + } + + @Test + public void shouldThrowOnExpiredLicense() { + // given: + properties.put( + RESPONSIVE_LICENSE_CONFIG, + getEncodedLicense(DECODED_TRIAL_EXPIRED_LICENSE_FILE) + ); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then: + assertThrows( + LicenseUseViolationException.class, + () -> { + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + ); + } + + @Test + public void shouldThrowIfNoLicenseOrApiKeyConfigured() { + // given: + properties.put(PLATFORM_API_KEY_CONFIG, ""); + properties.put(RESPONSIVE_LICENSE_CONFIG, ""); + properties.put(RESPONSIVE_LICENSE_FILE_CONFIG, ""); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then: + assertThrows( + ConfigException.class, + () -> { + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + ); + } + + @Test + public void shouldSkipLicenseCheckIfApiKeyConfigured() { + // given: + properties.put(PLATFORM_API_KEY_CONFIG, "some-api-key"); + properties.put(RESPONSIVE_LICENSE_CONFIG, ""); + properties.put(RESPONSIVE_LICENSE_FILE_CONFIG, ""); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then (no throw): + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + + private File writeLicenseFile(final String decodedLicenseFilename) { + final String encoded = getEncodedLicense(decodedLicenseFilename); + try { + final File encodedFile = File.createTempFile("rkst", null); + encodedFile.deleteOnExit(); + Files.writeString(encodedFile.toPath(), encoded); + return encodedFile; + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private String getEncodedLicense(final String filename) { + return Base64.getEncoder().encodeToString(slurpFile(filename)); + } + + private byte[] slurpFile(final String filename) { + try { + final File file = new File(ResponsiveKafkaStreamsTest.class.getClassLoader() + .getResource(filename) + .toURI() + ); + return Files.readAllBytes(file.toPath()); + } catch (final IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java index 27592107a..fcade9b6b 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java @@ -33,7 +33,7 @@ class LicenseAuthenticatorTest { @Test public void shouldVerifyLicense() { // given: - final LicenseDocument license = loadLicense("license-test.json"); + final LicenseDocument license = loadLicense("test-license.json"); // when/then (no throw): verifier.authenticate(license); @@ -42,7 +42,7 @@ public void shouldVerifyLicense() { @Test public void shouldThrowForFailedSignatureVerification() { // given: - final LicenseDocument license = loadLicense("license-test-invalid-signature.json"); + final LicenseDocument license = loadLicense("test-license-invalid-signature.json"); // when/then: assertThrows( @@ -52,18 +52,17 @@ public void shouldThrowForFailedSignatureVerification() { } private static LicenseDocument loadLicense(final String file) { - return loadResource(file, LicenseDocument.class); + return loadResource("test-licenses/" + file, LicenseDocument.class); } private static SigningKeys loadSigningKeys() { - return loadResource("signing-keys.json", SigningKeys.class); + return loadResource("license-keys/license-keys.json", SigningKeys.class); } private static T loadResource(final String path, final Class clazz) { - final String fullPath = "license-test/license-verifier/" + path; try { return MAPPER.readValue( - LicenseAuthenticatorTest.class.getClassLoader().getResource(fullPath), + LicenseAuthenticatorTest.class.getClassLoader().getResource(path), clazz ); } catch (final IOException e) { diff --git a/kafka-client/src/test/resources/license-test/license-verifier/keys/test.pem b/kafka-client/src/test/resources/license-keys/keys/test.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/license-verifier/keys/test.pem rename to kafka-client/src/test/resources/license-keys/keys/test.pem diff --git a/kafka-client/src/test/resources/license-test/license-verifier/signing-keys.json b/kafka-client/src/test/resources/license-keys/license-keys.json similarity index 57% rename from kafka-client/src/test/resources/license-test/license-verifier/signing-keys.json rename to kafka-client/src/test/resources/license-keys/license-keys.json index 1a016b667..faf26ae93 100644 --- a/kafka-client/src/test/resources/license-test/license-verifier/signing-keys.json +++ b/kafka-client/src/test/resources/license-keys/license-keys.json @@ -3,7 +3,7 @@ { "type": "RSA_4096", "keyId": "test", - "path": "license-test/license-verifier/keys/test.pem" + "path": "license-keys/keys/test.pem" } ] } \ No newline at end of file diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-footer.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-footer.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-footer.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-footer.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-header.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-header.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-header.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-header.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-real.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-real.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-real.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-real.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-with-comment.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-with-comment.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-with-comment.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-with-comment.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/valid.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/valid.pem diff --git a/kafka-client/src/test/resources/license-test/license-verifier/license-test-invalid-signature.json b/kafka-client/src/test/resources/test-licenses/test-license-invalid-signature.json similarity index 100% rename from kafka-client/src/test/resources/license-test/license-verifier/license-test-invalid-signature.json rename to kafka-client/src/test/resources/test-licenses/test-license-invalid-signature.json diff --git a/kafka-client/src/test/resources/license-test/license-verifier/license-test.json b/kafka-client/src/test/resources/test-licenses/test-license.json similarity index 100% rename from kafka-client/src/test/resources/license-test/license-verifier/license-test.json rename to kafka-client/src/test/resources/test-licenses/test-license.json From d58cc9c12b319778d1866ff9c73ec0908c92eda5 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Mon, 18 Nov 2024 15:03:29 -0800 Subject: [PATCH 2/9] fix file path --- .../kafka/internal/license/PublicKeyPemFileParserTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java index 750f3050f..172fc84ef 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java @@ -81,7 +81,7 @@ public void shouldFailToParseInvalidPemFileWithMissingHeader() { } private File getPemFile(final String filename) { - final String path = "license-test/public-key-pem-file-parser/" + filename; + final String path = "public-key-pem-file-parser-test/" + filename; try { return new File( PublicKeyPemFileParserTest.class.getClassLoader().getResource(path).toURI() From 13d90ff1bfc672f7422167e3f0c0114c630dfe2c Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Mon, 18 Nov 2024 16:14:37 -0800 Subject: [PATCH 3/9] use the test license from integration tests --- .../kafka/api/ResponsiveKafkaStreamsTest.java | 32 +++------------- .../kafka/testutils/LicenseUtils.java | 37 +++++++++++++++++++ .../kafka/testutils/ResponsiveExtension.java | 4 +- 3 files changed, 46 insertions(+), 27 deletions(-) create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java index 6a07a27b1..a32c6917c 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java @@ -44,11 +44,10 @@ import dev.responsive.kafka.internal.license.exception.LicenseUseViolationException; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.testutils.IntegrationTestUtils; +import dev.responsive.kafka.testutils.LicenseUtils; import java.io.File; import java.io.IOException; -import java.net.URISyntaxException; import java.nio.file.Files; -import java.util.Base64; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.admin.Admin; @@ -73,8 +72,6 @@ @ExtendWith(MockitoExtension.class) class ResponsiveKafkaStreamsTest { - private static final String DECODED_LICENSE_FILE - = "test-licenses/test-license.json"; private static final String DECODED_INVALID_LICENSE_FILE = "test-licenses/test-license-invalid-signature.json"; private static final String DECODED_TRIAL_EXPIRED_LICENSE_FILE @@ -146,7 +143,7 @@ public void setUp() { properties.put( RESPONSIVE_LICENSE_CONFIG, - getEncodedLicense(DECODED_LICENSE_FILE) + LicenseUtils.getLicense() ); } @@ -196,7 +193,7 @@ public void shouldCreateResponsiveKafkaStreamsInMetricsOnlyModeWithUnverifiedCon @Test public void shouldAcceptLicenseInLicenseFile() { // given: - final File licenseFile = writeLicenseFile(DECODED_LICENSE_FILE); + final File licenseFile = writeLicenseFile(LicenseUtils.getLicense()); properties.put(RESPONSIVE_LICENSE_CONFIG, ""); properties.put(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG, licenseFile.getAbsolutePath()); properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); @@ -213,7 +210,7 @@ public void shouldThrowOnLicenseWithInvalidSignature() { // given: properties.put( RESPONSIVE_LICENSE_CONFIG, - getEncodedLicense(DECODED_INVALID_LICENSE_FILE) + LicenseUtils.getEncodedLicense(DECODED_INVALID_LICENSE_FILE) ); properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); final StreamsBuilder builder = new StreamsBuilder(); @@ -234,7 +231,7 @@ public void shouldThrowOnExpiredLicense() { // given: properties.put( RESPONSIVE_LICENSE_CONFIG, - getEncodedLicense(DECODED_TRIAL_EXPIRED_LICENSE_FILE) + LicenseUtils.getEncodedLicense(DECODED_TRIAL_EXPIRED_LICENSE_FILE) ); properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); final StreamsBuilder builder = new StreamsBuilder(); @@ -285,8 +282,7 @@ public void shouldSkipLicenseCheckIfApiKeyConfigured() { ks.close(); } - private File writeLicenseFile(final String decodedLicenseFilename) { - final String encoded = getEncodedLicense(decodedLicenseFilename); + private File writeLicenseFile(final String encoded) { try { final File encodedFile = File.createTempFile("rkst", null); encodedFile.deleteOnExit(); @@ -296,20 +292,4 @@ private File writeLicenseFile(final String decodedLicenseFilename) { throw new RuntimeException(e); } } - - private String getEncodedLicense(final String filename) { - return Base64.getEncoder().encodeToString(slurpFile(filename)); - } - - private byte[] slurpFile(final String filename) { - try { - final File file = new File(ResponsiveKafkaStreamsTest.class.getClassLoader() - .getResource(filename) - .toURI() - ); - return Files.readAllBytes(file.toPath()); - } catch (final IOException | URISyntaxException e) { - throw new RuntimeException(e); - } - } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java new file mode 100644 index 000000000..8ae5c091d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java @@ -0,0 +1,37 @@ +package dev.responsive.kafka.testutils; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.Base64; + +public final class LicenseUtils { + private static final String DECODED_LICENSE_FILE + = "test-licenses/test-license.json"; + + private LicenseUtils () { + } + + public static String getLicense() { + return getEncodedLicense(DECODED_LICENSE_FILE); + } + + public static String getEncodedLicense(final String filename) { + return Base64.getEncoder().encodeToString(slurpFile(filename)); + } + + private static byte[] slurpFile(final String filename) { + try { + final File file = new File(LicenseUtils.class.getClassLoader() + .getResource(filename) + .toURI() + ); + return Files.readAllBytes(file.toPath()); + } catch (final IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + + +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java index 3ee5ee515..f0b9f28c5 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java @@ -23,6 +23,7 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ORG_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE; @@ -126,7 +127,8 @@ public Object resolveParameter( INTERNAL_TASK_ASSIGNOR_CLASS, TASK_ASSIGNOR_CLASS_OVERRIDE, BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), CASSANDRA_DESIRED_NUM_PARTITION_CONFIG, -1, - CASSANDRA_CHECK_INTERVAL_MS, 100 + CASSANDRA_CHECK_INTERVAL_MS, 100, + RESPONSIVE_LICENSE_CONFIG, LicenseUtils.getLicense() )); switch (backend) { From ef1535b3730ca461d17bea6084d30088ffefcd47 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Mon, 18 Nov 2024 16:45:22 -0800 Subject: [PATCH 4/9] lint and missing file --- .../java/dev/responsive/kafka/testutils/LicenseUtils.java | 2 +- .../test-licenses/test-license-trial-expired.json | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 kafka-client/src/test/resources/test-licenses/test-license-trial-expired.json diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java index 8ae5c091d..5d4fe6299 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java @@ -10,7 +10,7 @@ public final class LicenseUtils { private static final String DECODED_LICENSE_FILE = "test-licenses/test-license.json"; - private LicenseUtils () { + private LicenseUtils() { } public static String getLicense() { diff --git a/kafka-client/src/test/resources/test-licenses/test-license-trial-expired.json b/kafka-client/src/test/resources/test-licenses/test-license-trial-expired.json new file mode 100644 index 000000000..db87bd789 --- /dev/null +++ b/kafka-client/src/test/resources/test-licenses/test-license-trial-expired.json @@ -0,0 +1,8 @@ +{ + "info": "eyJlbWFpbCI6InJvaGFuQHJlc3BvbnNpdmUuZGV2IiwiaXNzdWVkQXQiOjE3MzE3MDU5NzgsImV4cGlyZXNBdCI6MTczMTcwNTk3OCwidHlwZSI6InRpbWVkX3RyaWFsX3YxIn0=", + "signature": "pUgmJtjfm5icyFPV96vviJqA6YQMOJBPp7gISCdU0bih1hgGBSEblJTJ0SQVR8KtrurumJeK7NtQkPBbzeYACdsaRYOCHYmz+t6l8RYBpkBJp/qIbniR+hwipf6jtugTq1L4X2hL06Ro7LpvJxaxDt4Fm6o56n6Lr+6pSc8WO8eOaAb5stuOZzwVr1ZmnNjqoLZHpuNiNGKer2S0sV0TYRu1h3/HZCcNbVzs/h6MD4j7BvsSyJzJ/Ta9Ljrf95WANQZxW7V637xzIou1sIngX2BLnmSReuPYU2tclMS+JSFS5htIGYtkNTJdfKwcpIc07k20oEh0+dzn1UvZa0HQkHcCzCdDn3ROCZ5oAw0eLYk+qo8/4ldEC93S1lptlWrr+0c+y7hckzqFkG467oDTUiVBJ3E+sMJAuAE2CNeIPvw+l+WBFQlvSrdU0cexLY0ydXvTvR7GzijCfjuBQyO6XI5lEHGQ9WF8yS6p9Tij1rVX9xmnJyxFgoDD/QyDliP4I0pUeeXTOkacGYWj3k+UketnJs6QRdabZ4LoTrd1SucfCUieyQoRHMftEWGvb/YtZuL5GfDzVu/nxYPZgNrrv5ribymBxz7C0Rjo47imWC5X/OTbMj0vPPH05LPXG92ayuJaeL+pKO2fxepEZD1O3so9yxk83peMpNGN6Vest8s=", + "key": "test", + "algo": "RSASSA_PSS_SHA_256", + "messageType": "RAW", + "version": "1" +} From cb6a85a0393ab1827e25e6816d56dcd37f5c325d Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Mon, 18 Nov 2024 23:31:32 -0800 Subject: [PATCH 5/9] fix e2e integration test --- .../dev/responsive/examples/e2etest/E2ETestApplication.java | 1 + .../java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java | 4 ++-- .../license-keys.json | 0 .../kafka/internal/license/LicenseAuthenticatorTest.java | 2 +- .../{license-keys => responsive-license-keys}/keys/test.pem | 0 .../license-keys.json | 0 6 files changed, 4 insertions(+), 3 deletions(-) rename kafka-client/src/main/resources/{license-keys => responsive-license-keys}/license-keys.json (100%) rename kafka-client/src/test/resources/{license-keys => responsive-license-keys}/keys/test.pem (100%) rename kafka-client/src/test/resources/{license-keys => responsive-license-keys}/license-keys.json (100%) diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java index 02fc60186..b5704a61c 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java @@ -70,6 +70,7 @@ public synchronized void start() { E2ETestUtils.maybeCreateKeyspace(properties); // build topology after creating keyspace because we use keyspace retry // to wait for scylla to resolve + properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key"); E2ETestUtils.retryFor( () -> kafkaStreams = buildTopology(properties), Duration.ofMinutes(5) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 37aa6a800..69ac2381f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -98,7 +98,7 @@ public class ResponsiveKafkaStreams extends KafkaStreams { - private static final String SIGNING_KEYS_PATH = "license-keys/license-keys.json"; + private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json"; private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class); @@ -289,7 +289,7 @@ private static void validateLicense(final ResponsiveConfig configs) { private static SigningKeys loadSigningKeys() { try { return new ObjectMapper().readValue( - ResponsiveKafkaStreams.class.getClassLoader().getResource(SIGNING_KEYS_PATH), + ResponsiveKafkaStreams.class.getResource(SIGNING_KEYS_PATH), SigningKeys.class ); } catch (final IOException e) { diff --git a/kafka-client/src/main/resources/license-keys/license-keys.json b/kafka-client/src/main/resources/responsive-license-keys/license-keys.json similarity index 100% rename from kafka-client/src/main/resources/license-keys/license-keys.json rename to kafka-client/src/main/resources/responsive-license-keys/license-keys.json diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java index fcade9b6b..12e2e7d6e 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java @@ -56,7 +56,7 @@ private static LicenseDocument loadLicense(final String file) { } private static SigningKeys loadSigningKeys() { - return loadResource("license-keys/license-keys.json", SigningKeys.class); + return loadResource("responsive-license-keys/license-keys.json", SigningKeys.class); } private static T loadResource(final String path, final Class clazz) { diff --git a/kafka-client/src/test/resources/license-keys/keys/test.pem b/kafka-client/src/test/resources/responsive-license-keys/keys/test.pem similarity index 100% rename from kafka-client/src/test/resources/license-keys/keys/test.pem rename to kafka-client/src/test/resources/responsive-license-keys/keys/test.pem diff --git a/kafka-client/src/test/resources/license-keys/license-keys.json b/kafka-client/src/test/resources/responsive-license-keys/license-keys.json similarity index 100% rename from kafka-client/src/test/resources/license-keys/license-keys.json rename to kafka-client/src/test/resources/responsive-license-keys/license-keys.json From 339b2d747e86f473ebde98fb9ccb21b9e3ac1021 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Tue, 19 Nov 2024 00:43:31 -0800 Subject: [PATCH 6/9] fix some paths --- .../kafka/internal/license/LicenseAuthenticator.java | 2 +- .../kafka/internal/license/LicenseAuthenticatorTest.java | 6 +++--- .../resources/responsive-license-keys/license-keys.json | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java index 5013e8bb6..0e3c41639 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java @@ -89,7 +89,7 @@ private byte[] verifyLicenseV1Signature(final LicenseDocumentV1 license) { private PublicKey loadPublicKey(final SigningKeys.SigningKey signingKey) { final File file; try { - file = new File(this.getClass().getClassLoader().getResource(signingKey.path()).toURI()); + file = new File(this.getClass().getResource(signingKey.path()).toURI()); } catch (final URISyntaxException e) { throw new RuntimeException(e); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java index 12e2e7d6e..c1ae5faae 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java @@ -52,17 +52,17 @@ public void shouldThrowForFailedSignatureVerification() { } private static LicenseDocument loadLicense(final String file) { - return loadResource("test-licenses/" + file, LicenseDocument.class); + return loadResource("/test-licenses/" + file, LicenseDocument.class); } private static SigningKeys loadSigningKeys() { - return loadResource("responsive-license-keys/license-keys.json", SigningKeys.class); + return loadResource("/responsive-license-keys/license-keys.json", SigningKeys.class); } private static T loadResource(final String path, final Class clazz) { try { return MAPPER.readValue( - LicenseAuthenticatorTest.class.getClassLoader().getResource(path), + LicenseAuthenticatorTest.class.getResource(path), clazz ); } catch (final IOException e) { diff --git a/kafka-client/src/test/resources/responsive-license-keys/license-keys.json b/kafka-client/src/test/resources/responsive-license-keys/license-keys.json index faf26ae93..9855c485a 100644 --- a/kafka-client/src/test/resources/responsive-license-keys/license-keys.json +++ b/kafka-client/src/test/resources/responsive-license-keys/license-keys.json @@ -3,7 +3,7 @@ { "type": "RSA_4096", "keyId": "test", - "path": "license-keys/keys/test.pem" + "path": "/responsive-license-keys/keys/test.pem" } ] } \ No newline at end of file From d5056d51d31140ed96e02824c27e2b2be77ab1a5 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Tue, 19 Nov 2024 11:53:11 -0800 Subject: [PATCH 7/9] add tool to sign licenses --- kafka-client/build.gradle.kts | 2 + .../license/model/LicenseDocument.java | 5 ++ .../license/model/LicenseDocumentV1.java | 4 ++ .../internal/license/model/LicenseInfo.java | 5 ++ .../internal/license/model/TimedTrialV1.java | 11 ++++ .../keys/license-signing-key-0.pem | 14 +++++ .../responsive-license-keys/license-keys.json | 10 +++- .../internal/utils/GenerateTrialLicense.java | 60 +++++++++++++++++++ 8 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 kafka-client/src/main/resources/responsive-license-keys/keys/license-signing-key-0.pem create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/utils/GenerateTrialLicense.java diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index b084fe206..35205486c 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -95,4 +95,6 @@ dependencies { testImplementation(testlibs.bundles.base) testImplementation(testlibs.bundles.testcontainers) testImplementation(libs.kafka.streams.test.utils) + testImplementation("software.amazon.awssdk:kms:2.20.0") + testImplementation("software.amazon.awssdk:sso:2.20.0") } \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java index 8913e2d21..990355926 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java @@ -37,4 +37,9 @@ public abstract class LicenseDocument { public LicenseDocument(@JsonProperty("version") final String version) { this.version = version; } + + @JsonProperty("version") + public String version() { + return version; + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java index ea3b9598c..5e4d5fbab 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java @@ -43,18 +43,22 @@ public LicenseDocumentV1( this.algo = algo; } + @JsonProperty("info") public String info() { return info; } + @JsonProperty("key") public String key() { return key; } + @JsonProperty("signature") public String signature() { return signature; } + @JsonProperty("algo") public String algo() { return algo; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java index 3f59e6bc6..ef0188866 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java @@ -35,4 +35,9 @@ public abstract class LicenseInfo { LicenseInfo(@JsonProperty("type") final String type) { this.type = type; } + + @JsonProperty("type") + public String type() { + return type; + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java index 70178c900..567bafe91 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java @@ -38,7 +38,18 @@ public TimedTrialV1( this.expiresAt = expiresAt; } + @JsonProperty("email") + public String email() { + return email; + } + + @JsonProperty("expiresAt") public long expiresAt() { return expiresAt; } + + @JsonProperty("issuedAt") + public long issuedAt() { + return issuedAt; + } } diff --git a/kafka-client/src/main/resources/responsive-license-keys/keys/license-signing-key-0.pem b/kafka-client/src/main/resources/responsive-license-keys/keys/license-signing-key-0.pem new file mode 100644 index 000000000..eb627d909 --- /dev/null +++ b/kafka-client/src/main/resources/responsive-license-keys/keys/license-signing-key-0.pem @@ -0,0 +1,14 @@ +-----BEGIN PUBLIC KEY----- +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA37LvpjgfWmlVAY/wefQ+ +2cL79J7UNJePfsQWjzMc4p8ITBcTGpX/RalhdLJQjbD/SJymAOEfy56RHbuG8vS0 +u79as6yhby8NWFaT1vNsYKOLfoUwqxi49TvxzXkBwdrLPXLbgIW7TNQpYIyM17Tl +tuceASZJGY7dECzeZY303XsmXgsLjaNHqdLlxtdzl8i8i/diqK8/I6oKL0/AsIaC +ZPfuPdQebtko4eE6p4pKjNu3qxNNdV73nV2WwHHMCE/U4CiTbH1nvpSVgr8sLBcW +EtbjDwJi+wp3OX5vblWHskLJDjdjAGbNH89UYyaWTV6C58dTGH4zmffw649Ib+80 +ywB8uH8IuoAJMXgECDT5XygXX2362z8MI4apMV6ouT90KyvamIVXLV1VpNyTn9ZY +AYXkZxImUujhB89Lz/b5ctK/epzi+5/ZDQyrgNaPCwFoPnE/QLeuHmrT2gM381S8 +3Y7fJmAa7sSRF0aPvTDM7hfmP2TRA29106qrEqPh2EX64mqoKCMkM6ZngCpiJgFZ +usvCRRUdBHnXKfyqFux6TBOVrKGrahJVLgkKMaFXma0U+peuXHSE1sktjtb2zUP9 +kqP1bwBtPD43epWPnVxWBqIcXD3poLBu+Hj8lVBV+NQ+XtTPzWmcD8+MSC0zzCUK +umd9JPEA9pzJAqlpRYTEGaUCAwEAAQ== +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/kafka-client/src/main/resources/responsive-license-keys/license-keys.json b/kafka-client/src/main/resources/responsive-license-keys/license-keys.json index bb9a9f69c..dc223c927 100644 --- a/kafka-client/src/main/resources/responsive-license-keys/license-keys.json +++ b/kafka-client/src/main/resources/responsive-license-keys/license-keys.json @@ -1 +1,9 @@ -{"keys": []} \ No newline at end of file +{ + "keys": [ + { + "type": "RSA_4096", + "keyId": "license-signing-key-0", + "path": "/responsive-license-keys/keys/license-signing-key-0.pem" + } + ] +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/GenerateTrialLicense.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/GenerateTrialLicense.java new file mode 100644 index 000000000..0a68eee2d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/GenerateTrialLicense.java @@ -0,0 +1,60 @@ +package dev.responsive.kafka.internal.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.responsive.kafka.internal.license.model.LicenseDocumentV1; +import dev.responsive.kafka.internal.license.model.TimedTrialV1; +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.model.MessageType; +import software.amazon.awssdk.services.kms.model.SignRequest; +import software.amazon.awssdk.services.kms.model.SignResponse; +import software.amazon.awssdk.services.kms.model.SigningAlgorithmSpec; + +public class GenerateTrialLicense { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String SIGNING_KEY_ARN = System.getenv("SIGNING_KEY_ARN"); + private static final String SIGNING_KEY_ID = System.getenv("SIGNING_KEY_ID"); + private static final String AWS_PROFILE = System.getenv("AWS_PROFILE"); + private static final String SIGNING_ALGO = "RSASSA_PSS_SHA_256"; + + public static void main(final String[] args) throws IOException { + final var info = new TimedTrialV1( + "timed_trial_v1", + "test@responsive.dev", + Instant.now().getEpochSecond(), + Instant.MAX.getEpochSecond() + ); + final byte[] serialized = MAPPER.writeValueAsBytes(info); + final Region region = Region.US_WEST_2; + final byte[] signature; + try (final KmsClient kmsClient = KmsClient.builder() + .credentialsProvider(ProfileCredentialsProvider.create(AWS_PROFILE)) + .region(region) + .build() + ) { + final SdkBytes sdkBytes = SdkBytes.fromByteArray(serialized); + final SignRequest signRequest = SignRequest.builder() + .keyId(SIGNING_KEY_ARN) + .message(sdkBytes) + .messageType(MessageType.RAW) + .signingAlgorithm(SigningAlgorithmSpec.RSASSA_PSS_SHA_256) + .build(); + final SignResponse signResponse = kmsClient.sign(signRequest); + final SdkBytes sdkSignature = signResponse.signature(); + signature = sdkSignature.asByteArray(); + } + final LicenseDocumentV1 license = new LicenseDocumentV1( + "1", + Base64.getEncoder().encodeToString(serialized), + Base64.getEncoder().encodeToString(signature), + SIGNING_KEY_ID, + SIGNING_ALGO + ); + System.out.println(MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(license)); + } +} From 8259dd0d38a5d0be90e80499231f63d43062ed63 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Tue, 19 Nov 2024 13:33:53 -0800 Subject: [PATCH 8/9] fix getting pem file from jar --- .../examples/e2etest/E2ETestApplication.java | 3 +- .../license/LicenseAuthenticator.java | 10 +----- .../license/PublicKeyPemFileParser.java | 22 ++++++++---- .../license/PublicKeyPemFileParserTest.java | 35 ++++++++----------- 4 files changed, 32 insertions(+), 38 deletions(-) diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java index b5704a61c..16935b22b 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java @@ -70,7 +70,8 @@ public synchronized void start() { E2ETestUtils.maybeCreateKeyspace(properties); // build topology after creating keyspace because we use keyspace retry // to wait for scylla to resolve - properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key"); + //properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key"); + properties.put(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, "ewogICJ2ZXJzaW9uIiA6ICIxIiwKICAiaW5mbyIgOiAiZXlKMGVYQmxJam9pZEdsdFpXUmZkSEpwWVd4ZmRqRWlMQ0psYldGcGJDSTZJblJsYzNSQWNtVnpjRzl1YzJsMlpTNWtaWFlpTENKcGMzTjFaV1JCZENJNk1UY3pNakEwTlRVNU1pd2laWGh3YVhKbGMwRjBJam96TVRVMU5qZzRPVGcyTkRRd016RTVPWDA9IiwKICAic2lnbmF0dXJlIiA6ICJnVnQ1bWZ1eFhKK2k2YkNqSTJrMkh4ZFozUzJwamN4T2ZxcVh5elM0WjcvUUZUNmJzNWt1MWhtSzZicmZaZHIxSVZLM0NmeXZyWWI0YVNVS2dRZ1ZLaDdBZzVmVEZvSE4ybGRRSklUUFdVQ3dxTFZaQVpwdy80cVQrbmdZSWFNZVhTVUFQL1hueTNTVm1pTFpvMlJpK2FlN05BYXNXR05aNEVwODhPREwzSEh0SkJjcGFYWTlheGZKUmtoYzlvbEx0T0Y2cWhtWlRzcmozZjdpOU9IWmsvQVFYU2JuS2RVMGprSkxMNTIwVUNqeFJxek8rc25sL28zWGM2eDg1ZlBTbFFxTHBLL043Sk9ZSVpwemR1Q1hEQkRFZUZaZTVvZ1h6dkpvN1VJTkkrbUFKTzM1Z2F0WWF2K0VqaERMR2x3TzlXZzBVRHFPNkJaRzhmVVlKeEt0UXhYSUFyTUhrVHpRM3VFODJWMUw2dkdDZVBiTHFxMGtFdkpBQ1N5WnFUOE56VjVjM3B3cEhIVXRVdmE4NnpTMllRTkJXRy9TMkJsdTVvY2JKQ3hDQ0lybUJWYnROTmdtTFVEblpUUzg3ejR0ZFRrZklsWkZOZjZpUTBMdVBqRlUxMm9yRWhkQmFkaU1Cc20reU5HS3pJQXZXb24zV0V2ZGE0RGxYcm5pa2dlcDlFV0RIcWxFdUVVWXkxWEFaRGJqM0pEbzZ0OHN5YUpxR01IUktPbTc0R21mMllmVE5NZzdaUElmWFE1VksvTVBXcEdyamZtcmhITEEvQmhBQzh5ZUJQTTFUNXFKM1NQRTJ4dFZNWnlHb1g0cFh0Y2hqZzFYSXgxNXo1UWFpdzA4RExGa3gydkdJWXNXb3NDblNMTCs4Z1l4dXFPbkU3R2ovdXhEeHA2M2pFMD0iLAogICJrZXkiIDogImxpY2Vuc2Utc2lnbmluZy1rZXktMCIsCiAgImFsZ28iIDogIlJTQVNTQV9QU1NfU0hBXzI1NiIKfQo="); E2ETestUtils.retryFor( () -> kafkaStreams = buildTopology(properties), Duration.ofMinutes(5) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java index 0e3c41639..a6dfae608 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java @@ -22,9 +22,7 @@ import dev.responsive.kafka.internal.license.model.LicenseDocumentV1; import dev.responsive.kafka.internal.license.model.LicenseInfo; import dev.responsive.kafka.internal.license.model.SigningKeys; -import java.io.File; import java.io.IOException; -import java.net.URISyntaxException; import java.security.GeneralSecurityException; import java.security.KeyFactory; import java.security.NoSuchAlgorithmException; @@ -87,13 +85,7 @@ private byte[] verifyLicenseV1Signature(final LicenseDocumentV1 license) { } private PublicKey loadPublicKey(final SigningKeys.SigningKey signingKey) { - final File file; - try { - file = new File(this.getClass().getResource(signingKey.path()).toURI()); - } catch (final URISyntaxException e) { - throw new RuntimeException(e); - } - final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFile(file); + final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFileInResource(signingKey.path()); final KeyFactory keyFactory; try { keyFactory = KeyFactory.getInstance("RSA"); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java index 1f45ffdf1..8e494a5fa 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java @@ -16,11 +16,13 @@ package dev.responsive.kafka.internal.license; -import java.io.File; +import java.io.BufferedReader; import java.io.IOException; -import java.nio.file.Files; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.Base64; import java.util.List; +import java.util.stream.Collectors; public class PublicKeyPemFileParser { private static final String HEADER_PREFIX = "-----"; @@ -31,13 +33,19 @@ public class PublicKeyPemFileParser { private static final String END_PUBLIC_KEY_HEADER = HEADER_PREFIX + END_PUBLIC_KEY + HEADER_PREFIX; - public static byte[] parsePemFile(final File file) { - final List lines; - try { - lines = Files.readAllLines(file.toPath()); - } catch (IOException e) { + public static byte[] parsePemFileInResource(final String path) { + try (final InputStream inputStream = PublicKeyPemFileParser.class.getResourceAsStream(path)) { + return parsePemFile(inputStream); + } catch (final IOException e) { throw new RuntimeException(e); } + } + + private static byte[] parsePemFile(final InputStream inputStream) throws IOException { + final List lines; + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + lines = reader.lines().collect(Collectors.toList()); + } final StringBuilder keyB64Builder = new StringBuilder(); boolean foundBegin = false; for (final String l : lines) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java index 172fc84ef..954c06fb2 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java @@ -20,8 +20,6 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.io.File; -import java.net.URISyntaxException; import org.junit.jupiter.api.Test; class PublicKeyPemFileParserTest { @@ -29,10 +27,10 @@ class PublicKeyPemFileParserTest { @Test public void shouldParseValidPemFile() { // given: - final File file = getPemFile("valid.pem"); + final String file = getPemFile("valid.pem"); // when: - final byte[] key = PublicKeyPemFileParser.parsePemFile(file); + final byte[] key = PublicKeyPemFileParser.parsePemFileInResource(file); // then: assertThat(new String(key), is("foobarbaz")); @@ -41,10 +39,10 @@ public void shouldParseValidPemFile() { @Test public void shouldParsePemFileWithRealKey() { // given: - final File file = getPemFile("valid-real.pem"); + final String file = getPemFile("valid-real.pem"); // when: - final byte[] key = PublicKeyPemFileParser.parsePemFile(file); + final byte[] key = PublicKeyPemFileParser.parsePemFileInResource(file); // then: assertThat(key.length, is(550)); @@ -53,10 +51,10 @@ public void shouldParsePemFileWithRealKey() { @Test public void shouldParseValidPemFileWithComment() { // given: - final File file = getPemFile("valid-with-comment.pem"); + final String file = getPemFile("valid-with-comment.pem"); // when: - final byte[] key = PublicKeyPemFileParser.parsePemFile(file); + final byte[] key = PublicKeyPemFileParser.parsePemFileInResource(file); // then: assertThat(new String(key), is("foobarbaz")); @@ -65,29 +63,24 @@ public void shouldParseValidPemFileWithComment() { @Test public void shouldFailToParseInvalidPemFileWithMissingFooter() { // given: - final File file = getPemFile("invalid-missing-footer.pem"); + final String file = getPemFile("invalid-missing-footer.pem"); // when/then: - assertThrows(IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFile(file)); + assertThrows( + IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFileInResource(file)); } @Test public void shouldFailToParseInvalidPemFileWithMissingHeader() { // given: - final File file = getPemFile("invalid-missing-header.pem"); + final String file = getPemFile("invalid-missing-header.pem"); // when/then: - assertThrows(IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFile(file)); + assertThrows( + IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFileInResource(file)); } - private File getPemFile(final String filename) { - final String path = "public-key-pem-file-parser-test/" + filename; - try { - return new File( - PublicKeyPemFileParserTest.class.getClassLoader().getResource(path).toURI() - ); - } catch (final URISyntaxException e) { - throw new RuntimeException(e); - } + private String getPemFile(final String filename) { + return "/public-key-pem-file-parser-test/" + filename; } } \ No newline at end of file From f9642fcce9319ea112a89e4ae7f6c1463b904657 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Tue, 19 Nov 2024 13:35:31 -0800 Subject: [PATCH 9/9] fix e2e test --- .../dev/responsive/examples/e2etest/E2ETestApplication.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java index 16935b22b..b5704a61c 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java @@ -70,8 +70,7 @@ public synchronized void start() { E2ETestUtils.maybeCreateKeyspace(properties); // build topology after creating keyspace because we use keyspace retry // to wait for scylla to resolve - //properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key"); - properties.put(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, "ewogICJ2ZXJzaW9uIiA6ICIxIiwKICAiaW5mbyIgOiAiZXlKMGVYQmxJam9pZEdsdFpXUmZkSEpwWVd4ZmRqRWlMQ0psYldGcGJDSTZJblJsYzNSQWNtVnpjRzl1YzJsMlpTNWtaWFlpTENKcGMzTjFaV1JCZENJNk1UY3pNakEwTlRVNU1pd2laWGh3YVhKbGMwRjBJam96TVRVMU5qZzRPVGcyTkRRd016RTVPWDA9IiwKICAic2lnbmF0dXJlIiA6ICJnVnQ1bWZ1eFhKK2k2YkNqSTJrMkh4ZFozUzJwamN4T2ZxcVh5elM0WjcvUUZUNmJzNWt1MWhtSzZicmZaZHIxSVZLM0NmeXZyWWI0YVNVS2dRZ1ZLaDdBZzVmVEZvSE4ybGRRSklUUFdVQ3dxTFZaQVpwdy80cVQrbmdZSWFNZVhTVUFQL1hueTNTVm1pTFpvMlJpK2FlN05BYXNXR05aNEVwODhPREwzSEh0SkJjcGFYWTlheGZKUmtoYzlvbEx0T0Y2cWhtWlRzcmozZjdpOU9IWmsvQVFYU2JuS2RVMGprSkxMNTIwVUNqeFJxek8rc25sL28zWGM2eDg1ZlBTbFFxTHBLL043Sk9ZSVpwemR1Q1hEQkRFZUZaZTVvZ1h6dkpvN1VJTkkrbUFKTzM1Z2F0WWF2K0VqaERMR2x3TzlXZzBVRHFPNkJaRzhmVVlKeEt0UXhYSUFyTUhrVHpRM3VFODJWMUw2dkdDZVBiTHFxMGtFdkpBQ1N5WnFUOE56VjVjM3B3cEhIVXRVdmE4NnpTMllRTkJXRy9TMkJsdTVvY2JKQ3hDQ0lybUJWYnROTmdtTFVEblpUUzg3ejR0ZFRrZklsWkZOZjZpUTBMdVBqRlUxMm9yRWhkQmFkaU1Cc20reU5HS3pJQXZXb24zV0V2ZGE0RGxYcm5pa2dlcDlFV0RIcWxFdUVVWXkxWEFaRGJqM0pEbzZ0OHN5YUpxR01IUktPbTc0R21mMllmVE5NZzdaUElmWFE1VksvTVBXcEdyamZtcmhITEEvQmhBQzh5ZUJQTTFUNXFKM1NQRTJ4dFZNWnlHb1g0cFh0Y2hqZzFYSXgxNXo1UWFpdzA4RExGa3gydkdJWXNXb3NDblNMTCs4Z1l4dXFPbkU3R2ovdXhEeHA2M2pFMD0iLAogICJrZXkiIDogImxpY2Vuc2Utc2lnbmluZy1rZXktMCIsCiAgImFsZ28iIDogIlJTQVNTQV9QU1NfU0hBXzI1NiIKfQo="); + properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key"); E2ETestUtils.retryFor( () -> kafkaStreams = buildTopology(properties), Duration.ofMinutes(5)