-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add WindowOperations and SegmentedOperations #178
Conversation
edaead3
to
26e0088
Compare
@@ -23,10 +23,6 @@ | |||
|
|||
public interface KeyValueOperations extends Closeable, RecordBatchingStateRestoreCallback { | |||
|
|||
void register(ResponsiveStoreRegistry storeRegistry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems silly to have the registry still live in the KeyValue/WindowStore while moving the registration to the KeyValue/WindowOperations class and just call through these extra de/register APIs.
Basically I moved all the Responsive logic into the Operations implementations, so that the Window/KeyValueStores themselves are now a fairly clean implementation of only what's relevant to Kafka Streams and the Responsive stuff is all encapsulated inside the various Operations
open = true; | ||
operations.register(storeRegistry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to explicitly register anything now, we just take care of everything when the Operations are created (or closed)
@@ -232,8 +219,4 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() { | |||
return operations.reverseAll(); | |||
} | |||
|
|||
// Visible for testing | |||
KeyValueOperations getOperations() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't seem to be used (we can of course add it back if any test actually does need it, but I don't think we need this AND the kvOperationsProvider
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, I had written a test to make sure it creates the right operations (global vs local) but then realized the test was kind-of nonsense sine it mocked so much. I deleted it and forgot to update this
4e1912d
to
54f56b8
Compare
54f56b8
to
990c359
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think there's one bug we should fix or make sure to put on our radar (unrelated to this PR)
return compareKeys(o1, o2); | ||
} | ||
|
||
// TODO remove generic from Stamped<> and just implement Comparable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by the way, we can make Stamped<K extends Comparable>
if we did want to keep the generic (just fun java tip, I'm still in favor of removing the generic here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this comment is kind of mixing concerns, ultimately I'd like to remove the generic, it just so happens that this also means we can clean up some of the comparing related code
@@ -232,8 +219,4 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() { | |||
return operations.reverseAll(); | |||
} | |||
|
|||
// Visible for testing | |||
KeyValueOperations getOperations() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, I had written a test to make sure it creates the right operations (global vs local) but then realized the test was kind-of nonsense sine it mocked so much. I deleted it and forgot to update this
|
||
final int subPartition = partitioner.partition(changelog.partition(), key); | ||
return Iterators.windowed( | ||
new LocalRemoteKvIterator<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think LocalRemoteKVIterator
may have a bug in reverse fetching since it assumes keys are compared in ascending order - we should probably pass in the iteration order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch -- I'm just going to file a followup ticket for this for now and tackle it after the main window store PR. I added another new kind of iterator in the next PR for segmented range scans but was planning to leave the implementation of it for a followup. I'll tack this onto that patch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.kafka.streams.state.KeyValueIterator; | ||
import org.apache.kafka.streams.state.WindowStoreIterator; | ||
|
||
public class SegmentedOperations implements WindowOperations { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shame on me for not having done this for the other *Operations
classes (we can do this one as well as a follow up) but we should figure out a testing strategy for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually filed a ticket a while ago for building out a unit testing framework for the state store APIs -- I'll think about it once the baseline window store implementation is done
Just a minor step in the window store implementation, most of the interesting stuff will be in the followup with the subpartitioner (which for now is just a TODO)
This PRs adds a WindowOperations interface with just the single SegmentedOperations implementation for now. Although there's only the one implementation of this, it's still nice to break up everything that's currently stuffed into the
ResponsiveWindowStore
implementation.With this, all of the Streams/WindowStore semantics and logic stays in the ResponsiveWindowStore class while the actual storage engine/implementation details will go in the SegmentedOperations