Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Oct 24, 2023
1 parent 89da6db commit 40dc979
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public VeniceRmdTTLFilter(final VeniceProperties props) throws IOException {
ttlPolicy = TTLResolutionPolicy.valueOf(props.getInt(VenicePushJob.REPUSH_TTL_POLICY));
long ttlInMs = TimeUnit.SECONDS.toMillis(props.getLong(VenicePushJob.REPUSH_TTL_IN_SECONDS));
long ttlStartTimestamp = props.getLong(VenicePushJob.REPUSH_TTL_START_TIMESTAMP);
this.filterTimestamp = ttlStartTimestamp - ttlInMs;
this.filterTimestamp = ttlStartTimestamp - ttlInMs - 1;
this.schemaSource = new HDFSSchemaSource(
props.getString(VenicePushJob.VALUE_SCHEMA_DIR),
props.getString(VenicePushJob.RMD_SCHEMA_DIR));
Expand All @@ -65,7 +65,7 @@ public boolean checkAndMaybeFilterValue(final INPUT_VALUE value) {
return false;
}
if (Objects.requireNonNull(ttlPolicy) == TTLResolutionPolicy.RT_WRITE_ONLY) {
return validateByTTLandMaybeUpdateValue(value, filterTimestamp);
return validateByTTLandMaybeUpdateValue(value);
}
throw new UnsupportedOperationException(ttlPolicy + " policy is not supported.");
}
Expand All @@ -75,7 +75,7 @@ public void close() {
schemaSource.close();
}

boolean validateByTTLandMaybeUpdateValue(final INPUT_VALUE value, long filterTimestamp) {
boolean validateByTTLandMaybeUpdateValue(final INPUT_VALUE value) {
ByteBuffer rmdPayload = getRmdPayload(value);
if (rmdPayload == null || !rmdPayload.hasRemaining()) {
throw new IllegalStateException(
Expand All @@ -90,13 +90,13 @@ boolean validateByTTLandMaybeUpdateValue(final INPUT_VALUE value, long filterTim
RmdTimestampType rmdTimestampType = RmdUtils.getRmdTimestampType(rmdTimestampObject);
// For value-level RMD timestamp, just compare the value with the filter TS.
if (rmdTimestampType.equals(RmdTimestampType.VALUE_LEVEL_TIMESTAMP)) {
return (long) rmdTimestampObject < filterTimestamp;
return (long) rmdTimestampObject <= filterTimestamp;
}
RecordDeserializer<GenericRecord> valueDeserializer =
valueDeserializerCache.computeIfAbsent(valueSchemaId, this::generateValueDeserializer);
GenericRecord valueRecord = valueDeserializer.deserialize(getValuePayload(value));
UpdateResultStatus updateResultStatus =
mergeRecordHelper.deleteRecord(valueRecord, (GenericRecord) rmdTimestampObject, filterTimestamp - 1, 0);
mergeRecordHelper.deleteRecord(valueRecord, (GenericRecord) rmdTimestampObject, filterTimestamp, 0);
if (updateResultStatus.equals(UpdateResultStatus.COMPLETELY_UPDATED)) {
// This means the record is fully stale, we should drop it.
return true;
Expand Down

0 comments on commit 40dc979

Please sign in to comment.