Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Backport to branch(3.12): Reduce the number of concurrent DDLs from 1…
Browse files Browse the repository at this point in the history
…0 to 1 for Cosmos DB in the integration tests (#1930)

Co-authored-by: Toshihiro Suzuki <[email protected]>
komamitsu and brfrn169 committed Jun 19, 2024
1 parent d9e2695 commit 2e8bafa
Showing 4 changed files with 58 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -46,6 +46,11 @@ protected int getThreadNum() {
return 3;
}

@Override
protected boolean isParallelDdlSupported() {
return false;
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
Original file line number Diff line number Diff line change
@@ -24,6 +24,11 @@ protected int getThreadNum() {
return 3;
}

@Override
protected boolean isParallelDdlSupported() {
return false;
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
Original file line number Diff line number Diff line change
@@ -110,6 +110,10 @@ protected int getThreadNum() {
return THREAD_NUM;
}

protected boolean isParallelDdlSupported() {
return true;
}

private void createTables() throws java.util.concurrent.ExecutionException, InterruptedException {
List<Callable<Void>> testCallables = new ArrayList<>();

@@ -139,8 +143,8 @@ private void createTables() throws java.util.concurrent.ExecutionException, Inte
// We firstly execute the first one and then the rest. This is because the first table creation
// creates the metadata table, and this process can't be handled in multiple threads/processes
// at the same time.
executeInParallel(testCallables.subList(0, 1));
executeInParallel(testCallables.subList(1, testCallables.size()));
executeDdls(testCallables.subList(0, 1));
executeDdls(testCallables.subList(1, testCallables.size()));
}

protected Map<String, String> getCreationOptions() {
@@ -210,8 +214,8 @@ private void dropTables() throws java.util.concurrent.ExecutionException, Interr
// We firstly execute the callables without the last one. And then we execute the last one. This
// is because the last table deletion deletes the metadata table, and this process can't be
// handled in multiple threads/processes at the same time.
executeInParallel(testCallables.subList(0, testCallables.size() - 1));
executeInParallel(testCallables.subList(testCallables.size() - 1, testCallables.size()));
executeDdls(testCallables.subList(0, testCallables.size() - 1));
executeDdls(testCallables.subList(testCallables.size() - 1, testCallables.size()));
}

private void truncateTable(
@@ -2025,6 +2029,22 @@ private void executeInParallel(TestForSecondClusteringKeyScan test)
executeInParallel(testCallables);
}

private void executeDdls(List<Callable<Void>> ddls)
throws InterruptedException, java.util.concurrent.ExecutionException {
if (isParallelDdlSupported()) {
executeInParallel(ddls);
} else {
ddls.forEach(
ddl -> {
try {
ddl.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

private void executeInParallel(List<Callable<Void>> testCallables)
throws InterruptedException, java.util.concurrent.ExecutionException {
List<Future<Void>> futures = executorService.invokeAll(testCallables);
Original file line number Diff line number Diff line change
@@ -92,6 +92,10 @@ protected int getThreadNum() {
return THREAD_NUM;
}

protected boolean isParallelDdlSupported() {
return true;
}

private void createTables() throws java.util.concurrent.ExecutionException, InterruptedException {
List<Callable<Void>> testCallables = new ArrayList<>();

@@ -111,8 +115,8 @@ private void createTables() throws java.util.concurrent.ExecutionException, Inte
// We firstly execute the first one and then the rest. This is because the first table creation
// creates the metadata table, and this process can't be handled in multiple threads/processes
// at the same time.
executeInParallel(testCallables.subList(0, 1));
executeInParallel(testCallables.subList(1, testCallables.size()));
executeDdls(testCallables.subList(0, 1));
executeDdls(testCallables.subList(1, testCallables.size()));
}

protected Map<String, String> getCreationOptions() {
@@ -162,8 +166,8 @@ private void dropTables() throws java.util.concurrent.ExecutionException, Interr
// We firstly execute the callables without the last one. And then we execute the last one. This
// is because the last table deletion deletes the metadata table, and this process can't be
// handled in multiple threads/processes at the same time.
executeInParallel(testCallables.subList(0, testCallables.size() - 1));
executeInParallel(testCallables.subList(testCallables.size() - 1, testCallables.size()));
executeDdls(testCallables.subList(0, testCallables.size() - 1));
executeDdls(testCallables.subList(testCallables.size() - 1, testCallables.size()));
}

private void truncateTable(DataType firstPartitionKeyType, DataType secondPartitionKeyType)
@@ -181,6 +185,22 @@ private String getNamespaceName(DataType firstPartitionKeyType) {
return namespaceBaseName + firstPartitionKeyType;
}

private void executeDdls(List<Callable<Void>> ddls)
throws InterruptedException, java.util.concurrent.ExecutionException {
if (isParallelDdlSupported()) {
executeInParallel(ddls);
} else {
ddls.forEach(
ddl -> {
try {
ddl.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

private void executeInParallel(List<Callable<Void>> testCallables)
throws InterruptedException, java.util.concurrent.ExecutionException {
List<Future<Void>> futures = executorService.invokeAll(testCallables);

0 comments on commit 2e8bafa

Please sign in to comment.