Skip to content

Commit

Permalink
Scaffold small segment merger task in minions
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Nov 19, 2024
1 parent 447c518 commit 09ecdfe
Show file tree
Hide file tree
Showing 9 changed files with 965 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, SegmentZ
}

@Nullable
private static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
public static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
// A fast path to get partition id if the segmentName is in a known format like LLC.
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
if (llcSegmentName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private MinionConstants() {
*/
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
public static final long DEFAULT_TABLE_MAX_NUM_TASKS = 1;

/**
* Job configs
Expand Down Expand Up @@ -199,4 +200,59 @@ public static class UpsertCompactionTask {
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";
}

public static class UpsertCompactMergeTask {
public static final String TASK_TYPE = "UpsertCompactMergeTask";

/**
* The time period to wait before picking segments for this task
* e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
*/
public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";

/**
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* prefix for the new segment name that is created,
* {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} will add __ as delimiter
* so not adding _ as a suffix here.
*/
public static final String MERGED_SEGMENT_NAME_PREFIX = "compactmerged";

/**
* maximum number of records to process in a single task, sum of all docs in to-be-merged segments
*/
public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask";

/**
* default maximum number of records to process in a single task, same as the value in {@link MergeRollupTask}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;

/**
* maximum number of records in the output segment
*/
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";

/**
* default maximum number of records in output segment, same as the value in
* {@link org.apache.pinot.core.segment.processing.framework.SegmentConfig}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;

/**
* maximum number of segments to process in a single task
*/
public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = "maxNumSegmentsPerTask";

/**
* default maximum number of segments to process in a single task
*/
public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;

public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
Expand All @@ -46,11 +48,14 @@ public class SegmentProcessorConfig {
private final Map<String, AggregationFunctionType> _aggregationTypes;
private final SegmentConfig _segmentConfig;
private final Consumer<Object> _progressObserver;
private final SegmentNameGenerator _segmentNameGenerator;
private final Long _customCreationTime;

private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig,
List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
Map<String, AggregationFunctionType> aggregationTypes, SegmentConfig segmentConfig,
Consumer<Object> progressObserver) {
Consumer<Object> progressObserver, @Nullable SegmentNameGenerator segmentNameGenerator,
@Nullable Long customCreationTime) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
_tableConfig = tableConfig;
_schema = schema;
Expand All @@ -62,6 +67,8 @@ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandl
_progressObserver = (progressObserver != null) ? progressObserver : p -> {
// Do nothing.
};
_segmentNameGenerator = segmentNameGenerator;
_customCreationTime = customCreationTime;
}

/**
Expand Down Expand Up @@ -117,11 +124,20 @@ public Consumer<Object> getProgressObserver() {
return _progressObserver;
}

public SegmentNameGenerator getSegmentNameGenerator() {
return _segmentNameGenerator;
}

public long getCustomCreationTime() {
return (_customCreationTime != null ? _customCreationTime : System.currentTimeMillis());
}

@Override
public String toString() {
return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _timeHandlerConfig="
+ _timeHandlerConfig + ", _partitionerConfigs=" + _partitionerConfigs + ", _mergeType=" + _mergeType
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig + '}';
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig
+ ", _segmentNameGenerator=" + _segmentNameGenerator + ", _customCreationTime=" + _customCreationTime + '}';
}

/**
Expand All @@ -136,6 +152,8 @@ public static class Builder {
private Map<String, AggregationFunctionType> _aggregationTypes;
private SegmentConfig _segmentConfig;
private Consumer<Object> _progressObserver;
private SegmentNameGenerator _segmentNameGenerator;
private Long _customCreationTime;

public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
Expand Down Expand Up @@ -177,6 +195,16 @@ public Builder setProgressObserver(Consumer<Object> progressObserver) {
return this;
}

public Builder setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
_segmentNameGenerator = segmentNameGenerator;
return this;
}

public Builder setCustomCreationTime(Long customCreationTime) {
_customCreationTime = customCreationTime;
return this;
}

public SegmentProcessorConfig build() {
Preconditions.checkState(_tableConfig != null, "Must provide table config in SegmentProcessorConfig");
Preconditions.checkState(_schema != null, "Must provide schema in SegmentProcessorConfig");
Expand All @@ -197,7 +225,7 @@ public SegmentProcessorConfig build() {
_segmentConfig = new SegmentConfig.Builder().build();
}
return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType,
_aggregationTypes, _segmentConfig, _progressObserver);
_aggregationTypes, _segmentConfig, _progressObserver, _segmentNameGenerator, _customCreationTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,11 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
generatorConfig.setCreationTime(String.valueOf(_segmentProcessorConfig.getCustomCreationTime()));

if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
if (_segmentProcessorConfig.getSegmentNameGenerator() != null) {
generatorConfig.setSegmentNameGenerator(_segmentProcessorConfig.getSegmentNameGenerator());
} else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
generatorConfig.setSegmentNameGenerator(
SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix,
segmentNamePostfix, fixedSegmentName, false));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);

public UpsertCompactMergeTaskExecutor(MinionConf minionConf) {
super(minionConf);
}

@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
throws Exception {
int numInputSegments = segmentDirs.size();
List<SegmentConversionResult> results = new ArrayList<>();
_eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + numInputSegments);
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
long startMillis = System.currentTimeMillis();

String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
TableConfig tableConfig = getTableConfig(tableNameWithType);
Schema schema = getSchema(tableNameWithType);

SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);

// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
int count = 1;
int partitionId = -1;
long maxCreationTimeOfMergingSegments = 0;
List<String> originalSegmentCrcFromTaskGenerator =
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
for (int i = 0; i < numInputSegments; i++) {
File segmentDir = segmentDirs.get(i);
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments));

SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segmentDir);
String segmentName = segmentMetadata.getName();
Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName);
if (segmentPartitionId == null) {
throw new IllegalStateException(String.format("Partition id not found for %s", segmentName));
}
if (partitionId != -1 && partitionId != segmentPartitionId) {
throw new IllegalStateException(String.format("Partition id mismatched for %s, expected partition id: %d",
segmentName, partitionId));
}
partitionId = segmentPartitionId;
maxCreationTimeOfMergingSegments = Math.max(maxCreationTimeOfMergingSegments,
segmentMetadata.getIndexCreationTime());

String crcFromDeepStorageSegment = segmentMetadata.getCrc();
if (!originalSegmentCrcFromTaskGenerator.get(i).equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator.get(i),
crcFromDeepStorageSegment);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName,
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, crcFromDeepStorageSegment);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s",
"");
LOGGER.error(message);
throw new IllegalStateException(message);
}

recordReaders.add(new CompactedPinotSegmentRecordReader(segmentDir, validDocIds));
}

segmentProcessorConfigBuilder.setSegmentNameGenerator(
new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionId,
System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
if (maxCreationTimeOfMergingSegments != 0) {
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments);
}
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
List<File> outputSegmentDirs;
try {
_eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
} finally {
for (RecordReader recordReader : recordReaders) {
recordReader.close();
}
}

long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));

for (File outputSegmentDir : outputSegmentDirs) {
String outputSegmentName = outputSegmentDir.getName();
results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
}

@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
Map<String, String> updateMap = new TreeMap<>();
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(System.currentTimeMillis()));
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX,
pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY));
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap);
}
}
Loading

0 comments on commit 09ecdfe

Please sign in to comment.