diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java index 02fc60186..b5704a61c 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java @@ -70,6 +70,7 @@ public synchronized void start() { E2ETestUtils.maybeCreateKeyspace(properties); // build topology after creating keyspace because we use keyspace retry // to wait for scylla to resolve + properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key"); E2ETestUtils.retryFor( () -> kafkaStreams = buildTopology(properties), Duration.ofMinutes(5) diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index b084fe206..35205486c 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -95,4 +95,6 @@ dependencies { testImplementation(testlibs.bundles.base) testImplementation(testlibs.bundles.testcontainers) testImplementation(libs.kafka.streams.test.utils) + testImplementation("software.amazon.awssdk:kms:2.20.0") + testImplementation("software.amazon.awssdk:sso:2.20.0") } \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 9c3aa9a48..69ac2381f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -32,6 +32,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import com.fasterxml.jackson.databind.ObjectMapper; import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -45,6 +46,11 @@ import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory; import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions; import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; +import dev.responsive.kafka.internal.license.LicenseAuthenticator; +import dev.responsive.kafka.internal.license.LicenseChecker; +import dev.responsive.kafka.internal.license.model.LicenseDocument; +import dev.responsive.kafka.internal.license.model.LicenseInfo; +import dev.responsive.kafka.internal.license.model.SigningKeys; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener; @@ -55,7 +61,11 @@ import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.SessionUtil; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; +import java.util.Base64; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -88,6 +98,8 @@ public class ResponsiveKafkaStreams extends KafkaStreams { + private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json"; + private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class); private final ResponsiveMetrics responsiveMetrics; @@ -200,6 +212,8 @@ protected ResponsiveKafkaStreams(final Params params) { params.time ); + validateLicense(params.responsiveConfig); + if (params.compatibilityMode == CompatibilityMode.FULL) { try { ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs); @@ -260,6 +274,57 @@ private static ResponsiveMetrics createMetrics( ), exportService); } + private static void validateLicense(final ResponsiveConfig configs) { + if (!configs.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) { + return; + } + final LicenseDocument licenseDocument = loadLicense(configs); + final SigningKeys signingKeys = loadSigningKeys(); + final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys); + final LicenseInfo licenseInfo = licenseAuthenticator.authenticate(licenseDocument); + final LicenseChecker checker = new LicenseChecker(); + checker.checkLicense(licenseInfo); + } + + private static SigningKeys loadSigningKeys() { + try { + return new ObjectMapper().readValue( + ResponsiveKafkaStreams.class.getResource(SIGNING_KEYS_PATH), + SigningKeys.class + ); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private static LicenseDocument loadLicense(final ResponsiveConfig configs) { + final String license = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG); + final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG); + if (license.isEmpty() == licenseFile.isEmpty()) { + throw new ConfigException(String.format( + "Must set exactly one of %s or %s", + ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, + ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG + )); + } + final String licenseB64; + if (!license.isEmpty()) { + licenseB64 = license; + } else { + try { + licenseB64 = Files.readString(new File(licenseFile).toPath()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + final ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + /** * Fill in the props with any overrides and all internal objects shared via the configs * before these get finalized as a {@link StreamsConfig} object diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index acbc1b63d..c31b3208a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -49,6 +49,13 @@ public class ResponsiveConfig extends AbstractConfig { public static final String RESPONSIVE_ENV_CONFIG = "responsive.env"; private static final String RESPONSIVE_ENV_DOC = "The Responsive environment slug (not the environment ID)."; + public static final String RESPONSIVE_LICENSE_CONFIG = "responsive.license"; + private static final String RESPONSIVE_LICENSE_DOC = "The license you're using to run Responsive"; + + public static final String RESPONSIVE_LICENSE_FILE_CONFIG = "responsive.license.file"; + private static final String RESPONSIVE_LICENSE_FILE_DOC + = "A path to a file containing your license."; + public static final String COMPATIBILITY_MODE_CONFIG = "responsive.compatibility.mode"; private static final String COMPATIBILITY_MODE_DOC = "This configuration enables running Responsive " + "in compatibility mode, disabling certain features."; @@ -314,6 +321,18 @@ public class ResponsiveConfig extends AbstractConfig { new ConfigDef.NonEmptyString(), Importance.HIGH, RESPONSIVE_ENV_DOC + ).define( + RESPONSIVE_LICENSE_CONFIG, + Type.STRING, + "", + Importance.HIGH, + RESPONSIVE_LICENSE_DOC + ).define( + RESPONSIVE_LICENSE_FILE_CONFIG, + Type.STRING, + "", + Importance.HIGH, + RESPONSIVE_LICENSE_FILE_DOC ).define( STORAGE_BACKEND_TYPE_CONFIG, Type.STRING, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java index 5013e8bb6..a6dfae608 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseAuthenticator.java @@ -22,9 +22,7 @@ import dev.responsive.kafka.internal.license.model.LicenseDocumentV1; import dev.responsive.kafka.internal.license.model.LicenseInfo; import dev.responsive.kafka.internal.license.model.SigningKeys; -import java.io.File; import java.io.IOException; -import java.net.URISyntaxException; import java.security.GeneralSecurityException; import java.security.KeyFactory; import java.security.NoSuchAlgorithmException; @@ -87,13 +85,7 @@ private byte[] verifyLicenseV1Signature(final LicenseDocumentV1 license) { } private PublicKey loadPublicKey(final SigningKeys.SigningKey signingKey) { - final File file; - try { - file = new File(this.getClass().getClassLoader().getResource(signingKey.path()).toURI()); - } catch (final URISyntaxException e) { - throw new RuntimeException(e); - } - final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFile(file); + final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFileInResource(signingKey.path()); final KeyFactory keyFactory; try { keyFactory = KeyFactory.getInstance("RSA"); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java index 1f45ffdf1..8e494a5fa 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParser.java @@ -16,11 +16,13 @@ package dev.responsive.kafka.internal.license; -import java.io.File; +import java.io.BufferedReader; import java.io.IOException; -import java.nio.file.Files; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.Base64; import java.util.List; +import java.util.stream.Collectors; public class PublicKeyPemFileParser { private static final String HEADER_PREFIX = "-----"; @@ -31,13 +33,19 @@ public class PublicKeyPemFileParser { private static final String END_PUBLIC_KEY_HEADER = HEADER_PREFIX + END_PUBLIC_KEY + HEADER_PREFIX; - public static byte[] parsePemFile(final File file) { - final List lines; - try { - lines = Files.readAllLines(file.toPath()); - } catch (IOException e) { + public static byte[] parsePemFileInResource(final String path) { + try (final InputStream inputStream = PublicKeyPemFileParser.class.getResourceAsStream(path)) { + return parsePemFile(inputStream); + } catch (final IOException e) { throw new RuntimeException(e); } + } + + private static byte[] parsePemFile(final InputStream inputStream) throws IOException { + final List lines; + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + lines = reader.lines().collect(Collectors.toList()); + } final StringBuilder keyB64Builder = new StringBuilder(); boolean foundBegin = false; for (final String l : lines) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java index 8913e2d21..990355926 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocument.java @@ -37,4 +37,9 @@ public abstract class LicenseDocument { public LicenseDocument(@JsonProperty("version") final String version) { this.version = version; } + + @JsonProperty("version") + public String version() { + return version; + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java index ea3b9598c..5e4d5fbab 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseDocumentV1.java @@ -43,18 +43,22 @@ public LicenseDocumentV1( this.algo = algo; } + @JsonProperty("info") public String info() { return info; } + @JsonProperty("key") public String key() { return key; } + @JsonProperty("signature") public String signature() { return signature; } + @JsonProperty("algo") public String algo() { return algo; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java index 3f59e6bc6..ef0188866 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java @@ -35,4 +35,9 @@ public abstract class LicenseInfo { LicenseInfo(@JsonProperty("type") final String type) { this.type = type; } + + @JsonProperty("type") + public String type() { + return type; + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java index 70178c900..567bafe91 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java @@ -38,7 +38,18 @@ public TimedTrialV1( this.expiresAt = expiresAt; } + @JsonProperty("email") + public String email() { + return email; + } + + @JsonProperty("expiresAt") public long expiresAt() { return expiresAt; } + + @JsonProperty("issuedAt") + public long issuedAt() { + return issuedAt; + } } diff --git a/kafka-client/src/main/resources/responsive-license-keys/keys/license-signing-key-0.pem b/kafka-client/src/main/resources/responsive-license-keys/keys/license-signing-key-0.pem new file mode 100644 index 000000000..eb627d909 --- /dev/null +++ b/kafka-client/src/main/resources/responsive-license-keys/keys/license-signing-key-0.pem @@ -0,0 +1,14 @@ +-----BEGIN PUBLIC KEY----- +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA37LvpjgfWmlVAY/wefQ+ +2cL79J7UNJePfsQWjzMc4p8ITBcTGpX/RalhdLJQjbD/SJymAOEfy56RHbuG8vS0 +u79as6yhby8NWFaT1vNsYKOLfoUwqxi49TvxzXkBwdrLPXLbgIW7TNQpYIyM17Tl +tuceASZJGY7dECzeZY303XsmXgsLjaNHqdLlxtdzl8i8i/diqK8/I6oKL0/AsIaC +ZPfuPdQebtko4eE6p4pKjNu3qxNNdV73nV2WwHHMCE/U4CiTbH1nvpSVgr8sLBcW +EtbjDwJi+wp3OX5vblWHskLJDjdjAGbNH89UYyaWTV6C58dTGH4zmffw649Ib+80 +ywB8uH8IuoAJMXgECDT5XygXX2362z8MI4apMV6ouT90KyvamIVXLV1VpNyTn9ZY +AYXkZxImUujhB89Lz/b5ctK/epzi+5/ZDQyrgNaPCwFoPnE/QLeuHmrT2gM381S8 +3Y7fJmAa7sSRF0aPvTDM7hfmP2TRA29106qrEqPh2EX64mqoKCMkM6ZngCpiJgFZ +usvCRRUdBHnXKfyqFux6TBOVrKGrahJVLgkKMaFXma0U+peuXHSE1sktjtb2zUP9 +kqP1bwBtPD43epWPnVxWBqIcXD3poLBu+Hj8lVBV+NQ+XtTPzWmcD8+MSC0zzCUK +umd9JPEA9pzJAqlpRYTEGaUCAwEAAQ== +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/kafka-client/src/main/resources/responsive-license-keys/license-keys.json b/kafka-client/src/main/resources/responsive-license-keys/license-keys.json new file mode 100644 index 000000000..dc223c927 --- /dev/null +++ b/kafka-client/src/main/resources/responsive-license-keys/license-keys.json @@ -0,0 +1,9 @@ +{ + "keys": [ + { + "type": "RSA_4096", + "keyId": "license-signing-key-0", + "path": "/responsive-license-keys/keys/license-signing-key-0.pem" + } + ] +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java index cfb5aeaf6..a32c6917c 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java @@ -17,7 +17,10 @@ package dev.responsive.kafka.api; import static dev.responsive.kafka.api.config.ResponsiveConfig.COMPATIBILITY_MODE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.PLATFORM_API_KEY_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ORG_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; @@ -37,14 +40,21 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.CassandraClient; import dev.responsive.kafka.internal.db.CassandraClientFactory; +import dev.responsive.kafka.internal.license.exception.LicenseAuthenticationException; +import dev.responsive.kafka.internal.license.exception.LicenseUseViolationException; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.testutils.IntegrationTestUtils; +import dev.responsive.kafka.testutils.LicenseUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -62,6 +72,10 @@ @ExtendWith(MockitoExtension.class) class ResponsiveKafkaStreamsTest { + private static final String DECODED_INVALID_LICENSE_FILE + = "test-licenses/test-license-invalid-signature.json"; + private static final String DECODED_TRIAL_EXPIRED_LICENSE_FILE + = "test-licenses/test-license-trial-expired.json"; private final KafkaClientSupplier supplier = new DefaultKafkaClientSupplier() { @Override @@ -126,6 +140,11 @@ public void setUp() { properties.put(RESPONSIVE_ORG_CONFIG, "responsive"); properties.put(RESPONSIVE_ENV_CONFIG, "license-test"); + + properties.put( + RESPONSIVE_LICENSE_CONFIG, + LicenseUtils.getLicense() + ); } @SuppressWarnings("resource") @@ -171,4 +190,106 @@ public void shouldCreateResponsiveKafkaStreamsInMetricsOnlyModeWithUnverifiedCon ks.close(); } + @Test + public void shouldAcceptLicenseInLicenseFile() { + // given: + final File licenseFile = writeLicenseFile(LicenseUtils.getLicense()); + properties.put(RESPONSIVE_LICENSE_CONFIG, ""); + properties.put(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG, licenseFile.getAbsolutePath()); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then (no throw): + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + + @Test + public void shouldThrowOnLicenseWithInvalidSignature() { + // given: + properties.put( + RESPONSIVE_LICENSE_CONFIG, + LicenseUtils.getEncodedLicense(DECODED_INVALID_LICENSE_FILE) + ); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then: + assertThrows( + LicenseAuthenticationException.class, + () -> { + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + ); + } + + @Test + public void shouldThrowOnExpiredLicense() { + // given: + properties.put( + RESPONSIVE_LICENSE_CONFIG, + LicenseUtils.getEncodedLicense(DECODED_TRIAL_EXPIRED_LICENSE_FILE) + ); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then: + assertThrows( + LicenseUseViolationException.class, + () -> { + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + ); + } + + @Test + public void shouldThrowIfNoLicenseOrApiKeyConfigured() { + // given: + properties.put(PLATFORM_API_KEY_CONFIG, ""); + properties.put(RESPONSIVE_LICENSE_CONFIG, ""); + properties.put(RESPONSIVE_LICENSE_FILE_CONFIG, ""); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then: + assertThrows( + ConfigException.class, + () -> { + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + ); + } + + @Test + public void shouldSkipLicenseCheckIfApiKeyConfigured() { + // given: + properties.put(PLATFORM_API_KEY_CONFIG, "some-api-key"); + properties.put(RESPONSIVE_LICENSE_CONFIG, ""); + properties.put(RESPONSIVE_LICENSE_FILE_CONFIG, ""); + properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name()); + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("foo").to("bar"); + + // when/then (no throw): + final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier); + ks.close(); + } + + private File writeLicenseFile(final String encoded) { + try { + final File encodedFile = File.createTempFile("rkst", null); + encodedFile.deleteOnExit(); + Files.writeString(encodedFile.toPath(), encoded); + return encodedFile; + } catch (final IOException e) { + throw new RuntimeException(e); + } + } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java index 27592107a..c1ae5faae 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java @@ -33,7 +33,7 @@ class LicenseAuthenticatorTest { @Test public void shouldVerifyLicense() { // given: - final LicenseDocument license = loadLicense("license-test.json"); + final LicenseDocument license = loadLicense("test-license.json"); // when/then (no throw): verifier.authenticate(license); @@ -42,7 +42,7 @@ public void shouldVerifyLicense() { @Test public void shouldThrowForFailedSignatureVerification() { // given: - final LicenseDocument license = loadLicense("license-test-invalid-signature.json"); + final LicenseDocument license = loadLicense("test-license-invalid-signature.json"); // when/then: assertThrows( @@ -52,18 +52,17 @@ public void shouldThrowForFailedSignatureVerification() { } private static LicenseDocument loadLicense(final String file) { - return loadResource(file, LicenseDocument.class); + return loadResource("/test-licenses/" + file, LicenseDocument.class); } private static SigningKeys loadSigningKeys() { - return loadResource("signing-keys.json", SigningKeys.class); + return loadResource("/responsive-license-keys/license-keys.json", SigningKeys.class); } private static T loadResource(final String path, final Class clazz) { - final String fullPath = "license-test/license-verifier/" + path; try { return MAPPER.readValue( - LicenseAuthenticatorTest.class.getClassLoader().getResource(fullPath), + LicenseAuthenticatorTest.class.getResource(path), clazz ); } catch (final IOException e) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java index 750f3050f..954c06fb2 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/PublicKeyPemFileParserTest.java @@ -20,8 +20,6 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.io.File; -import java.net.URISyntaxException; import org.junit.jupiter.api.Test; class PublicKeyPemFileParserTest { @@ -29,10 +27,10 @@ class PublicKeyPemFileParserTest { @Test public void shouldParseValidPemFile() { // given: - final File file = getPemFile("valid.pem"); + final String file = getPemFile("valid.pem"); // when: - final byte[] key = PublicKeyPemFileParser.parsePemFile(file); + final byte[] key = PublicKeyPemFileParser.parsePemFileInResource(file); // then: assertThat(new String(key), is("foobarbaz")); @@ -41,10 +39,10 @@ public void shouldParseValidPemFile() { @Test public void shouldParsePemFileWithRealKey() { // given: - final File file = getPemFile("valid-real.pem"); + final String file = getPemFile("valid-real.pem"); // when: - final byte[] key = PublicKeyPemFileParser.parsePemFile(file); + final byte[] key = PublicKeyPemFileParser.parsePemFileInResource(file); // then: assertThat(key.length, is(550)); @@ -53,10 +51,10 @@ public void shouldParsePemFileWithRealKey() { @Test public void shouldParseValidPemFileWithComment() { // given: - final File file = getPemFile("valid-with-comment.pem"); + final String file = getPemFile("valid-with-comment.pem"); // when: - final byte[] key = PublicKeyPemFileParser.parsePemFile(file); + final byte[] key = PublicKeyPemFileParser.parsePemFileInResource(file); // then: assertThat(new String(key), is("foobarbaz")); @@ -65,29 +63,24 @@ public void shouldParseValidPemFileWithComment() { @Test public void shouldFailToParseInvalidPemFileWithMissingFooter() { // given: - final File file = getPemFile("invalid-missing-footer.pem"); + final String file = getPemFile("invalid-missing-footer.pem"); // when/then: - assertThrows(IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFile(file)); + assertThrows( + IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFileInResource(file)); } @Test public void shouldFailToParseInvalidPemFileWithMissingHeader() { // given: - final File file = getPemFile("invalid-missing-header.pem"); + final String file = getPemFile("invalid-missing-header.pem"); // when/then: - assertThrows(IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFile(file)); + assertThrows( + IllegalArgumentException.class, () -> PublicKeyPemFileParser.parsePemFileInResource(file)); } - private File getPemFile(final String filename) { - final String path = "license-test/public-key-pem-file-parser/" + filename; - try { - return new File( - PublicKeyPemFileParserTest.class.getClassLoader().getResource(path).toURI() - ); - } catch (final URISyntaxException e) { - throw new RuntimeException(e); - } + private String getPemFile(final String filename) { + return "/public-key-pem-file-parser-test/" + filename; } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/GenerateTrialLicense.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/GenerateTrialLicense.java new file mode 100644 index 000000000..0a68eee2d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/GenerateTrialLicense.java @@ -0,0 +1,60 @@ +package dev.responsive.kafka.internal.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.responsive.kafka.internal.license.model.LicenseDocumentV1; +import dev.responsive.kafka.internal.license.model.TimedTrialV1; +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.model.MessageType; +import software.amazon.awssdk.services.kms.model.SignRequest; +import software.amazon.awssdk.services.kms.model.SignResponse; +import software.amazon.awssdk.services.kms.model.SigningAlgorithmSpec; + +public class GenerateTrialLicense { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String SIGNING_KEY_ARN = System.getenv("SIGNING_KEY_ARN"); + private static final String SIGNING_KEY_ID = System.getenv("SIGNING_KEY_ID"); + private static final String AWS_PROFILE = System.getenv("AWS_PROFILE"); + private static final String SIGNING_ALGO = "RSASSA_PSS_SHA_256"; + + public static void main(final String[] args) throws IOException { + final var info = new TimedTrialV1( + "timed_trial_v1", + "test@responsive.dev", + Instant.now().getEpochSecond(), + Instant.MAX.getEpochSecond() + ); + final byte[] serialized = MAPPER.writeValueAsBytes(info); + final Region region = Region.US_WEST_2; + final byte[] signature; + try (final KmsClient kmsClient = KmsClient.builder() + .credentialsProvider(ProfileCredentialsProvider.create(AWS_PROFILE)) + .region(region) + .build() + ) { + final SdkBytes sdkBytes = SdkBytes.fromByteArray(serialized); + final SignRequest signRequest = SignRequest.builder() + .keyId(SIGNING_KEY_ARN) + .message(sdkBytes) + .messageType(MessageType.RAW) + .signingAlgorithm(SigningAlgorithmSpec.RSASSA_PSS_SHA_256) + .build(); + final SignResponse signResponse = kmsClient.sign(signRequest); + final SdkBytes sdkSignature = signResponse.signature(); + signature = sdkSignature.asByteArray(); + } + final LicenseDocumentV1 license = new LicenseDocumentV1( + "1", + Base64.getEncoder().encodeToString(serialized), + Base64.getEncoder().encodeToString(signature), + SIGNING_KEY_ID, + SIGNING_ALGO + ); + System.out.println(MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(license)); + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java new file mode 100644 index 000000000..5d4fe6299 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java @@ -0,0 +1,37 @@ +package dev.responsive.kafka.testutils; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.Base64; + +public final class LicenseUtils { + private static final String DECODED_LICENSE_FILE + = "test-licenses/test-license.json"; + + private LicenseUtils() { + } + + public static String getLicense() { + return getEncodedLicense(DECODED_LICENSE_FILE); + } + + public static String getEncodedLicense(final String filename) { + return Base64.getEncoder().encodeToString(slurpFile(filename)); + } + + private static byte[] slurpFile(final String filename) { + try { + final File file = new File(LicenseUtils.class.getClassLoader() + .getResource(filename) + .toURI() + ); + return Files.readAllBytes(file.toPath()); + } catch (final IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + + +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java index 3ee5ee515..f0b9f28c5 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java @@ -23,6 +23,7 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ORG_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE; @@ -126,7 +127,8 @@ public Object resolveParameter( INTERNAL_TASK_ASSIGNOR_CLASS, TASK_ASSIGNOR_CLASS_OVERRIDE, BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), CASSANDRA_DESIRED_NUM_PARTITION_CONFIG, -1, - CASSANDRA_CHECK_INTERVAL_MS, 100 + CASSANDRA_CHECK_INTERVAL_MS, 100, + RESPONSIVE_LICENSE_CONFIG, LicenseUtils.getLicense() )); switch (backend) { diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-footer.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-footer.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-footer.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-footer.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-header.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-header.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/invalid-missing-header.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/invalid-missing-header.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-real.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-real.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-real.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-real.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-with-comment.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-with-comment.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid-with-comment.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/valid-with-comment.pem diff --git a/kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid.pem b/kafka-client/src/test/resources/public-key-pem-file-parser-test/valid.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/public-key-pem-file-parser/valid.pem rename to kafka-client/src/test/resources/public-key-pem-file-parser-test/valid.pem diff --git a/kafka-client/src/test/resources/license-test/license-verifier/keys/test.pem b/kafka-client/src/test/resources/responsive-license-keys/keys/test.pem similarity index 100% rename from kafka-client/src/test/resources/license-test/license-verifier/keys/test.pem rename to kafka-client/src/test/resources/responsive-license-keys/keys/test.pem diff --git a/kafka-client/src/test/resources/license-test/license-verifier/signing-keys.json b/kafka-client/src/test/resources/responsive-license-keys/license-keys.json similarity index 57% rename from kafka-client/src/test/resources/license-test/license-verifier/signing-keys.json rename to kafka-client/src/test/resources/responsive-license-keys/license-keys.json index 1a016b667..9855c485a 100644 --- a/kafka-client/src/test/resources/license-test/license-verifier/signing-keys.json +++ b/kafka-client/src/test/resources/responsive-license-keys/license-keys.json @@ -3,7 +3,7 @@ { "type": "RSA_4096", "keyId": "test", - "path": "license-test/license-verifier/keys/test.pem" + "path": "/responsive-license-keys/keys/test.pem" } ] } \ No newline at end of file diff --git a/kafka-client/src/test/resources/license-test/license-verifier/license-test-invalid-signature.json b/kafka-client/src/test/resources/test-licenses/test-license-invalid-signature.json similarity index 100% rename from kafka-client/src/test/resources/license-test/license-verifier/license-test-invalid-signature.json rename to kafka-client/src/test/resources/test-licenses/test-license-invalid-signature.json diff --git a/kafka-client/src/test/resources/test-licenses/test-license-trial-expired.json b/kafka-client/src/test/resources/test-licenses/test-license-trial-expired.json new file mode 100644 index 000000000..db87bd789 --- /dev/null +++ b/kafka-client/src/test/resources/test-licenses/test-license-trial-expired.json @@ -0,0 +1,8 @@ +{ + "info": "eyJlbWFpbCI6InJvaGFuQHJlc3BvbnNpdmUuZGV2IiwiaXNzdWVkQXQiOjE3MzE3MDU5NzgsImV4cGlyZXNBdCI6MTczMTcwNTk3OCwidHlwZSI6InRpbWVkX3RyaWFsX3YxIn0=", + "signature": "pUgmJtjfm5icyFPV96vviJqA6YQMOJBPp7gISCdU0bih1hgGBSEblJTJ0SQVR8KtrurumJeK7NtQkPBbzeYACdsaRYOCHYmz+t6l8RYBpkBJp/qIbniR+hwipf6jtugTq1L4X2hL06Ro7LpvJxaxDt4Fm6o56n6Lr+6pSc8WO8eOaAb5stuOZzwVr1ZmnNjqoLZHpuNiNGKer2S0sV0TYRu1h3/HZCcNbVzs/h6MD4j7BvsSyJzJ/Ta9Ljrf95WANQZxW7V637xzIou1sIngX2BLnmSReuPYU2tclMS+JSFS5htIGYtkNTJdfKwcpIc07k20oEh0+dzn1UvZa0HQkHcCzCdDn3ROCZ5oAw0eLYk+qo8/4ldEC93S1lptlWrr+0c+y7hckzqFkG467oDTUiVBJ3E+sMJAuAE2CNeIPvw+l+WBFQlvSrdU0cexLY0ydXvTvR7GzijCfjuBQyO6XI5lEHGQ9WF8yS6p9Tij1rVX9xmnJyxFgoDD/QyDliP4I0pUeeXTOkacGYWj3k+UketnJs6QRdabZ4LoTrd1SucfCUieyQoRHMftEWGvb/YtZuL5GfDzVu/nxYPZgNrrv5ribymBxz7C0Rjo47imWC5X/OTbMj0vPPH05LPXG92ayuJaeL+pKO2fxepEZD1O3so9yxk83peMpNGN6Vest8s=", + "key": "test", + "algo": "RSASSA_PSS_SHA_256", + "messageType": "RAW", + "version": "1" +} diff --git a/kafka-client/src/test/resources/license-test/license-verifier/license-test.json b/kafka-client/src/test/resources/test-licenses/test-license.json similarity index 100% rename from kafka-client/src/test/resources/license-test/license-verifier/license-test.json rename to kafka-client/src/test/resources/test-licenses/test-license.json