Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configs to accept license and verify it #386

Merged
merged 9 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public synchronized void start() {
E2ETestUtils.maybeCreateKeyspace(properties);
// build topology after creating keyspace because we use keyspace retry
// to wait for scylla to resolve
properties.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "test-api-key");
E2ETestUtils.retryFor(
() -> kafkaStreams = buildTopology(properties),
Duration.ofMinutes(5)
Expand Down
2 changes: 2 additions & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,6 @@ dependencies {
testImplementation(testlibs.bundles.base)
testImplementation(testlibs.bundles.testcontainers)
testImplementation(libs.kafka.streams.test.utils)
testImplementation("software.amazon.awssdk:kms:2.20.0")
testImplementation("software.amazon.awssdk:sso:2.20.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;

import com.fasterxml.jackson.databind.ObjectMapper;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
Expand All @@ -45,6 +46,11 @@
import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory;
import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.license.LicenseAuthenticator;
import dev.responsive.kafka.internal.license.LicenseChecker;
import dev.responsive.kafka.internal.license.model.LicenseDocument;
import dev.responsive.kafka.internal.license.model.LicenseInfo;
import dev.responsive.kafka.internal.license.model.SigningKeys;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
Expand All @@ -55,7 +61,11 @@
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -88,6 +98,8 @@

public class ResponsiveKafkaStreams extends KafkaStreams {

private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json";

private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);

private final ResponsiveMetrics responsiveMetrics;
Expand Down Expand Up @@ -200,6 +212,8 @@ protected ResponsiveKafkaStreams(final Params params) {
params.time
);

validateLicense(params.responsiveConfig);

if (params.compatibilityMode == CompatibilityMode.FULL) {
try {
ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs);
Expand Down Expand Up @@ -260,6 +274,57 @@ private static ResponsiveMetrics createMetrics(
), exportService);
}

private static void validateLicense(final ResponsiveConfig configs) {
if (!configs.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) {
return;
}
final LicenseDocument licenseDocument = loadLicense(configs);
final SigningKeys signingKeys = loadSigningKeys();
final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys);
final LicenseInfo licenseInfo = licenseAuthenticator.authenticate(licenseDocument);
final LicenseChecker checker = new LicenseChecker();
checker.checkLicense(licenseInfo);
}

private static SigningKeys loadSigningKeys() {
try {
return new ObjectMapper().readValue(
ResponsiveKafkaStreams.class.getResource(SIGNING_KEYS_PATH),
SigningKeys.class
);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private static LicenseDocument loadLicense(final ResponsiveConfig configs) {
final String license = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG);
final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG);
if (license.isEmpty() == licenseFile.isEmpty()) {
throw new ConfigException(String.format(
"Must set exactly one of %s or %s",
ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG,
ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG
));
}
final String licenseB64;
if (!license.isEmpty()) {
licenseB64 = license;
} else {
try {
licenseB64 = Files.readString(new File(licenseFile).toPath());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
final ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

/**
* Fill in the props with any overrides and all internal objects shared via the configs
* before these get finalized as a {@link StreamsConfig} object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public class ResponsiveConfig extends AbstractConfig {
public static final String RESPONSIVE_ENV_CONFIG = "responsive.env";
private static final String RESPONSIVE_ENV_DOC = "The Responsive environment slug (not the environment ID).";

public static final String RESPONSIVE_LICENSE_CONFIG = "responsive.license";
private static final String RESPONSIVE_LICENSE_DOC = "The license you're using to run Responsive";

public static final String RESPONSIVE_LICENSE_FILE_CONFIG = "responsive.license.file";
private static final String RESPONSIVE_LICENSE_FILE_DOC
= "A path to a file containing your license.";

public static final String COMPATIBILITY_MODE_CONFIG = "responsive.compatibility.mode";
private static final String COMPATIBILITY_MODE_DOC = "This configuration enables running Responsive "
+ "in compatibility mode, disabling certain features.";
Expand Down Expand Up @@ -314,6 +321,18 @@ public class ResponsiveConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.HIGH,
RESPONSIVE_ENV_DOC
).define(
RESPONSIVE_LICENSE_CONFIG,
Type.STRING,
"",
Importance.HIGH,
RESPONSIVE_LICENSE_DOC
).define(
RESPONSIVE_LICENSE_FILE_CONFIG,
Type.STRING,
"",
Importance.HIGH,
RESPONSIVE_LICENSE_FILE_DOC
).define(
STORAGE_BACKEND_TYPE_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import dev.responsive.kafka.internal.license.model.LicenseDocumentV1;
import dev.responsive.kafka.internal.license.model.LicenseInfo;
import dev.responsive.kafka.internal.license.model.SigningKeys;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -87,13 +85,7 @@ private byte[] verifyLicenseV1Signature(final LicenseDocumentV1 license) {
}

private PublicKey loadPublicKey(final SigningKeys.SigningKey signingKey) {
final File file;
try {
file = new File(this.getClass().getClassLoader().getResource(signingKey.path()).toURI());
} catch (final URISyntaxException e) {
throw new RuntimeException(e);
}
final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFile(file);
final byte[] publicKeyBytes = PublicKeyPemFileParser.parsePemFileInResource(signingKey.path());
final KeyFactory keyFactory;
try {
keyFactory = KeyFactory.getInstance("RSA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package dev.responsive.kafka.internal.license;

import java.io.File;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Base64;
import java.util.List;
import java.util.stream.Collectors;

public class PublicKeyPemFileParser {
private static final String HEADER_PREFIX = "-----";
Expand All @@ -31,13 +33,19 @@ public class PublicKeyPemFileParser {
private static final String END_PUBLIC_KEY_HEADER
= HEADER_PREFIX + END_PUBLIC_KEY + HEADER_PREFIX;

public static byte[] parsePemFile(final File file) {
final List<String> lines;
try {
lines = Files.readAllLines(file.toPath());
} catch (IOException e) {
public static byte[] parsePemFileInResource(final String path) {
try (final InputStream inputStream = PublicKeyPemFileParser.class.getResourceAsStream(path)) {
return parsePemFile(inputStream);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private static byte[] parsePemFile(final InputStream inputStream) throws IOException {
final List<String> lines;
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
lines = reader.lines().collect(Collectors.toList());
}
final StringBuilder keyB64Builder = new StringBuilder();
boolean foundBegin = false;
for (final String l : lines) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public abstract class LicenseDocument {
public LicenseDocument(@JsonProperty("version") final String version) {
this.version = version;
}

@JsonProperty("version")
public String version() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ public LicenseDocumentV1(
this.algo = algo;
}

@JsonProperty("info")
public String info() {
return info;
}

@JsonProperty("key")
public String key() {
return key;
}

@JsonProperty("signature")
public String signature() {
return signature;
}

@JsonProperty("algo")
public String algo() {
return algo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public abstract class LicenseInfo {
LicenseInfo(@JsonProperty("type") final String type) {
this.type = type;
}

@JsonProperty("type")
public String type() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,18 @@ public TimedTrialV1(
this.expiresAt = expiresAt;
}

@JsonProperty("email")
public String email() {
return email;
}

@JsonProperty("expiresAt")
public long expiresAt() {
return expiresAt;
}

@JsonProperty("issuedAt")
public long issuedAt() {
return issuedAt;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA37LvpjgfWmlVAY/wefQ+
2cL79J7UNJePfsQWjzMc4p8ITBcTGpX/RalhdLJQjbD/SJymAOEfy56RHbuG8vS0
u79as6yhby8NWFaT1vNsYKOLfoUwqxi49TvxzXkBwdrLPXLbgIW7TNQpYIyM17Tl
tuceASZJGY7dECzeZY303XsmXgsLjaNHqdLlxtdzl8i8i/diqK8/I6oKL0/AsIaC
ZPfuPdQebtko4eE6p4pKjNu3qxNNdV73nV2WwHHMCE/U4CiTbH1nvpSVgr8sLBcW
EtbjDwJi+wp3OX5vblWHskLJDjdjAGbNH89UYyaWTV6C58dTGH4zmffw649Ib+80
ywB8uH8IuoAJMXgECDT5XygXX2362z8MI4apMV6ouT90KyvamIVXLV1VpNyTn9ZY
AYXkZxImUujhB89Lz/b5ctK/epzi+5/ZDQyrgNaPCwFoPnE/QLeuHmrT2gM381S8
3Y7fJmAa7sSRF0aPvTDM7hfmP2TRA29106qrEqPh2EX64mqoKCMkM6ZngCpiJgFZ
usvCRRUdBHnXKfyqFux6TBOVrKGrahJVLgkKMaFXma0U+peuXHSE1sktjtb2zUP9
kqP1bwBtPD43epWPnVxWBqIcXD3poLBu+Hj8lVBV+NQ+XtTPzWmcD8+MSC0zzCUK
umd9JPEA9pzJAqlpRYTEGaUCAwEAAQ==
-----END PUBLIC KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"keys": [
{
"type": "RSA_4096",
"keyId": "license-signing-key-0",
"path": "/responsive-license-keys/keys/license-signing-key-0.pem"
}
]
}
Loading
Loading