Skip to content

Commit

Permalink
Backport to branch(3.10): Reduce the number of concurrent DDLs from 1…
Browse files Browse the repository at this point in the history
…0 to 1 for Cosmos DB in the integration tests (#1932)

Co-authored-by: Toshihiro Suzuki <[email protected]>
  • Loading branch information
komamitsu and brfrn169 authored Jun 19, 2024
1 parent ce74998 commit d0e5431
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ protected int getThreadNum() {
return 3;
}

@Override
protected boolean isParallelDdlSupported() {
return false;
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ protected int getThreadNum() {
return 3;
}

@Override
protected boolean isParallelDdlSupported() {
return false;
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ protected int getThreadNum() {
return THREAD_NUM;
}

protected boolean isParallelDdlSupported() {
return true;
}

private void createTables() throws java.util.concurrent.ExecutionException, InterruptedException {
List<Callable<Void>> testCallables = new ArrayList<>();

Expand Down Expand Up @@ -139,8 +143,8 @@ private void createTables() throws java.util.concurrent.ExecutionException, Inte
// We firstly execute the first one and then the rest. This is because the first table creation
// creates the metadata table, and this process can't be handled in multiple threads/processes
// at the same time.
executeInParallel(testCallables.subList(0, 1));
executeInParallel(testCallables.subList(1, testCallables.size()));
executeDdls(testCallables.subList(0, 1));
executeDdls(testCallables.subList(1, testCallables.size()));
}

protected Map<String, String> getCreationOptions() {
Expand Down Expand Up @@ -210,8 +214,8 @@ private void dropTables() throws java.util.concurrent.ExecutionException, Interr
// We firstly execute the callables without the last one. And then we execute the last one. This
// is because the last table deletion deletes the metadata table, and this process can't be
// handled in multiple threads/processes at the same time.
executeInParallel(testCallables.subList(0, testCallables.size() - 1));
executeInParallel(testCallables.subList(testCallables.size() - 1, testCallables.size()));
executeDdls(testCallables.subList(0, testCallables.size() - 1));
executeDdls(testCallables.subList(testCallables.size() - 1, testCallables.size()));
}

private void truncateTable(
Expand Down Expand Up @@ -2025,6 +2029,22 @@ private void executeInParallel(TestForSecondClusteringKeyScan test)
executeInParallel(testCallables);
}

private void executeDdls(List<Callable<Void>> ddls)
throws InterruptedException, java.util.concurrent.ExecutionException {
if (isParallelDdlSupported()) {
executeInParallel(ddls);
} else {
ddls.forEach(
ddl -> {
try {
ddl.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

private void executeInParallel(List<Callable<Void>> testCallables)
throws InterruptedException, java.util.concurrent.ExecutionException {
List<Future<Void>> futures = executorService.invokeAll(testCallables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ protected int getThreadNum() {
return THREAD_NUM;
}

protected boolean isParallelDdlSupported() {
return true;
}

private void createTables() throws java.util.concurrent.ExecutionException, InterruptedException {
List<Callable<Void>> testCallables = new ArrayList<>();

Expand All @@ -111,8 +115,8 @@ private void createTables() throws java.util.concurrent.ExecutionException, Inte
// We firstly execute the first one and then the rest. This is because the first table creation
// creates the metadata table, and this process can't be handled in multiple threads/processes
// at the same time.
executeInParallel(testCallables.subList(0, 1));
executeInParallel(testCallables.subList(1, testCallables.size()));
executeDdls(testCallables.subList(0, 1));
executeDdls(testCallables.subList(1, testCallables.size()));
}

protected Map<String, String> getCreationOptions() {
Expand Down Expand Up @@ -162,8 +166,8 @@ private void dropTables() throws java.util.concurrent.ExecutionException, Interr
// We firstly execute the callables without the last one. And then we execute the last one. This
// is because the last table deletion deletes the metadata table, and this process can't be
// handled in multiple threads/processes at the same time.
executeInParallel(testCallables.subList(0, testCallables.size() - 1));
executeInParallel(testCallables.subList(testCallables.size() - 1, testCallables.size()));
executeDdls(testCallables.subList(0, testCallables.size() - 1));
executeDdls(testCallables.subList(testCallables.size() - 1, testCallables.size()));
}

private void truncateTable(DataType firstPartitionKeyType, DataType secondPartitionKeyType)
Expand All @@ -181,6 +185,22 @@ private String getNamespaceName(DataType firstPartitionKeyType) {
return namespaceBaseName + firstPartitionKeyType;
}

private void executeDdls(List<Callable<Void>> ddls)
throws InterruptedException, java.util.concurrent.ExecutionException {
if (isParallelDdlSupported()) {
executeInParallel(ddls);
} else {
ddls.forEach(
ddl -> {
try {
ddl.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

private void executeInParallel(List<Callable<Void>> testCallables)
throws InterruptedException, java.util.concurrent.ExecutionException {
List<Future<Void>> futures = executorService.invokeAll(testCallables);
Expand Down

0 comments on commit d0e5431

Please sign in to comment.