Skip to content

Commit

Permalink
Incorporate PR review feedback - 1
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Oct 3, 2023
1 parent 1717027 commit 2020117
Show file tree
Hide file tree
Showing 16 changed files with 91 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public URLRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
super(metadata, false, namedXContentRegistry, clusterService, recoverySettings);
super(metadata, namedXContentRegistry, clusterService, recoverySettings);

if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,7 @@ public AzureRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(
metadata,
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(
metadata,
getSetting(COMPRESS_SETTING, metadata),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public HdfsRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);
super(metadata, namedXContentRegistry, clusterService, recoverySettings);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class S3BlobStore implements BlobStore {

private final StorageClass storageClass;

private RepositoryMetadata repositoryMetadata;
private volatile RepositoryMetadata repositoryMetadata;

private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();

Expand Down Expand Up @@ -105,20 +105,11 @@ class S3BlobStore implements BlobStore {
this.priorityExecutorBuilder = priorityExecutorBuilder;
}

@Override
public boolean isReloadable() {
return true;
}

@Override
public void reload(RepositoryMetadata repositoryMetadata) {
this.repositoryMetadata = repositoryMetadata;
}

public boolean isMultipartUploadEnabled() {
return multipartUploadEnabled;
}

@Override
public String toString() {
return bucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.settings.SecureString;
Expand Down Expand Up @@ -78,7 +79,6 @@
* <dt>{@code concurrent_streams}</dt><dd>Number of concurrent read/write stream (per repository on each node). Defaults to 5.</dd>
* <dt>{@code chunk_size}</dt>
* <dd>Large file can be divided into chunks. This parameter specifies the chunk size. Defaults to not chucked.</dd>
* <dt>{@code compress}</dt><dd>If set to true metadata files will be stored compressed. Defaults to false.</dd>
* </dl>
*/
class S3Repository extends MeteredBlobStoreRepository {
Expand Down Expand Up @@ -204,29 +204,27 @@ class S3Repository extends MeteredBlobStoreRepository {

private final S3Service service;

private String bucket;
private volatile String bucket;

private ByteSizeValue bufferSize;
private volatile ByteSizeValue bufferSize;

private ByteSizeValue chunkSize;
private volatile ByteSizeValue chunkSize;

private BlobPath basePath;
private volatile BlobPath basePath;

private boolean serverSideEncryption;
private volatile boolean serverSideEncryption;

private String storageClass;

private String cannedACL;

private RepositoryMetadata repositoryMetadata;
private volatile String storageClass;

private volatile String cannedACL;
private final AsyncTransferManager asyncUploadUtils;
private final S3AsyncService s3AsyncService;
private final boolean multipartUploadEnabled;
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final Path pluginConfigPath;

// Used by test classes
S3Repository(
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
Expand Down Expand Up @@ -270,14 +268,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final boolean multipartUploadEnabled,
Path pluginConfigPath
) {
super(
metadata,
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.service = service;
this.s3AsyncService = s3AsyncService;
this.multipartUploadEnabled = multipartUploadEnabled;
Expand All @@ -286,7 +277,7 @@ class S3Repository extends MeteredBlobStoreRepository {
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.normalExecutorBuilder = normalExecutorBuilder;

readRepositoryMetadata(metadata);
readRepositoryMetadata();
}

private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
Expand Down Expand Up @@ -341,14 +332,14 @@ protected S3BlobStore createBlobStore() {
bufferSize,
cannedACL,
storageClass,
repositoryMetadata,
metadata,
asyncUploadUtils,
priorityExecutorBuilder,
normalExecutorBuilder
);
}

// only use for testing
// only use for testing (S3RepositoryTests)
@Override
protected BlobStore getBlobStore() {
return super.getBlobStore();
Expand All @@ -368,51 +359,27 @@ public boolean isReloadable() {
public void reload(RepositoryMetadata newRepositoryMetadata) {
// Reload configs for S3Repository
super.reload(newRepositoryMetadata);
repositoryMetadata = newRepositoryMetadata;
readRepositoryMetadata(repositoryMetadata);
readRepositoryMetadata();

// Reload configs for S3RepositoryPlugin
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(repositoryMetadata.settings(), pluginConfigPath);
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(metadata.settings(), pluginConfigPath);
service.refreshAndClearCache(clientsSettings);
s3AsyncService.refreshAndClearCache(clientsSettings);

// Reload configs for S3BlobStore
BlobStore blobStore = getBlobStore();
blobStore.reload(repositoryMetadata);
blobStore.reload(metadata);
}

/**
* Reloads the values derived from the Repository Metadata
*
* @param repositoryMetadata RepositoryMetadata instance to derive the values from
*/
private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
this.repositoryMetadata = metadata;
private void readRepositoryMetadata() {
validateRepositoryMetadata();

// Parse and validate the user's S3 Storage Class setting
this.bucket = BUCKET_SETTING.get(metadata.settings());
if (bucket == null) {
throw new RepositoryException(metadata.name(), "No bucket defined for s3 repository");
}

this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings());
this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());

// We make sure that chunkSize is bigger or equal than/to bufferSize
if (this.chunkSize.getBytes() < bufferSize.getBytes()) {
throw new RepositoryException(
metadata.name(),
CHUNK_SIZE_SETTING.getKey()
+ " ("
+ this.chunkSize
+ ") can't be lower than "
+ BUFFER_SIZE_SETTING.getKey()
+ " ("
+ bufferSize
+ ")."
);
}

final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
this.basePath = new BlobPath().add(basePath);
Expand All @@ -421,10 +388,8 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
}

this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings());

this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());

if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
// provided repository settings
deprecationLogger.deprecate(
Expand All @@ -445,6 +410,29 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
);
}

private void validateRepositoryMetadata() {
// Parse and validate the user's S3 Storage Class setting
Settings settings = metadata.settings();
if (BUCKET_SETTING.get(settings) == null) {
throw new RepositoryException(metadata.name(), "No bucket defined for s3 repository");
}

// We make sure that chunkSize is bigger or equal than/to bufferSize
if (CHUNK_SIZE_SETTING.get(settings).getBytes() < BUFFER_SIZE_SETTING.get(settings).getBytes()) {
throw new RepositoryException(
metadata.name(),
CHUNK_SIZE_SETTING.getKey()
+ " ("
+ CHUNK_SIZE_SETTING.get(settings)
+ ") can't be lower than "
+ BUFFER_SIZE_SETTING.getKey()
+ " ("
+ BUFFER_SIZE_SETTING.get(settings)
+ ")."
);
}
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.security.SecureRandom;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Collections.emptyMap;

Expand All @@ -100,7 +101,7 @@ class S3Service implements Closeable {

private static final String DEFAULT_S3_ENDPOINT = "s3.amazonaws.com";

private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap();
private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = new ConcurrentHashMap<>();

/**
* Client settings calculated from static configuration and settings in the keystore.
Expand All @@ -111,7 +112,7 @@ class S3Service implements Closeable {
* Client settings derived from those in {@link #staticClientSettings} by combining them with settings
* in the {@link RepositoryMetadata}.
*/
private volatile Map<Settings, S3ClientSettings> derivedClientSettings = emptyMap();
private volatile Map<Settings, S3ClientSettings> derivedClientSettings = new ConcurrentHashMap<>();

S3Service(final Path configPath) {
staticClientSettings = MapBuilder.<String, S3ClientSettings>newMapBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ default Map<String, Long> stats() {
return Collections.emptyMap();
}

/**
* Checks if the blob store can be reloaded inplace or not
* @return true if the blob store can be reloaded inplace, false otherwise
*/
default boolean isReloadable() {
return false;
}

/**
* Reload the blob store inplace
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,11 @@ public void applyClusterState(ClusterChangedEvent event) {
if (previousMetadata.type().equals(repositoryMetadata.type()) == false
|| previousMetadata.settings().equals(repositoryMetadata.settings()) == false) {
// Previous version is different from the version in settings
logger.debug("updating repository [{}]", repositoryMetadata.name());
if (repository.isSystemRepository() && repository.isReloadable()) {
logger.debug("updating repository [{}] in-place", repositoryMetadata.name());
repository.reload(repositoryMetadata);
} else {
logger.debug("creating repository [{}] again", repositoryMetadata.name());
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
repository = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,21 +296,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.NodeScope
);

protected boolean supportURLRepo;
protected volatile boolean supportURLRepo;

private int maxShardBlobDeleteBatch;
private volatile int maxShardBlobDeleteBatch;

private Compressor compressor;
private volatile Compressor compressor;

private boolean cacheRepositoryData;
private volatile boolean cacheRepositoryData;

private RateLimiter snapshotRateLimiter;
private volatile RateLimiter snapshotRateLimiter;

private RateLimiter restoreRateLimiter;
private volatile RateLimiter restoreRateLimiter;

private RateLimiter remoteUploadRateLimiter;
private volatile RateLimiter remoteUploadRateLimiter;

private RateLimiter remoteDownloadRateLimiter;
private volatile RateLimiter remoteDownloadRateLimiter;

private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();

Expand Down Expand Up @@ -355,9 +355,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
BlobStoreIndexShardSnapshots::fromXContent
);

private boolean readOnly;
private volatile boolean readOnly;

private boolean isSystemRepository;
private final boolean isSystemRepository;

private final Object lock = new Object();

Expand Down Expand Up @@ -399,7 +399,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
/**
* IO buffer size hint for reading and writing to the underlying blob store.
*/
protected int bufferSize;
protected volatile int bufferSize;

/**
* Constructs new BlobStoreRepository
Expand All @@ -408,7 +408,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
protected BlobStoreRepository(
final RepositoryMetadata repositoryMetadata,
final boolean compress,
final NamedXContentRegistry namedXContentRegistry,
final ClusterService clusterService,
final RecoverySettings recoverySettings
Expand Down
Loading

0 comments on commit 2020117

Please sign in to comment.