Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-level TTL PR 1: new API #370

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

package dev.responsive.kafka.api.stores;

import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.utils.TableName;
import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nullable;

public final class ResponsiveKeyValueParams {

private final TableName name;
private final KVSchema schema;

@Nullable private Duration timeToLive = null;
private Optional<TtlProvider<?, ?>> ttlProvider = Optional.empty();

private ResponsiveKeyValueParams(
final String name,
Expand All @@ -46,7 +46,16 @@ public static ResponsiveKeyValueParams fact(final String name) {
}

public ResponsiveKeyValueParams withTimeToLive(final Duration timeToLive) {
this.timeToLive = timeToLive;
return withTtlProvider(TtlProvider.withDefault(timeToLive));
}

public ResponsiveKeyValueParams withTtlProvider(final TtlProvider<?, ?> ttlProvider) {
// If ttl is constant and infinite, it's equivalent to having no ttl at all
if (ttlProvider.hasDefaultOnly() && !ttlProvider.defaultTtl().isFinite()) {
this.ttlProvider = Optional.empty();
} else {
this.ttlProvider = Optional.of(ttlProvider);
}
return this;
}

Expand All @@ -58,8 +67,17 @@ public KVSchema schemaType() {
return schema;
}

public Optional<Duration> timeToLive() {
return Optional.ofNullable(timeToLive);
public Optional<TtlProvider<?, ?>> ttlProvider() {
return ttlProvider;
}

public Optional<TtlDuration> defaultTimeToLive() {
if (ttlProvider.isPresent()) {
return Optional.ofNullable(ttlProvider.get().defaultTtl());

} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;

public class ResponsiveSessionStoreSupplier implements SessionBytesStoreSupplier {
public class ResponsiveSessionBytesStoreSupplier implements SessionBytesStoreSupplier {

private final ResponsiveSessionParams params;
private final long segmentIntervalMs;

public ResponsiveSessionStoreSupplier(final ResponsiveSessionParams params) {
public ResponsiveSessionBytesStoreSupplier(final ResponsiveSessionParams params) {
this.params = params;
this.segmentIntervalMs = computeSegmentInterval(params.retentionPeriod(), params.numSegments());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
Expand Down Expand Up @@ -56,7 +55,9 @@ public final class ResponsiveStores {
* @return a supplier for a key-value store with the given options
* that uses Responsive's storage for its backend
*/
public static KeyValueBytesStoreSupplier keyValueStore(final ResponsiveKeyValueParams params) {
public static ResponsiveKeyValueBytesStoreSupplier keyValueStore(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agavra FYI all the changes in ResponsiveStores are technically unrelated -- just holdovers from an alternative API approach I abandoned -- but I think it's the right thing to do anyways so I kept it in. Hope you don't mind

final ResponsiveKeyValueParams params
) {
return new ResponsiveKeyValueBytesStoreSupplier(params);
}

Expand All @@ -67,7 +68,7 @@ public static KeyValueBytesStoreSupplier keyValueStore(final ResponsiveKeyValueP
* @return a supplier for a key-value store with the given options
* that uses Responsive's storage for its backend
*/
public static KeyValueBytesStoreSupplier keyValueStore(final String name) {
public static ResponsiveKeyValueBytesStoreSupplier keyValueStore(final String name) {
return keyValueStore(ResponsiveKeyValueParams.keyValue(name));
}

Expand All @@ -93,7 +94,7 @@ public static KeyValueBytesStoreSupplier keyValueStore(final String name) {
* @return a supplier for a key-value store with the given options
* that uses Responsive's storage for its backend
*/
public static KeyValueBytesStoreSupplier factStore(final String name) {
public static ResponsiveKeyValueBytesStoreSupplier factStore(final String name) {
return keyValueStore(ResponsiveKeyValueParams.fact(name));
}

Expand All @@ -116,7 +117,7 @@ public static KeyValueBytesStoreSupplier factStore(final String name) {
* that uses Responsive's storage for its backend
*/
public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(
final KeyValueBytesStoreSupplier storeSupplier,
final ResponsiveKeyValueBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde
) {
Expand Down Expand Up @@ -148,17 +149,11 @@ public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(
* that uses Responsive's storage for its backend
*/
public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(
final KeyValueBytesStoreSupplier storeSupplier,
final ResponsiveKeyValueBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde
) {
if (storeSupplier instanceof ResponsiveKeyValueBytesStoreSupplier) {
((ResponsiveKeyValueBytesStoreSupplier) storeSupplier).asTimestamped();
} else {
throw new IllegalArgumentException(
"Must supply a Responsive StoreSupplier via one of the ResponsiveStores APIs"
);
}
storeSupplier.asTimestamped();

return new ResponsiveStoreBuilder<>(
StoreType.TIMESTAMPED_KEY_VALUE,
Expand Down Expand Up @@ -202,8 +197,10 @@ public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> materializ
* @return a supplier for a window store with the given options
* that uses Responsive's storage for its backend
*/
public static WindowBytesStoreSupplier windowStoreSupplier(final ResponsiveWindowParams params) {
return new ResponsiveWindowedStoreSupplier(params);
public static ResponsiveWindowBytesStoreSupplier windowStoreSupplier(
final ResponsiveWindowParams params
) {
return new ResponsiveWindowBytesStoreSupplier(params);
}

/**
Expand All @@ -217,7 +214,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier(final ResponsiveWindo
* @return a supplier for a window store with the given options
* that uses Responsive's storage for its backend
*/
public static WindowBytesStoreSupplier windowStoreSupplier(
public static ResponsiveWindowBytesStoreSupplier windowStoreSupplier(
final String name,
final Duration retentionPeriod,
final Duration windowSize,
Expand All @@ -232,7 +229,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier(
throw new IllegalArgumentException("Retention period cannot be less than window size");
}

return new ResponsiveWindowedStoreSupplier(
return new ResponsiveWindowBytesStoreSupplier(
ResponsiveWindowParams.window(name, windowSize, retentionPeriod, retainDuplicates)
);
} else {
Expand All @@ -242,7 +239,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier(
);
}

return new ResponsiveWindowedStoreSupplier(
return new ResponsiveWindowBytesStoreSupplier(
ResponsiveWindowParams.streamStreamJoin(name, windowSize)
);
}
Expand All @@ -262,7 +259,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier(
* that uses Responsive's storage for its backend
*/
public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(
final WindowBytesStoreSupplier storeSupplier,
final ResponsiveWindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde
) {
Expand All @@ -289,7 +286,7 @@ public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(
* that uses Responsive's storage for its backend
*/
public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(
final WindowBytesStoreSupplier storeSupplier,
final ResponsiveWindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde
) {
Expand Down Expand Up @@ -318,7 +315,7 @@ public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> windowMateri
final ResponsiveWindowParams params
) {
return new ResponsiveMaterialized<>(
Materialized.as(new ResponsiveWindowedStoreSupplier(params))
Materialized.as(new ResponsiveWindowBytesStoreSupplier(params))
);
}

Expand All @@ -329,10 +326,10 @@ public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> windowMateri
* @return a supplier for a session store with the given options
* that uses Responsive's storage for its backend
*/
public static SessionBytesStoreSupplier sessionStoreSupplier(
public static ResponsiveSessionBytesStoreSupplier sessionStoreSupplier(
final ResponsiveSessionParams params
) {
return new ResponsiveSessionStoreSupplier(params);
return new ResponsiveSessionBytesStoreSupplier(params);
}

/**
Expand All @@ -348,7 +345,7 @@ public static SessionBytesStoreSupplier sessionStoreSupplier(
* that uses Responsive's storage for its backend
*/
public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(
final SessionBytesStoreSupplier storeSupplier,
final ResponsiveSessionBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde
) {
Expand All @@ -374,7 +371,7 @@ public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> sessionMate
final ResponsiveSessionParams params
) {
return new ResponsiveMaterialized<>(
Materialized.as(new ResponsiveSessionStoreSupplier(params))
Materialized.as(new ResponsiveSessionBytesStoreSupplier(params))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

public class ResponsiveWindowedStoreSupplier implements WindowBytesStoreSupplier {
public class ResponsiveWindowBytesStoreSupplier implements WindowBytesStoreSupplier {

private final ResponsiveWindowParams params;
private final long segmentIntervalMs;

public ResponsiveWindowedStoreSupplier(final ResponsiveWindowParams params) {
public ResponsiveWindowBytesStoreSupplier(final ResponsiveWindowParams params) {
this.params = params;
this.segmentIntervalMs = computeSegmentInterval(params.retentionPeriod(), params.numSegments());
}
Expand Down
Loading
Loading