Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport] [1.2] [plugin] repository-azure is not working properly hangs on basic operations #1745

Merged
merged 2 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion plugins/repository-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,23 @@ task azureThirdPartyTest(type: Test) {
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
}
}
check.dependsOn(azureThirdPartyTest)

task azureThirdPartyDefaultXmlTest(type: Test) {
SourceSetContainer sourceSets = project.getExtensions().getByType(SourceSetContainer.class);
SourceSet internalTestSourceSet = sourceSets.getByName(InternalClusterTestPlugin.SOURCE_SET_NAME)
setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs())
setClasspath(internalTestSourceSet.getRuntimeClasspath())
dependsOn tasks.internalClusterTest
include '**/AzureStorageCleanupThirdPartyTests.class'
systemProperty 'javax.xml.stream.XMLInputFactory', "com.sun.xml.internal.stream.XMLInputFactoryImpl"
systemProperty 'test.azure.account', azureAccount ? azureAccount : ""
systemProperty 'test.azure.key', azureKey ? azureKey : ""
systemProperty 'test.azure.sas_token', azureSasToken ? azureSasToken : ""
systemProperty 'test.azure.container', azureContainer ? azureContainer : ""
systemProperty 'test.azure.base', (azureBasePath ? azureBasePath : "") + "_third_party_tests_" + BuildParams.testSeed
if (useFixture) {
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
}
}

check.dependsOn(azureThirdPartyTest, azureThirdPartyDefaultXmlTest)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
Expand All @@ -51,6 +52,7 @@
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.common.implementation.Constants;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
Expand Down Expand Up @@ -82,6 +84,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -217,50 +220,71 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor) throws U
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path);

SocketAccess.doPrivilegedVoidException(() -> {
for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) {
// Skipping prefixes as those are not deletable and should not be there
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";

outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len = blobItem.getProperties().getContentLength();

final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
logger.trace(
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
);
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(
() -> new ParameterizedMessage(
"container [{}]: blob [{}] deleted status [{}].",
container,
blobItem.getName(),
response.getStatusCode()
)
);

blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
String continuationToken = null;

do {
// Fetch one page at a time, others are going to be fetched by continuation token
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// gets addressed.
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobs(listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();

if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
}

final PagedResponse<BlobItem> page = pageOpt.get();
for (final BlobItem blobItem : page.getValue()) {
// Skipping prefixes as those are not deletable and should not be there
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";

outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len = blobItem.getProperties().getContentLength();

final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
logger.trace(
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
);
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(
() -> new ParameterizedMessage(
"container [{}]: blob [{}] deleted status [{}].",
container,
blobItem.getName(),
response.getStatusCode()
)
);

blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
}
}

@Override
public void onFailure(Exception e) {
exceptions.add(e);
}
@Override
public void onFailure(Exception e) {
exceptions.add(e);
}

@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
}
}
});
}
});
}

// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
});

if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
Expand Down Expand Up @@ -301,20 +325,39 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix
.setPrefix(keyPath + (prefix == null ? "" : prefix));

SocketAccess.doPrivilegedVoidException(() -> {
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
// Skipping over the prefixes, only look for the blobs
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
continue;
String continuationToken = null;

do {
// Fetch one page at a time, others are going to be fetched by continuation token
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// gets addressed
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();

if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
}

final String name = getBlobName(blobItem.getName(), container, keyPath);
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
final PagedResponse<BlobItem> page = pageOpt.get();
for (final BlobItem blobItem : page.getValue()) {
// Skipping over the prefixes, only look for the blobs
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
continue;
}

final BlobItemProperties properties = blobItem.getProperties();
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
}
final String name = getBlobName(blobItem.getName(), container, keyPath);
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));

final BlobItemProperties properties = blobItem.getProperties();
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
}

// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
});

return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
Expand All @@ -330,18 +373,36 @@ public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxExcept
.setPrefix(keyPath);

SocketAccess.doPrivilegedVoidException(() -> {
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
// Skipping over the blobs, only look for prefixes
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
blobsBuilder.add(name);
String continuationToken = null;

do {
// Fetch one page at a time, others are going to be fetched by continuation token
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// gets addressed
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();

if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
}
}
;

final PagedResponse<BlobItem> page = pageOpt.get();
for (final BlobItem blobItem : page.getValue()) {
// Skipping over the blobs, only look for prefixes
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
blobsBuilder.add(name);
}
}
// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
});

return Collections.unmodifiableMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public void handle(final HttpExchange exchange) throws IOException {

}
list.append("</Blobs>");
list.append("<NextMarker />");
list.append("</EnumerationResults>");

byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);
Expand Down