Skip to content

Commit

Permalink
Backport to branch(3.13) : Add export options validator (#2455)
Browse files Browse the repository at this point in the history
Co-authored-by: inv-jishnu <[email protected]>
Co-authored-by: Peckstadt Yves <[email protected]>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent ab373e2 commit 227ec48
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 0 deletions.
28 changes: 28 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,34 @@ public enum CoreError implements ScalarDbError {
""),
DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT(
Category.USER_ERROR, "0151", "Method null argument not allowed", "", ""),
ABAC_NOT_ENABLED(
Category.USER_ERROR,
"0152",
"The attribute-based access control feature is not enabled. To use this feature, you must enable it. Note that this feature is supported only in the ScalarDB Enterprise edition",
"",
""),
DATA_LOADER_CLUSTERING_KEY_NOT_FOUND(
Category.USER_ERROR, "0153", "The provided clustering key %s was not found", "", ""),
DATA_LOADER_INVALID_PROJECTION(
Category.USER_ERROR, "0154", "The column '%s' was not found", "", ""),
DATA_LOADER_INCOMPLETE_PARTITION_KEY(
Category.USER_ERROR,
"0155",
"The provided partition key is incomplete. Required key: %s",
"",
""),
DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH(
Category.USER_ERROR,
"0156",
"The provided clustering key order does not match the table schema. Required order: %s",
"",
""),
DATA_LOADER_PARTITION_KEY_ORDER_MISMATCH(
Category.USER_ERROR,
"0157",
"The provided partition key order does not match the table schema. Required order: %s",
"",
""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.scalar.db.dataloader.core.dataexport.validation;

/** A custom exception for export options validation errors */
public class ExportOptionsValidationException extends Exception {

/**
* Class constructor
*
* @param message error message
*/
public ExportOptionsValidationException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package com.scalar.db.dataloader.core.dataexport.validation;

import com.scalar.db.api.Scan;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
import com.scalar.db.io.Column;
import com.scalar.db.io.Key;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* A validator for ensuring that export options are consistent with the ScalarDB table metadata and
* follow the defined constraints.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ExportOptionsValidator {

/**
* Validates the export request.
*
* @param exportOptions The export options provided by the user.
* @param tableMetadata The metadata of the ScalarDB table to validate against.
* @throws ExportOptionsValidationException If the export options are invalid.
*/
public static void validate(ExportOptions exportOptions, TableMetadata tableMetadata)
throws ExportOptionsValidationException {
LinkedHashSet<String> partitionKeyNames = tableMetadata.getPartitionKeyNames();
LinkedHashSet<String> clusteringKeyNames = tableMetadata.getClusteringKeyNames();
ScanRange scanRange = exportOptions.getScanRange();

validatePartitionKey(partitionKeyNames, exportOptions.getScanPartitionKey());
validateProjectionColumns(tableMetadata.getColumnNames(), exportOptions.getProjectionColumns());
validateSortOrders(clusteringKeyNames, exportOptions.getSortOrders());

if (scanRange.getScanStartKey() != null) {
validateClusteringKey(clusteringKeyNames, scanRange.getScanStartKey());
}
if (scanRange.getScanEndKey() != null) {
validateClusteringKey(clusteringKeyNames, scanRange.getScanEndKey());
}
}

/*
* Check if the provided partition key is available in the ScalarDB table
* @param partitionKeyNames List of partition key names available in a
* @param key To be validated ScalarDB key
* @throws ExportOptionsValidationException if the key could not be found or is not a partition
*/
private static void validatePartitionKey(LinkedHashSet<String> partitionKeyNames, Key key)
throws ExportOptionsValidationException {
if (partitionKeyNames == null || key == null) {
return;
}

// Make sure that all partition key columns are provided
if (partitionKeyNames.size() != key.getColumns().size()) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_INCOMPLETE_PARTITION_KEY.buildMessage(partitionKeyNames));
}

// Check if the order of columns in key.getColumns() matches the order in partitionKeyNames
Iterator<String> partitionKeyIterator = partitionKeyNames.iterator();
for (Column<?> column : key.getColumns()) {
// Check if the column names match in order
if (!partitionKeyIterator.hasNext()
|| !partitionKeyIterator.next().equals(column.getName())) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_PARTITION_KEY_ORDER_MISMATCH.buildMessage(partitionKeyNames));
}
}
}

private static void validateSortOrders(
LinkedHashSet<String> clusteringKeyNames, List<Scan.Ordering> sortOrders)
throws ExportOptionsValidationException {
if (sortOrders == null || sortOrders.isEmpty()) {
return;
}

for (Scan.Ordering sortOrder : sortOrders) {
checkIfColumnExistsAsClusteringKey(clusteringKeyNames, sortOrder.getColumnName());
}
}

/**
* Validates that the clustering key columns in the given Key object match the expected order
* defined in the clusteringKeyNames. The Key can be a prefix of the clusteringKeyNames, but the
* order must remain consistent.
*
* @param clusteringKeyNames the expected ordered set of clustering key names
* @param key the Key object containing the actual clustering key columns
* @throws ExportOptionsValidationException if the order or names of clustering keys do not match
*/
private static void validateClusteringKey(LinkedHashSet<String> clusteringKeyNames, Key key)
throws ExportOptionsValidationException {
// If either clusteringKeyNames or key is null, no validation is needed
if (clusteringKeyNames == null || key == null) {
return;
}

// Create an iterator to traverse the clusteringKeyNames in order
Iterator<String> clusteringKeyIterator = clusteringKeyNames.iterator();

// Iterate through the columns in the given Key
for (Column<?> column : key.getColumns()) {
// If clusteringKeyNames have been exhausted but columns still exist in the Key,
// it indicates a mismatch
if (!clusteringKeyIterator.hasNext()) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH.buildMessage(clusteringKeyNames));
}

// Get the next expected clustering key name
String expectedKey = clusteringKeyIterator.next();

// Check if the current column name matches the expected clustering key name
if (!column.getName().equals(expectedKey)) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH.buildMessage(clusteringKeyNames));
}
}
}

private static void checkIfColumnExistsAsClusteringKey(
LinkedHashSet<String> clusteringKeyNames, String columnName)
throws ExportOptionsValidationException {
if (clusteringKeyNames == null || columnName == null) {
return;
}

if (!clusteringKeyNames.contains(columnName)) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_CLUSTERING_KEY_NOT_FOUND.buildMessage(columnName));
}
}

private static void validateProjectionColumns(
LinkedHashSet<String> columnNames, List<String> columns)
throws ExportOptionsValidationException {
if (columns == null || columns.isEmpty()) {
return;
}

for (String column : columns) {
if (!columnNames.contains(column)) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_INVALID_PROJECTION.buildMessage(column));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package com.scalar.db.dataloader.core.dataexport.validation;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
import com.scalar.db.io.DataType;
import com.scalar.db.io.IntColumn;
import com.scalar.db.io.Key;
import com.scalar.db.io.TextColumn;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ExportOptionsValidatorTest {

private TableMetadata singlePkCkMetadata;
private TableMetadata multiplePkCkMetadata;
private List<String> projectedColumns;

@BeforeEach
void setup() {
singlePkCkMetadata = createMockMetadata(1, 1);
multiplePkCkMetadata = createMockMetadata(2, 2);
projectedColumns = createProjectedColumns();
}

private TableMetadata createMockMetadata(int pkCount, int ckCount) {
TableMetadata.Builder builder = TableMetadata.newBuilder();

// Add partition keys
for (int i = 1; i <= pkCount; i++) {
builder.addColumn("pk" + i, DataType.INT);
builder.addPartitionKey("pk" + i);
}

// Add clustering keys
for (int i = 1; i <= ckCount; i++) {
builder.addColumn("ck" + i, DataType.TEXT);
builder.addClusteringKey("ck" + i);
}

return builder.build();
}

private List<String> createProjectedColumns() {
List<String> columns = new ArrayList<>();
columns.add("pk1");
columns.add("ck1");
return columns;
}

@Test
void validate_withValidExportOptionsForSinglePkCk_ShouldNotThrowException()
throws ExportOptionsValidationException {

Key partitionKey = Key.newBuilder().add(IntColumn.of("pk1", 1)).build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", partitionKey, FileFormat.JSON)
.projectionColumns(projectedColumns)
.scanRange(new ScanRange(null, null, false, false))
.build();

ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata);
}

@Test
void validate_withValidExportOptionsForMultiplePkCk_ShouldNotThrowException()
throws ExportOptionsValidationException {

Key partitionKey =
Key.newBuilder().add(IntColumn.of("pk1", 1)).add(IntColumn.of("pk2", 2)).build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", partitionKey, FileFormat.JSON)
.projectionColumns(projectedColumns)
.scanRange(new ScanRange(null, null, false, false))
.build();

ExportOptionsValidator.validate(exportOptions, multiplePkCkMetadata);
}

@Test
void validate_withIncompletePartitionKeyForSinglePk_ShouldThrowException() {
Key incompletePartitionKey = Key.newBuilder().build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", incompletePartitionKey, FileFormat.JSON).build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(
CoreError.DATA_LOADER_INCOMPLETE_PARTITION_KEY.buildMessage(
singlePkCkMetadata.getPartitionKeyNames()));
}

@Test
void validate_withIncompletePartitionKeyForMultiplePks_ShouldThrowException() {
Key incompletePartitionKey = Key.newBuilder().add(IntColumn.of("pk1", 1)).build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", incompletePartitionKey, FileFormat.JSON).build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, multiplePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(
CoreError.DATA_LOADER_INCOMPLETE_PARTITION_KEY.buildMessage(
multiplePkCkMetadata.getPartitionKeyNames()));
}

@Test
void validate_withInvalidProjectionColumn_ShouldThrowException() {
ExportOptions exportOptions =
ExportOptions.builder(
"test",
"sample",
Key.newBuilder().add(IntColumn.of("pk1", 1)).build(),
FileFormat.JSON)
.projectionColumns(Collections.singletonList("invalid_column"))
.build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(CoreError.DATA_LOADER_INVALID_PROJECTION.buildMessage("invalid_column"));
}

@Test
void validate_withInvalidClusteringKeyInScanRange_ShouldThrowException() {
ScanRange scanRange =
new ScanRange(
Key.newBuilder().add(TextColumn.of("invalid_ck", "value")).build(),
Key.newBuilder().add(TextColumn.of("ck1", "value")).build(),
false,
false);

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", createValidPartitionKey(), FileFormat.JSON)
.scanRange(scanRange)
.build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(CoreError.DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH.buildMessage("[ck1]"));
}

@Test
void validate_withInvalidPartitionKeyOrder_ShouldThrowException() {
// Partition key names are expected to be "pk1", "pk2"
LinkedHashSet<String> partitionKeyNames = new LinkedHashSet<>();
partitionKeyNames.add("pk1");
partitionKeyNames.add("pk2");

// Create a partition key with reversed order, expecting an error
Key invalidPartitionKey =
Key.newBuilder()
.add(IntColumn.of("pk2", 2)) // Incorrect order
.add(IntColumn.of("pk1", 1)) // Incorrect order
.build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", invalidPartitionKey, FileFormat.JSON)
.projectionColumns(projectedColumns)
.scanRange(new ScanRange(null, null, false, false))
.build();

// Verify that the validator throws the correct exception
assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, multiplePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(
CoreError.DATA_LOADER_PARTITION_KEY_ORDER_MISMATCH.buildMessage(partitionKeyNames));
}

private Key createValidPartitionKey() {
return Key.newBuilder().add(IntColumn.of("pk1", 1)).build();
}
}

0 comments on commit 227ec48

Please sign in to comment.