Skip to content

Commit

Permalink
Improve async temp segment delete in validation manager (apache#14339)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Oct 31, 2024
1 parent ebd7509 commit 65f658d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URI;
import java.sql.Timestamp;
Expand Down Expand Up @@ -1592,67 +1593,71 @@ boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() {

/**
* Delete tmp segments for realtime table with low level consumer, split commit and async deletion is enabled.
* @param tableNameWithType
* @param segmentsZKMetadata
* @return number of deleted orphan temporary segments
*
*/
public long deleteTmpSegments(String tableNameWithType, List<SegmentZKMetadata> segmentsZKMetadata) {
public int deleteTmpSegments(String realtimeTableName, List<SegmentZKMetadata> segmentsZKMetadata)
throws IOException {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");

if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
return 0L;
}

TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
LOGGER.warn("Failed to find table config for table: {}, skipping deletion of tmp segments", tableNameWithType);
return 0L;
}

if (!isTmpSegmentAsyncDeletionEnabled()) {
return 0L;
// NOTE: Do not delete the file if it is used as download URL. This could happen when user uses temporary file to
// backfill segment.
Set<String> downloadUrls = Sets.newHashSetWithExpectedSize(segmentsZKMetadata.size());
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
if (segmentZKMetadata.getStatus() == Status.DONE) {
downloadUrls.add(segmentZKMetadata.getDownloadUrl());
}
}

Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta -> meta.getStatus() == Status.DONE
&& !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
SegmentZKMetadata::getDownloadUrl).collect(
Collectors.toSet());

String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName);
PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
long deletedTmpSegments = 0;
try {
for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
// prepend scheme
int numDeletedTmpSegments = 0;
for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
if (isTmpAndCanDelete(filePath, downloadUrls, pinotFS)) {
URI uri = URIUtils.getUri(filePath);
if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
LOGGER.info("Deleting temporary segment file: {}", uri);
String canonicalPath = uri.toString();
LOGGER.info("Deleting temporary segment file: {}", canonicalPath);
try {
if (pinotFS.delete(uri, true)) {
LOGGER.info("Succeed to delete file: {}", uri);
deletedTmpSegments++;
LOGGER.info("Deleted temporary segment file: {}", canonicalPath);
numDeletedTmpSegments++;
} else {
LOGGER.warn("Failed to delete file: {}", uri);
LOGGER.warn("Failed to delete temporary segment file: {}", canonicalPath);
}
} catch (Exception e) {
LOGGER.error("Caught exception while deleting temporary segment file: {}", canonicalPath, e);
}
}
} catch (Exception e) {
LOGGER.warn("Caught exception while deleting temporary files for table: {}", rawTableName, e);
}
return deletedTmpSegments;
return numDeletedTmpSegments;
}

private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS pinotFS)
throws Exception {
long lastModified = pinotFS.lastModified(uri);
private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, PinotFS pinotFS) {
if (!SegmentCompletionUtils.isTmpFile(filePath)) {
return false;
}
// Prepend scheme
URI uri = URIUtils.getUri(filePath);
String canonicalPath = uri.toString();
// NOTE: Do not delete the file if it is used as download URL. This could happen when user uses temporary file to
// backfill segment.
if (downloadUrls.contains(canonicalPath)) {
return false;
}
long lastModified;
try {
lastModified = pinotFS.lastModified(uri);
} catch (Exception e) {
LOGGER.error("Caught exception while getting last modified time for file: {}, ineligible for delete",
canonicalPath, e);
return false;
}
if (lastModified <= 0) {
LOGGER.warn("file {} modification time {} is not positive, ineligible for delete", uri.toString(), lastModified);
LOGGER.warn("Last modified time for file: {} is not positive: {}, ineligible for delete", canonicalPath,
lastModified);
return false;
}
String uriString = uri.toString();
return SegmentCompletionUtils.isTmpFile(uriString) && !deepURIs.contains(uriString)
&& getCurrentTimeMs() - lastModified > _controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
return getCurrentTimeMs() - lastModified > _controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,17 @@ private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig str
List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);

// Delete tmp segments
try {
long numDeleteTmpSegments = _llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, segmentsZKMetadata);
LOGGER.info("Deleted {} tmp segments for table: {}", numDeleteTmpSegments, realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
numDeleteTmpSegments);
} catch (Exception e) {
LOGGER.error("Failed to delete tmp segments for table: {}", realtimeTableName, e);
if (_llcRealtimeSegmentManager.isTmpSegmentAsyncDeletionEnabled()) {
try {
long startTimeMs = System.currentTimeMillis();
int numDeletedTmpSegments = _llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, segmentsZKMetadata);
LOGGER.info("Deleted {} tmp segments for table: {} in {}ms", numDeletedTmpSegments, realtimeTableName,
System.currentTimeMillis() - startTimeMs);
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
numDeletedTmpSegments);
} catch (Exception e) {
LOGGER.error("Failed to delete tmp segments for table: {}", realtimeTableName, e);
}
}

// Update the total document count gauge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,8 @@ public void testUploadToSegmentStore()
}

@Test
public void testDeleteTmpSegmentFiles() throws Exception {
public void testDeleteTmpSegmentFiles()
throws Exception {
// turn on knobs for async deletion of tmp files
ControllerConf config = new ControllerConf();
config.setDataDir(TEMP_DIR.toString());
Expand All @@ -1069,19 +1070,19 @@ public void testDeleteTmpSegmentFiles() throws Exception {
PinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(
helixResourceManager, config);

long deletedTmpSegCount;
int numDeletedTmpSegments;
// case 1: the segmentMetadata download uri is identical to the uri of the tmp segment. Should not delete
when(segZKMeta.getStatus()).thenReturn(Status.DONE);
when(segZKMeta.getDownloadUrl()).thenReturn(SCHEME + tableDir + "/" + segmentFileName);
deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta));
numDeletedTmpSegments = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta));
assertTrue(segmentFile.exists());
assertEquals(0L, deletedTmpSegCount);
assertEquals(numDeletedTmpSegments, 0);

// case 2: download url is empty, indicating the tmp segment is absolutely orphan. Delete the file
when(segZKMeta.getDownloadUrl()).thenReturn(METADATA_URI_FOR_PEER_DOWNLOAD);
deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta));
numDeletedTmpSegments = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta));
assertFalse(segmentFile.exists());
assertEquals(1L, deletedTmpSegCount);
assertEquals(numDeletedTmpSegments, 1);
}

//////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@

import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class SegmentCompletionUtils {
private SegmentCompletionUtils() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionUtils.class);
// Used to create temporary segment file names
private static final String TMP = ".tmp.";

Expand Down

0 comments on commit 65f658d

Please sign in to comment.