Skip to content

Commit

Permalink
In SchemaLoader --import command, pass options down to Admin.import(...)
Browse files Browse the repository at this point in the history
  • Loading branch information
Torch3333 committed Nov 15, 2023
1 parent 8f1495f commit b6e16cf
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.scalar.db.schemaloader;

import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
Expand All @@ -16,28 +17,33 @@
@ThreadSafe
public class ImportSchemaParser {
private final JsonObject schemaJson;
private final Map<String, String> options;

public ImportSchemaParser(Path jsonFilePath) throws SchemaLoaderException {
public ImportSchemaParser(Path jsonFilePath, Map<String, String> options)
throws SchemaLoaderException {
try (Reader reader = Files.newBufferedReader(jsonFilePath)) {
schemaJson = JsonParser.parseReader(reader).getAsJsonObject();
} catch (IOException | JsonParseException e) {
throw new SchemaLoaderException("Parsing the schema JSON failed", e);
}
this.options = ImmutableMap.copyOf(options);
}

public ImportSchemaParser(String serializedSchemaJson) throws SchemaLoaderException {
public ImportSchemaParser(String serializedSchemaJson, Map<String, String> options)
throws SchemaLoaderException {
try {
schemaJson = JsonParser.parseString(serializedSchemaJson).getAsJsonObject();
} catch (JsonParseException e) {
throw new SchemaLoaderException("Parsing the schema JSON failed", e);
}
this.options = ImmutableMap.copyOf(options);
}

public List<ImportTableSchema> parse() throws SchemaLoaderException {
List<ImportTableSchema> tableSchemaList = new ArrayList<>();
for (Map.Entry<String, JsonElement> entry : schemaJson.entrySet()) {
tableSchemaList.add(
new ImportTableSchema(entry.getKey(), entry.getValue().getAsJsonObject()));
new ImportTableSchema(entry.getKey(), entry.getValue().getAsJsonObject(), options));
}
return tableSchemaList;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.scalar.db.schemaloader;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.JsonObject;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.Immutable;

@Immutable
public class ImportTableSchema {
private static final String TRANSACTION = "transaction";
private final String namespace;
private final String tableName;
private final boolean isTransactionTable;
private final ImmutableMap<String, String> options;

public ImportTableSchema(String tableFullName, JsonObject tableDefinition)
public ImportTableSchema(
String tableFullName, JsonObject tableDefinition, Map<String, String> options)
throws SchemaLoaderException {
String[] fullName = tableFullName.split("\\.", -1);
if (fullName.length != 2) {
Expand All @@ -20,11 +25,30 @@ public ImportTableSchema(String tableFullName, JsonObject tableDefinition)
}
namespace = fullName[0];
tableName = fullName[1];
if (tableDefinition.keySet().contains(TRANSACTION)) {
isTransactionTable = tableDefinition.get(TRANSACTION).getAsBoolean();
if (tableDefinition.keySet().contains(TableSchema.TRANSACTION)) {
isTransactionTable = tableDefinition.get(TableSchema.TRANSACTION).getAsBoolean();
} else {
isTransactionTable = true;
}
this.options = buildOptions(tableDefinition, options);
}

private ImmutableMap<String, String> buildOptions(
JsonObject tableDefinition, Map<String, String> globalOptions) {
ImmutableMap.Builder<String, String> optionsBuilder = ImmutableMap.builder();
optionsBuilder.putAll(globalOptions);
Set<String> keysToIgnore =
ImmutableSet.of(
TableSchema.PARTITION_KEY,
TableSchema.CLUSTERING_KEY,
TableSchema.TRANSACTION,
TableSchema.COLUMNS,
TableSchema.SECONDARY_INDEX);
tableDefinition.entrySet().stream()
.filter(entry -> !keysToIgnore.contains(entry.getKey()))
.forEach(entry -> optionsBuilder.put(entry.getKey(), entry.getValue().getAsString()));
// If an option is defined globally and in the JSON file, the JSON file value is used
return optionsBuilder.buildKeepingLast();
}

public String getNamespace() {
Expand All @@ -38,4 +62,8 @@ public String getTable() {
public boolean isTransactionTable() {
return isTransactionTable;
}

public Map<String, String> getOptions() {
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,64 +514,73 @@ private static void alterTables(
*
* @param configProperties ScalarDB config properties
* @param serializedSchemaJson serialized json string schema.
* @param options specific options for importing.
* @throws SchemaLoaderException thrown when importing tables fails.
*/
public static void importTables(Properties configProperties, String serializedSchemaJson)
public static void importTables(
Properties configProperties, String serializedSchemaJson, Map<String, String> options)
throws SchemaLoaderException {
Either<Path, Properties> config = new Right<>(configProperties);
Either<Path, String> schema = new Right<>(serializedSchemaJson);
importTables(config, schema);
importTables(config, schema, options);
}

/**
* Import tables defined in the schema file.
*
* @param configProperties ScalarDB properties.
* @param schemaPath path to the schema file.
* @param options specific options for importing.
* @throws SchemaLoaderException thrown when importing tables fails.
*/
public static void importTables(Properties configProperties, Path schemaPath)
public static void importTables(
Properties configProperties, Path schemaPath, Map<String, String> options)
throws SchemaLoaderException {
Either<Path, Properties> config = new Right<>(configProperties);
Either<Path, String> schema = new Left<>(schemaPath);
importTables(config, schema);
importTables(config, schema, options);
}

/**
* Import tables defined in the schema.
*
* @param configPath path to the ScalarDB config.
* @param serializedSchemaJson serialized json string schema.
* @param options specific options for importing.
* @throws SchemaLoaderException thrown when importing tables fails.
*/
public static void importTables(Path configPath, String serializedSchemaJson)
public static void importTables(
Path configPath, String serializedSchemaJson, Map<String, String> options)
throws SchemaLoaderException {
Either<Path, Properties> config = new Left<>(configPath);
Either<Path, String> schema = new Right<>(serializedSchemaJson);
importTables(config, schema);
importTables(config, schema, options);
}

/**
* Import tables defined in the schema file.
*
* @param configPath path to the ScalarDB config.
* @param schemaPath path to the schema file.
* @param options specific options for importing.
* @throws SchemaLoaderException thrown when importing tables fails.
*/
public static void importTables(Path configPath, Path schemaPath) throws SchemaLoaderException {
public static void importTables(Path configPath, Path schemaPath, Map<String, String> options)
throws SchemaLoaderException {
Either<Path, Properties> config = new Left<>(configPath);
Either<Path, String> schema = new Left<>(schemaPath);
importTables(config, schema);
importTables(config, schema, options);
}

private static void importTables(Either<Path, Properties> config, Either<Path, String> schema)
private static void importTables(
Either<Path, Properties> config, Either<Path, String> schema, Map<String, String> options)
throws SchemaLoaderException {
// Parse the schema
List<ImportTableSchema> tableSchemaList = getImportTableSchemaList(schema);
List<ImportTableSchema> tableSchemaList = getImportTableSchemaList(schema, options);

// Import tables
try (SchemaOperator operator = getSchemaOperator(config)) {
operator.importTables(tableSchemaList);
operator.importTables(tableSchemaList, options);
}
}

Expand Down Expand Up @@ -613,25 +622,25 @@ static SchemaParser getSchemaParser(Either<Path, String> schema, Map<String, Str
}
}

private static List<ImportTableSchema> getImportTableSchemaList(Either<Path, String> schema)
throws SchemaLoaderException {
private static List<ImportTableSchema> getImportTableSchemaList(
Either<Path, String> schema, Map<String, String> options) throws SchemaLoaderException {
if ((schema.isLeft() && schema.getLeft() != null)
|| (schema.isRight() && schema.getRight() != null)) {
ImportSchemaParser schemaParser = getImportSchemaParser(schema);
ImportSchemaParser schemaParser = getImportSchemaParser(schema, options);
return schemaParser.parse();
}
return Collections.emptyList();
}

@VisibleForTesting
static ImportSchemaParser getImportSchemaParser(Either<Path, String> schema)
throws SchemaLoaderException {
static ImportSchemaParser getImportSchemaParser(
Either<Path, String> schema, Map<String, String> options) throws SchemaLoaderException {
assert (schema.isLeft() && schema.getLeft() != null)
|| (schema.isRight() && schema.getRight() != null);
if (schema.isLeft()) {
return new ImportSchemaParser(schema.getLeft());
return new ImportSchemaParser(schema.getLeft(), options);
} else {
return new ImportSchemaParser(schema.getRight());
return new ImportSchemaParser(schema.getRight(), options);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.scalar.db.service.TransactionFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -408,15 +407,16 @@ private void addNewColumnsToTable(
}
}

public void importTables(List<ImportTableSchema> tableSchemaList) throws SchemaLoaderException {
public void importTables(List<ImportTableSchema> tableSchemaList, Map<String, String> options)
throws SchemaLoaderException {
for (ImportTableSchema tableSchema : tableSchemaList) {
String namespace = tableSchema.getNamespace();
String table = tableSchema.getTable();
try {
if (tableSchema.isTransactionTable()) {
transactionAdmin.get().importTable(namespace, table, Collections.emptyMap());
transactionAdmin.get().importTable(namespace, table, options);
} else {
storageAdmin.get().importTable(namespace, table, Collections.emptyMap());
storageAdmin.get().importTable(namespace, table, options);
}
logger.info("Importing the table {} in the namespace {} succeeded", table, namespace);
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
@Immutable
public class TableSchema {

private static final String COLUMNS = "columns";
private static final String TRANSACTION = "transaction";
private static final String PARTITION_KEY = "partition-key";
private static final String CLUSTERING_KEY = "clustering-key";
private static final String SECONDARY_INDEX = "secondary-index";
static final String COLUMNS = "columns";
static final String TRANSACTION = "transaction";
static final String PARTITION_KEY = "partition-key";
static final String CLUSTERING_KEY = "clustering-key";
static final String SECONDARY_INDEX = "secondary-index";
private static final ImmutableMap<String, DataType> DATA_MAP_TYPE =
ImmutableMap.<String, DataType>builder()
.put("BOOLEAN", DataType.BOOLEAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ private void importTables() throws SchemaLoaderException {
"Specifying the '--coordinator' option with the '--import' option is not allowed."
+ " Create coordinator tables separately");
}

SchemaLoader.importTables(configPath, schemaFile);
Map<String, String> options = prepareAllOptions();
SchemaLoader.importTables(configPath, schemaFile, options);
}

private Map<String, String> prepareAllOptions() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.scalar.db.schemaloader;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;

import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;

public class ImportSchemaParserTest {
Expand All @@ -11,6 +14,7 @@ public class ImportSchemaParserTest {
public void parse_ProperSerializedSchemaJsonAndOptionsGiven_ShouldParseCorrectly()
throws SchemaLoaderException {
// Arrange
Map<String, String> globalOptions = ImmutableMap.of("ru", "4000", "replication-factor", "1");
String serializedSchemaJson =
"{"
+ " \"sample_db.sample_table1\": {"
Expand Down Expand Up @@ -45,7 +49,7 @@ public void parse_ProperSerializedSchemaJsonAndOptionsGiven_ShouldParseCorrectly
+ " \"compaction-strategy\": \"LCS\""
+ " }"
+ "}";
ImportSchemaParser parser = new ImportSchemaParser(serializedSchemaJson);
ImportSchemaParser parser = new ImportSchemaParser(serializedSchemaJson, globalOptions);

// Act
List<ImportTableSchema> actual = parser.parse();
Expand All @@ -56,13 +60,22 @@ public void parse_ProperSerializedSchemaJsonAndOptionsGiven_ShouldParseCorrectly
assertThat(actual.get(0).getNamespace()).isEqualTo("sample_db");
assertThat(actual.get(0).getTable()).isEqualTo("sample_table1");
assertThat(actual.get(0).isTransactionTable()).isTrue();
assertThat(actual.get(0).getOptions())
.containsOnly(entry("ru", "4000"), entry("replication-factor", "1"));

assertThat(actual.get(1).getNamespace()).isEqualTo("sample_db");
assertThat(actual.get(1).getTable()).isEqualTo("sample_table2");
assertThat(actual.get(1).isTransactionTable()).isFalse();
assertThat(actual.get(1).getOptions())
.containsOnly(entry("ru", "4000"), entry("replication-factor", "1"));

assertThat(actual.get(2).getNamespace()).isEqualTo("sample_db");
assertThat(actual.get(2).getTable()).isEqualTo("sample_table3");
assertThat(actual.get(2).isTransactionTable()).isTrue();
assertThat(actual.get(2).getOptions())
.containsOnly(
entry("ru", "5000"),
entry("compaction-strategy", "LCS"),
entry("replication-factor", "1"));
}
}
Loading

0 comments on commit b6e16cf

Please sign in to comment.