Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a clone of local segements size map used for Remote Segment Stats until sync to remote completes #11896

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,17 @@ public Map<String, Long> getLatestLocalFileNameLengthMap() {
}

/**
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map.
* The method is given a function as an argument which is used for determining the file size (length in bytes).
* This method is also provided the collection of segment files which are the latest refresh local segment files.
* This method also removes the stale segment files from the map that are not part of the input segment files.
*
* @param segmentFiles list of local refreshed segment files
* @param fileSizeFunction function is used to determine the file size in bytes
*
* @return updated map of local segment files and filesize
*/
public void updateLatestLocalFileNameLengthMap(
public Map<String, Long> updateLatestLocalFileNameLengthMap(
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
Expand All @@ -332,6 +337,7 @@ public void updateLatestLocalFileNameLengthMap(
// Remove keys from the fileSizeMap that do not exist in the latest segment files
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
computeBytesLag();
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
}

public void addToLatestUploadedFiles(String file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Map<String, Long> localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
Expand All @@ -231,6 +233,7 @@
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
localSegmentsSizeMap,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
Expand All @@ -251,7 +254,7 @@
}, latch);

// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
latch.await();
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
Expand Down Expand Up @@ -295,10 +298,11 @@
long refreshClockTimeMs,
long refreshSeqNo,
long lastRefreshedCheckpoint,
Map<String, Long> localFileSizeMap,
ReplicationCheckpoint checkpoint
) {
// Update latest uploaded segment files name in segment tracker
segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet());
segmentTracker.setLatestUploadedFiles(localFileSizeMap.keySet());
// Update the remote refresh time and refresh seq no
updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo);
// Reset the backoffDelayIterator for the future failures
Expand Down Expand Up @@ -371,7 +375,11 @@
}
}

private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, ActionListener<Void> listener) {
private void uploadNewSegments(
Collection<String> localSegmentsPostRefresh,
Map<String, Long> localSegmentsSizeMap,
ActionListener<Void> listener
) {
Collection<String> filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
if (filteredFiles.size() == 0) {
logger.debug("No new segments to upload in uploadNewSegments");
Expand All @@ -385,7 +393,7 @@

for (String src : filteredFiles) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = createUploadListener();
UploadListener statsListener = createUploadListener(localSegmentsSizeMap);
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
Expand Down Expand Up @@ -444,9 +452,11 @@
* Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size.
*
* @param segmentFiles list of segment files that are part of the most recent local refresh.
*
* @return updated map of local segment files and filesize
*/
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
private Map<String, Long> updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
}

private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) {
Expand Down Expand Up @@ -521,30 +531,32 @@

/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*
* @param fileSizeMap updated map of current snapshot of local segments to their sizes
*/
private UploadListener createUploadListener() {
private UploadListener createUploadListener(Map<String, Long> fileSizeMap) {
return new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesStarted(fileSizeMap.get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file));
segmentTracker.addToLatestUploadedFiles(file);
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesFailed(fileSizeMap.get(file));

Check warning on line 559 in server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java#L559

Added line #L559 was not covered by tests
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}
};
Expand Down
Loading