Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a clone of local segements size map used for Remote Segment Stats until sync to remote completes #11896

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public Map<String, Long> getLatestLocalFileNameLengthMap() {
* @param segmentFiles list of local refreshed segment files
* @param fileSizeFunction function is used to determine the file size in bytes
*/
public void updateLatestLocalFileNameLengthMap(
public Map<String, Long> updateLatestLocalFileNameLengthMap(
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
Expand All @@ -332,6 +332,7 @@ public void updateLatestLocalFileNameLengthMap(
// Remove keys from the fileSizeMap that do not exist in the latest segment files
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
computeBytesLag();
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
}

public void addToLatestUploadedFiles(String file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private boolean syncSegments() {
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Map<String, Long> localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
Expand All @@ -231,6 +231,7 @@ public void onResponse(Void unused) {
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
localSegmentsSizeMap,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
Expand All @@ -251,7 +252,7 @@ public void onFailure(Exception e) {
}, latch);

// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
latch.await();
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
Expand Down Expand Up @@ -295,10 +296,11 @@ private void onSuccessfulSegmentsSync(
long refreshClockTimeMs,
long refreshSeqNo,
long lastRefreshedCheckpoint,
Map<String, Long> localFileSizeMap,
ReplicationCheckpoint checkpoint
) {
// Update latest uploaded segment files name in segment tracker
segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet());
segmentTracker.setLatestUploadedFiles(localFileSizeMap.keySet());
// Update the remote refresh time and refresh seq no
updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo);
// Reset the backoffDelayIterator for the future failures
Expand Down Expand Up @@ -371,7 +373,11 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
}
}

private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, ActionListener<Void> listener) {
private void uploadNewSegments(
Collection<String> localSegmentsPostRefresh,
Map<String, Long> localSegmentsSizeMap,
ActionListener<Void> listener
) {
Collection<String> filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
if (filteredFiles.size() == 0) {
logger.debug("No new segments to upload in uploadNewSegments");
Expand All @@ -385,7 +391,7 @@ private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, Acti

for (String src : filteredFiles) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = createUploadListener();
UploadListener statsListener = createUploadListener(localSegmentsSizeMap);
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
Expand Down Expand Up @@ -445,8 +451,8 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo
*
* @param segmentFiles list of segment files that are part of the most recent local refresh.
*/
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
private Map<String, Long> updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
}

private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) {
Expand Down Expand Up @@ -522,29 +528,29 @@ private boolean isLocalOrSnapshotRecovery() {
/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*/
private UploadListener createUploadListener() {
private UploadListener createUploadListener(Map<String, Long> fileSizeMap) {
return new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesStarted(fileSizeMap.get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file));
segmentTracker.addToLatestUploadedFiles(file);
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesFailed(fileSizeMap.get(file));
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}
};
Expand Down
Loading