Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Deletion protection config #1260

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ public class LeaseManagementConfig {

private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;

/**
* Whether to enabled deletion protection on the DyanmoDB lease table created by KCL.
*
* <p>Default value: false
*/
private boolean deletionProtectionEnabled = false;
lucienlu-aws marked this conversation as resolved.
Show resolved Hide resolved

/**
* The list of tags to be applied to the DynamoDB table created for lease management.
*
Expand Down Expand Up @@ -382,6 +389,7 @@ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer lease
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
deletionProtectionEnabled(),
tags(),
leaseSerializer,
customShardDetectorProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean deletionProtectionEnabled;
private final Collection<Tag> tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
Expand Down Expand Up @@ -450,7 +451,7 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, false,
DefaultSdkAutoConstructList.getInstance(), leaseSerializer);
}

Expand Down Expand Up @@ -483,6 +484,7 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param deletionProtectionEnabled
* @param tags
*/
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
Expand All @@ -495,15 +497,17 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags, LeaseSerializer leaseSerializer) {
Duration dynamoDbRequestTimeout, BillingMode billingMode, final boolean deletionProtectionEnabled,
Collection<Tag> tags, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags, leaseSerializer,
null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
deletionProtectionEnabled, tags, leaseSerializer, null, false,
LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
this.streamConfig = streamConfig;
}

Expand Down Expand Up @@ -534,6 +538,7 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param deletionProtectionEnabled
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
Expand All @@ -549,7 +554,8 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags, LeaseSerializer leaseSerializer,
Duration dynamoDbRequestTimeout, BillingMode billingMode, final boolean deletionProtectionEnabled,
lucienlu-aws marked this conversation as resolved.
Show resolved Hide resolved
Collection<Tag> tags, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
Expand Down Expand Up @@ -577,6 +583,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.deletionProtectionEnabled = deletionProtectionEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
Expand Down Expand Up @@ -648,7 +655,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,
tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags);
tableCreatorCallback, dynamoDbRequestTimeout, billingMode, deletionProtectionEnabled, tags);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {

private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean deletionProtectionEnabled;
private final Collection<Tag> tags;

private boolean newTableCreated = false;
Expand Down Expand Up @@ -134,7 +135,7 @@ public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dyna
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST);
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST, false);
}

/**
Expand All @@ -146,14 +147,15 @@ public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dyna
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param deletionProtectionEnabled
*/
@Deprecated
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode) {
final BillingMode billingMode, final boolean deletionProtectionEnabled) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout,
billingMode, DefaultSdkAutoConstructList.getInstance());
billingMode, deletionProtectionEnabled, DefaultSdkAutoConstructList.getInstance());
}

/**
Expand All @@ -165,19 +167,22 @@ public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dyna
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param deletionProtectionEnabled
* @param tags
*/
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode, final Collection<Tag> tags) {
final BillingMode billingMode, final boolean deletionProtectionEnabled,
final Collection<Tag> tags) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
this.consistentReads = consistentReads;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.deletionProtectionEnabled = deletionProtectionEnabled;
this.tags = tags;
}

Expand Down Expand Up @@ -806,6 +811,7 @@ protected DependencyException convertAndRethrowExceptions(String operation, Stri
private CreateTableRequest.Builder createTableRequestBuilder() {
final CreateTableRequest.Builder builder = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.deletionProtectionEnabled(deletionProtectionEnabled)
.tags(tags);
if (BillingMode.PAY_PER_REQUEST.equals(billingMode)) {
builder.billingMode(billingMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrati
@Override
protected DynamoDBLeaseRefresher getLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected void starting(Description description) {

protected DynamoDBLeaseRefresher getLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST, false);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class DynamoDBLeaseRefresherTest {

private static final String TABLE_NAME = "test";
private static final boolean CONSISTENT_READS = true;
private static final boolean DELETION_PROTECTION_ENABLED = false;

@Mock
private DynamoDbAsyncClient dynamoDbClient;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void setup() throws Exception {
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.billingMode(BillingMode.PAY_PER_REQUEST)
.deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
.build();
}

Expand Down Expand Up @@ -286,7 +288,7 @@ public void testLeaseTableExistsTimesOut() throws Exception {
@Test
public void testCreateLeaseTableProvisionedBillingModeIfNotExists() throws Exception {
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, DELETION_PROTECTION_ENABLED);

when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
Expand All @@ -299,6 +301,7 @@ public void testCreateLeaseTableProvisionedBillingModeIfNotExists() throws Excep
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
.deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
Expand All @@ -319,7 +322,7 @@ public void testCreateLeaseTableProvisionedBillingModeIfNotExists() throws Excep
public void testCreateLeaseTableWithTagsIfNotExists() throws Exception {
tags = Collections.singletonList(Tag.builder().key("foo").value("bar").build());
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, tags);
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, DELETION_PROTECTION_ENABLED, tags);

when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS))
Expand All @@ -332,6 +335,7 @@ public void testCreateLeaseTableWithTagsIfNotExists() throws Exception {
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
.deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
.tags(tags)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
Expand Down Expand Up @@ -369,6 +373,39 @@ public void testCreateLeaseTableIfNotExists() throws Exception {
Assert.assertTrue(result);
}

@Test
public void testCreateLeaseTableProvisionedWithDeletionProtectionIfNotExists() throws Exception {
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, true);

when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());

final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L)
.writeCapacityUnits(10L).build();
final CreateTableRequest createTableRequest = CreateTableRequest.builder()
.tableName(TABLE_NAME)
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
.deletionProtectionEnabled(true)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
.thenReturn(null);

final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L);

verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
verify(mockDescribeTableFuture, times(1))
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
verify(mockCreateTableFuture, times(1))
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
Assert.assertTrue(result);
}

@Test
public void testCreateLeaseTableIfNotExists_throwsDependencyException() throws Exception {
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
Expand Down Expand Up @@ -462,7 +499,7 @@ public void testCreateLeaseTableIfNotExists_throwsTimeoutException_expectDepende
@Test
public void testCreateLeaseTableProvisionedBillingModeTimesOut() throws Exception {
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, false);
TimeoutException te = setRuleForDependencyTimeout();

when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);
Expand Down
Loading