diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index fa5b6a2aee5d3..296dd81b14ce0 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -580,111 +580,20 @@ private class CompareAndExchangeOperation { this.threadPool = threadPool; } - private List listMultipartUploads() { - final var listRequest = new ListMultipartUploadsRequest(bucket); - listRequest.setPrefix(blobKey); - listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose)); - try { - return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads(); - } catch (AmazonS3Exception e) { - if (e.getStatusCode() == 404) { - return List.of(); - } - throw e; - } - } - - private int getUploadIndex(String targetUploadId, List multipartUploads) { - var uploadIndex = 0; - var found = false; - for (MultipartUpload multipartUpload : multipartUploads) { - final var observedUploadId = multipartUpload.getUploadId(); - if (observedUploadId.equals(targetUploadId)) { - final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis(); - final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli(); - final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis(); - if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) { - logger.warn( - """ - compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \ - which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""", - bucket, - blobKey, - multipartUpload.getInitiated(), - multipartUpload.getInitiated().toInstant().toEpochMilli(), - currentTimeMillis, - expectedAgeRangeMillis - ); - } - found = true; - } else if (observedUploadId.compareTo(targetUploadId) < 0) { - uploadIndex += 1; - } - } - - return found ? uploadIndex : -1; - } - - /** - * @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation - */ - private boolean hasPreexistingUploads() { - final var uploads = listMultipartUploads(); - if (uploads.isEmpty()) { - return false; - } - - final var expiryDate = Date.from( - Instant.ofEpochMilli( - blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis() - ) - ); - if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) { - return true; - } - - // there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare) - for (final var upload : uploads) { - logger.warn( - "cleaning up stale compare-and-swap upload [{}] initiated at [{}]", - upload.getUploadId(), - upload.getInitiated() - ); - safeAbortMultipartUpload(upload.getUploadId()); - } - - return false; - } - void run(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { BlobContainerUtils.ensureValidRegisterContent(updated); if (hasPreexistingUploads()) { - // This is a small optimization to improve the liveness properties of this algorithm. // // We can safely proceed even if there are other uploads in progress, but that would add to the potential for collisions and // delays. Thus in this case we prefer avoid disturbing the ongoing attempts and just fail up front. - listener.onResponse(OptionalBytesReference.MISSING); return; } - final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey); - initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); - final var uploadId = SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId(); - - final var uploadPartRequest = new UploadPartRequest(); - uploadPartRequest.setBucketName(bucket); - uploadPartRequest.setKey(blobKey); - uploadPartRequest.setUploadId(uploadId); - uploadPartRequest.setPartNumber(1); - uploadPartRequest.setLastPart(true); - uploadPartRequest.setInputStream(updated.streamInput()); - uploadPartRequest.setPartSize(updated.length()); - uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); - final var partETag = SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag(); - + final var uploadId = initiateMultipartUpload(); + final var partETag = uploadPart(updated, uploadId); final var currentUploads = listMultipartUploads(); final var uploadIndex = getUploadIndex(uploadId, currentUploads); @@ -710,16 +619,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener ActionListener.completeWith(delegate2, () -> { if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) { - final var completeMultipartUploadRequest = new CompleteMultipartUploadRequest( - bucket, - blobKey, - uploadId, - List.of(partETag) - ); - completeMultipartUploadRequest.setRequestMetricCollector( - blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose) - ); - SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest)); + completeMultipartUpload(uploadId, partETag); isComplete.set(true); } return currentValue; @@ -740,15 +640,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener { try { - for (MultipartUpload currentUpload : currentUploads) { - final var currentUploadId = currentUpload.getUploadId(); - if (uploadId.equals(currentUploadId) == false) { - blobStore.getSnapshotExecutor() - .execute( - ActionRunnable.run(listeners.acquire(), () -> abortMultipartUploadIfExists(currentUploadId)) - ); - } - } + cancelOtherUploads(uploadId, currentUploads, listeners); } finally { delayListener.onResponse(null); } @@ -769,6 +661,111 @@ void run(BytesReference expected, BytesReference updated, ActionListener upload.getInitiated().after(expiryDate))) { + return true; + } + + // there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare) + for (final var upload : uploads) { + logger.warn( + "cleaning up stale compare-and-swap upload [{}] initiated at [{}]", + upload.getUploadId(), + upload.getInitiated() + ); + safeAbortMultipartUpload(upload.getUploadId()); + } + + return false; + } + + private List listMultipartUploads() { + final var listRequest = new ListMultipartUploadsRequest(bucket); + listRequest.setPrefix(blobKey); + listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose)); + try { + return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads(); + } catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + return List.of(); + } + throw e; + } + } + + private String initiateMultipartUpload() { + final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey); + initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); + return SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId(); + } + + private PartETag uploadPart(BytesReference updated, String uploadId) throws IOException { + final var uploadPartRequest = new UploadPartRequest(); + uploadPartRequest.setBucketName(bucket); + uploadPartRequest.setKey(blobKey); + uploadPartRequest.setUploadId(uploadId); + uploadPartRequest.setPartNumber(1); + uploadPartRequest.setLastPart(true); + uploadPartRequest.setInputStream(updated.streamInput()); + uploadPartRequest.setPartSize(updated.length()); + uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); + return SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag(); + } + + private int getUploadIndex(String targetUploadId, List multipartUploads) { + var uploadIndex = 0; + var found = false; + for (MultipartUpload multipartUpload : multipartUploads) { + final var observedUploadId = multipartUpload.getUploadId(); + if (observedUploadId.equals(targetUploadId)) { + final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis(); + final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli(); + final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis(); + if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) { + logger.warn( + """ + compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \ + which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""", + bucket, + blobKey, + multipartUpload.getInitiated(), + multipartUpload.getInitiated().toInstant().toEpochMilli(), + currentTimeMillis, + expectedAgeRangeMillis + ); + } + found = true; + } else if (observedUploadId.compareTo(targetUploadId) < 0) { + uploadIndex += 1; + } + } + + return found ? uploadIndex : -1; + } + + private void cancelOtherUploads(String uploadId, List currentUploads, RefCountingListener listeners) { + for (final var currentUpload : currentUploads) { + final var currentUploadId = currentUpload.getUploadId(); + if (uploadId.equals(currentUploadId) == false) { + blobStore.getSnapshotExecutor() + .execute(ActionRunnable.run(listeners.acquire(), () -> abortMultipartUploadIfExists(currentUploadId))); + } + } + } + private void safeAbortMultipartUpload(String uploadId) { try { abortMultipartUploadIfExists(uploadId); @@ -791,6 +788,11 @@ private void abortMultipartUploadIfExists(String uploadId) { } } + private void completeMultipartUpload(String uploadId, PartETag partETag) { + final var completeMultipartUploadRequest = new CompleteMultipartUploadRequest(bucket, blobKey, uploadId, List.of(partETag)); + completeMultipartUploadRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); + SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest)); + } } @Override