Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upsert small segment merger task in minions #14477

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, SegmentZ
}

@Nullable
private static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
public static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
// A fast path to get partition id if the segmentName is in a known format like LLC.
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
if (llcSegmentName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private MinionConstants() {
*/
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
public static final long DEFAULT_TABLE_MAX_NUM_TASKS = 1;

/**
* Job configs
Expand Down Expand Up @@ -199,4 +200,59 @@ public static class UpsertCompactionTask {
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";
}

public static class UpsertCompactMergeTask {
public static final String TASK_TYPE = "UpsertCompactMergeTask";

/**
* The time period to wait before picking segments for this task
* e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
*/
public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";

/**
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* prefix for the new segment name that is created,
* {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} will add __ as delimiter
* so not adding _ as a suffix here.
*/
public static final String MERGED_SEGMENT_NAME_PREFIX = "compactmerged";

/**
* maximum number of records to process in a single task, sum of all docs in to-be-merged segments
*/
public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask";

/**
* default maximum number of records to process in a single task, same as the value in {@link MergeRollupTask}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;

/**
* maximum number of records in the output segment
*/
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";

/**
* default maximum number of records in output segment, same as the value in
* {@link org.apache.pinot.core.segment.processing.framework.SegmentConfig}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;

/**
* maximum number of segments to process in a single task
*/
public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = "maxNumSegmentsPerTask";

/**
* default maximum number of segments to process in a single task
*/
public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;

public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
Expand All @@ -46,11 +48,14 @@ public class SegmentProcessorConfig {
private final Map<String, AggregationFunctionType> _aggregationTypes;
private final SegmentConfig _segmentConfig;
private final Consumer<Object> _progressObserver;
private final SegmentNameGenerator _segmentNameGenerator;
private final Long _customCreationTime;

private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig,
List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
Map<String, AggregationFunctionType> aggregationTypes, SegmentConfig segmentConfig,
Consumer<Object> progressObserver) {
Consumer<Object> progressObserver, @Nullable SegmentNameGenerator segmentNameGenerator,
@Nullable Long customCreationTime) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
_tableConfig = tableConfig;
_schema = schema;
Expand All @@ -62,6 +67,8 @@ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandl
_progressObserver = (progressObserver != null) ? progressObserver : p -> {
// Do nothing.
};
_segmentNameGenerator = segmentNameGenerator;
_customCreationTime = customCreationTime;
}

/**
Expand Down Expand Up @@ -117,11 +124,20 @@ public Consumer<Object> getProgressObserver() {
return _progressObserver;
}

public SegmentNameGenerator getSegmentNameGenerator() {
return _segmentNameGenerator;
}

public long getCustomCreationTime() {
return (_customCreationTime != null ? _customCreationTime : System.currentTimeMillis());
}

@Override
public String toString() {
return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _timeHandlerConfig="
+ _timeHandlerConfig + ", _partitionerConfigs=" + _partitionerConfigs + ", _mergeType=" + _mergeType
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig + '}';
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig
+ ", _segmentNameGenerator=" + _segmentNameGenerator + ", _customCreationTime=" + _customCreationTime + '}';
}

/**
Expand All @@ -136,6 +152,8 @@ public static class Builder {
private Map<String, AggregationFunctionType> _aggregationTypes;
private SegmentConfig _segmentConfig;
private Consumer<Object> _progressObserver;
private SegmentNameGenerator _segmentNameGenerator;
private Long _customCreationTime;

public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
Expand Down Expand Up @@ -177,6 +195,16 @@ public Builder setProgressObserver(Consumer<Object> progressObserver) {
return this;
}

public Builder setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
_segmentNameGenerator = segmentNameGenerator;
return this;
}

public Builder setCustomCreationTime(Long customCreationTime) {
_customCreationTime = customCreationTime;
return this;
}

public SegmentProcessorConfig build() {
Preconditions.checkState(_tableConfig != null, "Must provide table config in SegmentProcessorConfig");
Preconditions.checkState(_schema != null, "Must provide schema in SegmentProcessorConfig");
Expand All @@ -197,7 +225,7 @@ public SegmentProcessorConfig build() {
_segmentConfig = new SegmentConfig.Builder().build();
}
return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType,
_aggregationTypes, _segmentConfig, _progressObserver);
_aggregationTypes, _segmentConfig, _progressObserver, _segmentNameGenerator, _customCreationTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,11 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
generatorConfig.setCreationTime(String.valueOf(_segmentProcessorConfig.getCustomCreationTime()));

if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
if (_segmentProcessorConfig.getSegmentNameGenerator() != null) {
generatorConfig.setSegmentNameGenerator(_segmentProcessorConfig.getSegmentNameGenerator());
} else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
generatorConfig.setSegmentNameGenerator(
SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix,
segmentNamePostfix, fixedSegmentName, false));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);

public UpsertCompactMergeTaskExecutor(MinionConf minionConf) {
super(minionConf);
}

@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
throws Exception {
int numInputSegments = segmentDirs.size();
List<SegmentConversionResult> results = new ArrayList<>();
_eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + numInputSegments);
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
long startMillis = System.currentTimeMillis();

String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
TableConfig tableConfig = getTableConfig(tableNameWithType);
Schema schema = getSchema(tableNameWithType);

SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);

// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
int count = 1;
int partitionId = -1;
long maxCreationTimeOfMergingSegments = 0;
List<String> originalSegmentCrcFromTaskGenerator =
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
for (int i = 0; i < numInputSegments; i++) {
File segmentDir = segmentDirs.get(i);
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments));

SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segmentDir);
String segmentName = segmentMetadata.getName();
Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName);
if (segmentPartitionId == null) {
throw new IllegalStateException(String.format("Partition id not found for %s", segmentName));
}
if (partitionId != -1 && partitionId != segmentPartitionId) {
throw new IllegalStateException(String.format("Partition id mismatched for %s, expected partition id: %d",
segmentName, partitionId));
}
partitionId = segmentPartitionId;
maxCreationTimeOfMergingSegments = Math.max(maxCreationTimeOfMergingSegments,
segmentMetadata.getIndexCreationTime());

String crcFromDeepStorageSegment = segmentMetadata.getCrc();
if (!originalSegmentCrcFromTaskGenerator.get(i).equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator.get(i),
crcFromDeepStorageSegment);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName,
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, crcFromDeepStorageSegment);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s",
"");
LOGGER.error(message);
throw new IllegalStateException(message);
}

recordReaders.add(new CompactedPinotSegmentRecordReader(segmentDir, validDocIds));
}

segmentProcessorConfigBuilder.setSegmentNameGenerator(
new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionId,
System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
if (maxCreationTimeOfMergingSegments != 0) {
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments);
}
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
List<File> outputSegmentDirs;
try {
_eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
} finally {
for (RecordReader recordReader : recordReaders) {
recordReader.close();
}
}

long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));

for (File outputSegmentDir : outputSegmentDirs) {
String outputSegmentName = outputSegmentDir.getName();
results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
}

@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
Map<String, String> updateMap = new TreeMap<>();
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(System.currentTimeMillis()));
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX,
pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY));
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap);
}
}
Loading
Loading