Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WindowOperations and SegmentedOperations #178

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ private ResponsiveConfig(final Map<?, ?> originals, final boolean doLog) {
super(CONFIG_DEF, originals, doLog);
}

// TODO: encapsulate the SubPartitioner in the RemoteTable class
public SubPartitioner getSegmentedSubPartitioner(
final Admin admin,
final TableName name,
final String changelogTopicName
) {
throw new UnsupportedOperationException("TODO -- follow up PR");
}

public SubPartitioner getSubPartitioner(
final Admin admin,
final TableName name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;

import dev.responsive.kafka.internal.stores.ResponsiveWindowStore;
import dev.responsive.kafka.internal.utils.Stamped;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand Down Expand Up @@ -58,6 +57,16 @@ public boolean retain(final Stamped<Bytes> key) {

@Override
public int compare(final Stamped<Bytes> o1, final Stamped<Bytes> o2) {
return ResponsiveWindowStore.compareKeys(o1, o2);
return compareKeys(o1, o2);
}

// TODO remove generic from Stamped<> and just implement Comparable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way, we can make Stamped<K extends Comparable> if we did want to keep the generic (just fun java tip, I'm still in favor of removing the generic here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this comment is kind of mixing concerns, ultimately I'd like to remove the generic, it just so happens that this also means we can clean up some of the comparing related code

public static int compareKeys(final Stamped<Bytes> o1, final Stamped<Bytes> o2) {
final int key = o1.key.compareTo(o2.key);
if (key != 0) {
return key;
}

return Long.compare(o1.stamp, o2.stamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ public GlobalOperations(
this.table = table;
}

@Override
public void register(final ResponsiveStoreRegistry storeRegistry) {
// we don't do anything with global tables
}

@Override
public void deregister(final ResponsiveStoreRegistry storeRegistry) {
// we don't do anything with global tables
}

@Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context.partition(), context.offset(), context.timestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@

public interface KeyValueOperations extends Closeable, RecordBatchingStateRestoreCallback {

void register(ResponsiveStoreRegistry storeRegistry);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems silly to have the registry still live in the KeyValue/WindowStore while moving the registration to the KeyValue/WindowOperations class and just call through these extra de/register APIs.

Basically I moved all the Responsive logic into the Operations implementations, so that the Window/KeyValueStores themselves are now a fairly clean implementation of only what's relevant to Kafka Streams and the Responsive stuff is all encapsulated inside the various Operations


void deregister(ResponsiveStoreRegistry storeRegistry);

void put(final Bytes key, final byte[] value);

byte[] delete(final Bytes key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package dev.responsive.kafka.internal.stores;

import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadSessionClients;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadStoreRegistry;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.changelogFor;

Expand All @@ -42,13 +43,15 @@

public class PartitionedOperations implements KeyValueOperations {

@SuppressWarnings("rawtypes")
private final InternalProcessorContext context;
private final ResponsiveKeyValueParams params;
private final RemoteKVTable<?> table;
private final CommitBuffer<Bytes, RemoteKVTable<?>> buffer;
private final SubPartitioner partitioner;
private final TopicPartition changelog;
@SuppressWarnings("rawtypes")
private final InternalProcessorContext context;

private final ResponsiveStoreRegistry storeRegistry;
private final ResponsiveStoreRegistration registration;
private final ResponsiveRestoreListener restoreListener;

Expand All @@ -68,6 +71,7 @@ public static PartitionedOperations create(

final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(appConfigs);
final SessionClients sessionClients = loadSessionClients(appConfigs);
final ResponsiveStoreRegistry storeRegistry = loadStoreRegistry(appConfigs);

final TopicPartition changelog = new TopicPartition(
changelogFor(storeContext, name.kafkaName(), false),
Expand Down Expand Up @@ -110,13 +114,16 @@ public static PartitionedOperations create(
offset == -1 ? 0 : offset,
buffer::flush
);
storeRegistry.registerStore(registration);

return new PartitionedOperations(
params,
table,
buffer,
partitioner,
changelog,
context,
storeRegistry,
registration,
sessionClients.restoreListener()
);
Expand Down Expand Up @@ -153,6 +160,7 @@ public PartitionedOperations(
final SubPartitioner partitioner,
final TopicPartition changelog,
final InternalProcessorContext context,
final ResponsiveStoreRegistry storeRegistry,
final ResponsiveStoreRegistration registration,
final ResponsiveRestoreListener restoreListener
) {
Expand All @@ -162,20 +170,11 @@ public PartitionedOperations(
this.partitioner = partitioner;
this.changelog = changelog;
this.context = context;
this.storeRegistry = storeRegistry;
this.registration = registration;
this.restoreListener = restoreListener;
}

@Override
public void register(final ResponsiveStoreRegistry storeRegistry) {
storeRegistry.registerStore(registration);
}

@Override
public void deregister(final ResponsiveStoreRegistry storeRegistry) {
storeRegistry.deregisterStore(registration);
}

@Override
public void put(final Bytes key, final byte[] value) {
buffer.put(key, value, context.timestamp());
Expand Down Expand Up @@ -246,6 +245,7 @@ public void close() {
// no need to flush the buffer here, will happen through the kafka client commit as usual
buffer.close();
restoreListener.onStoreClosed(changelog, params.name().kafkaName());
storeRegistry.deregisterStore(registration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
Expand All @@ -43,7 +40,6 @@ public class ResponsiveKeyValueStore implements KeyValueStore<Bytes, byte[]> {

private final ResponsiveKeyValueParams params;
private final TableName name;
private final Function<Map<String, Object>, ResponsiveStoreRegistry> registryProvider;
private final KVOperationsProvider opsProvider;

private Position position; // TODO(IQ): update the position during restoration
Expand All @@ -52,26 +48,22 @@ public class ResponsiveKeyValueStore implements KeyValueStore<Bytes, byte[]> {
// All the fields below this are effectively final, we just can't set them until #init is called
private Logger log;
private KeyValueOperations operations;
private ResponsiveStoreRegistry storeRegistry;
private StateStoreContext context;

public ResponsiveKeyValueStore(final ResponsiveKeyValueParams params) {
this(
params,
ResponsiveKeyValueStore::provideOperations,
InternalSessionConfigs::loadStoreRegistry
ResponsiveKeyValueStore::provideOperations
);
}

// Visible for Testing
public ResponsiveKeyValueStore(
final ResponsiveKeyValueParams params,
final KVOperationsProvider opsProvider,
final Function<Map<String, Object>, ResponsiveStoreRegistry> registryProvider
final KVOperationsProvider opsProvider
) {
this.params = params;
this.name = params.name();
this.registryProvider = registryProvider;
this.position = Position.emptyPosition();
this.opsProvider = opsProvider;

Expand Down Expand Up @@ -118,9 +110,7 @@ public void init(final StateStoreContext storeContext, final StateStore root) {
operations = opsProvider.provide(params, storeContext, taskType);
log.info("Completed initializing state store");

storeRegistry = registryProvider.apply(storeContext.appConfigs());
open = true;
operations.register(storeRegistry);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to explicitly register anything now, we just take care of everything when the Operations are created (or closed)

storeContext.register(root, operations);
} catch (final InterruptedException | TimeoutException e) {
throw new ProcessorStateException("Failed to initialize store.", e);
Expand Down Expand Up @@ -214,9 +204,6 @@ public long approximateNumEntries() {

@Override
public void close() {
if (storeRegistry != null) {
operations.deregister(storeRegistry);
}
if (operations != null) {
operations.close();
}
Expand All @@ -232,8 +219,4 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() {
return operations.reverseAll();
}

// Visible for testing
KeyValueOperations getOperations() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't seem to be used (we can of course add it back if any test actually does need it, but I don't think we need this AND the kvOperationsProvider)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, I had written a test to make sure it creates the right operations (global vs local) but then realized the test was kind-of nonsense sine it mocked so much. I deleted it and forgot to update this

return operations;
}
}
Loading
Loading