-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add WindowOperations and SegmentedOperations #178
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,10 +23,6 @@ | |
|
||
public interface KeyValueOperations extends Closeable, RecordBatchingStateRestoreCallback { | ||
|
||
void register(ResponsiveStoreRegistry storeRegistry); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems silly to have the registry still live in the KeyValue/WindowStore while moving the registration to the KeyValue/WindowOperations class and just call through these extra de/register APIs. Basically I moved all the Responsive logic into the Operations implementations, so that the Window/KeyValueStores themselves are now a fairly clean implementation of only what's relevant to Kafka Streams and the Responsive stuff is all encapsulated inside the various Operations |
||
|
||
void deregister(ResponsiveStoreRegistry storeRegistry); | ||
|
||
void put(final Bytes key, final byte[] value); | ||
|
||
byte[] delete(final Bytes key); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,9 @@ | |
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; | ||
|
||
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; | ||
import dev.responsive.kafka.internal.config.InternalSessionConfigs; | ||
import dev.responsive.kafka.internal.utils.TableName; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Function; | ||
import org.apache.kafka.common.utils.Bytes; | ||
import org.apache.kafka.common.utils.LogContext; | ||
import org.apache.kafka.streams.KeyValue; | ||
|
@@ -43,7 +40,6 @@ public class ResponsiveKeyValueStore implements KeyValueStore<Bytes, byte[]> { | |
|
||
private final ResponsiveKeyValueParams params; | ||
private final TableName name; | ||
private final Function<Map<String, Object>, ResponsiveStoreRegistry> registryProvider; | ||
private final KVOperationsProvider opsProvider; | ||
|
||
private Position position; // TODO(IQ): update the position during restoration | ||
|
@@ -52,26 +48,22 @@ public class ResponsiveKeyValueStore implements KeyValueStore<Bytes, byte[]> { | |
// All the fields below this are effectively final, we just can't set them until #init is called | ||
private Logger log; | ||
private KeyValueOperations operations; | ||
private ResponsiveStoreRegistry storeRegistry; | ||
private StateStoreContext context; | ||
|
||
public ResponsiveKeyValueStore(final ResponsiveKeyValueParams params) { | ||
this( | ||
params, | ||
ResponsiveKeyValueStore::provideOperations, | ||
InternalSessionConfigs::loadStoreRegistry | ||
ResponsiveKeyValueStore::provideOperations | ||
); | ||
} | ||
|
||
// Visible for Testing | ||
public ResponsiveKeyValueStore( | ||
final ResponsiveKeyValueParams params, | ||
final KVOperationsProvider opsProvider, | ||
final Function<Map<String, Object>, ResponsiveStoreRegistry> registryProvider | ||
final KVOperationsProvider opsProvider | ||
) { | ||
this.params = params; | ||
this.name = params.name(); | ||
this.registryProvider = registryProvider; | ||
this.position = Position.emptyPosition(); | ||
this.opsProvider = opsProvider; | ||
|
||
|
@@ -118,9 +110,7 @@ public void init(final StateStoreContext storeContext, final StateStore root) { | |
operations = opsProvider.provide(params, storeContext, taskType); | ||
log.info("Completed initializing state store"); | ||
|
||
storeRegistry = registryProvider.apply(storeContext.appConfigs()); | ||
open = true; | ||
operations.register(storeRegistry); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to explicitly register anything now, we just take care of everything when the Operations are created (or closed) |
||
storeContext.register(root, operations); | ||
} catch (final InterruptedException | TimeoutException e) { | ||
throw new ProcessorStateException("Failed to initialize store.", e); | ||
|
@@ -214,9 +204,6 @@ public long approximateNumEntries() { | |
|
||
@Override | ||
public void close() { | ||
if (storeRegistry != null) { | ||
operations.deregister(storeRegistry); | ||
} | ||
if (operations != null) { | ||
operations.close(); | ||
} | ||
|
@@ -232,8 +219,4 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() { | |
return operations.reverseAll(); | ||
} | ||
|
||
// Visible for testing | ||
KeyValueOperations getOperations() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't seem to be used (we can of course add it back if any test actually does need it, but I don't think we need this AND the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yes, I had written a test to make sure it creates the right operations (global vs local) but then realized the test was kind-of nonsense sine it mocked so much. I deleted it and forgot to update this |
||
return operations; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by the way, we can make
Stamped<K extends Comparable>
if we did want to keep the generic (just fun java tip, I'm still in favor of removing the generic here)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this comment is kind of mixing concerns, ultimately I'd like to remove the generic, it just so happens that this also means we can clean up some of the comparing related code