Skip to content

Commit

Permalink
add configs to accept license and verify it (#386)
Browse files Browse the repository at this point in the history
* add configs to accept license and verify it

This patch makes the following changes to integrate license verification:
1. Adds responsive.license and responsive.license.file configs to allow
   the user to configure their license.
2. ResponsiveKafkaStreams validates the license if the user has not
   configured a cloud API key. The user could technically get around the
   license verification by configuring a bogus key. We assume that we
   won't have malicious users for now, and defer a more rigorous check
   to a future PR. We should do the check in the background and not block
   starting the app so that we don't depend on our cloud for the user's
   application to run.
3. Moves around some of the test license files so that we can use them
   from different tests.

* fix file path

* use the test license from integration tests

* lint and missing file

* fix e2e integration test

* fix some paths

* add tool to sign licenses

* fix getting pem file from jar

* fix e2e test
  • Loading branch information
rodesai authored Nov 19, 2024
1 parent 876127a commit 9ea53c1
Show file tree
Hide file tree
Showing 28 changed files with 400 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public synchronized void start() {
E2ETestUtils.maybeCreateKeyspace(properties);
// build topology after creating keyspace because we use keyspace retry
// to wait for scylla to resolve
properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key");
E2ETestUtils.retryFor(
() -> kafkaStreams = buildTopology(properties),
Duration.ofMinutes(5)
Expand Down
2 changes: 2 additions & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,6 @@ dependencies {
testImplementation(testlibs.bundles.base)
testImplementation(testlibs.bundles.testcontainers)
testImplementation(libs.kafka.streams.test.utils)
testImplementation("software.amazon.awssdk:kms:2.20.0")
testImplementation("software.amazon.awssdk:sso:2.20.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;

import com.fasterxml.jackson.databind.ObjectMapper;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
Expand All @@ -41,6 +42,11 @@
import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory;
import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.license.LicenseAuthenticator;
import dev.responsive.kafka.internal.license.LicenseChecker;
import dev.responsive.kafka.internal.license.model.LicenseDocument;
import dev.responsive.kafka.internal.license.model.LicenseInfo;
import dev.responsive.kafka.internal.license.model.SigningKeys;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
Expand All @@ -51,7 +57,11 @@
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -84,6 +94,8 @@

public class ResponsiveKafkaStreams extends KafkaStreams {

private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json";

private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);

private final ResponsiveMetrics responsiveMetrics;
Expand Down Expand Up @@ -196,6 +208,8 @@ protected ResponsiveKafkaStreams(final Params params) {
params.time
);

validateLicense(params.responsiveConfig);

if (params.compatibilityMode == CompatibilityMode.FULL) {
try {
ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs);
Expand Down Expand Up @@ -256,6 +270,57 @@ private static ResponsiveMetrics createMetrics(
), exportService);
}

private static void validateLicense(final ResponsiveConfig configs) {
if (!configs.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) {
return;
}
final LicenseDocument licenseDocument = loadLicense(configs);
final SigningKeys signingKeys = loadSigningKeys();
final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys);
final LicenseInfo licenseInfo = licenseAuthenticator.authenticate(licenseDocument);
final LicenseChecker checker = new LicenseChecker();
checker.checkLicense(licenseInfo);
}

private static SigningKeys loadSigningKeys() {
try {
return new ObjectMapper().readValue(
ResponsiveKafkaStreams.class.getResource(SIGNING_KEYS_PATH),
SigningKeys.class
);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private static LicenseDocument loadLicense(final ResponsiveConfig configs) {
final String license = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG);
final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG);
if (license.isEmpty() == licenseFile.isEmpty()) {
throw new ConfigException(String.format(
"Must set exactly one of %s or %s",
ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG,
ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG
));
}
final String licenseB64;
if (!license.isEmpty()) {
licenseB64 = license;
} else {
try {
licenseB64 = Files.readString(new File(licenseFile).toPath());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
final ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

/**
* Fill in the props with any overrides and all internal objects shared via the configs
* before these get finalized as a {@link StreamsConfig} object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public class ResponsiveConfig extends AbstractConfig {
public static final String RESPONSIVE_ENV_CONFIG = "responsive.env";
private static final String RESPONSIVE_ENV_DOC = "The Responsive environment slug (not the environment ID).";

public static final String RESPONSIVE_LICENSE_CONFIG = "responsive.license";
private static final String RESPONSIVE_LICENSE_DOC = "The license you're using to run Responsive";

public static final String RESPONSIVE_LICENSE_FILE_CONFIG = "responsive.license.file";
private static final String RESPONSIVE_LICENSE_FILE_DOC
= "A path to a file containing your license.";

public static final String COMPATIBILITY_MODE_CONFIG = "responsive.compatibility.mode";
private static final String COMPATIBILITY_MODE_DOC = "This configuration enables running Responsive "
+ "in compatibility mode, disabling certain features.";
Expand Down Expand Up @@ -310,6 +317,18 @@ public class ResponsiveConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.HIGH,
RESPONSIVE_ENV_DOC
).define(
RESPONSIVE_LICENSE_CONFIG,
Type.STRING,
"",
Importance.HIGH,
RESPONSIVE_LICENSE_DOC
).define(
RESPONSIVE_LICENSE_FILE_CONFIG,
Type.STRING,
"",
Importance.HIGH,
RESPONSIVE_LICENSE_FILE_DOC
).define(
STORAGE_BACKEND_TYPE_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import dev.responsive.kafka.internal.license.model.LicenseDocumentV1;
import dev.responsive.kafka.internal.license.model.LicenseInfo;
import dev.responsive.kafka.internal.license.model.SigningKeys;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -83,13 +81,7 @@ private byte[] verifyLicenseV1Signature(final LicenseDocumentV1 license) {
}

private PublicKey loadPublicKey(final SigningKeys.SigningKey signingKey) {
final File file;
try {
file = new File(this.getClass().getClassLoader().getResource(signingKey.path()).toURI());
} catch (final URISyntaxException e) {
throw new RuntimeException(e);
}
final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFile(file);
final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFileInResource(signingKey.path());
final KeyFactory keyFactory;
try {
keyFactory = KeyFactory.getInstance("RSA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@

package dev.responsive.kafka.internal.license;

import java.io.File;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Base64;
import java.util.List;
import java.util.stream.Collectors;

public class PublicKeyPemFileParser {
private static final String HEADER_PREFIX = "-----";
Expand All @@ -27,13 +29,19 @@ public class PublicKeyPemFileParser {
private static final String END_PUBLIC_KEY_HEADER
= HEADER_PREFIX + END_PUBLIC_KEY + HEADER_PREFIX;

public static byte[] parsePemFile(final File file) {
final List<String> lines;
try {
lines = Files.readAllLines(file.toPath());
} catch (IOException e) {
public static byte[] parsePemFileInResource(final String path) {
try (final InputStream inputStream = PublicKeyPemFileParser.class.getResourceAsStream(path)) {
return parsePemFile(inputStream);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private static byte[] parsePemFile(final InputStream inputStream) throws IOException {
final List<String> lines;
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
lines = reader.lines().collect(Collectors.toList());
}
final StringBuilder keyB64Builder = new StringBuilder();
boolean foundBegin = false;
for (final String l : lines) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public abstract class LicenseDocument {
public LicenseDocument(@JsonProperty("version") final String version) {
this.version = version;
}

@JsonProperty("version")
public String version() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@ public LicenseDocumentV1(
this.algo = algo;
}

@JsonProperty("info")
public String info() {
return info;
}

@JsonProperty("key")
public String key() {
return key;
}

@JsonProperty("signature")
public String signature() {
return signature;
}

@JsonProperty("algo")
public String algo() {
return algo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public abstract class LicenseInfo {
LicenseInfo(@JsonProperty("type") final String type) {
this.type = type;
}

@JsonProperty("type")
public String type() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ public TimedTrialV1(
this.expiresAt = expiresAt;
}

@JsonProperty("email")
public String email() {
return email;
}

@JsonProperty("expiresAt")
public long expiresAt() {
return expiresAt;
}

@JsonProperty("issuedAt")
public long issuedAt() {
return issuedAt;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA37LvpjgfWmlVAY/wefQ+
2cL79J7UNJePfsQWjzMc4p8ITBcTGpX/RalhdLJQjbD/SJymAOEfy56RHbuG8vS0
u79as6yhby8NWFaT1vNsYKOLfoUwqxi49TvxzXkBwdrLPXLbgIW7TNQpYIyM17Tl
tuceASZJGY7dECzeZY303XsmXgsLjaNHqdLlxtdzl8i8i/diqK8/I6oKL0/AsIaC
ZPfuPdQebtko4eE6p4pKjNu3qxNNdV73nV2WwHHMCE/U4CiTbH1nvpSVgr8sLBcW
EtbjDwJi+wp3OX5vblWHskLJDjdjAGbNH89UYyaWTV6C58dTGH4zmffw649Ib+80
ywB8uH8IuoAJMXgECDT5XygXX2362z8MI4apMV6ouT90KyvamIVXLV1VpNyTn9ZY
AYXkZxImUujhB89Lz/b5ctK/epzi+5/ZDQyrgNaPCwFoPnE/QLeuHmrT2gM381S8
3Y7fJmAa7sSRF0aPvTDM7hfmP2TRA29106qrEqPh2EX64mqoKCMkM6ZngCpiJgFZ
usvCRRUdBHnXKfyqFux6TBOVrKGrahJVLgkKMaFXma0U+peuXHSE1sktjtb2zUP9
kqP1bwBtPD43epWPnVxWBqIcXD3poLBu+Hj8lVBV+NQ+XtTPzWmcD8+MSC0zzCUK
umd9JPEA9pzJAqlpRYTEGaUCAwEAAQ==
-----END PUBLIC KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"keys": [
{
"type": "RSA_4096",
"keyId": "license-signing-key-0",
"path": "/responsive-license-keys/keys/license-signing-key-0.pem"
}
]
}
Loading

0 comments on commit 9ea53c1

Please sign in to comment.