diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index fe9440813b94f..40b329f52dad4 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -306,7 +306,7 @@ public Map getLatestLocalFileNameLengthMap() { * @param segmentFiles list of local refreshed segment files * @param fileSizeFunction function is used to determine the file size in bytes */ - public void updateLatestLocalFileNameLengthMap( + public Map updateLatestLocalFileNameLengthMap( Collection segmentFiles, CheckedFunction fileSizeFunction ) { @@ -332,6 +332,7 @@ public void updateLatestLocalFileNameLengthMap( // Remove keys from the fileSizeMap that do not exist in the latest segment files latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); computeBytesLag(); + return Collections.unmodifiableMap(latestLocalFileNameLengthMap); } public void addToLatestUploadedFiles(String file) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index d96a7e7c95ecf..c77028cb0d470 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -215,7 +215,7 @@ private boolean syncSegments() { Collection localSegmentsPostRefresh = segmentInfos.files(true); // Create a map of file name to size and update the refresh segment tracker - updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh); CountDownLatch latch = new CountDownLatch(1); ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { @Override @@ -231,6 +231,7 @@ public void onResponse(Void unused) { refreshClockTimeMs, refreshSeqNo, lastRefreshedCheckpoint, + localSegmentsSizeMap, checkpoint ); // At this point since we have uploaded new segments, segment infos and segment metadata file, @@ -251,7 +252,7 @@ public void onFailure(Exception e) { }, latch); // Start the segments files upload - uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener); + uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener); latch.await(); } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); @@ -295,10 +296,11 @@ private void onSuccessfulSegmentsSync( long refreshClockTimeMs, long refreshSeqNo, long lastRefreshedCheckpoint, + Map localFileSizeMap, ReplicationCheckpoint checkpoint ) { // Update latest uploaded segment files name in segment tracker - segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet()); + segmentTracker.setLatestUploadedFiles(localFileSizeMap.keySet()); // Update the remote refresh time and refresh seq no updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures @@ -371,7 +373,11 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se } } - private void uploadNewSegments(Collection localSegmentsPostRefresh, ActionListener listener) { + private void uploadNewSegments( + Collection localSegmentsPostRefresh, + Map localSegmentsSizeMap, + ActionListener listener + ) { Collection filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList()); if (filteredFiles.size() == 0) { logger.debug("No new segments to upload in uploadNewSegments"); @@ -385,7 +391,7 @@ private void uploadNewSegments(Collection localSegmentsPostRefresh, Acti for (String src : filteredFiles) { // Initializing listener here to ensure that the stats increment operations are thread-safe - UploadListener statsListener = createUploadListener(); + UploadListener statsListener = createUploadListener(localSegmentsSizeMap); ActionListener aggregatedListener = ActionListener.wrap(resp -> { statsListener.onSuccess(src); batchUploadListener.onResponse(resp); @@ -445,8 +451,8 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo * * @param segmentFiles list of segment files that are part of the most recent local refresh. */ - private void updateLocalSizeMapAndTracker(Collection segmentFiles) { - segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); + private Map updateLocalSizeMapAndTracker(Collection segmentFiles) { + return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); } private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { @@ -522,21 +528,21 @@ private boolean isLocalOrSnapshotRecovery() { /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events */ - private UploadListener createUploadListener() { + private UploadListener createUploadListener(Map fileSizeMap) { return new UploadListener() { private long uploadStartTime = 0; @Override public void beforeUpload(String file) { // Start tracking the upload bytes started - segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesStarted(fileSizeMap.get(file)); uploadStartTime = System.currentTimeMillis(); } @Override public void onSuccess(String file) { // Track upload success - segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file)); segmentTracker.addToLatestUploadedFiles(file); segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime)); } @@ -544,7 +550,7 @@ public void onSuccess(String file) { @Override public void onFailure(String file) { // Track upload failure - segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesFailed(fileSizeMap.get(file)); segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime)); } };