-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[plugin] repository-azure is not working properly hangs on basic operations #1740
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -283,4 +283,23 @@ task azureThirdPartyTest(type: Test) { | |
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }" | ||
} | ||
} | ||
check.dependsOn(azureThirdPartyTest) | ||
|
||
task azureThirdPartyDefaultXmlTest(type: Test) { | ||
SourceSetContainer sourceSets = project.getExtensions().getByType(SourceSetContainer.class); | ||
SourceSet internalTestSourceSet = sourceSets.getByName(InternalClusterTestPlugin.SOURCE_SET_NAME) | ||
setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs()) | ||
setClasspath(internalTestSourceSet.getRuntimeClasspath()) | ||
dependsOn tasks.internalClusterTest | ||
include '**/AzureStorageCleanupThirdPartyTests.class' | ||
systemProperty 'javax.xml.stream.XMLInputFactory', "com.sun.xml.internal.stream.XMLInputFactoryImpl" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn’t this be the default for all tests to match real usage? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We would hopefully do upstream fix and will test with both, this is the only test which actually could run against Azure cloud, I did run it in all configurations. |
||
systemProperty 'test.azure.account', azureAccount ? azureAccount : "" | ||
systemProperty 'test.azure.key', azureKey ? azureKey : "" | ||
systemProperty 'test.azure.sas_token', azureSasToken ? azureSasToken : "" | ||
systemProperty 'test.azure.container', azureContainer ? azureContainer : "" | ||
systemProperty 'test.azure.base', (azureBasePath ? azureBasePath : "") + "_third_party_tests_" + BuildParams.testSeed | ||
if (useFixture) { | ||
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }" | ||
} | ||
} | ||
|
||
check.dependsOn(azureThirdPartyTest, azureThirdPartyDefaultXmlTest) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import com.azure.core.http.HttpMethod; | ||
import com.azure.core.http.HttpRequest; | ||
import com.azure.core.http.HttpResponse; | ||
import com.azure.core.http.rest.PagedResponse; | ||
import com.azure.core.http.rest.Response; | ||
import com.azure.core.util.Context; | ||
import com.azure.storage.blob.BlobClient; | ||
|
@@ -51,6 +52,7 @@ | |
import com.azure.storage.blob.options.BlobParallelUploadOptions; | ||
import com.azure.storage.common.implementation.Constants; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.core.util.Throwables; | ||
|
@@ -82,6 +84,7 @@ | |
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
@@ -217,50 +220,71 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor) throws U | |
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path); | ||
|
||
SocketAccess.doPrivilegedVoidException(() -> { | ||
for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) { | ||
// Skipping prefixes as those are not deletable and should not be there | ||
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected"; | ||
|
||
outstanding.incrementAndGet(); | ||
executor.execute(new AbstractRunnable() { | ||
@Override | ||
protected void doRun() throws Exception { | ||
final long len = blobItem.getProperties().getContentLength(); | ||
|
||
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName()); | ||
logger.trace( | ||
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName()) | ||
); | ||
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get()); | ||
logger.trace( | ||
() -> new ParameterizedMessage( | ||
"container [{}]: blob [{}] deleted status [{}].", | ||
container, | ||
blobItem.getName(), | ||
response.getStatusCode() | ||
) | ||
); | ||
|
||
blobsDeleted.incrementAndGet(); | ||
if (len >= 0) { | ||
bytesDeleted.addAndGet(len); | ||
String continuationToken = null; | ||
|
||
do { | ||
// Fetch one page at a time, others are going to be fetched by continuation token | ||
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we open an OpenSearch issue as well so we don't forget here? Something like "Revert repository-azure patch once upstream fixes are available" would be fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
// gets addressed. | ||
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobs(listBlobsOptions, timeout()) | ||
.streamByPage(continuationToken) | ||
.findFirst(); | ||
|
||
if (!pageOpt.isPresent()) { | ||
// No more pages, should never happen | ||
break; | ||
} | ||
|
||
final PagedResponse<BlobItem> page = pageOpt.get(); | ||
for (final BlobItem blobItem : page.getValue()) { | ||
// Skipping prefixes as those are not deletable and should not be there | ||
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected"; | ||
|
||
outstanding.incrementAndGet(); | ||
executor.execute(new AbstractRunnable() { | ||
@Override | ||
protected void doRun() throws Exception { | ||
final long len = blobItem.getProperties().getContentLength(); | ||
|
||
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName()); | ||
logger.trace( | ||
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName()) | ||
); | ||
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get()); | ||
logger.trace( | ||
() -> new ParameterizedMessage( | ||
"container [{}]: blob [{}] deleted status [{}].", | ||
container, | ||
blobItem.getName(), | ||
response.getStatusCode() | ||
) | ||
); | ||
|
||
blobsDeleted.incrementAndGet(); | ||
if (len >= 0) { | ||
bytesDeleted.addAndGet(len); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
exceptions.add(e); | ||
} | ||
@Override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this here from spotless formatting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
public void onFailure(Exception e) { | ||
exceptions.add(e); | ||
} | ||
|
||
@Override | ||
public void onAfter() { | ||
if (outstanding.decrementAndGet() == 0) { | ||
result.onResponse(null); | ||
@Override | ||
public void onAfter() { | ||
if (outstanding.decrementAndGet() == 0) { | ||
result.onResponse(null); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
}); | ||
} | ||
|
||
// Fetch next continuation token | ||
continuationToken = page.getContinuationToken(); | ||
} while (StringUtils.isNotBlank(continuationToken)); | ||
}); | ||
|
||
if (outstanding.decrementAndGet() == 0) { | ||
result.onResponse(null); | ||
} | ||
|
@@ -301,20 +325,39 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix | |
.setPrefix(keyPath + (prefix == null ? "" : prefix)); | ||
|
||
SocketAccess.doPrivilegedVoidException(() -> { | ||
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) { | ||
// Skipping over the prefixes, only look for the blobs | ||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) { | ||
continue; | ||
String continuationToken = null; | ||
|
||
do { | ||
// Fetch one page at a time, others are going to be fetched by continuation token | ||
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064 | ||
// gets addressed | ||
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout()) | ||
.streamByPage(continuationToken) | ||
.findFirst(); | ||
|
||
if (!pageOpt.isPresent()) { | ||
// No more pages, should never happen | ||
break; | ||
} | ||
|
||
final String name = getBlobName(blobItem.getName(), container, keyPath); | ||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name)); | ||
final PagedResponse<BlobItem> page = pageOpt.get(); | ||
for (final BlobItem blobItem : page.getValue()) { | ||
// Skipping over the prefixes, only look for the blobs | ||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) { | ||
continue; | ||
} | ||
|
||
final BlobItemProperties properties = blobItem.getProperties(); | ||
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength())); | ||
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength())); | ||
} | ||
final String name = getBlobName(blobItem.getName(), container, keyPath); | ||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name)); | ||
|
||
final BlobItemProperties properties = blobItem.getProperties(); | ||
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength())); | ||
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength())); | ||
} | ||
|
||
// Fetch next continuation token | ||
continuationToken = page.getContinuationToken(); | ||
} while (StringUtils.isNotBlank(continuationToken)); | ||
}); | ||
|
||
return MapBuilder.newMapBuilder(blobsBuilder).immutableMap(); | ||
|
@@ -330,18 +373,36 @@ public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxExcept | |
.setPrefix(keyPath); | ||
|
||
SocketAccess.doPrivilegedVoidException(() -> { | ||
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) { | ||
// Skipping over the blobs, only look for prefixes | ||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) { | ||
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/ | ||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /. | ||
// Lastly, we add the length of keyPath to the offset to strip this container's path. | ||
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", ""); | ||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name)); | ||
blobsBuilder.add(name); | ||
String continuationToken = null; | ||
|
||
do { | ||
// Fetch one page at a time, others are going to be fetched by continuation token | ||
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064 | ||
// gets addressed | ||
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout()) | ||
.streamByPage(continuationToken) | ||
.findFirst(); | ||
|
||
if (!pageOpt.isPresent()) { | ||
// No more pages, should never happen | ||
break; | ||
} | ||
} | ||
; | ||
|
||
final PagedResponse<BlobItem> page = pageOpt.get(); | ||
for (final BlobItem blobItem : page.getValue()) { | ||
// Skipping over the blobs, only look for prefixes | ||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) { | ||
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/ | ||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /. | ||
// Lastly, we add the length of keyPath to the offset to strip this container's path. | ||
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", ""); | ||
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name)); | ||
blobsBuilder.add(name); | ||
} | ||
} | ||
// Fetch next continuation token | ||
continuationToken = page.getContinuationToken(); | ||
} while (StringUtils.isNotBlank(continuationToken)); | ||
}); | ||
|
||
return Collections.unmodifiableMap( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍