Skip to content

Commit

Permalink
add WindowOperations and SegmentedOperations
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed Oct 21, 2023
1 parent d0b9d3c commit 26e0088
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ private ResponsiveConfig(final Map<?, ?> originals, final boolean doLog) {
super(CONFIG_DEF, originals, doLog);
}

// TODO: encapsulate the SubPartitioner in the RemoteTable class
public SubPartitioner getSegmentedSubPartitioner(
final Admin admin,
final TableName name,
final String changelogTopicName
) {
throw new UnsupportedOperationException("TODO -- follow up PR");
}

public SubPartitioner getSubPartitioner(
final Admin admin,
final TableName name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public boolean retain(final Stamped<Bytes> key) {

@Override
public int compare(final Stamped<Bytes> o1, final Stamped<Bytes> o2) {
return ResponsiveWindowStore.compareKeys(o1, o2);
return compareKeys(o1, o2);
}

// TODO remove generic from Stamped<> and just implement Comparable
public static int compareKeys(final Stamped<Bytes> o1, final Stamped<Bytes> o2) {
final int key = o1.key.compareTo(o2.key);
if (key != 0) {
return key;
}

return Long.compare(o1.stamp, o2.stamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ public GlobalOperations(
this.table = table;
}

@Override
public void register(final ResponsiveStoreRegistry storeRegistry) {
// we don't do anything with global tables
}

@Override
public void deregister(final ResponsiveStoreRegistry storeRegistry) {
// we don't do anything with global tables
}

@Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context.partition(), context.offset(), context.timestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@

public interface KeyValueOperations extends Closeable, RecordBatchingStateRestoreCallback {

void register(ResponsiveStoreRegistry storeRegistry);

void deregister(ResponsiveStoreRegistry storeRegistry);

void put(final Bytes key, final byte[] value);

byte[] delete(final Bytes key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package dev.responsive.kafka.internal.stores;

import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadSessionClients;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadStoreRegistry;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.changelogFor;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.db.BytesKeySpec;
import dev.responsive.kafka.internal.db.CassandraTableSpecFactory;
import dev.responsive.kafka.internal.db.RemoteKVTable;
Expand All @@ -42,13 +44,15 @@

public class PartitionedOperations implements KeyValueOperations {

@SuppressWarnings("rawtypes")
private final InternalProcessorContext context;
private final ResponsiveKeyValueParams params;
private final RemoteKVTable<?> table;
private final CommitBuffer<Bytes, RemoteKVTable<?>> buffer;
private final SubPartitioner partitioner;
private final TopicPartition changelog;
@SuppressWarnings("rawtypes")
private final InternalProcessorContext context;

private final ResponsiveStoreRegistry storeRegistry;
private final ResponsiveStoreRegistration registration;
private final ResponsiveRestoreListener restoreListener;

Expand All @@ -68,6 +72,7 @@ public static PartitionedOperations create(

final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(appConfigs);
final SessionClients sessionClients = loadSessionClients(appConfigs);
final ResponsiveStoreRegistry storeRegistry = loadStoreRegistry(appConfigs);

final TopicPartition changelog = new TopicPartition(
changelogFor(storeContext, name.kafkaName(), false),
Expand Down Expand Up @@ -110,13 +115,16 @@ public static PartitionedOperations create(
offset == -1 ? 0 : offset,
buffer::flush
);
storeRegistry.registerStore(registration);

return new PartitionedOperations(
params,
table,
buffer,
partitioner,
changelog,
context,
storeRegistry,
registration,
sessionClients.restoreListener()
);
Expand Down Expand Up @@ -153,6 +161,7 @@ public PartitionedOperations(
final SubPartitioner partitioner,
final TopicPartition changelog,
final InternalProcessorContext context,
final ResponsiveStoreRegistry storeRegistry,
final ResponsiveStoreRegistration registration,
final ResponsiveRestoreListener restoreListener
) {
Expand All @@ -162,20 +171,11 @@ public PartitionedOperations(
this.partitioner = partitioner;
this.changelog = changelog;
this.context = context;
this.storeRegistry = storeRegistry;
this.registration = registration;
this.restoreListener = restoreListener;
}

@Override
public void register(final ResponsiveStoreRegistry storeRegistry) {
storeRegistry.registerStore(registration);
}

@Override
public void deregister(final ResponsiveStoreRegistry storeRegistry) {
storeRegistry.deregisterStore(registration);
}

@Override
public void put(final Bytes key, final byte[] value) {
buffer.put(key, value, context.timestamp());
Expand Down Expand Up @@ -246,6 +246,7 @@ public void close() {
// no need to flush the buffer here, will happen through the kafka client commit as usual
buffer.close();
restoreListener.onStoreClosed(changelog, params.name().kafkaName());
storeRegistry.deregisterStore(registration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
Expand All @@ -43,7 +40,6 @@ public class ResponsiveKeyValueStore implements KeyValueStore<Bytes, byte[]> {

private final ResponsiveKeyValueParams params;
private final TableName name;
private final Function<Map<String, Object>, ResponsiveStoreRegistry> registryProvider;
private final KVOperationsProvider opsProvider;

private Position position; // TODO(IQ): update the position during restoration
Expand All @@ -52,26 +48,22 @@ public class ResponsiveKeyValueStore implements KeyValueStore<Bytes, byte[]> {
// All the fields below this are effectively final, we just can't set them until #init is called
private Logger log;
private KeyValueOperations operations;
private ResponsiveStoreRegistry storeRegistry;
private StateStoreContext context;

public ResponsiveKeyValueStore(final ResponsiveKeyValueParams params) {
this(
params,
ResponsiveKeyValueStore::provideOperations,
InternalSessionConfigs::loadStoreRegistry
ResponsiveKeyValueStore::provideOperations
);
}

// Visible for Testing
public ResponsiveKeyValueStore(
final ResponsiveKeyValueParams params,
final KVOperationsProvider opsProvider,
final Function<Map<String, Object>, ResponsiveStoreRegistry> registryProvider
final KVOperationsProvider opsProvider
) {
this.params = params;
this.name = params.name();
this.registryProvider = registryProvider;
this.position = Position.emptyPosition();
this.opsProvider = opsProvider;

Expand Down Expand Up @@ -118,9 +110,7 @@ public void init(final StateStoreContext storeContext, final StateStore root) {
operations = opsProvider.provide(params, storeContext, taskType);
log.info("Completed initializing state store");

storeRegistry = registryProvider.apply(storeContext.appConfigs());
open = true;
operations.register(storeRegistry);
storeContext.register(root, operations);
} catch (final InterruptedException | TimeoutException e) {
throw new ProcessorStateException("Failed to initialize store.", e);
Expand Down Expand Up @@ -214,9 +204,6 @@ public long approximateNumEntries() {

@Override
public void close() {
if (storeRegistry != null) {
operations.deregister(storeRegistry);
}
if (operations != null) {
operations.close();
}
Expand All @@ -232,8 +219,4 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() {
return operations.reverseAll();
}

// Visible for testing
KeyValueOperations getOperations() {
return operations;
}
}
Loading

0 comments on commit 26e0088

Please sign in to comment.