diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java index c540ab665..c5af98bc9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java @@ -16,18 +16,18 @@ package dev.responsive.kafka.api.stores; +import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration; import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema; import dev.responsive.kafka.internal.utils.TableName; import java.time.Duration; import java.util.Optional; -import javax.annotation.Nullable; public final class ResponsiveKeyValueParams { private final TableName name; private final KVSchema schema; - @Nullable private Duration timeToLive = null; + private Optional> ttlProvider = Optional.empty(); private ResponsiveKeyValueParams( final String name, @@ -46,7 +46,16 @@ public static ResponsiveKeyValueParams fact(final String name) { } public ResponsiveKeyValueParams withTimeToLive(final Duration timeToLive) { - this.timeToLive = timeToLive; + return withTtlProvider(TtlProvider.withDefault(timeToLive)); + } + + public ResponsiveKeyValueParams withTtlProvider(final TtlProvider ttlProvider) { + // If ttl is constant and infinite, it's equivalent to having no ttl at all + if (ttlProvider.hasDefaultOnly() && !ttlProvider.defaultTtl().isFinite()) { + this.ttlProvider = Optional.empty(); + } else { + this.ttlProvider = Optional.of(ttlProvider); + } return this; } @@ -58,8 +67,17 @@ public KVSchema schemaType() { return schema; } - public Optional timeToLive() { - return Optional.ofNullable(timeToLive); + public Optional> ttlProvider() { + return ttlProvider; + } + + public Optional defaultTimeToLive() { + if (ttlProvider.isPresent()) { + return Optional.ofNullable(ttlProvider.get().defaultTtl()); + + } else { + return Optional.empty(); + } } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionStoreSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionBytesStoreSupplier.java similarity index 90% rename from kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionStoreSupplier.java rename to kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionBytesStoreSupplier.java index 984ecc281..c9568f151 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionStoreSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionBytesStoreSupplier.java @@ -24,12 +24,12 @@ import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; -public class ResponsiveSessionStoreSupplier implements SessionBytesStoreSupplier { +public class ResponsiveSessionBytesStoreSupplier implements SessionBytesStoreSupplier { private final ResponsiveSessionParams params; private final long segmentIntervalMs; - public ResponsiveSessionStoreSupplier(final ResponsiveSessionParams params) { + public ResponsiveSessionBytesStoreSupplier(final ResponsiveSessionParams params) { this.params = params; this.segmentIntervalMs = computeSegmentInterval(params.retentionPeriod(), params.numSegments()); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java index 6700c0397..aebd0396a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -56,7 +55,9 @@ public final class ResponsiveStores { * @return a supplier for a key-value store with the given options * that uses Responsive's storage for its backend */ - public static KeyValueBytesStoreSupplier keyValueStore(final ResponsiveKeyValueParams params) { + public static ResponsiveKeyValueBytesStoreSupplier keyValueStore( + final ResponsiveKeyValueParams params + ) { return new ResponsiveKeyValueBytesStoreSupplier(params); } @@ -67,7 +68,7 @@ public static KeyValueBytesStoreSupplier keyValueStore(final ResponsiveKeyValueP * @return a supplier for a key-value store with the given options * that uses Responsive's storage for its backend */ - public static KeyValueBytesStoreSupplier keyValueStore(final String name) { + public static ResponsiveKeyValueBytesStoreSupplier keyValueStore(final String name) { return keyValueStore(ResponsiveKeyValueParams.keyValue(name)); } @@ -93,7 +94,7 @@ public static KeyValueBytesStoreSupplier keyValueStore(final String name) { * @return a supplier for a key-value store with the given options * that uses Responsive's storage for its backend */ - public static KeyValueBytesStoreSupplier factStore(final String name) { + public static ResponsiveKeyValueBytesStoreSupplier factStore(final String name) { return keyValueStore(ResponsiveKeyValueParams.fact(name)); } @@ -116,7 +117,7 @@ public static KeyValueBytesStoreSupplier factStore(final String name) { * that uses Responsive's storage for its backend */ public static StoreBuilder> keyValueStoreBuilder( - final KeyValueBytesStoreSupplier storeSupplier, + final ResponsiveKeyValueBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde ) { @@ -148,17 +149,11 @@ public static StoreBuilder> keyValueStoreBuilder( * that uses Responsive's storage for its backend */ public static StoreBuilder> timestampedKeyValueStoreBuilder( - final KeyValueBytesStoreSupplier storeSupplier, + final ResponsiveKeyValueBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde ) { - if (storeSupplier instanceof ResponsiveKeyValueBytesStoreSupplier) { - ((ResponsiveKeyValueBytesStoreSupplier) storeSupplier).asTimestamped(); - } else { - throw new IllegalArgumentException( - "Must supply a Responsive StoreSupplier via one of the ResponsiveStores APIs" - ); - } + storeSupplier.asTimestamped(); return new ResponsiveStoreBuilder<>( StoreType.TIMESTAMPED_KEY_VALUE, @@ -202,8 +197,10 @@ public static Materialized> materializ * @return a supplier for a window store with the given options * that uses Responsive's storage for its backend */ - public static WindowBytesStoreSupplier windowStoreSupplier(final ResponsiveWindowParams params) { - return new ResponsiveWindowedStoreSupplier(params); + public static ResponsiveWindowBytesStoreSupplier windowStoreSupplier( + final ResponsiveWindowParams params + ) { + return new ResponsiveWindowBytesStoreSupplier(params); } /** @@ -217,7 +214,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier(final ResponsiveWindo * @return a supplier for a window store with the given options * that uses Responsive's storage for its backend */ - public static WindowBytesStoreSupplier windowStoreSupplier( + public static ResponsiveWindowBytesStoreSupplier windowStoreSupplier( final String name, final Duration retentionPeriod, final Duration windowSize, @@ -232,7 +229,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier( throw new IllegalArgumentException("Retention period cannot be less than window size"); } - return new ResponsiveWindowedStoreSupplier( + return new ResponsiveWindowBytesStoreSupplier( ResponsiveWindowParams.window(name, windowSize, retentionPeriod, retainDuplicates) ); } else { @@ -242,7 +239,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier( ); } - return new ResponsiveWindowedStoreSupplier( + return new ResponsiveWindowBytesStoreSupplier( ResponsiveWindowParams.streamStreamJoin(name, windowSize) ); } @@ -262,7 +259,7 @@ public static WindowBytesStoreSupplier windowStoreSupplier( * that uses Responsive's storage for its backend */ public static StoreBuilder> windowStoreBuilder( - final WindowBytesStoreSupplier storeSupplier, + final ResponsiveWindowBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde ) { @@ -289,7 +286,7 @@ public static StoreBuilder> windowStoreBuilder( * that uses Responsive's storage for its backend */ public static StoreBuilder> timestampedWindowStoreBuilder( - final WindowBytesStoreSupplier storeSupplier, + final ResponsiveWindowBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde ) { @@ -318,7 +315,7 @@ public static Materialized> windowMateri final ResponsiveWindowParams params ) { return new ResponsiveMaterialized<>( - Materialized.as(new ResponsiveWindowedStoreSupplier(params)) + Materialized.as(new ResponsiveWindowBytesStoreSupplier(params)) ); } @@ -329,10 +326,10 @@ public static Materialized> windowMateri * @return a supplier for a session store with the given options * that uses Responsive's storage for its backend */ - public static SessionBytesStoreSupplier sessionStoreSupplier( + public static ResponsiveSessionBytesStoreSupplier sessionStoreSupplier( final ResponsiveSessionParams params ) { - return new ResponsiveSessionStoreSupplier(params); + return new ResponsiveSessionBytesStoreSupplier(params); } /** @@ -348,7 +345,7 @@ public static SessionBytesStoreSupplier sessionStoreSupplier( * that uses Responsive's storage for its backend */ public static StoreBuilder> sessionStoreBuilder( - final SessionBytesStoreSupplier storeSupplier, + final ResponsiveSessionBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde ) { @@ -374,7 +371,7 @@ public static Materialized> sessionMate final ResponsiveSessionParams params ) { return new ResponsiveMaterialized<>( - Materialized.as(new ResponsiveSessionStoreSupplier(params)) + Materialized.as(new ResponsiveSessionBytesStoreSupplier(params)) ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowedStoreSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowBytesStoreSupplier.java similarity index 91% rename from kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowedStoreSupplier.java rename to kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowBytesStoreSupplier.java index 7885089e7..fa98d0d0c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowedStoreSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowBytesStoreSupplier.java @@ -24,12 +24,12 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; -public class ResponsiveWindowedStoreSupplier implements WindowBytesStoreSupplier { +public class ResponsiveWindowBytesStoreSupplier implements WindowBytesStoreSupplier { private final ResponsiveWindowParams params; private final long segmentIntervalMs; - public ResponsiveWindowedStoreSupplier(final ResponsiveWindowParams params) { + public ResponsiveWindowBytesStoreSupplier(final ResponsiveWindowParams params) { this.params = params; this.segmentIntervalMs = computeSegmentInterval(params.retentionPeriod(), params.numSegments()); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java new file mode 100644 index 000000000..2e58bcb0d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java @@ -0,0 +1,291 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.api.stores; + +import dev.responsive.kafka.internal.utils.StateDeserializer; +import java.time.Duration; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.kafka.common.serialization.Serde; + +public class TtlProvider { + + /** + * Creates a new TtlProvider with the given default duration to retain records for unless + * overridden. To allow ttl overrides for individual records, you can use one of the + * {@link #fromKey(Function, Serde)}, {@link #fromValue(Function, Serde)}, + * or {@link #fromKeyAndValue(BiFunction, Serde, Serde)} methods to define the row-level + * override function. + * + * @return a new TtlProvider that will retain records for the specified default duration + */ + public static TtlProvider withDefault(final Duration defaultTtl) { + return new TtlProvider<>( + TtlType.DEFAULT_ONLY, + TtlDuration.of(defaultTtl), + (ignoredK, ignoredV) -> Optional.empty(), + null, + null + ); + } + + /** + * Creates a new TtlProvider that has no default (equivalent to infinite retention for + * all records unless an override is specified). Must be used in combination with + * exactly one of the {@link #fromKey(Function, Serde)}, {@link #fromValue(Function, Serde)}, + * and {@link #fromKeyAndValue(BiFunction, Serde, Serde)} methods to define the row-level + * override function. + * + * @return a new TtlProvider that will retain records indefinitely by default + */ + public static TtlProvider withNoDefault() { + return new TtlProvider<>( + TtlType.DEFAULT_ONLY, + TtlDuration.noTtl(), + (ignoredK, ignoredV) -> Optional.empty(), + null, + null + ); + } + + /** + * @return the same TtlProvider with a key-based override function + */ + public TtlProvider fromKey( + final Function> computeTtlFromKey, + final Serde keySerde + ) { + if (ttlType.equals(TtlType.VALUE) || ttlType.equals(TtlType.KEY_AND_VALUE)) { + throw new IllegalArgumentException("Must choose only key, value, or key-and-value ttl"); + } + + if (keySerde == null || keySerde.deserializer() == null) { + throw new IllegalArgumentException("The key Serde and Deserializer must not be null"); + } + + return new TtlProvider<>( + TtlType.KEY, + defaultTtl, + (k, ignored) -> computeTtlFromKey.apply(k), + keySerde, + null + ); + } + + /** + * @return the same TtlProvider with a value-based override function + */ + public TtlProvider fromValue( + final Function> computeTtlFromValue, + final Serde valueSerde + ) { + if (ttlType.equals(TtlType.KEY) || ttlType.equals(TtlType.KEY_AND_VALUE)) { + throw new IllegalArgumentException("Must choose only key, value, or key-and-value ttl"); + } + + if (valueSerde == null || valueSerde.deserializer() == null) { + throw new IllegalArgumentException("The value Serde and Deserializer must not be null"); + } + + return new TtlProvider<>( + TtlType.VALUE, + defaultTtl, + (ignored, v) -> computeTtlFromValue.apply(v), + null, + valueSerde); + } + + /** + * @return the same TtlProvider with a key-and-value-based override function + */ + public TtlProvider fromKeyAndValue( + final BiFunction> computeTtlFromKeyAndValue, + final Serde keySerde, + final Serde valueSerde + ) { + if (ttlType.equals(TtlType.KEY) || ttlType.equals(TtlType.VALUE)) { + throw new IllegalArgumentException("Must choose only key, value, or key-and-value ttl"); + } + + if (keySerde == null || keySerde.deserializer() == null) { + throw new IllegalArgumentException("The key Serde and Deserializer must not be null"); + } else if (valueSerde == null || valueSerde.deserializer() == null) { + throw new IllegalArgumentException("The value Serde and Deserializer must not be null"); + } + + return new TtlProvider<>( + TtlType.KEY_AND_VALUE, + defaultTtl, + computeTtlFromKeyAndValue, + keySerde, + valueSerde + ); + } + + public static class TtlDuration { + + public enum Ttl { + INFINITE, + FINITE + } + + public static TtlDuration of(final Duration ttl) { + if (ttl.compareTo(Duration.ZERO) <= 0) { + throw new IllegalArgumentException("ttl duration must be greater than zero"); + } + return new TtlDuration(ttl, Ttl.FINITE); + } + + // No ttl will be applied, in other words infinite retention + public static TtlDuration noTtl() { + return new TtlDuration(Duration.ZERO, Ttl.INFINITE); + } + + private final Duration duration; + private final Ttl ttlType; + + private TtlDuration(final Duration ttlValue, final Ttl ttlType) { + this.duration = ttlValue; + this.ttlType = ttlType; + } + + public Duration duration() { + if (!isFinite()) { + throw new IllegalStateException("Can't convert TtlDuration to Duration unless finite"); + } + return duration; + } + + public boolean isFinite() { + return ttlType.equals(Ttl.FINITE); + } + + public long toSeconds() { + return duration.toSeconds(); + } + + public long toMillis() { + return duration.toMillis(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final TtlDuration that = (TtlDuration) o; + + if (!duration.equals(that.duration)) { + return false; + } + return ttlType == that.ttlType; + } + + @Override + public int hashCode() { + int result = duration.hashCode(); + result = 31 * result + ttlType.hashCode(); + return result; + } + } + + private enum TtlType { + DEFAULT_ONLY, + KEY, + VALUE, + KEY_AND_VALUE + } + + private final TtlType ttlType; + private final TtlDuration defaultTtl; + + // Only non-null for key/value-based ttl providers + private final Serde keySerde; + private final Serde valueSerde; + + private final BiFunction> computeTtl; + + private TtlProvider( + final TtlType ttlType, + final TtlDuration defaultTtl, + final BiFunction> computeTtl, + final Serde keySerde, + final Serde valueSerde + ) { + this.ttlType = ttlType; + this.defaultTtl = defaultTtl; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.computeTtl = computeTtl; + } + + public Serde keySerde() { + return keySerde; + } + + public Serde valueSerde() { + return valueSerde; + } + + public TtlDuration defaultTtl() { + return defaultTtl; + } + + public boolean hasDefaultOnly() { + return ttlType == TtlType.DEFAULT_ONLY; + } + + public boolean needsValueToComputeTtl() { + return ttlType == TtlType.VALUE || ttlType == TtlType.KEY_AND_VALUE; + } + + public Optional computeTtl( + final byte[] keyBytes, + final byte[] valueBytes, + final StateDeserializer stateDeserializer + ) { + final K key; + final V value; + switch (ttlType) { + case DEFAULT_ONLY: + key = null; //ignored + value = null; //ignored + break; + case KEY: + key = stateDeserializer.keyFrom(keyBytes); + value = null; //ignored + break; + case VALUE: + key = null; //ignored + value = stateDeserializer.valueFrom(valueBytes); + break; + case KEY_AND_VALUE: + key = stateDeserializer.keyFrom(keyBytes); + value = stateDeserializer.valueFrom(valueBytes); + break; + default: + throw new IllegalStateException("Unrecognized ttl type: " + ttlType); + } + return computeTtl.apply(key, value); + } + +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java index 6cab3e9af..c094d945c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java @@ -53,8 +53,8 @@ public static RemoteTableSpec fromKVParams( ) { RemoteTableSpec spec = new BaseTableSpec(params.name().tableName(), partitioner); - if (params.timeToLive().isPresent()) { - spec = new TtlTableSpec(spec, params.timeToLive().get()); + if (params.ttlProvider().isPresent()) { + spec = new TtlTableSpec(spec, params.defaultTimeToLive().get().duration()); } if (params.schemaType() == SchemaTypes.KVSchema.FACT) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index d71ae3ee1..1a9666d8b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -40,8 +40,6 @@ import dev.responsive.kafka.internal.utils.Result; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.TableName; -import java.time.Duration; -import java.time.Instant; import java.util.Collection; import java.util.Optional; import java.util.OptionalInt; @@ -151,10 +149,14 @@ public static PartitionedOperations create( storeRegistry.registerStore(registration); final boolean migrationMode = ConfigUtils.responsiveMode(config) == ResponsiveMode.MIGRATE; - long startingTimestamp = -1; - final Optional ttl = params.timeToLive(); - if (migrationMode && ttl.isPresent()) { - startingTimestamp = Instant.now().minus(ttl.get()).toEpochMilli(); + long startTimeMs = -1; + if (migrationMode && params.ttlProvider().isPresent()) { + if (!params.ttlProvider().get().hasDefaultOnly()) { + throw new UnsupportedOperationException("Row-level ttl overrides are not yet supported " + + "with migration mode"); + } + startTimeMs = + System.currentTimeMillis() - params.ttlProvider().get().defaultTtl().toMillis(); } return new PartitionedOperations( @@ -168,7 +170,7 @@ public static PartitionedOperations create( registration, sessionClients.restoreListener(), migrationMode, - startingTimestamp + startTimeMs ); } catch (final RuntimeException e) { if (buffer != null) { @@ -371,11 +373,12 @@ private long currentRecordTimestamp() { } private long minValidTimestamp() { - // TODO: unwrapping the ttl from Duration to millis is somewhat heavy for the hot path - return params - .timeToLive() - .map(ttl -> currentRecordTimestamp() - ttl.toMillis()) - .orElse(-1L); + if (params.ttlProvider().isEmpty()) { + return -1L; + } + + final long ttlMs = params.defaultTimeToLive().get().toMillis(); + return currentRecordTimestamp() - ttlMs; } private boolean migratingAndTimestampTooEarly() { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/StateDeserializer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/StateDeserializer.java new file mode 100644 index 000000000..37fa14a9b --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/StateDeserializer.java @@ -0,0 +1,81 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.internal.utils; + +import java.util.Optional; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StateDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(StateDeserializer.class); + + private final String changelogTopic; + private final Deserializer keyDeserializer; + private final Deserializer valueDeserializer; + private final Optional>> timestampedValueDeserializer; + + public StateDeserializer( + final boolean isTimestamped, + final String changelogTopic, + final Serde keySerde, + final Serde valueSerde + ) { + this.changelogTopic = changelogTopic; + this.keyDeserializer = keySerde == null ? null : keySerde.deserializer(); + this.valueDeserializer = valueSerde == null ? null : valueSerde.deserializer(); + + if (isTimestamped && valueSerde != null) { + timestampedValueDeserializer = + Optional.of(new ValueAndTimestampSerde<>(valueSerde).deserializer()); + } else { + timestampedValueDeserializer = Optional.empty(); + } + } + + public K keyFrom(final byte[] keyBytes) { + if (keyBytes == null || keyDeserializer == null) { + final String errMgs = String.format( + "Tried to deserialize key where keyBytes==null is %s and keyDeserializer==null is %s", + keyBytes == null, keyDeserializer == null); + LOG.error(errMgs); + throw new IllegalStateException(errMgs); + } + + return keyDeserializer.deserialize(changelogTopic, keyBytes); + } + + public V valueFrom(final byte[] valueBytes) { + if (valueBytes == null || valueDeserializer == null) { + final String errMgs = String.format( + "Tried to deserialize value where valueBytes==null is %s " + + "and valueDeserializer==null is %s", + valueBytes == null, valueDeserializer == null); + LOG.error(errMgs); + throw new IllegalStateException(errMgs); + } + + if (timestampedValueDeserializer.isEmpty()) { + return valueDeserializer.deserialize(changelogTopic, valueBytes); + } else { + return timestampedValueDeserializer.get().deserialize(changelogTopic, valueBytes).value(); + } + } +}