Skip to content

Commit

Permalink
onfigure BlobServiceClient to use workloadIdentity
Browse files Browse the repository at this point in the history
  • Loading branch information
c3-amitsalunke committed Feb 24, 2024
1 parent 8489294 commit 05396cd
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 50 deletions.
8 changes: 6 additions & 2 deletions plugins/repository-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ opensearchplugin {
}

dependencies {
api 'com.azure:azure-core:1.39.0'
api 'com.azure:azure-core:1.46.0'
api 'com.azure:azure-json:1.0.1'
api 'com.azure:azure-storage-common:12.21.2'
api 'com.azure:azure-core-http-netty:1.12.8'
Expand All @@ -55,7 +55,11 @@ dependencies {
api "io.netty:netty-resolver-dns:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
implementation project(':modules:transport-netty4')
api 'com.azure:azure-storage-blob:12.23.0'
api 'com.azure:azure-storage-blob:12.25.1'
api 'com.azure:azure-identity:1.11.2'
api 'com.microsoft.azure:msal4j:1.14.2'
api 'com.nimbusds:oauth2-oidc-sdk:11.10'
api 'net.minidev:json-smart:1.0.6.3'
api "io.projectreactor.netty:reactor-netty-core:${versions.reactor_netty}"
api "io.projectreactor.netty:reactor-netty-http:${versions.reactor_netty}"
api "org.slf4j:slf4j-api:${versions.slf4j}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public List<Setting<?>> getSettings() {
AzureStorageSettings.ACCOUNT_SETTING,
AzureStorageSettings.KEY_SETTING,
AzureStorageSettings.SAS_TOKEN_SETTING,
AzureStorageSettings.FEDERATED_TOKEN_FILE_SETTING,
AzureStorageSettings.ENDPOINT_SUFFIX_SETTING,
AzureStorageSettings.TIMEOUT_SETTING,
AzureStorageSettings.MAX_RETRIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories.azure;

import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpPipelinePosition;
Expand All @@ -40,11 +41,15 @@
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.identity.WorkloadIdentityCredential;
import com.azure.identity.WorkloadIdentityCredentialBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.implementation.connectionstring.StorageConnectionString;
Expand All @@ -61,12 +66,17 @@

import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.InvalidKeyException;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -216,8 +226,12 @@ protected PasswordAuthentication getPasswordAuthentication() {
* <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md#miscellaneous">migration guide</a> for mode details:
*/
private BlobServiceClientBuilder applyLocationMode(final BlobServiceClientBuilder builder, final AzureStorageSettings settings) {
final StorageConnectionString storageConnectionString = StorageConnectionString.create(settings.getConnectString(), logger);
final StorageEndpoint endpoint = storageConnectionString.getBlobEndpoint();
StorageEndpoint endpoint;
if (settings.useWorkloadIdentity()) {
endpoint = new StorageEndpoint(URI.create(settings.getEndpoint()));
} else {
endpoint = StorageConnectionString.create(settings.getConnectString(), logger).getBlobEndpoint();
}

if (endpoint == null || endpoint.getPrimaryUri() == null) {
throw new IllegalArgumentException("connectionString missing required settings to derive blob service primary endpoint.");
Expand Down Expand Up @@ -247,9 +261,31 @@ private BlobServiceClientBuilder applyLocationMode(final BlobServiceClientBuilde
return builder;
}

private static BlobServiceClientBuilder createClientBuilder(AzureStorageSettings settings) throws InvalidKeyException,
URISyntaxException {
return SocketAccess.doPrivilegedException(() -> new BlobServiceClientBuilder().connectionString(settings.getConnectString()));
private static BlobServiceClientBuilder createClientBuilder(AzureStorageSettings settings)
throws InvalidKeyException,
URISyntaxException {
return SocketAccess.doPrivilegedException(() -> {
PrivilegedExceptionAction<ExecutorService> privilegedAction = Executors::newSingleThreadExecutor;
ExecutorService executorService = AccessController.doPrivileged(privilegedAction);
BlobServiceClientBuilder b =
new BlobServiceClientBuilder()
.credential(new DefaultAzureCredentialBuilder().executorService(executorService).build());

if (settings.useWorkloadIdentity()) {
TokenCredential workloadIdentityCredential =
new WorkloadIdentityCredentialBuilder()
.tokenFilePath(settings.getFederatedTokenFile())
.executorService(executorService)
.build();
b.endpoint(settings.getEndpoint())
.credential(workloadIdentityCredential);
} else if (!settings.getConnectString().isEmpty()) {
b.connectionString(settings.getConnectString());
}

return b;
}
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,27 @@ final class AzureStorageSettings {
key -> SecureSetting.secureString(key, null)
);

/** Azure web identity token */
public static final AffixSetting<String> FEDERATED_TOKEN_FILE_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"federated_token_file",
key -> Setting.simpleString(key, Property.NodeScope),
() -> ACCOUNT_SETTING
);

/** Azure SAS token */
public static final AffixSetting<SecureString> SAS_TOKEN_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"sas_token",
key -> SecureSetting.secureString(key, null)
AZURE_CLIENT_PREFIX_KEY,
"sas_token",
key -> SecureSetting.secureString(key, null)
);

/** max_retries: Number of retries in case of Azure errors. Defaults to 3 (RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT). */
public static final AffixSetting<Integer> MAX_RETRIES_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"max_retries",
(key) -> Setting.intSetting(key, 3, Setting.Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);
/**
* Azure endpoint suffix. Default to core.windows.net (CloudStorageAccount.DEFAULT_DNS).
Expand All @@ -100,61 +107,54 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueMinutes(-1), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_CONNECT_TIMEOUT
public static final AffixSetting<TimeValue> CONNECT_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"connect.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(10), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_WRITE_TIMEOUT
public static final AffixSetting<TimeValue> WRITE_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"write.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_READ_TIMEOUT
public static final AffixSetting<TimeValue> READ_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"read.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_RESPONSE_TIMEOUT
public static final AffixSetting<TimeValue> RESPONSE_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"response.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

/** The type of the proxy to connect to azure through. Can be direct (no proxy, default), http or socks */
public static final AffixSetting<ProxySettings.ProxyType> PROXY_TYPE_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"proxy.type",
(key) -> new Setting<>(key, "direct", s -> ProxySettings.ProxyType.valueOf(s.toUpperCase(Locale.ROOT)), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

/** The host name of a proxy to connect to azure through. */
public static final AffixSetting<String> PROXY_HOST_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"proxy.host",
(key) -> Setting.simpleString(key, Property.NodeScope),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING
);
Expand All @@ -164,7 +164,6 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"proxy.port",
(key) -> Setting.intSetting(key, 0, 0, 65535, Setting.Property.NodeScope),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING,
() -> PROXY_HOST_SETTING
Expand All @@ -175,7 +174,6 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"proxy.username",
key -> SecureSetting.secureString(key, null),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING,
() -> PROXY_HOST_SETTING
Expand All @@ -186,7 +184,6 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"proxy.password",
key -> SecureSetting.secureString(key, null),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING,
() -> PROXY_HOST_SETTING,
Expand All @@ -195,6 +192,7 @@ final class AzureStorageSettings {

private final String account;
private final String connectString;
private final String federatedTokenFile;
private final String endpointSuffix;
private final TimeValue timeout;
private final int maxRetries;
Expand All @@ -207,20 +205,22 @@ final class AzureStorageSettings {

// copy-constructor
private AzureStorageSettings(
String account,
String connectString,
String endpointSuffix,
TimeValue timeout,
int maxRetries,
LocationMode locationMode,
TimeValue connectTimeout,
TimeValue writeTimeout,
TimeValue readTimeout,
TimeValue responseTimeout,
ProxySettings proxySettings
String account,
String connectString,
String federatedTokenFile,
String endpointSuffix,
TimeValue timeout,
int maxRetries,
LocationMode locationMode,
TimeValue connectTimeout,
TimeValue writeTimeout,
TimeValue readTimeout,
TimeValue responseTimeout,
ProxySettings proxySettings
) {
this.account = account;
this.connectString = connectString;
this.federatedTokenFile = federatedTokenFile;
this.endpointSuffix = endpointSuffix;
this.timeout = timeout;
this.maxRetries = maxRetries;
Expand All @@ -235,6 +235,7 @@ private AzureStorageSettings(
private AzureStorageSettings(
String account,
String key,
String federatedTokenFile,
String sasToken,
String endpointSuffix,
TimeValue timeout,
Expand All @@ -246,7 +247,8 @@ private AzureStorageSettings(
ProxySettings proxySettings
) {
this.account = account;
this.connectString = buildConnectString(account, key, sasToken, endpointSuffix);
this.connectString = buildConnectString(account, key, sasToken, federatedTokenFile, endpointSuffix);
this.federatedTokenFile = federatedTokenFile;
this.endpointSuffix = endpointSuffix;
this.timeout = timeout;
this.maxRetries = maxRetries;
Expand Down Expand Up @@ -278,20 +280,32 @@ public String getConnectString() {
return connectString;
}

private static String buildConnectString(String account, @Nullable String key, @Nullable String sasToken, String endpointSuffix) {
public String getFederatedTokenFile() {
return federatedTokenFile;
}

public Boolean useWorkloadIdentity() {
return !federatedTokenFile.isEmpty();
}

private static String buildConnectString(String account, @Nullable String key, @Nullable String sasToken, @Nullable String federatedTokenFile, String endpointSuffix) {
final boolean hasSasToken = Strings.hasText(sasToken);
final boolean hasKey = Strings.hasText(key);
if (hasSasToken == false && hasKey == false) {
throw new SettingsException("Neither a secret key nor a shared access token was set.");
}
final boolean hasFederatedTokenFile = Strings.hasText(federatedTokenFile);
if (hasSasToken && hasKey) {
throw new SettingsException("Both a secret as well as a shared access token were set.");
} else if (hasSasToken && hasFederatedTokenFile) {
throw new SettingsException("Both a shared access token as well as an azure federated token file were set.");
} else if (hasKey && hasFederatedTokenFile) {
throw new SettingsException("Both a secret as well as an azure federated token file were set.");
}
final StringBuilder connectionStringBuilder = new StringBuilder();
connectionStringBuilder.append("DefaultEndpointsProtocol=https").append(";AccountName=").append(account);
if (hasSasToken || hasKey) {
connectionStringBuilder.append("DefaultEndpointsProtocol=https").append(";AccountName=").append(account);
}
if (hasKey) {
connectionStringBuilder.append(";AccountKey=").append(key);
} else {
} else if (hasSasToken) {
connectionStringBuilder.append(";SharedAccessSignature=").append(sasToken);
}
if (Strings.hasText(endpointSuffix)) {
Expand All @@ -300,6 +314,10 @@ private static String buildConnectString(String account, @Nullable String key, @
return connectionStringBuilder.toString();
}

public String getEndpoint() {
return "https://" + account + ".blob." + endpointSuffix;
}

public LocationMode getLocationMode() {
return locationMode;
}
Expand Down Expand Up @@ -369,6 +387,7 @@ private static AzureStorageSettings getClientSettings(Settings settings, String
return new AzureStorageSettings(
account.toString(),
key.toString(),
getValue(settings, clientName, FEDERATED_TOKEN_FILE_SETTING),
sasToken.toString(),
getValue(settings, clientName, ENDPOINT_SUFFIX_SETTING),
getValue(settings, clientName, TIMEOUT_SETTING),
Expand All @@ -377,8 +396,7 @@ private static AzureStorageSettings getClientSettings(Settings settings, String
getValue(settings, clientName, WRITE_TIMEOUT_SETTING),
getValue(settings, clientName, READ_TIMEOUT_SETTING),
getValue(settings, clientName, RESPONSE_TIMEOUT_SETTING),
validateAndCreateProxySettings(settings, clientName)
);
validateAndCreateProxySettings(settings, clientName));
}
}

Expand Down Expand Up @@ -431,6 +449,7 @@ static Map<String, AzureStorageSettings> overrideLocationMode(
new AzureStorageSettings(
entry.getValue().account,
entry.getValue().connectString,
entry.getValue().federatedTokenFile,
entry.getValue().endpointSuffix,
entry.getValue().timeout,
entry.getValue().maxRetries,
Expand All @@ -439,8 +458,7 @@ static Map<String, AzureStorageSettings> overrideLocationMode(
entry.getValue().writeTimeout,
entry.getValue().readTimeout,
entry.getValue().responseTimeout,
entry.getValue().getProxySettings()
)
entry.getValue().getProxySettings())
);
}
return mapBuilder.immutableMap();
Expand Down

0 comments on commit 05396cd

Please sign in to comment.