Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][controller][vpj] Support TTL repush for A/A + partial update #704

Merged
merged 9 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/venice-push-job/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ plugins {
}

dependencies {
implementation project(':clients:da-vinci-client')
sixpluszero marked this conversation as resolved.
Show resolved Hide resolved
implementation (project(':internal:venice-common')) {
exclude module: ':internal:alpini'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package com.linkedin.venice.hadoop;

import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;


/**
* An abstraction to filter given data type. It can be used in conjunction with {@link FilterChain}.
*/
public abstract class AbstractVeniceFilter<INPUT_VALUE> implements Closeable {
public AbstractVeniceFilter(final VeniceProperties props) {
public AbstractVeniceFilter() {
}

/**
* This function implements how to parse the value and determine if filtering is needed.
* @param value
* For certain value from Active/Active partial update enabled stores, it might filter out part of its input value and
* only keep the remaining fresh part based on filter timestamp.
* @return true if the value should be filtered out, otherwise false.
*/
public abstract boolean apply(final INPUT_VALUE value);
public abstract boolean checkAndMaybeFilterValue(final INPUT_VALUE value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void add(AbstractVeniceFilter<INPUT_VALUE> filter) {
public boolean apply(final INPUT_VALUE value) {
if (!filterList.isEmpty()) {
for (AbstractVeniceFilter<INPUT_VALUE> filter: filterList) {
if (filter.apply(value)) {
if (filter.checkAndMaybeFilterValue(value)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import com.linkedin.venice.hadoop.input.kafka.VeniceKafkaInputReducer;
import com.linkedin.venice.hadoop.input.kafka.ttl.TTLResolutionPolicy;
import com.linkedin.venice.hadoop.output.avro.ValidateSchemaAndBuildDictMapperOutput;
import com.linkedin.venice.hadoop.schema.HDFSRmdSchemaSource;
import com.linkedin.venice.hadoop.schema.HDFSSchemaSource;
import com.linkedin.venice.hadoop.ssl.TempFileSSLConfigurator;
import com.linkedin.venice.hadoop.utils.HadoopUtils;
import com.linkedin.venice.hadoop.utils.VPJSSLUtils;
Expand Down Expand Up @@ -397,8 +397,10 @@ public class VenicePushJob implements AutoCloseable {
public static final String REPUSH_TTL_ENABLE = "repush.ttl.enable";
public static final String REPUSH_TTL_IN_SECONDS = "repush.ttl.seconds";
public static final String REPUSH_TTL_POLICY = "repush.ttl.policy";
public static final String REPUSH_TTL_START_TIMESTAMP = "repush.ttl.start.timestamp";
public static final String RMD_SCHEMA_DIR = "rmd.schema.dir";
private static final String TEMP_DIR_PREFIX = "/tmp/veniceRmdSchemas/";
public static final String VALUE_SCHEMA_DIR = "value.schema.dir";
private static final String TEMP_DIR_PREFIX = "/tmp/veniceSchemas/";
public static final int NOT_SET = -1;
private static final Logger LOGGER = LogManager.getLogger(VenicePushJob.class);

Expand Down Expand Up @@ -523,8 +525,10 @@ protected static class PushJobSetting {
boolean repushTTLEnabled;
// specify ttl time to drop stale records.
long repushTTLInSeconds;
long repushTTLStartTimeMs;
// HDFS directory to cache RMD schemas
String rmdSchemaDir;
String valueSchemaDir;
String controllerD2ServiceName;
String parentControllerRegionD2ZkHosts;
String childControllerRegionD2ZkHosts;
Expand Down Expand Up @@ -708,6 +712,7 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
pushJobSettingToReturn.deferVersionSwap = props.getBoolean(DEFER_VERSION_SWAP, false);
pushJobSettingToReturn.repushTTLEnabled = props.getBoolean(REPUSH_TTL_ENABLE, false);
pushJobSettingToReturn.repushTTLInSeconds = NOT_SET;
pushJobSettingToReturn.repushTTLStartTimeMs = props.getLong(REPUSH_TTL_START_TIMESTAMP, System.currentTimeMillis());
pushJobSettingToReturn.isTargetedRegionPushEnabled = props.getBoolean(TARGETED_REGION_PUSH_ENABLED, false);
pushJobSettingToReturn.postValidationConsumption = props.getBoolean(POST_VALIDATION_CONSUMPTION_ENABLED, true);
if (pushJobSettingToReturn.isIncrementalPush && pushJobSettingToReturn.isTargetedRegionPushEnabled) {
Expand Down Expand Up @@ -963,10 +968,6 @@ public void run() {
validateStoreSettingAndPopulate(controllerClient, pushJobSetting);
inputStorageQuotaTracker = new InputStorageQuotaTracker(storeSetting.storeStorageQuota);

if (pushJobSetting.repushTTLEnabled && storeSetting.isWriteComputeEnabled) {
throw new VeniceException("Repush TTL is not supported when the store has write compute enabled.");
}

if (pushJobSetting.isSourceETL) {
MultiSchemaResponse allValueSchemaResponses = controllerClient.getAllValueSchema(pushJobSetting.storeName);
MultiSchemaResponse.Schema[] allValueSchemas = allValueSchemaResponses.getSchemas();
Expand Down Expand Up @@ -1062,15 +1063,13 @@ public void run() {

// build the full path for HDFSRmdSchemaSource: the schema path will be suffixed
// by the store name and time like: <TEMP_DIR_PREFIX>/<store_name>/<timestamp>
StringBuilder schemaDirBuilder = new StringBuilder();
schemaDirBuilder.append(TEMP_DIR_PREFIX)
.append(pushJobSetting.storeName)
.append("/")
.append(System.currentTimeMillis());
try (HDFSRmdSchemaSource rmdSchemaSource =
new HDFSRmdSchemaSource(schemaDirBuilder.toString(), pushJobSetting.storeName)) {
rmdSchemaSource.loadRmdSchemasOnDisk(controllerClient);
pushJobSetting.rmdSchemaDir = rmdSchemaSource.getPath();
String rmdSchemaDir = TEMP_DIR_PREFIX + pushJobSetting.storeName + "/rmd_" + System.currentTimeMillis();
sixpluszero marked this conversation as resolved.
Show resolved Hide resolved
String valueSchemaDir = TEMP_DIR_PREFIX + pushJobSetting.storeName + "/value_" + System.currentTimeMillis();
try (HDFSSchemaSource schemaSource =
new HDFSSchemaSource(valueSchemaDir, rmdSchemaDir, pushJobSetting.storeName)) {
schemaSource.saveSchemasOnDisk(controllerClient);
pushJobSetting.rmdSchemaDir = schemaSource.getRmdSchemaPath();
pushJobSetting.valueSchemaDir = schemaSource.getValueSchemaPath();
}
}
}
Expand Down Expand Up @@ -3000,11 +2999,13 @@ protected void setupDefaultJobConf(
conf.set(KAFKA_INPUT_TOPIC, pushJobSetting.kafkaInputTopic);
conf.set(KAFKA_INPUT_BROKER_URL, pushJobSetting.kafkaInputBrokerUrl);
conf.setLong(REPUSH_TTL_IN_SECONDS, pushJobSetting.repushTTLInSeconds);
conf.setLong(REPUSH_TTL_START_TIMESTAMP, pushJobSetting.repushTTLStartTimeMs);
if (pushJobSetting.repushTTLEnabled) {
conf.setInt(REPUSH_TTL_POLICY, TTLResolutionPolicy.RT_WRITE_ONLY.getValue()); // only support one policy
// thus not allow any value passed
// in.
conf.set(RMD_SCHEMA_DIR, pushJobSetting.rmdSchemaDir);
conf.set(VALUE_SCHEMA_DIR, pushJobSetting.valueSchemaDir);
}
// Pass the compression strategy of source version to repush MR job
conf.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,10 @@ private byte[] concatenateAllChunks(final byte[][] chunks, final int totalByteCo
}

public static class ValueBytesAndSchemaId {
private final byte[] bytes;
private final int schemaID;

private final int replicationMetadataVersionId;
private final ByteBuffer replicationMetadataPayload;
private byte[] bytes;
private ByteBuffer replicationMetadataPayload;

ValueBytesAndSchemaId(byte[] bytes, int schemaID, int rmdId, ByteBuffer rmdPayload) {
this.bytes = bytes;
Expand All @@ -278,5 +277,13 @@ public int getReplicationMetadataVersionId() {
public ByteBuffer getReplicationMetadataPayload() {
return replicationMetadataPayload;
}

public void setReplicationMetadataPayload(ByteBuffer replicationMetadataPayload) {
this.replicationMetadataPayload = replicationMetadataPayload;
}

public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,27 @@ protected int getSchemaId(ChunkAssembler.ValueBytesAndSchemaId valueBytesAndSche
}

@Override
protected int getRmdId(ChunkAssembler.ValueBytesAndSchemaId valueBytesAndSchemaId) {
protected int getRmdProtocolId(ChunkAssembler.ValueBytesAndSchemaId valueBytesAndSchemaId) {
return valueBytesAndSchemaId.getReplicationMetadataVersionId();
}

@Override
protected ByteBuffer getRmdPayload(ChunkAssembler.ValueBytesAndSchemaId valueBytesAndSchemaId) {
return valueBytesAndSchemaId.getReplicationMetadataPayload();
}

@Override
protected ByteBuffer getValuePayload(ChunkAssembler.ValueBytesAndSchemaId valueBytesAndSchemaId) {
return ByteBuffer.wrap(valueBytesAndSchemaId.getBytes());
}

@Override
protected void updateRmdPayload(ChunkAssembler.ValueBytesAndSchemaId valueBytesAndSchemaId, ByteBuffer payload) {
valueBytesAndSchemaId.setReplicationMetadataPayload(payload);
}

@Override
protected void updateValuePayload(ChunkAssembler.ValueBytesAndSchemaId valueBytesAndSchemaId, byte[] payload) {
valueBytesAndSchemaId.setBytes(payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ protected int getSchemaId(final KafkaInputMapperValue kafkaInputMapperValue) {
return kafkaInputMapperValue.schemaId;
}

protected int getRmdId(final KafkaInputMapperValue kafkaInputMapperValue) {
protected int getRmdProtocolId(final KafkaInputMapperValue kafkaInputMapperValue) {
return kafkaInputMapperValue.replicationMetadataVersionId;
}

Expand All @@ -28,6 +28,21 @@ protected ByteBuffer getRmdPayload(final KafkaInputMapperValue kafkaInputMapperV
return kafkaInputMapperValue.replicationMetadataPayload;
}

@Override
protected ByteBuffer getValuePayload(final KafkaInputMapperValue kafkaInputMapperValue) {
return kafkaInputMapperValue.value;
}

@Override
protected void updateRmdPayload(KafkaInputMapperValue kafkaInputMapperValue, ByteBuffer payload) {
kafkaInputMapperValue.replicationMetadataPayload = payload;
}

@Override
protected void updateValuePayload(KafkaInputMapperValue kafkaInputMapperValue, byte[] payload) {
kafkaInputMapperValue.value = ByteBuffer.wrap(payload);
}

/**
* When schemeId is negative, it indicates a chunked record.
* Skip it and pass it to Reducer as chunk will only be re-assembled at Reducer.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package com.linkedin.venice.hadoop.input.kafka.ttl;

import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_POS;

import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper;
import com.linkedin.davinci.schema.merge.MergeRecordHelper;
import com.linkedin.davinci.schema.merge.UpdateResultStatus;
import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory;
import com.linkedin.venice.hadoop.AbstractVeniceFilter;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.schema.HDFSRmdSchemaSource;
import com.linkedin.venice.hadoop.schema.HDFSSchemaSource;
import com.linkedin.venice.schema.rmd.RmdTimestampType;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.schema.rmd.RmdVersionId;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -27,68 +32,121 @@
*/
public abstract class VeniceRmdTTLFilter<INPUT_VALUE> extends AbstractVeniceFilter<INPUT_VALUE> {
private final TTLResolutionPolicy ttlPolicy;
private final long ttlInMs;
private final HDFSRmdSchemaSource schemaSource;
protected final Map<RmdVersionId, Schema> rmdMapping;
private final long filterTimestamp;
private final HDFSSchemaSource schemaSource;
protected final Map<RmdVersionId, Schema> rmdSchemaMap;
protected final Map<Integer, Schema> valueSchemaMap;
private final Map<RmdVersionId, RecordDeserializer<GenericRecord>> rmdDeserializerCache;
private final Map<Integer, RecordDeserializer<GenericRecord>> valueDeserializerCache;
private final Map<RmdVersionId, RecordSerializer<GenericRecord>> rmdSerializerCache;
private final Map<Integer, RecordSerializer<GenericRecord>> valueSerializerCache;
private final MergeRecordHelper mergeRecordHelper = new CollectionTimestampMergeRecordHelper();

public VeniceRmdTTLFilter(final VeniceProperties props) throws IOException {
super(props);
super();
ttlPolicy = TTLResolutionPolicy.valueOf(props.getInt(VenicePushJob.REPUSH_TTL_POLICY));
ttlInMs = TimeUnit.SECONDS.toMillis(props.getLong(VenicePushJob.REPUSH_TTL_IN_SECONDS));
schemaSource = new HDFSRmdSchemaSource(props.getString(VenicePushJob.RMD_SCHEMA_DIR));
rmdMapping = schemaSource.fetchSchemas();
long ttlInMs = TimeUnit.SECONDS.toMillis(props.getLong(VenicePushJob.REPUSH_TTL_IN_SECONDS));
long ttlStartTimestamp = props.getLong(VenicePushJob.REPUSH_TTL_START_TIMESTAMP);
this.filterTimestamp = ttlStartTimestamp - ttlInMs - 1;
this.schemaSource = new HDFSSchemaSource(
props.getString(VenicePushJob.VALUE_SCHEMA_DIR),
props.getString(VenicePushJob.RMD_SCHEMA_DIR));
this.rmdSchemaMap = schemaSource.fetchRmdSchemas();
this.valueSchemaMap = schemaSource.fetchValueSchemas();
this.rmdDeserializerCache = new VeniceConcurrentHashMap<>();
this.valueDeserializerCache = new VeniceConcurrentHashMap<>();
this.rmdSerializerCache = new VeniceConcurrentHashMap<>();
this.valueSerializerCache = new VeniceConcurrentHashMap<>();
}

@Override
public boolean apply(final INPUT_VALUE value) {
public boolean checkAndMaybeFilterValue(final INPUT_VALUE value) {
if (skipRmdRecord(value)) {
return false;
}
Instant curTime = Instant.now();
switch (ttlPolicy) {
case RT_WRITE_ONLY:
Instant timestamp = Instant.ofEpochMilli(getTimeStampFromRmdRecord(value));
return ChronoUnit.MILLIS.between(timestamp, curTime) > ttlInMs;
default:
throw new UnsupportedOperationException(ttlPolicy + " policy is not supported.");
if (Objects.requireNonNull(ttlPolicy) == TTLResolutionPolicy.RT_WRITE_ONLY) {
return filterByTTLandMaybeUpdateValue(value);
}
throw new UnsupportedOperationException(ttlPolicy + " policy is not supported.");
}

@Override
public void close() {
schemaSource.close();
}

public long getTimeStampFromRmdRecord(final INPUT_VALUE value) {
boolean filterByTTLandMaybeUpdateValue(final INPUT_VALUE value) {
ByteBuffer rmdPayload = getRmdPayload(value);
if (rmdPayload == null || !rmdPayload.hasRemaining()) {
throw new IllegalStateException(
"The record doesn't contain required RMD field. Please check if your store has A/A enabled");
}
int id = getRmdId(value), valueSchemaId = getSchemaId(value);
int valueSchemaId = getSchemaId(value);
int id = getRmdProtocolId(value);
RmdVersionId rmdVersionId = new RmdVersionId(valueSchemaId, id);
GenericRecord record =
this.rmdDeserializerCache.computeIfAbsent(rmdVersionId, this::generateDeserializer).deserialize(rmdPayload);
return RmdUtils.extractTimestampFromRmd(record)
.stream()
.mapToLong(v -> v)
.max()
.orElseThrow(NoSuchElementException::new);
GenericRecord rmdRecord =
rmdDeserializerCache.computeIfAbsent(rmdVersionId, this::generateRmdDeserializer).deserialize(rmdPayload);
Object rmdTimestampObject = rmdRecord.get(TIMESTAMP_FIELD_POS);
RmdTimestampType rmdTimestampType = RmdUtils.getRmdTimestampType(rmdTimestampObject);
// For value-level RMD timestamp, just compare the value with the filter TS.
if (rmdTimestampType.equals(RmdTimestampType.VALUE_LEVEL_TIMESTAMP)) {
return (long) rmdTimestampObject <= filterTimestamp;
}
RecordDeserializer<GenericRecord> valueDeserializer =
valueDeserializerCache.computeIfAbsent(valueSchemaId, this::generateValueDeserializer);
GenericRecord valueRecord = valueDeserializer.deserialize(getValuePayload(value));
UpdateResultStatus updateResultStatus =
mergeRecordHelper.deleteRecord(valueRecord, (GenericRecord) rmdTimestampObject, filterTimestamp, 0);
if (updateResultStatus.equals(UpdateResultStatus.COMPLETELY_UPDATED)) {
// This means the record is fully stale, we should drop it.
return true;
}
if (updateResultStatus.equals(UpdateResultStatus.NOT_UPDATED_AT_ALL)) {
// This means the whole record is newer than TTL filter threshold timestamp, and we should keep it.
return false;
}
// Part of the data has been wiped out by DELETE operation, and we should update the input's value and RMD payload.
RecordSerializer<GenericRecord> valueSerializer =
valueSerializerCache.computeIfAbsent(valueSchemaId, this::generateValueSerializer);
RecordSerializer<GenericRecord> rmdSerializer =
rmdSerializerCache.computeIfAbsent(rmdVersionId, this::generateRmdSerializer);
updateValuePayload(value, valueSerializer.serialize(valueRecord));
updateRmdPayload(value, ByteBuffer.wrap(rmdSerializer.serialize(rmdRecord)));
return false;
}

RecordDeserializer<GenericRecord> generateRmdDeserializer(RmdVersionId rmdVersionId) {
Schema schema = rmdSchemaMap.get(rmdVersionId);
return MapOrderingPreservingSerDeFactory.getDeserializer(schema, schema);
}

private RecordDeserializer<GenericRecord> generateDeserializer(RmdVersionId rmdVersionId) {
Schema schema = rmdMapping.get(rmdVersionId);
return FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(schema, schema);
RecordDeserializer<GenericRecord> generateValueDeserializer(int valueSchemaId) {
Schema schema = valueSchemaMap.get(valueSchemaId);
return MapOrderingPreservingSerDeFactory.getDeserializer(schema, schema);
}

RecordSerializer<GenericRecord> generateRmdSerializer(RmdVersionId rmdVersionId) {
Schema schema = rmdSchemaMap.get(rmdVersionId);
return MapOrderingPreservingSerDeFactory.getSerializer(schema);
}

RecordSerializer<GenericRecord> generateValueSerializer(int valueSchemaId) {
Schema schema = valueSchemaMap.get(valueSchemaId);
return MapOrderingPreservingSerDeFactory.getSerializer(schema);
}

protected abstract int getSchemaId(final INPUT_VALUE value);

protected abstract int getRmdId(final INPUT_VALUE value);
protected abstract int getRmdProtocolId(final INPUT_VALUE value);

protected abstract ByteBuffer getRmdPayload(final INPUT_VALUE value);

protected abstract ByteBuffer getValuePayload(final INPUT_VALUE value);

protected abstract void updateRmdPayload(final INPUT_VALUE value, ByteBuffer payload);

protected abstract void updateValuePayload(final INPUT_VALUE value, byte[] payload);

/**
* Define how records could be skipped if certain conditions are met.
* Do not skip by default.
Expand Down
Loading
Loading