Skip to content

Commit

Permalink
add kv store implementation for rs3 (#362)
Browse files Browse the repository at this point in the history
* add kv store implementation for rs3

This patch adds a key-value store implementation for RS3:
Adds the rs3 repo as a submodule so that we can get to the rs3 proto file

Extends the flush writing path to pass the relevant changelog offset to the
flush manager. RS3 uses this to specify the end offset of the WAL segments
that it writes during flush.

Adds the RS3 RemoteKVTable implementation. The RS3 table uses the RS3 grpc
protocol to do writes/reads to/from RS3. The grpc protocol is behind a more
generic RS3Client type that exposes apis for writing wal segments, reading
keys, and getting the current written/flushed offsets.

When the table is initialized, it fetches the current written offset for all
PSSs, and uses the lowest of these as the restore point.

On a write, RS3KVFlushManager builds up a WAL segment for each PSS for a given
LSS/partition, adding entires to the segment based on the record key. This
mapping (from key to PSS) is provided by PssPartitioner. When it has all the
writes in the current flush, it completes the WAL segment write rpcs to
RS3.

On a read, RS3KVTable fetches the value from the PSS that corresponds to the
requested key. Range scans are currently not supported.

Adds a new store type called RS3 for specifying an RS3 store.

Adds some simple config for talking to the rs3 server that allows specifying
the hostname/port. The store->PSS mapping is still hard-coded. I'll make it
configurable in a follow-up.

* add token to checkout

* use the published server

* apply basic feedback

* fix build errors in ttdkeyvaluetable

* clean up psspartitioner interface

* add direct partitioner that maps lss 1:1 to pss

* missed final

* remove PssRangePartitioner for now

* rebase

* fix test
  • Loading branch information
rodesai authored Nov 27, 2024
1 parent 2669a78 commit d7f3641
Show file tree
Hide file tree
Showing 48 changed files with 1,972 additions and 57 deletions.
1 change: 1 addition & 0 deletions .github/workflows/github-main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
- name: "Checkout"
uses: actions/checkout@v3
with:
token: ${{ secrets.TOOLS_GHA_TOKEN }}
submodules: recursive
fetch-depth: 0

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/github-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- name: "Checkout"
uses: actions/checkout@v3
with:
token: ${{ secrets.TOOLS_GHA_TOKEN }}
submodules: recursive
fetch-depth: 0

Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
path = controller-api/src/main/external-protos/opentelemetry-proto
url = https://github.com/open-telemetry/opentelemetry-proto.git
branch = main
[submodule "kafka-client/src/main/external-protos/rs3"]
path = kafka-client/src/main/external-protos/rs3
url = https://github.com/responsivedev/rs3.git
46 changes: 45 additions & 1 deletion kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,51 @@ import java.io.ByteArrayOutputStream
* limitations under the License.
*/

import com.google.protobuf.gradle.id

buildscript {
dependencies {
classpath("com.google.protobuf:protobuf-gradle-plugin:0.8.8")
}
}

plugins {
id("responsive.java-library-conventions")
id("java")
id("com.google.protobuf").version("0.9.2")
}

sourceSets {
main {
proto {
srcDir("src/main/external-protos/rs3/proto")
}
}
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.22.3"
}

plugins {
id("grpc") {
artifact = "io.grpc:protoc-gen-grpc-java:${libs.versions.grpc.orNull}"
}
}

generateProtoTasks {
all().forEach {
it.plugins {
// Apply the "grpc" plugin whose spec is defined above, without
// options. Note the braces cannot be omitted, otherwise the
// plugin will not be added. This is because of the implicit way
// NamedDomainObjectContainer binds the methods.
id("grpc") {
}
}
}
}
}

/*********** Generated Resources ***********/
Expand Down Expand Up @@ -97,6 +139,8 @@ dependencies {
implementation(libs.bundles.commons)
implementation(libs.mongodb.driver.sync)
implementation(libs.bundles.otel)
implementation(libs.bundles.grpc)
implementation(libs.protobuf.java.util)
implementation(libs.guava)

testImplementation(libs.kafka.clients) {
Expand All @@ -109,4 +153,4 @@ dependencies {
testImplementation(libs.kafka.streams.test.utils)
testImplementation("software.amazon.awssdk:kms:2.20.0")
testImplementation("software.amazon.awssdk:sso:2.20.0")
}
}
1 change: 1 addition & 0 deletions kafka-client/src/main/external-protos/rs3
Submodule rs3 added at e55c11
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_PASSWORD_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_USERNAME_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_HOSTNAME_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_PORT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE;
import static dev.responsive.kafka.internal.metrics.ResponsiveMetrics.RESPONSIVE_METRICS_NAMESPACE;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
Expand All @@ -42,6 +44,7 @@
import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory;
import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.db.rs3.RS3TableFactory;
import dev.responsive.kafka.internal.license.LicenseAuthenticator;
import dev.responsive.kafka.internal.license.LicenseChecker;
import dev.responsive.kafka.internal.license.model.LicenseDocument;
Expand Down Expand Up @@ -540,7 +543,7 @@ public Params build() {
final var admin = responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals());
if (compatibilityMode == CompatibilityMode.METRICS_ONLY) {
sessionClients = new SessionClients(
Optional.empty(), Optional.empty(), false, admin);
Optional.empty(), Optional.empty(), Optional.empty(), false, admin);
return this;
}

Expand All @@ -551,6 +554,7 @@ public Params build() {
sessionClients = new SessionClients(
Optional.empty(),
Optional.of(cassandraFactory.createClient(cqlSession, responsiveConfig)),
Optional.empty(),
false,
admin
);
Expand All @@ -576,19 +580,33 @@ public Params build() {
CollectionCreationOptions.fromConfig(responsiveConfig)
)),
Optional.empty(),
Optional.empty(),
false,
admin
);
break;
case IN_MEMORY:
LOG.info("using in-memory responsive store");
sessionClients = new SessionClients(
Optional.empty(),
Optional.empty(),
Optional.empty(),
true,
admin
);
break;
case RS3:
LOG.info("using rs3 responsive store");
final var rs3Host = responsiveConfig.getString(RS3_HOSTNAME_CONFIG);
final var rs3Port = responsiveConfig.getInt(RS3_PORT_CONFIG);
sessionClients = new SessionClients(
Optional.empty(),
Optional.empty(),
Optional.of(new RS3TableFactory(rs3Host, rs3Port)),
false,
admin
);
break;
default:
throw new IllegalStateException("Unexpected value: " + backendType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public class ResponsiveConfig extends AbstractConfig {
+ "with for best results. However it is important to note that this cannot be changed for "
+ "an active application. Messing with this can corrupt existing state!";

// ------------------ RS3 specific configurations ----------------------

public static final String RS3_HOSTNAME_CONFIG = "responsive.rs3.hostname";
private static final String RS3_HOSTNAME_DOC = "The hostname to use when connecting to RS3.";

public static final String RS3_PORT_CONFIG = "responsive.rs3.port";
private static final String RS3_PORT_DOC = "The port to use when connecting to RS3.";

// ------------------ ScyllaDB specific configurations ----------------------

public static final String CASSANDRA_USERNAME_CONFIG = "responsive.cassandra.username";
Expand Down Expand Up @@ -584,6 +592,18 @@ public class ResponsiveConfig extends AbstractConfig {
RESTORE_OFFSET_REPAIR_ENABLED_DEFAULT,
Importance.LOW,
RESTORE_OFFSET_REPAIR_ENABLED_DOC
).define(
RS3_HOSTNAME_CONFIG,
Type.STRING,
"",
Importance.MEDIUM,
RS3_HOSTNAME_DOC
).define(
RS3_PORT_CONFIG,
Type.INT,
50051,
Importance.MEDIUM,
RS3_PORT_DOC
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
public enum StorageBackend {
CASSANDRA,
MONGO_DB,
IN_MEMORY;
IN_MEMORY,
RS3;

public static String[] names() {
return Arrays.stream(values()).map(Enum::name).toArray(String[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public FlushResult<K, P> flushWriteBatch(
) {
final var batchWriters = new BatchWriters<>(flushManager, kafkaPartition);

prepareBatch(batchWriters, bufferedWrites, keySpec);
prepareBatch(batchWriters, bufferedWrites, keySpec, consumedOffset);

final var preFlushResult = flushManager.preFlush();
if (!preFlushResult.wasApplied()) {
Expand Down Expand Up @@ -89,10 +89,11 @@ public FlushResult<K, P> flushWriteBatch(
private static <K extends Comparable<K>, P> void prepareBatch(
final BatchWriters<K, P> batchWriters,
final Collection<Result<K>> bufferedWrites,
final KeySpec<K> keySpec
final KeySpec<K> keySpec,
final long consumedOffset
) {
for (final Result<K> result : bufferedWrites) {
final RemoteWriter<K, P> writer = batchWriters.findOrAddWriter(result.key);
final RemoteWriter<K, P> writer = batchWriters.findOrAddWriter(result.key, consumedOffset);
if (result.isTombstone) {
writer.delete(result.key);
} else if (keySpec.retain(result.key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ public int numTablePartitionsInBatch() {
}

public RemoteWriter<K, P> findOrAddWriter(
final K key
final K key,
final long consumedOffset
) {
flushManager.writeAdded(key);

final P tablePartition = flushManager.partitioner().tablePartition(kafkaPartition, key);
return batchWriters.computeIfAbsent(tablePartition, flushManager::createWriter);
return batchWriters.computeIfAbsent(
tablePartition,
tp -> flushManager.createWriter(tp, consumedOffset)
);
}

public Collection<RemoteWriter<K, P>> allWriters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public TablePartitioner<Bytes, Integer> partitioner() {
}

@Override
public RemoteWriter<Bytes, Integer> createWriter(final Integer tablePartition) {
public RemoteWriter<Bytes, Integer> createWriter(
final Integer tablePartition,
final long consumedOffset
) {
return new FactSchemaWriter<>(
client,
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public TablePartitioner<Bytes, Integer> partitioner() {
}

@Override
public RemoteWriter<Bytes, Integer> createWriter(final Integer tablePartition) {
public RemoteWriter<Bytes, Integer> createWriter(
final Integer tablePartition,
final long consumedOffset
) {
return new LwtWriter<>(
client,
() -> table.ensureEpoch(tablePartition, epoch),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public TablePartitioner<WindowedKey, SegmentPartition> partitioner() {

@Override
public RemoteWriter<WindowedKey, SegmentPartition> createWriter(
final SegmentPartition tablePartition
final SegmentPartition tablePartition,
final long consumedOffset
) {
return new LwtWriter<>(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public interface FlushManager<K, P> {

TablePartitioner<K, P> partitioner();

RemoteWriter<K, P> createWriter(final P tablePartition);
RemoteWriter<K, P> createWriter(final P tablePartition, final long consumedOffset);

void writeAdded(final K key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ public TablePartitioner<Bytes, Integer> partitioner() {
}

@Override
public RemoteWriter<Bytes, Integer> createWriter(final Integer tablePartition) {
public RemoteWriter<Bytes, Integer> createWriter(
final Integer tablePartition,
final long consumedOffset
) {
return new MongoWriter<>(table, kafkaPartition, tablePartition, () -> kvDocs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public TablePartitioner<SessionKey, SegmentPartition> partitioner() {
}

@Override
public RemoteWriter<SessionKey, SegmentPartition> createWriter(final SegmentPartition segment) {
public RemoteWriter<SessionKey, SegmentPartition> createWriter(
final SegmentPartition segment,
final long consumedOffset
) {
log.debug("Creating writer for segment {}", segment);

return new MongoWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public TablePartitioner<WindowedKey, SegmentPartition> partitioner() {
}

@Override
public RemoteWriter<WindowedKey, SegmentPartition> createWriter(final SegmentPartition segment) {
public RemoteWriter<WindowedKey, SegmentPartition> createWriter(
final SegmentPartition segment,
final long consumedOffset
) {
log.debug("Creating writer for segment {}", segment);

return new MongoWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ public TablePartitioner<Bytes, Integer> partitioner() {
}

@Override
public RemoteWriter<Bytes, Integer> createWriter(Integer tablePartition) {
public RemoteWriter<Bytes, Integer> createWriter(
final Integer tablePartition,
final long consumedOffset
) {
return new RemoteWriter<>() {
@Override
public void insert(Bytes key, byte[] value, long epochMillis) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.internal.db.rs3.client.LssId;
import java.util.List;

/**
* PSS partitioner that does a 1:1 mapping from LSS to PSS
*/
public class PssDirectPartitioner implements PssPartitioner {
@Override
public int pss(byte[] key, LssId lssId) {
return lssId.id();
}

@Override
public List<Integer> pssForLss(LssId lssId) {
return List.of(lssId.id());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.internal.db.rs3.client.LssId;
import java.util.List;

public interface PssPartitioner {
int pss(byte[] key, LssId lssId);

List<Integer> pssForLss(LssId lssId);
}
Loading

0 comments on commit d7f3641

Please sign in to comment.