Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configs to accept license and verify it #386

Merged
merged 9 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;

import com.fasterxml.jackson.databind.ObjectMapper;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
Expand All @@ -45,6 +46,11 @@
import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory;
import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.license.LicenseAuthenticator;
import dev.responsive.kafka.internal.license.LicenseChecker;
import dev.responsive.kafka.internal.license.model.LicenseDocument;
import dev.responsive.kafka.internal.license.model.LicenseInfo;
import dev.responsive.kafka.internal.license.model.SigningKeys;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
Expand All @@ -55,7 +61,11 @@
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -88,6 +98,8 @@

public class ResponsiveKafkaStreams extends KafkaStreams {

private static final String SIGNING_KEYS_PATH = "license-keys/license-keys.json";

private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);

private final ResponsiveMetrics responsiveMetrics;
Expand Down Expand Up @@ -200,6 +212,8 @@ protected ResponsiveKafkaStreams(final Params params) {
params.time
);

validateLicense(params.responsiveConfig);

if (params.compatibilityMode == CompatibilityMode.FULL) {
try {
ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs);
Expand Down Expand Up @@ -260,6 +274,57 @@ private static ResponsiveMetrics createMetrics(
), exportService);
}

private static void validateLicense(final ResponsiveConfig configs) {
if (!configs.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) {
return;
}
final LicenseDocument licenseDocument = loadLicense(configs);
final SigningKeys signingKeys = loadSigningKeys();
final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys);
final LicenseInfo licenseInfo = licenseAuthenticator.authenticate(licenseDocument);
final LicenseChecker checker = new LicenseChecker();
checker.checkLicense(licenseInfo);
}

private static SigningKeys loadSigningKeys() {
try {
return new ObjectMapper().readValue(
ResponsiveKafkaStreams.class.getClassLoader().getResource(SIGNING_KEYS_PATH),
SigningKeys.class
);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private static LicenseDocument loadLicense(final ResponsiveConfig configs) {
final String license = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG);
final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG);
if (license.isEmpty() == licenseFile.isEmpty()) {
throw new ConfigException(String.format(
"Must set exactly one of %s or %s",
ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG,
ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG
));
}
final String licenseB64;
if (!license.isEmpty()) {
licenseB64 = license;
} else {
try {
licenseB64 = Files.readString(new File(licenseFile).toPath());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
final ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

/**
* Fill in the props with any overrides and all internal objects shared via the configs
* before these get finalized as a {@link StreamsConfig} object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public class ResponsiveConfig extends AbstractConfig {
public static final String RESPONSIVE_ENV_CONFIG = "responsive.env";
private static final String RESPONSIVE_ENV_DOC = "The Responsive environment slug (not the environment ID).";

public static final String RESPONSIVE_LICENSE_CONFIG = "responsive.license";
private static final String RESPONSIVE_LICENSE_DOC = "The license you're using to run Responsive";

public static final String RESPONSIVE_LICENSE_FILE_CONFIG = "responsive.license.file";
private static final String RESPONSIVE_LICENSE_FILE_DOC
= "A path to a file containing your license.";

public static final String COMPATIBILITY_MODE_CONFIG = "responsive.compatibility.mode";
private static final String COMPATIBILITY_MODE_DOC = "This configuration enables running Responsive "
+ "in compatibility mode, disabling certain features.";
Expand Down Expand Up @@ -314,6 +321,18 @@ public class ResponsiveConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.HIGH,
RESPONSIVE_ENV_DOC
).define(
RESPONSIVE_LICENSE_CONFIG,
Type.STRING,
"",
Importance.HIGH,
RESPONSIVE_LICENSE_DOC
).define(
RESPONSIVE_LICENSE_FILE_CONFIG,
Type.STRING,
"",
Importance.HIGH,
RESPONSIVE_LICENSE_FILE_DOC
).define(
STORAGE_BACKEND_TYPE_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"keys": []}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess when we create the keys well update it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, that's right!

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package dev.responsive.kafka.api;

import static dev.responsive.kafka.api.config.ResponsiveConfig.COMPATIBILITY_MODE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.PLATFORM_API_KEY_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ORG_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
Expand All @@ -37,14 +40,22 @@
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.CassandraClientFactory;
import dev.responsive.kafka.internal.license.exception.LicenseAuthenticationException;
import dev.responsive.kafka.internal.license.exception.LicenseUseViolationException;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.testutils.IntegrationTestUtils;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -62,6 +73,12 @@

@ExtendWith(MockitoExtension.class)
class ResponsiveKafkaStreamsTest {
private static final String DECODED_LICENSE_FILE
= "test-licenses/test-license.json";
private static final String DECODED_INVALID_LICENSE_FILE
= "test-licenses/test-license-invalid-signature.json";
private static final String DECODED_TRIAL_EXPIRED_LICENSE_FILE
= "test-licenses/test-license-trial-expired.json";

private final KafkaClientSupplier supplier = new DefaultKafkaClientSupplier() {
@Override
Expand Down Expand Up @@ -126,6 +143,11 @@ public void setUp() {

properties.put(RESPONSIVE_ORG_CONFIG, "responsive");
properties.put(RESPONSIVE_ENV_CONFIG, "license-test");

properties.put(
RESPONSIVE_LICENSE_CONFIG,
getEncodedLicense(DECODED_LICENSE_FILE)
);
}

@SuppressWarnings("resource")
Expand Down Expand Up @@ -171,4 +193,123 @@ public void shouldCreateResponsiveKafkaStreamsInMetricsOnlyModeWithUnverifiedCon
ks.close();
}

@Test
public void shouldAcceptLicenseInLicenseFile() {
// given:
final File licenseFile = writeLicenseFile(DECODED_LICENSE_FILE);
properties.put(RESPONSIVE_LICENSE_CONFIG, "");
properties.put(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG, licenseFile.getAbsolutePath());
properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("foo").to("bar");

// when/then (no throw):
final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier);
ks.close();
}

@Test
public void shouldThrowOnLicenseWithInvalidSignature() {
// given:
properties.put(
RESPONSIVE_LICENSE_CONFIG,
getEncodedLicense(DECODED_INVALID_LICENSE_FILE)
);
properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("foo").to("bar");

// when/then:
assertThrows(
LicenseAuthenticationException.class,
() -> {
final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier);
ks.close();
}
);
}

@Test
public void shouldThrowOnExpiredLicense() {
// given:
properties.put(
RESPONSIVE_LICENSE_CONFIG,
getEncodedLicense(DECODED_TRIAL_EXPIRED_LICENSE_FILE)
);
properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("foo").to("bar");

// when/then:
assertThrows(
LicenseUseViolationException.class,
() -> {
final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier);
ks.close();
}
);
}

@Test
public void shouldThrowIfNoLicenseOrApiKeyConfigured() {
// given:
properties.put(PLATFORM_API_KEY_CONFIG, "");
properties.put(RESPONSIVE_LICENSE_CONFIG, "");
properties.put(RESPONSIVE_LICENSE_FILE_CONFIG, "");
properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("foo").to("bar");

// when/then:
assertThrows(
ConfigException.class,
() -> {
final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier);
ks.close();
}
);
}

@Test
public void shouldSkipLicenseCheckIfApiKeyConfigured() {
// given:
properties.put(PLATFORM_API_KEY_CONFIG, "some-api-key");
properties.put(RESPONSIVE_LICENSE_CONFIG, "");
properties.put(RESPONSIVE_LICENSE_FILE_CONFIG, "");
properties.put(COMPATIBILITY_MODE_CONFIG, CompatibilityMode.METRICS_ONLY.name());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("foo").to("bar");

// when/then (no throw):
final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier);
ks.close();
}

private File writeLicenseFile(final String decodedLicenseFilename) {
final String encoded = getEncodedLicense(decodedLicenseFilename);
try {
final File encodedFile = File.createTempFile("rkst", null);
encodedFile.deleteOnExit();
Files.writeString(encodedFile.toPath(), encoded);
return encodedFile;
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private String getEncodedLicense(final String filename) {
return Base64.getEncoder().encodeToString(slurpFile(filename));
}

private byte[] slurpFile(final String filename) {
try {
final File file = new File(ResponsiveKafkaStreamsTest.class.getClassLoader()
.getResource(filename)
.toURI()
);
return Files.readAllBytes(file.toPath());
} catch (final IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LicenseAuthenticatorTest {
@Test
public void shouldVerifyLicense() {
// given:
final LicenseDocument license = loadLicense("license-test.json");
final LicenseDocument license = loadLicense("test-license.json");

// when/then (no throw):
verifier.authenticate(license);
Expand All @@ -42,7 +42,7 @@ public void shouldVerifyLicense() {
@Test
public void shouldThrowForFailedSignatureVerification() {
// given:
final LicenseDocument license = loadLicense("license-test-invalid-signature.json");
final LicenseDocument license = loadLicense("test-license-invalid-signature.json");

// when/then:
assertThrows(
Expand All @@ -52,18 +52,17 @@ public void shouldThrowForFailedSignatureVerification() {
}

private static LicenseDocument loadLicense(final String file) {
return loadResource(file, LicenseDocument.class);
return loadResource("test-licenses/" + file, LicenseDocument.class);
}

private static SigningKeys loadSigningKeys() {
return loadResource("signing-keys.json", SigningKeys.class);
return loadResource("license-keys/license-keys.json", SigningKeys.class);
}

private static <T> T loadResource(final String path, final Class<T> clazz) {
final String fullPath = "license-test/license-verifier/" + path;
try {
return MAPPER.readValue(
LicenseAuthenticatorTest.class.getClassLoader().getResource(fullPath),
LicenseAuthenticatorTest.class.getClassLoader().getResource(path),
clazz
);
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void shouldFailToParseInvalidPemFileWithMissingHeader() {
}

private File getPemFile(final String filename) {
final String path = "license-test/public-key-pem-file-parser/" + filename;
final String path = "public-key-pem-file-parser-test/" + filename;
try {
return new File(
PublicKeyPemFileParserTest.class.getClassLoader().getResource(path).toURI()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{
"type": "RSA_4096",
"keyId": "test",
"path": "license-test/license-verifier/keys/test.pem"
"path": "license-keys/keys/test.pem"
}
]
}
Loading