From 753618b5f2173ba05b05d1e5218eefa78e564762 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 4 Dec 2024 15:59:38 +0530 Subject: [PATCH 01/20] Util classes for data loader --- data-loader/build.gradle | 11 + .../db/dataloader/core/ErrorMessage.java | 62 +++++ .../core/exception/Base64Exception.java | 14 ++ .../dataloader/core/util/CollectionUtil.java | 19 ++ .../db/dataloader/core/util/CsvUtil.java | 17 ++ .../db/dataloader/core/util/DebugUtil.java | 30 +++ .../db/dataloader/core/util/DecimalUtil.java | 40 ++++ .../db/dataloader/core/util/PathUtil.java | 24 ++ .../db/dataloader/core/util/RuntimeUtil.java | 21 ++ .../db/dataloader/core/UnitTestUtils.java | 226 ++++++++++++++++++ .../core/util/CollectionUtilTest.java | 28 +++ .../db/dataloader/core/util/CsvUtilTest.java | 25 ++ .../dataloader/core/util/DecimalUtilTest.java | 20 ++ .../db/dataloader/core/util/PathUtilTest.java | 45 ++++ .../dataloader/core/util/RuntimeUtilTest.java | 24 ++ 15 files changed, 606 insertions(+) create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/exception/Base64Exception.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CsvUtil.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CollectionUtilTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CsvUtilTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/DecimalUtilTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java diff --git a/data-loader/build.gradle b/data-loader/build.gradle index f633095150..3be6a49e81 100644 --- a/data-loader/build.gradle +++ b/data-loader/build.gradle @@ -1,4 +1,8 @@ subprojects { + + ext { + jacksonVersion = '2.17.0' + } group = "scalardb.dataloader" dependencies { // AssertJ @@ -13,6 +17,7 @@ subprojects { // Apache Commons implementation("org.apache.commons:commons-lang3:${commonsLangVersion}") implementation("commons-io:commons-io:${commonsIoVersion}") + implementation("org.slf4j:slf4j-simple:${slf4jVersion}") // Mockito testImplementation "org.mockito:mockito-core:${mockitoVersion}" @@ -24,5 +29,11 @@ subprojects { annotationProcessor "org.projectlombok:lombok:${lombokVersion}" testCompileOnly "org.projectlombok:lombok:${lombokVersion}" testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}" + + // Jackson + implementation("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}") + implementation("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}") + } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java new file mode 100644 index 0000000000..395385467e --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java @@ -0,0 +1,62 @@ +package com.scalar.db.dataloader.core; + +public class ErrorMessage { + public static final String ERROR_MISSING_NAMESPACE_OR_TABLE = + "the provided namespace '%s' and/or table name '%s' is incorrect and could not be found"; + public static final String ERROR_MISSING_COLUMN = "missing field or column mapping for %s"; + public static final String ERROR_MISSING_PARTITION_KEY_COLUMN = + "missing required field or column mapping for partition key %s"; + public static final String ERROR_MISSING_CLUSTERING_KEY_COLUMN = + "missing required field or column mapping for clustering key %s"; + public static final String ERROR_CRUD_EXCEPTION = + "something went wrong while trying to save the data"; + public static final String ERROR_DATA_ALREADY_EXISTS = "record already exists"; + public static final String ERROR_DATA_NOT_FOUND = "record was not found"; + public static final String ERROR_CONTROL_FILE_MISSING_DATA_MAPPINGS = + "the control file is missing data mappings"; + public static final String ERROR_TARGET_COLUMN_NOT_FOUND = + "The target column '%s' for source field '%s' could not be found in table '%s'"; + public static final String ERROR_MISSING_PARTITION_KEY = + "The required partition key '%s' is missing in the control file mapping for table '%s'"; + public static final String ERROR_MISSING_CLUSTERING_KEY = + "The required clustering key '%s' is missing in the control file mapping for table '%s'"; + public static final String ERROR_MISSING_SOURCE_FIELD = + "the data mapping source field '%s' for table '%s' is missing in the json data record"; + public static final String ERROR_DUPLICATE_DATA_MAPPINGS = + "Duplicate data mappings found for table '%s' in the control file"; + public static final String ERROR_MISSING_COLUMN_MAPPING = + "No mapping found for column '%s' in table '%s' in the control file. \nControl file validation set at 'FULL'. All columns need to be mapped."; + public static final String ERROR_MULTIPLE_MAPPINGS_FOR_COLUMN_FOUND = + "Multiple data mappings found for column '%s' in table '%s'"; + public static final String ERROR_METHOD_NULL_ARGUMENT = "Method null argument not allowed"; + public static final String ERROR_COULD_NOT_FIND_PARTITION_KEY = + "could not find the partition key"; + public static final String ERROR_METADATA_OR_DATA_TYPES_NOT_FOUND = + "no table meta data or a data type map was found for %s.%s"; + public static final String ERROR_EMPTY_SOURCE_ROW = + "The source record data was undefined or empty"; + public static final String ERROR_UPSERT_INSERT_MISSING_COLUMNS = + "The source record needs to contain all fields if the UPSERT turns into an INSERT"; + public static final String ERROR_SCAN_FAILED = "Could not complete the scan"; + public static final String ERROR_UNKNOWN_TRANSACTION_STATUS = + "Error : the transaction to retrieve the account is in an unknown state"; + public static final String ERROR_INVALID_PROJECTION = "The column '%s' was not found"; + public static final String ERROR_SCAN = + "Something went wrong while scanning. Are you sure you are running in the correct transaction mode?"; + public static final String ERROR_CLUSTERING_KEY_NOT_FOUND = + "The provided clustering key %s was not found"; + public static final String ERROR_KEY_NOT_FOUND = "The key '%s' could not be found"; + public static final String ERROR_KEY_FORMATTING = + "They provided key '%s is not formatted correctly. Expected format is field=value."; + public static final String ERROR_SORT_FORMATTING = + "They provided sort '%s is not formatted correctly. Expected format is field=asc|desc."; + public static final String ERROR_VALUE_TO_STRING_CONVERSION_FAILED = + "Something went wrong while converting the ScalarDB values to strings. The table metadata and Value datatype probably do not match."; + + public static final String ERROR_BASE64_ENCODING = + "Invalid base64 encoding for blob value for column %s"; + public static final String ERROR_NUMBER_FORMAT_EXCEPTION = + "Invalid number specified for column %s"; + public static final String ERROR_NULL_POINTER_EXCEPTION = + "The %s column does not support a null value"; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/exception/Base64Exception.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/exception/Base64Exception.java new file mode 100644 index 0000000000..9cf94854c0 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/exception/Base64Exception.java @@ -0,0 +1,14 @@ +package com.scalar.db.dataloader.core.exception; + +/** Exception thrown when an error occurs while trying to encode or decode base64 values. */ +public class Base64Exception extends Exception { + + /** + * Class constructor + * + * @param message Exception message + */ + public Base64Exception(String message) { + super(message); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java new file mode 100644 index 0000000000..184e29b7f3 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java @@ -0,0 +1,19 @@ +package com.scalar.db.dataloader.core.util; + +import java.util.Collection; + +/** Utils for collection classes */ +public class CollectionUtil { + + /** + * Check if lists are of same length + * + * @param collections List of collections + * @return collections are same length or not + */ + public static boolean areSameLength(Collection... collections) { + int N = collections[0].size(); + for (Collection a : collections) if (a.size() != N) return false; + return true; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CsvUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CsvUtil.java new file mode 100644 index 0000000000..9979ce58ca --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CsvUtil.java @@ -0,0 +1,17 @@ +package com.scalar.db.dataloader.core.util; + +/** Utils for csv data manipulation */ +public class CsvUtil { + + /** + * Remove the last character in the string builder if it's a delimiter + * + * @param stringBuilder String builder instance + * @param delimiter Delimiter character used in the CSV content + */ + public static void removeTrailingDelimiter(StringBuilder stringBuilder, String delimiter) { + if (stringBuilder.substring(stringBuilder.length() - 1).equals(delimiter)) { + stringBuilder.setLength(stringBuilder.length() - 1); + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java new file mode 100644 index 0000000000..cde28ca33e --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java @@ -0,0 +1,30 @@ +package com.scalar.db.dataloader.core.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DebugUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(DebugUtil.class); + + /** + * log memory usage + * + * @param stage stage of process + */ + public static void logMemoryUsage(String stage) { + Runtime runtime = Runtime.getRuntime(); + long usedMemory = runtime.totalMemory() - runtime.freeMemory(); + long maxMemory = runtime.maxMemory(); + + LOGGER.info( + "Memory usage at {}: Used Memory = {} MB, Max Memory = {} MB", + stage, + formatMemorySize(usedMemory), + formatMemorySize(maxMemory)); + } + + private static String formatMemorySize(long size) { + return String.format("%.2f", size / (1024.0 * 1024.0)); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java new file mode 100644 index 0000000000..8372dc8aac --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java @@ -0,0 +1,40 @@ +package com.scalar.db.dataloader.core.util; + +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.util.Locale; + +/** Utils for decimal handling */ +public class DecimalUtil { + + /** + * Convert a Double to a non-scientific formatted string + * + * @param doubleValue Double value + * @return formatted double as a string + */ + public static String convertToNonScientific(Double doubleValue) { + return createFormatter().format(doubleValue); + } + + /** + * Convert a Float to a non-scientific formatted string + * + * @param floatValue Float value + * @return formatted float as a string + */ + public static String convertToNonScientific(Float floatValue) { + return createFormatter().format(floatValue); + } + + /** + * Create a Decimal formatter + * + * @return decimal formatter instance + */ + private static DecimalFormat createFormatter() { + DecimalFormat df = new DecimalFormat("0", DecimalFormatSymbols.getInstance(Locale.ENGLISH)); + df.setMaximumFractionDigits(340); // 340 = DecimalFormat.DOUBLE_FRACTION_DIGITS + return df; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java new file mode 100644 index 0000000000..c0bd226e45 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java @@ -0,0 +1,24 @@ +package com.scalar.db.dataloader.core.util; + +public class PathUtil { + + /** + * Ensures the specified path has a trailing slash. + * + *

java.nio.file.Path is not used because this is also used for virtual paths. + * + * @param path the path + * @return the path with a trailing slash + */ + public static String ensureTrailingSlash(String path) { + if (path == null || path.isEmpty()) { + return ""; + } + + if (!path.endsWith("/")) { + return path + "/"; + } + + return path; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java new file mode 100644 index 0000000000..a5de36d34c --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java @@ -0,0 +1,21 @@ +package com.scalar.db.dataloader.core.util; + +import static com.scalar.db.dataloader.core.ErrorMessage.ERROR_METHOD_NULL_ARGUMENT; + +/** Utils for runtime checks */ +public class RuntimeUtil { + + /** + * Argument null check + * + * @param values List of arguments + * @throws NullPointerException when one of the arguments is null + */ + public static void checkNotNull(Object... values) { + for (Object value : values) { + if (value == null) { + throw new NullPointerException(ERROR_METHOD_NULL_ARGUMENT); + } + } + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java new file mode 100644 index 0000000000..b915b64af8 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java @@ -0,0 +1,226 @@ +package com.scalar.db.dataloader.core; + +import static com.scalar.db.io.DataType.BIGINT; +import static com.scalar.db.io.DataType.BLOB; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.util.DecimalUtil; +import com.scalar.db.io.BigIntColumn; +import com.scalar.db.io.BlobColumn; +import com.scalar.db.io.BooleanColumn; +import com.scalar.db.io.Column; +import com.scalar.db.io.DataType; +import com.scalar.db.io.DoubleColumn; +import com.scalar.db.io.FloatColumn; +import com.scalar.db.io.IntColumn; +import com.scalar.db.io.TextColumn; +import com.scalar.db.transaction.consensuscommit.Attribute; +import java.util.*; + +/** Utils for the service unit tests */ +public class UnitTestUtils { + public static final String TEST_NAMESPACE = "namespace"; + public static final String TEST_TABLE_NAME = "table"; + public static final String TEST_COLUMN_1_PK = "col1"; + public static final String TEST_COLUMN_2_CK = "col2"; + public static final String TEST_COLUMN_3_CK = "col3"; + public static final String TEST_COLUMN_4 = "col4"; + public static final String TEST_COLUMN_5 = "col5"; + public static final String TEST_COLUMN_6 = "col6"; + public static final String TEST_COLUMN_7 = "col7"; + + public static final String TEST_VALUE_TEXT = "test value"; + + public static final String TEST_VALUE_BLOB_STRING = "blob test value"; + public static final byte[] TEST_VALUE_BLOB = TEST_VALUE_BLOB_STRING.getBytes(); + public static final String TEST_VALUE_BLOB_BASE64 = + new String(Base64.getEncoder().encode(TEST_VALUE_BLOB)); + public static final String TEST_VALUE_TX_ID = "txt value 464654654"; + public static final Float TEST_VALUE_FLOAT = Float.MIN_VALUE; + public static final int TEST_VALUE_INT = Integer.MAX_VALUE; + public static final Long TEST_VALUE_LONG = BigIntColumn.MAX_VALUE; + public static final boolean TEST_VALUE_BOOLEAN = true; + public static final double TEST_VALUE_DOUBLE = Double.MIN_VALUE; + public static final String TEST_CSV_DELIMITER = ";"; + + public static TableMetadata createTestTableMetadata() { + return TableMetadata.newBuilder() + .addColumn(TEST_COLUMN_1_PK, BIGINT) + .addColumn(TEST_COLUMN_2_CK, DataType.INT) + .addColumn(TEST_COLUMN_3_CK, DataType.BOOLEAN) + .addColumn(TEST_COLUMN_4, DataType.FLOAT) + .addColumn(TEST_COLUMN_5, DataType.DOUBLE) + .addColumn(TEST_COLUMN_6, DataType.TEXT) + .addColumn(TEST_COLUMN_7, BLOB) + .addColumn(Attribute.BEFORE_PREFIX + TEST_COLUMN_4, DataType.FLOAT) + .addColumn(Attribute.BEFORE_PREFIX + TEST_COLUMN_5, DataType.DOUBLE) + .addColumn(Attribute.BEFORE_PREFIX + TEST_COLUMN_6, DataType.TEXT) + .addColumn(Attribute.BEFORE_PREFIX + TEST_COLUMN_7, BLOB) + .addColumn(Attribute.ID, DataType.TEXT) + .addColumn(Attribute.STATE, DataType.INT) + .addColumn(Attribute.VERSION, DataType.INT) + .addColumn(Attribute.PREPARED_AT, BIGINT) + .addColumn(Attribute.COMMITTED_AT, BIGINT) + .addColumn(Attribute.BEFORE_ID, DataType.TEXT) + .addColumn(Attribute.BEFORE_STATE, DataType.INT) + .addColumn(Attribute.BEFORE_VERSION, DataType.INT) + .addColumn(Attribute.BEFORE_PREPARED_AT, BIGINT) + .addColumn(Attribute.BEFORE_COMMITTED_AT, BIGINT) + .addPartitionKey(TEST_COLUMN_1_PK) + .addClusteringKey(TEST_COLUMN_2_CK) + .addClusteringKey(TEST_COLUMN_3_CK) + .build(); + } + + public static ObjectNode getOutputDataWithMetadata() { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode rootNode = mapper.createObjectNode(); + rootNode.put(TEST_COLUMN_1_PK, TEST_VALUE_LONG); + rootNode.put(TEST_COLUMN_2_CK, TEST_VALUE_INT); + rootNode.put(TEST_COLUMN_3_CK, TEST_VALUE_BOOLEAN); + rootNode.put(TEST_COLUMN_4, TEST_VALUE_FLOAT); + rootNode.put(TEST_COLUMN_5, TEST_VALUE_DOUBLE); + rootNode.put(TEST_COLUMN_6, TEST_VALUE_TEXT); + rootNode.put(TEST_COLUMN_7, TEST_VALUE_BLOB); + rootNode.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_4, TEST_VALUE_FLOAT); + rootNode.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_5, TEST_VALUE_DOUBLE); + rootNode.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_6, TEST_VALUE_TEXT); + rootNode.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_7, TEST_VALUE_BLOB); + rootNode.put(Attribute.ID, TEST_VALUE_TX_ID); + rootNode.put(Attribute.STATE, TEST_VALUE_INT); + rootNode.put(Attribute.VERSION, TEST_VALUE_INT); + rootNode.put(Attribute.PREPARED_AT, TEST_VALUE_LONG); + rootNode.put(Attribute.COMMITTED_AT, TEST_VALUE_LONG); + rootNode.put(Attribute.BEFORE_ID, TEST_VALUE_TEXT); + rootNode.put(Attribute.BEFORE_STATE, TEST_VALUE_INT); + rootNode.put(Attribute.BEFORE_VERSION, TEST_VALUE_INT); + rootNode.put(Attribute.BEFORE_PREPARED_AT, TEST_VALUE_LONG); + rootNode.put(Attribute.BEFORE_COMMITTED_AT, TEST_VALUE_LONG); + return rootNode; + } + + public static ObjectNode getOutputDataWithoutMetadata() { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode rootNode = mapper.createObjectNode(); + rootNode.put(TEST_COLUMN_1_PK, TEST_VALUE_LONG); + rootNode.put(TEST_COLUMN_2_CK, TEST_VALUE_INT); + rootNode.put(TEST_COLUMN_3_CK, TEST_VALUE_BOOLEAN); + rootNode.put(TEST_COLUMN_4, TEST_VALUE_FLOAT); + rootNode.put(TEST_COLUMN_5, TEST_VALUE_DOUBLE); + rootNode.put(TEST_COLUMN_6, TEST_VALUE_TEXT); + rootNode.put(TEST_COLUMN_7, TEST_VALUE_BLOB); + return rootNode; + } + + public static List getColumnsListOfMetadata() { + List projectedColumns = new ArrayList<>(); + projectedColumns.add(TEST_COLUMN_1_PK); + projectedColumns.add(TEST_COLUMN_2_CK); + projectedColumns.add(TEST_COLUMN_3_CK); + projectedColumns.add(TEST_COLUMN_4); + projectedColumns.add(TEST_COLUMN_5); + projectedColumns.add(TEST_COLUMN_6); + projectedColumns.add(TEST_COLUMN_7); + projectedColumns.add(Attribute.BEFORE_PREFIX + TEST_COLUMN_4); + projectedColumns.add(Attribute.BEFORE_PREFIX + TEST_COLUMN_5); + projectedColumns.add(Attribute.BEFORE_PREFIX + TEST_COLUMN_6); + projectedColumns.add(Attribute.BEFORE_PREFIX + TEST_COLUMN_7); + projectedColumns.add(Attribute.ID); + projectedColumns.add(Attribute.STATE); + projectedColumns.add(Attribute.VERSION); + projectedColumns.add(Attribute.PREPARED_AT); + projectedColumns.add(Attribute.COMMITTED_AT); + projectedColumns.add(Attribute.BEFORE_ID); + projectedColumns.add(Attribute.BEFORE_STATE); + projectedColumns.add(Attribute.BEFORE_VERSION); + projectedColumns.add(Attribute.BEFORE_PREPARED_AT); + projectedColumns.add(Attribute.BEFORE_COMMITTED_AT); + return projectedColumns; + } + + public static Map getColumnData() { + Map columnData = new HashMap<>(); + columnData.put(TEST_COLUMN_1_PK, BIGINT); + columnData.put(TEST_COLUMN_2_CK, DataType.INT); + columnData.put(TEST_COLUMN_3_CK, DataType.BOOLEAN); + columnData.put(TEST_COLUMN_4, DataType.FLOAT); + columnData.put(TEST_COLUMN_5, DataType.DOUBLE); + columnData.put(TEST_COLUMN_6, DataType.TEXT); + columnData.put(TEST_COLUMN_7, BLOB); + columnData.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_4, DataType.FLOAT); + columnData.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_5, DataType.DOUBLE); + columnData.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_6, DataType.TEXT); + columnData.put(Attribute.BEFORE_PREFIX + TEST_COLUMN_7, BLOB); + columnData.put(Attribute.ID, DataType.TEXT); + columnData.put(Attribute.STATE, DataType.INT); + columnData.put(Attribute.VERSION, DataType.INT); + columnData.put(Attribute.PREPARED_AT, BIGINT); + columnData.put(Attribute.COMMITTED_AT, BIGINT); + columnData.put(Attribute.BEFORE_ID, DataType.TEXT); + columnData.put(Attribute.BEFORE_STATE, DataType.INT); + columnData.put(Attribute.BEFORE_VERSION, DataType.INT); + columnData.put(Attribute.BEFORE_PREPARED_AT, BIGINT); + columnData.put(Attribute.BEFORE_COMMITTED_AT, BIGINT); + return columnData; + } + + public static Map> createTestValues() { + Map> values = new HashMap<>(); + values.put(TEST_COLUMN_1_PK, BigIntColumn.of(TEST_COLUMN_1_PK, TEST_VALUE_LONG)); + values.put(TEST_COLUMN_2_CK, IntColumn.of(TEST_COLUMN_2_CK, TEST_VALUE_INT)); + values.put(TEST_COLUMN_3_CK, BooleanColumn.of(TEST_COLUMN_3_CK, TEST_VALUE_BOOLEAN)); + values.put(TEST_COLUMN_4, FloatColumn.of(TEST_COLUMN_4, TEST_VALUE_FLOAT)); + values.put(TEST_COLUMN_5, DoubleColumn.of(TEST_COLUMN_5, TEST_VALUE_DOUBLE)); + values.put(TEST_COLUMN_6, TextColumn.of(TEST_COLUMN_6, TEST_VALUE_TEXT)); + values.put(TEST_COLUMN_7, BlobColumn.of(TEST_COLUMN_7, TEST_VALUE_BLOB)); + values.put( + Attribute.BEFORE_PREFIX + TEST_COLUMN_4, + FloatColumn.of(Attribute.BEFORE_PREFIX + TEST_COLUMN_4, TEST_VALUE_FLOAT)); + values.put( + Attribute.BEFORE_PREFIX + TEST_COLUMN_5, + DoubleColumn.of(Attribute.BEFORE_PREFIX + TEST_COLUMN_5, TEST_VALUE_DOUBLE)); + values.put( + Attribute.BEFORE_PREFIX + TEST_COLUMN_6, + TextColumn.of(Attribute.BEFORE_PREFIX + TEST_COLUMN_6, TEST_VALUE_TEXT)); + values.put( + Attribute.BEFORE_PREFIX + TEST_COLUMN_7, + BlobColumn.of(Attribute.BEFORE_PREFIX + TEST_COLUMN_7, TEST_VALUE_BLOB)); + values.put(Attribute.ID, TextColumn.of(Attribute.ID, TEST_VALUE_TX_ID)); + values.put(Attribute.STATE, IntColumn.of(Attribute.STATE, TEST_VALUE_INT)); + values.put(Attribute.VERSION, IntColumn.of(Attribute.VERSION, TEST_VALUE_INT)); + values.put(Attribute.PREPARED_AT, BigIntColumn.of(Attribute.PREPARED_AT, TEST_VALUE_LONG)); + values.put(Attribute.COMMITTED_AT, BigIntColumn.of(Attribute.COMMITTED_AT, TEST_VALUE_LONG)); + values.put(Attribute.BEFORE_ID, TextColumn.of(Attribute.BEFORE_ID, TEST_VALUE_TEXT)); + values.put(Attribute.BEFORE_STATE, IntColumn.of(Attribute.BEFORE_STATE, TEST_VALUE_INT)); + values.put(Attribute.BEFORE_VERSION, IntColumn.of(Attribute.BEFORE_VERSION, TEST_VALUE_INT)); + values.put( + Attribute.BEFORE_PREPARED_AT, + BigIntColumn.of(Attribute.BEFORE_PREPARED_AT, TEST_VALUE_LONG)); + values.put( + Attribute.BEFORE_COMMITTED_AT, + BigIntColumn.of(Attribute.BEFORE_COMMITTED_AT, TEST_VALUE_LONG)); + return values; + } + + public static String getSourceTestValue(DataType dataType) { + switch (dataType) { + case INT: + return Integer.toString(TEST_VALUE_INT); + case BIGINT: + return Long.toString(TEST_VALUE_LONG); + case FLOAT: + return DecimalUtil.convertToNonScientific(TEST_VALUE_FLOAT); + case DOUBLE: + return DecimalUtil.convertToNonScientific(TEST_VALUE_DOUBLE); + case BLOB: + return TEST_VALUE_BLOB_BASE64; + case BOOLEAN: + return Boolean.toString(TEST_VALUE_BOOLEAN); + case TEXT: + default: + return TEST_VALUE_TEXT; + } + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CollectionUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CollectionUtilTest.java new file mode 100644 index 0000000000..b054a55cef --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CollectionUtilTest.java @@ -0,0 +1,28 @@ +package com.scalar.db.dataloader.core.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +/** Unit tests for CollectionUtils */ +class CollectionUtilTest { + + @Test + void areSameLength_CollectionsAllSameLength_ShouldReturnTrue() { + List listOne = new ArrayList<>(); + List listTwo = new ArrayList<>(); + boolean actual = CollectionUtil.areSameLength(listOne, listTwo); + assertThat(actual).isTrue(); + } + + @Test + void areSameLength_CollectionsDifferentLength_ShouldReturnFalse() { + List listOne = new ArrayList<>(); + List listTwo = new ArrayList<>(); + listTwo.add(5); + boolean actual = CollectionUtil.areSameLength(listOne, listTwo); + assertThat(actual).isFalse(); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CsvUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CsvUtilTest.java new file mode 100644 index 0000000000..2afcfbcbe8 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/CsvUtilTest.java @@ -0,0 +1,25 @@ +package com.scalar.db.dataloader.core.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +/** Unit tests for CsvUtils */ +class CsvUtilTest { + + @Test + void removeTrailingDelimiter_HasTrailingDelimiter_ShouldRemoveDelimiter() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("testing;"); + CsvUtil.removeTrailingDelimiter(stringBuilder, ";"); + assertThat(stringBuilder.toString()).isEqualTo("testing"); + } + + @Test + void removeTrailingDelimiter_DoesNotHaveTrailingDelimiter_ShouldNotRemoveAnything() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("testing"); + CsvUtil.removeTrailingDelimiter(stringBuilder, ";"); + assertThat(stringBuilder.toString()).isEqualTo("testing"); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/DecimalUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/DecimalUtilTest.java new file mode 100644 index 0000000000..c99ad0866d --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/DecimalUtilTest.java @@ -0,0 +1,20 @@ +package com.scalar.db.dataloader.core.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class DecimalUtilTest { + @Test + void convertToNonScientific_withValidDoubleValue_shouldReturnProperStringValue() { + String expectedValue = "340.55"; + Double value = 340.55; + Assertions.assertEquals(expectedValue, DecimalUtil.convertToNonScientific(value)); + } + + @Test + void convertToNonScientific_withValidFloatValue_shouldReturnProperStringValue() { + String expectedValue = "356"; + Float value = 356F; + Assertions.assertEquals(expectedValue, DecimalUtil.convertToNonScientific(value)); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java new file mode 100644 index 0000000000..0db7c4edff --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java @@ -0,0 +1,45 @@ +package com.scalar.db.dataloader.core.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class PathUtilTest { + + @Test + void ensureTrailingSlash_nullPath_returnsEmptyString() { + String path = null; + String result = PathUtil.ensureTrailingSlash(path); + Assertions.assertEquals("", result); + } + + @Test + void ensureTrailingSlash_emptyPath_returnsEmptyString() { + String path = ""; + String result = PathUtil.ensureTrailingSlash(path); + Assertions.assertEquals("", result); + } + + @Test + void ensureTrailingSlash_pathWithoutTrailingSlash_addsTrailingSlash() { + String path = "/path/to/directory"; + String expectedResult = "/path/to/directory/"; + String result = PathUtil.ensureTrailingSlash(path); + Assertions.assertEquals(expectedResult, result); + } + + @Test + void ensureTrailingSlash_pathWithTrailingSlash_returnsOriginalPath() { + String path = "/path/to/directory/"; + String expectedResult = "/path/to/directory/"; + String result = PathUtil.ensureTrailingSlash(path); + Assertions.assertEquals(expectedResult, result); + } + + @Test + void ensureTrailingSlash_virtualPath_addsTrailingSlash() { + String path = "s3://bucket/path"; + String expectedResult = "s3://bucket/path/"; + String result = PathUtil.ensureTrailingSlash(path); + Assertions.assertEquals(expectedResult, result); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java new file mode 100644 index 0000000000..32659fe1cf --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java @@ -0,0 +1,24 @@ +package com.scalar.db.dataloader.core.util; + +import static com.scalar.db.dataloader.core.ErrorMessage.ERROR_METHOD_NULL_ARGUMENT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +/** RuntimeUtils unit tests */ +public class RuntimeUtilTest { + + @Test + public void checkNotNull_HasNullValues_ShouldThrowException() { + assertThatThrownBy(() -> RuntimeUtil.checkNotNull(null, null)) + .isExactlyInstanceOf(NullPointerException.class) + .hasMessage(ERROR_METHOD_NULL_ARGUMENT); + } + + @Test + public void checkNotNull_HasNoNullValues_ShouldNotThrowException() { + String string = "1"; + Object object = new Object(); + RuntimeUtil.checkNotNull(string, object); + } +} From 8d39d02677c53fe747492f6f3ddcd343bcbb05a6 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 4 Dec 2024 17:55:22 +0530 Subject: [PATCH 02/20] Fix spotbug issue --- .../java/com/scalar/db/dataloader/core/UnitTestUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java index b915b64af8..bf4b4414af 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java @@ -17,6 +17,7 @@ import com.scalar.db.io.IntColumn; import com.scalar.db.io.TextColumn; import com.scalar.db.transaction.consensuscommit.Attribute; +import java.nio.charset.StandardCharsets; import java.util.*; /** Utils for the service unit tests */ @@ -34,9 +35,9 @@ public class UnitTestUtils { public static final String TEST_VALUE_TEXT = "test value"; public static final String TEST_VALUE_BLOB_STRING = "blob test value"; - public static final byte[] TEST_VALUE_BLOB = TEST_VALUE_BLOB_STRING.getBytes(); + static final byte[] TEST_VALUE_BLOB = TEST_VALUE_BLOB_STRING.getBytes(StandardCharsets.UTF_8); public static final String TEST_VALUE_BLOB_BASE64 = - new String(Base64.getEncoder().encode(TEST_VALUE_BLOB)); + new String(Base64.getEncoder().encode(TEST_VALUE_BLOB), StandardCharsets.UTF_8); public static final String TEST_VALUE_TX_ID = "txt value 464654654"; public static final Float TEST_VALUE_FLOAT = Float.MIN_VALUE; public static final int TEST_VALUE_INT = Integer.MAX_VALUE; From bf94c495a8f5b4834d31aca6f15aa42dabcf9df1 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 6 Dec 2024 16:00:01 +0530 Subject: [PATCH 03/20] Removed error message and added core error --- .../com/scalar/db/common/error/CoreError.java | 6 ++ .../db/dataloader/core/ErrorMessage.java | 62 ------------------- .../db/dataloader/core/util/RuntimeUtil.java | 4 +- 3 files changed, 8 insertions(+), 64 deletions(-) delete mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 4325ef0090..bb54ba6a9f 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -688,6 +688,12 @@ public enum CoreError implements ScalarDbError { "Invalid number specified for column %s in table %s in namespace %s", "", ""), + DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT( + Category.USER_ERROR, + "0151", + "Method null argument not allowed", + "", + ""), // // Errors for the concurrency error category diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java deleted file mode 100644 index 395385467e..0000000000 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.scalar.db.dataloader.core; - -public class ErrorMessage { - public static final String ERROR_MISSING_NAMESPACE_OR_TABLE = - "the provided namespace '%s' and/or table name '%s' is incorrect and could not be found"; - public static final String ERROR_MISSING_COLUMN = "missing field or column mapping for %s"; - public static final String ERROR_MISSING_PARTITION_KEY_COLUMN = - "missing required field or column mapping for partition key %s"; - public static final String ERROR_MISSING_CLUSTERING_KEY_COLUMN = - "missing required field or column mapping for clustering key %s"; - public static final String ERROR_CRUD_EXCEPTION = - "something went wrong while trying to save the data"; - public static final String ERROR_DATA_ALREADY_EXISTS = "record already exists"; - public static final String ERROR_DATA_NOT_FOUND = "record was not found"; - public static final String ERROR_CONTROL_FILE_MISSING_DATA_MAPPINGS = - "the control file is missing data mappings"; - public static final String ERROR_TARGET_COLUMN_NOT_FOUND = - "The target column '%s' for source field '%s' could not be found in table '%s'"; - public static final String ERROR_MISSING_PARTITION_KEY = - "The required partition key '%s' is missing in the control file mapping for table '%s'"; - public static final String ERROR_MISSING_CLUSTERING_KEY = - "The required clustering key '%s' is missing in the control file mapping for table '%s'"; - public static final String ERROR_MISSING_SOURCE_FIELD = - "the data mapping source field '%s' for table '%s' is missing in the json data record"; - public static final String ERROR_DUPLICATE_DATA_MAPPINGS = - "Duplicate data mappings found for table '%s' in the control file"; - public static final String ERROR_MISSING_COLUMN_MAPPING = - "No mapping found for column '%s' in table '%s' in the control file. \nControl file validation set at 'FULL'. All columns need to be mapped."; - public static final String ERROR_MULTIPLE_MAPPINGS_FOR_COLUMN_FOUND = - "Multiple data mappings found for column '%s' in table '%s'"; - public static final String ERROR_METHOD_NULL_ARGUMENT = "Method null argument not allowed"; - public static final String ERROR_COULD_NOT_FIND_PARTITION_KEY = - "could not find the partition key"; - public static final String ERROR_METADATA_OR_DATA_TYPES_NOT_FOUND = - "no table meta data or a data type map was found for %s.%s"; - public static final String ERROR_EMPTY_SOURCE_ROW = - "The source record data was undefined or empty"; - public static final String ERROR_UPSERT_INSERT_MISSING_COLUMNS = - "The source record needs to contain all fields if the UPSERT turns into an INSERT"; - public static final String ERROR_SCAN_FAILED = "Could not complete the scan"; - public static final String ERROR_UNKNOWN_TRANSACTION_STATUS = - "Error : the transaction to retrieve the account is in an unknown state"; - public static final String ERROR_INVALID_PROJECTION = "The column '%s' was not found"; - public static final String ERROR_SCAN = - "Something went wrong while scanning. Are you sure you are running in the correct transaction mode?"; - public static final String ERROR_CLUSTERING_KEY_NOT_FOUND = - "The provided clustering key %s was not found"; - public static final String ERROR_KEY_NOT_FOUND = "The key '%s' could not be found"; - public static final String ERROR_KEY_FORMATTING = - "They provided key '%s is not formatted correctly. Expected format is field=value."; - public static final String ERROR_SORT_FORMATTING = - "They provided sort '%s is not formatted correctly. Expected format is field=asc|desc."; - public static final String ERROR_VALUE_TO_STRING_CONVERSION_FAILED = - "Something went wrong while converting the ScalarDB values to strings. The table metadata and Value datatype probably do not match."; - - public static final String ERROR_BASE64_ENCODING = - "Invalid base64 encoding for blob value for column %s"; - public static final String ERROR_NUMBER_FORMAT_EXCEPTION = - "Invalid number specified for column %s"; - public static final String ERROR_NULL_POINTER_EXCEPTION = - "The %s column does not support a null value"; -} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java index a5de36d34c..0d967cf719 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java @@ -1,6 +1,6 @@ package com.scalar.db.dataloader.core.util; -import static com.scalar.db.dataloader.core.ErrorMessage.ERROR_METHOD_NULL_ARGUMENT; +import static com.scalar.db.common.error.CoreError.DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT; /** Utils for runtime checks */ public class RuntimeUtil { @@ -14,7 +14,7 @@ public class RuntimeUtil { public static void checkNotNull(Object... values) { for (Object value : values) { if (value == null) { - throw new NullPointerException(ERROR_METHOD_NULL_ARGUMENT); + throw new NullPointerException(DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT.getMessage()); } } } From 47be388a02af373d08bc373f35d4743836f8e709 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 6 Dec 2024 16:09:57 +0530 Subject: [PATCH 04/20] Applied spotless --- .../src/main/java/com/scalar/db/common/error/CoreError.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index bb54ba6a9f..b02b3c45a6 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -689,11 +689,7 @@ public enum CoreError implements ScalarDbError { "", ""), DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT( - Category.USER_ERROR, - "0151", - "Method null argument not allowed", - "", - ""), + Category.USER_ERROR, "0151", "Method null argument not allowed", "", ""), // // Errors for the concurrency error category From 913eb1c069e452ff5da752fb35c8a6b50e49d758 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 6 Dec 2024 16:19:21 +0530 Subject: [PATCH 05/20] Fixed unit test failures --- .../com/scalar/db/dataloader/core/util/RuntimeUtilTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java index 32659fe1cf..fc8d281cb9 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java @@ -1,6 +1,6 @@ package com.scalar.db.dataloader.core.util; -import static com.scalar.db.dataloader.core.ErrorMessage.ERROR_METHOD_NULL_ARGUMENT; +import static com.scalar.db.common.error.CoreError.DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import org.junit.jupiter.api.Test; @@ -12,7 +12,7 @@ public class RuntimeUtilTest { public void checkNotNull_HasNullValues_ShouldThrowException() { assertThatThrownBy(() -> RuntimeUtil.checkNotNull(null, null)) .isExactlyInstanceOf(NullPointerException.class) - .hasMessage(ERROR_METHOD_NULL_ARGUMENT); + .hasMessage(DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT.getMessage()); } @Test From 6cfa83aa2f9d83d1afcb2f34ea102c4e10212b0f Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 11 Dec 2024 16:26:17 +0530 Subject: [PATCH 06/20] Basic data import enum and exception --- .../db/dataloader/core/dataimport/ImportMode.java | 8 ++++++++ .../ControlFileValidationException.java | 14 ++++++++++++++ .../controlfile/ControlFileValidationLevel.java | 11 +++++++++++ 3 files changed, 33 insertions(+) create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java new file mode 100644 index 0000000000..7f2a805e75 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java @@ -0,0 +1,8 @@ +package com.scalar.db.dataloader.core.dataimport; + +/** Represents the way to be imported data is handled */ +public enum ImportMode { + INSERT, + UPDATE, + UPSERT +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java new file mode 100644 index 0000000000..e4e032a4c8 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java @@ -0,0 +1,14 @@ +package com.scalar.db.dataloader.core.dataimport.controlfile; + +/** Represents the control file */ +public class ControlFileValidationException extends Exception { + + /** + * Class constructor + * + * @param message error message + */ + public ControlFileValidationException(String message) { + super(message); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java new file mode 100644 index 0000000000..3753d0ba65 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java @@ -0,0 +1,11 @@ +package com.scalar.db.dataloader.core.dataimport.controlfile; + +/** Control file validation level */ +public enum ControlFileValidationLevel { + /* All columns need to be mapped */ + FULL, + /* All partition key and clustering key columns need to be mapped */ + KEYS, + /* Only validate the columns that are mapped */ + MAPPED +} From d381b2b6ae77657f42e7972625cf9355a9c2518c Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 11 Dec 2024 17:17:21 +0530 Subject: [PATCH 07/20] Removed exception class for now --- .../ControlFileValidationException.java | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java deleted file mode 100644 index e4e032a4c8..0000000000 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationException.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.scalar.db.dataloader.core.dataimport.controlfile; - -/** Represents the control file */ -public class ControlFileValidationException extends Exception { - - /** - * Class constructor - * - * @param message error message - */ - public ControlFileValidationException(String message) { - super(message); - } -} From 67f24744048d9650f3ce461fff873b08414d4631 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 12 Dec 2024 11:30:59 +0530 Subject: [PATCH 08/20] Added DECIMAL_FORMAT --- .../com/scalar/db/dataloader/core/util/DecimalUtil.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java index 8372dc8aac..b1c23e50de 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java @@ -7,6 +7,8 @@ /** Utils for decimal handling */ public class DecimalUtil { + private static final DecimalFormat DECIMAL_FORMAT = createFormatter(); + /** * Convert a Double to a non-scientific formatted string * @@ -14,7 +16,7 @@ public class DecimalUtil { * @return formatted double as a string */ public static String convertToNonScientific(Double doubleValue) { - return createFormatter().format(doubleValue); + return DECIMAL_FORMAT.format(doubleValue); } /** @@ -24,7 +26,7 @@ public static String convertToNonScientific(Double doubleValue) { * @return formatted float as a string */ public static String convertToNonScientific(Float floatValue) { - return createFormatter().format(floatValue); + return DECIMAL_FORMAT.format(floatValue); } /** From 14e359379469f1b22f27ae3b12683d071c585f62 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 12 Dec 2024 14:35:31 +0530 Subject: [PATCH 09/20] Path util class updated --- .../db/dataloader/core/util/PathUtil.java | 14 +++++----- .../db/dataloader/core/util/PathUtilTest.java | 26 ++++++------------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java index c0bd226e45..c307ea961f 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/PathUtil.java @@ -1,22 +1,22 @@ package com.scalar.db.dataloader.core.util; +import java.io.File; + public class PathUtil { /** - * Ensures the specified path has a trailing slash. - * - *

java.nio.file.Path is not used because this is also used for virtual paths. + * Ensures the specified path has a trailing path separator. * * @param path the path - * @return the path with a trailing slash + * @return the path with a trailing path separator. */ - public static String ensureTrailingSlash(String path) { + public static String ensureTrailingSeparator(String path) { if (path == null || path.isEmpty()) { return ""; } - if (!path.endsWith("/")) { - return path + "/"; + if (!path.endsWith(File.separator)) { + return path + File.separator; } return path; diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java index 0db7c4edff..85d3ed1ce7 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/PathUtilTest.java @@ -6,40 +6,30 @@ class PathUtilTest { @Test - void ensureTrailingSlash_nullPath_returnsEmptyString() { - String path = null; - String result = PathUtil.ensureTrailingSlash(path); + void ensureTrailingSeparator_nullPath_returnsEmptyString() { + String result = PathUtil.ensureTrailingSeparator(null); Assertions.assertEquals("", result); } @Test - void ensureTrailingSlash_emptyPath_returnsEmptyString() { - String path = ""; - String result = PathUtil.ensureTrailingSlash(path); + void ensureTrailingSeparator_emptyPath_returnsEmptyString() { + String result = PathUtil.ensureTrailingSeparator(""); Assertions.assertEquals("", result); } @Test - void ensureTrailingSlash_pathWithoutTrailingSlash_addsTrailingSlash() { + void ensureTrailingSlash_pathWithoutTrailingSlash_addsTrailingSeparator() { String path = "/path/to/directory"; String expectedResult = "/path/to/directory/"; - String result = PathUtil.ensureTrailingSlash(path); + String result = PathUtil.ensureTrailingSeparator(path); Assertions.assertEquals(expectedResult, result); } @Test - void ensureTrailingSlash_pathWithTrailingSlash_returnsOriginalPath() { + void ensureTrailingSlash_pathWithTrailingSeparator_returnsOriginalPath() { String path = "/path/to/directory/"; String expectedResult = "/path/to/directory/"; - String result = PathUtil.ensureTrailingSlash(path); - Assertions.assertEquals(expectedResult, result); - } - - @Test - void ensureTrailingSlash_virtualPath_addsTrailingSlash() { - String path = "s3://bucket/path"; - String expectedResult = "s3://bucket/path/"; - String result = PathUtil.ensureTrailingSlash(path); + String result = PathUtil.ensureTrailingSeparator(path); Assertions.assertEquals(expectedResult, result); } } From a096d51e4bd29c478e5f75709901e380ff855efe Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 13 Dec 2024 11:10:38 +0530 Subject: [PATCH 10/20] Feedback changes --- .../scalar/db/dataloader/core/util/CollectionUtil.java | 8 ++++++-- .../com/scalar/db/dataloader/core/util/DebugUtil.java | 4 ++-- .../com/scalar/db/dataloader/core/util/DecimalUtil.java | 6 ++---- .../com/scalar/db/dataloader/core/util/RuntimeUtil.java | 2 +- .../scalar/db/dataloader/core/util/RuntimeUtilTest.java | 2 +- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java index 184e29b7f3..e98f4beef7 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/CollectionUtil.java @@ -12,8 +12,12 @@ public class CollectionUtil { * @return collections are same length or not */ public static boolean areSameLength(Collection... collections) { - int N = collections[0].size(); - for (Collection a : collections) if (a.size() != N) return false; + int n = collections[0].size(); + for (Collection c : collections) { + if (c.size() != n) { + return false; + } + } return true; } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java index cde28ca33e..a16e2fae02 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DebugUtil.java @@ -5,7 +5,7 @@ public class DebugUtil { - private static final Logger LOGGER = LoggerFactory.getLogger(DebugUtil.class); + private static final Logger logger = LoggerFactory.getLogger(DebugUtil.class); /** * log memory usage @@ -17,7 +17,7 @@ public static void logMemoryUsage(String stage) { long usedMemory = runtime.totalMemory() - runtime.freeMemory(); long maxMemory = runtime.maxMemory(); - LOGGER.info( + logger.info( "Memory usage at {}: Used Memory = {} MB, Max Memory = {} MB", stage, formatMemorySize(usedMemory), diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java index b1c23e50de..8372dc8aac 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/DecimalUtil.java @@ -7,8 +7,6 @@ /** Utils for decimal handling */ public class DecimalUtil { - private static final DecimalFormat DECIMAL_FORMAT = createFormatter(); - /** * Convert a Double to a non-scientific formatted string * @@ -16,7 +14,7 @@ public class DecimalUtil { * @return formatted double as a string */ public static String convertToNonScientific(Double doubleValue) { - return DECIMAL_FORMAT.format(doubleValue); + return createFormatter().format(doubleValue); } /** @@ -26,7 +24,7 @@ public static String convertToNonScientific(Double doubleValue) { * @return formatted float as a string */ public static String convertToNonScientific(Float floatValue) { - return DECIMAL_FORMAT.format(floatValue); + return createFormatter().format(floatValue); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java index 0d967cf719..870e70285a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/RuntimeUtil.java @@ -14,7 +14,7 @@ public class RuntimeUtil { public static void checkNotNull(Object... values) { for (Object value : values) { if (value == null) { - throw new NullPointerException(DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT.getMessage()); + throw new NullPointerException(DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT.buildMessage()); } } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java index fc8d281cb9..6a46c6c716 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java @@ -12,7 +12,7 @@ public class RuntimeUtilTest { public void checkNotNull_HasNullValues_ShouldThrowException() { assertThatThrownBy(() -> RuntimeUtil.checkNotNull(null, null)) .isExactlyInstanceOf(NullPointerException.class) - .hasMessage(DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT.getMessage()); + .hasMessage(DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT.buildMessage()); } @Test From 52890c8d8992f3a2f7c437b2e6df6456ffe646b8 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 16 Dec 2024 11:07:50 +0530 Subject: [PATCH 11/20] Changes --- data-loader/build.gradle | 1 - .../com/scalar/db/dataloader/core/util/RuntimeUtilTest.java | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/data-loader/build.gradle b/data-loader/build.gradle index 3be6a49e81..87a057933b 100644 --- a/data-loader/build.gradle +++ b/data-loader/build.gradle @@ -1,5 +1,4 @@ subprojects { - ext { jacksonVersion = '2.17.0' } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java index 6a46c6c716..8b03c0c0ab 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/RuntimeUtilTest.java @@ -6,17 +6,17 @@ import org.junit.jupiter.api.Test; /** RuntimeUtils unit tests */ -public class RuntimeUtilTest { +class RuntimeUtilTest { @Test - public void checkNotNull_HasNullValues_ShouldThrowException() { + void checkNotNull_HasNullValues_ShouldThrowException() { assertThatThrownBy(() -> RuntimeUtil.checkNotNull(null, null)) .isExactlyInstanceOf(NullPointerException.class) .hasMessage(DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT.buildMessage()); } @Test - public void checkNotNull_HasNoNullValues_ShouldNotThrowException() { + void checkNotNull_HasNoNullValues_ShouldNotThrowException() { String string = "1"; Object object = new Object(); RuntimeUtil.checkNotNull(string, object); From 1997eb87fcae398495ced54ad724d41daceacb4b Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 17 Dec 2024 12:26:58 +0530 Subject: [PATCH 12/20] Added ScalarDB Dao --- .../com/scalar/db/common/error/CoreError.java | 6 + .../core/dataimport/dao/ScalarDBDao.java | 426 ++++++++++++++++++ .../dataimport/dao/ScalarDBDaoException.java | 15 + .../core/dataimport/dao/ScalarDBManager.java | 68 +++ .../core/dataimport/dao/ScalarDBDaoTest.java | 225 +++++++++ 5 files changed, 740 insertions(+) create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoException.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index b02b3c45a6..474b8f7b80 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -941,6 +941,12 @@ public enum CoreError implements ScalarDbError { "Handling the before-preparation snapshot hook failed. Details: %s", "", ""), + DATA_LOADER_ERROR_CRUD_EXCEPTION( + Category.INTERNAL_ERROR, "0047", "something went wrong while trying to save the data", "", "" + ), + DATA_LOADER_ERROR_SCAN( + Category.INTERNAL_ERROR, "0048", "Something went wrong while scanning. Are you sure you are running in the correct transaction mode?", "", "" + ), // // Errors for the unknown transaction status error category diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java new file mode 100644 index 0000000000..9016d38d8c --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java @@ -0,0 +1,426 @@ +package com.scalar.db.dataloader.core.dataimport.dao; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.Get; +import com.scalar.db.api.Put; +import com.scalar.db.api.PutBuilder.Buildable; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; +import com.scalar.db.api.ScanBuilder; +import com.scalar.db.api.Scanner; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.ScanRange; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.io.Column; +import com.scalar.db.io.Key; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** The generic DAO that is used to scan ScalarDB data */ +public class ScalarDBDao { + + /* Class logger */ + private static final Logger LOGGER = LoggerFactory.getLogger(ScalarDBDao.class); + + /** + * Retrieve record from ScalarDB instance in storage mode + * + * @param namespace Namespace name + * @param tableName Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key for get + * @param storage Distributed storage for ScalarDB connection that is running in storage mode. + * @return Optional get result + * @throws ScalarDBDaoException if something goes wrong while reading the data + */ + public Optional get( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + DistributedStorage storage) + throws ScalarDBDaoException { + + String printKey = keysToString(partitionKey, clusteringKey); + + try { + Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); + Optional result = storage.get(get); + LOGGER.info("GET completed for " + printKey); + return result; + } catch (ExecutionException e) { + throw new ScalarDBDaoException("error GET " + printKey, e); + } + } + + /** + * Retrieve record from ScalarDB instance in transaction mode + * + * @param namespace Namespace name + * @param tableName Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key for get + * @param transaction ScalarDB transaction instance + * @return Optional get result + * @throws ScalarDBDaoException if something goes wrong while reading the data + */ + public Optional get( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + DistributedTransaction transaction) + throws ScalarDBDaoException { + + Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); + String printKey = keysToString(partitionKey, clusteringKey); + try { + Optional result = transaction.get(get); + LOGGER.info("GET completed for " + printKey); + return result; + } catch (CrudException e) { + throw new ScalarDBDaoException("error GET " + printKey, e.getCause()); + } + } + + /** + * Save record in ScalarDB instance + * + * @param namespace Namespace name + * @param tableName Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key + * @param columns List of column values to be inserted or updated + * @param transaction ScalarDB transaction instance + * @throws ScalarDBDaoException if something goes wrong while executing the transaction + */ + public void put( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns, + DistributedTransaction transaction) + throws ScalarDBDaoException { + + Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns); + try { + transaction.put(put); + } catch (CrudException e) { + throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); + } + LOGGER.info("PUT completed for " + keysToString(partitionKey, clusteringKey)); + } + + /** + * Save record in ScalarDB instance + * + * @param namespace Namespace name + * @param tableName Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key + * @param columns List of column values to be inserted or updated + * @param storage Distributed storage for ScalarDB connection that is running in storage mode + * @throws ScalarDBDaoException if something goes wrong while executing the transaction + */ + public void put( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns, + DistributedStorage storage) + throws ScalarDBDaoException { + Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns); + try { + storage.put(put); + } catch (ExecutionException e) { + throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); + } + LOGGER.info("PUT completed for " + keysToString(partitionKey, clusteringKey)); + } + + /** + * Scan a ScalarDB table + * + * @param namespace ScalarDB namespace + * @param tableName ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param range Optional range to set ScalarDB scan start and end values + * @param sorts Optional scan clustering key sorting values + * @param projections List of column projection to use during scan + * @param limit Scan limit value + * @param storage Distributed storage for ScalarDB connection that is running in storage mode + * @return List of ScalarDB scan results + * @throws ScalarDBDaoException if scan fails + */ + public List scan( + String namespace, + String tableName, + Key partitionKey, + ScanRange range, + List sorts, + List projections, + int limit, + DistributedStorage storage) + throws ScalarDBDaoException { + // Create scan + Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit); + + // scan data + try { + LOGGER.info("SCAN started..."); + Scanner scanner = storage.scan(scan); + List allResults = scanner.all(); + scanner.close(); + LOGGER.info("SCAN completed"); + return allResults; + } catch (ExecutionException | IOException e) { + throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + } + } + + /** + * Scan a ScalarDB table + * + * @param namespace ScalarDB namespace + * @param tableName ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param range Optional range to set ScalarDB scan start and end values + * @param sorts Optional scan clustering key sorting values + * @param projections List of column projection to use during scan + * @param limit Scan limit value + * @param transaction Distributed Transaction manager for ScalarDB connection that is * running in + * transaction mode + * @return List of ScalarDB scan results + * @throws ScalarDBDaoException if scan fails + */ + public List scan( + String namespace, + String tableName, + Key partitionKey, + ScanRange range, + List sorts, + List projections, + int limit, + DistributedTransaction transaction) + throws ScalarDBDaoException { + + // Create scan + Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit); + + // scan data + try { + LOGGER.info("SCAN started..."); + List results = transaction.scan(scan); + LOGGER.info("SCAN completed"); + return results; + } catch (CrudException | NoSuchElementException e) { + // No such element Exception is thrown when the scan is done in transaction mode but + // ScalarDB is running in storage mode + throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + } + } + + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param tableName ScalarDB table name + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param storage Distributed storage for ScalarDB connection that is running in storage mode + * @return ScalarDB Scanner object + * @throws ScalarDBDaoException if scan fails + */ + public Scanner createScanner( + String namespace, + String tableName, + List projectionColumns, + int limit, + DistributedStorage storage) + throws ScalarDBDaoException { + Scan scan = + createScan(namespace, tableName, null, null, new ArrayList<>(), projectionColumns, limit); + try { + return storage.scan(scan); + } catch (ExecutionException e) { + throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + } + } + + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param tableName ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param scanRange Optional range to set ScalarDB scan start and end values + * @param sortOrders Optional scan clustering key sorting values + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param storage Distributed storage for ScalarDB connection that is running in storage mode + * @return ScalarDB Scanner object + * @throws ScalarDBDaoException if scan fails + */ + public Scanner createScanner( + String namespace, + String tableName, + Key partitionKey, + ScanRange scanRange, + List sortOrders, + List projectionColumns, + int limit, + DistributedStorage storage) + throws ScalarDBDaoException { + Scan scan = + createScan( + namespace, tableName, partitionKey, scanRange, sortOrders, projectionColumns, limit); + try { + return storage.scan(scan); + } catch (ExecutionException e) { + throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + } + } + + /** + * Create ScalarDB scan instance + * + * @param namespace ScalarDB namespace + * @param tableName ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param scanRange Optional range to set ScalarDB scan start and end values + * @param sortOrders Optional scan clustering key sorting values + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @return ScalarDB scan instance + */ + Scan createScan( + String namespace, + String tableName, + Key partitionKey, + ScanRange scanRange, + List sortOrders, + List projectionColumns, + int limit) { + // If no partition key is provided a scan all is created + if (partitionKey == null) { + ScanBuilder.BuildableScanAll buildableScanAll = + Scan.newBuilder().namespace(namespace).table(tableName).all(); + + // projection columns + if (projectionColumns != null && !projectionColumns.isEmpty()) { + buildableScanAll.projections(projectionColumns); + } + + // limit + if (limit > 0) { + buildableScanAll.limit(limit); + } + return buildableScanAll.build(); + } + + // Create a scan with partition key (not a scan all) + + ScanBuilder.BuildableScan buildableScan = + Scan.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey); + + // Set the scan boundary + if (scanRange != null) { + // Set boundary start + if (scanRange.getScanStartKey() != null) { + buildableScan.start(scanRange.getScanStartKey(), scanRange.isStartInclusive()); + } + + // with end + if (scanRange.getScanEndKey() != null) { + buildableScan.end(scanRange.getScanEndKey(), scanRange.isEndInclusive()); + } + } + + // clustering order + for (Scan.Ordering sort : sortOrders) { + buildableScan.ordering(sort); + } + + // projections + if (projectionColumns != null && !projectionColumns.isEmpty()) { + buildableScan.projections(projectionColumns); + } + + // limit + if (limit > 0) { + buildableScan.limit(limit); + } + return buildableScan.build(); + } + + /** + * Return a ScalarDB get based on provided parameters + * + * @param namespace Namespace name + * @param tableName Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key for get + * @return ScalarDB Get instance + */ + private Get createGetWith( + String namespace, String tableName, Key partitionKey, Key clusteringKey) { + if (clusteringKey != null) { + return Get.newBuilder() + .namespace(namespace) + .table(tableName) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build(); + } + return Get.newBuilder() + .namespace(namespace) + .table(tableName) + .partitionKey(partitionKey) + .build(); + } + + /** + * Return a ScalarDB put based on provided parameters + * + * @param namespace Namespace name + * @param tableName Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key + * @param columns List of column values + * @return ScalarDB Put Instance + */ + private Put createPutWith( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns) { + Buildable buildable = + Put.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey); + if (clusteringKey != null) { + buildable.clusteringKey(clusteringKey); + } + + for (Column column : columns) { + buildable.value(column); + } + return buildable.build(); + } + + private String keysToString(Key partitionKey, Key clusteringKey) { + if (clusteringKey != null) { + return partitionKey.toString() + "," + clusteringKey; + } else { + return partitionKey.toString(); + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoException.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoException.java new file mode 100644 index 0000000000..1e50affb07 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoException.java @@ -0,0 +1,15 @@ +package com.scalar.db.dataloader.core.dataimport.dao; + +/** A custom DAO exception that encapsulates errors thrown by ScalarDB operations */ +public class ScalarDBDaoException extends Exception { + + /** + * Class constructor + * + * @param message error message + * @param cause reason for exception + */ + public ScalarDBDaoException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java new file mode 100644 index 0000000000..6ab2a4f4e6 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java @@ -0,0 +1,68 @@ +package com.scalar.db.dataloader.core.dataimport.dao; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.DistributedTransactionAdmin; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.service.StorageFactory; +import com.scalar.db.service.TransactionFactory; +import java.io.IOException; + +/** + * A manager to retrieve the various ScalarDB managers based on the running mode + * + * @author Yves Peckstadt + */ +public class ScalarDBManager { + + /* Distributed storage for ScalarDB connection that is running in storage mode. */ + private final DistributedStorage storage; + /* Distributed Transaction manager for ScalarDB connection that is running in transaction mode */ + private final DistributedTransactionManager transactionManager; + /* Distributed storage admin for ScalarDB admin operations */ + private final DistributedStorageAdmin storageAdmin; + private final DistributedTransactionAdmin transactionAdmin; + + /** + * Class constructor + * + * @param storageFactory Factory to create all the necessary ScalarDB data managers + */ + public ScalarDBManager(StorageFactory storageFactory) throws IOException { + storage = storageFactory.getStorage(); + storageAdmin = storageFactory.getStorageAdmin(); + transactionManager = null; + transactionAdmin = null; + } + + /** + * Class constructor + * + * @param transactionFactory Factory to create all the necessary ScalarDB data managers + */ + public ScalarDBManager(TransactionFactory transactionFactory) throws IOException { + + transactionManager = transactionFactory.getTransactionManager(); + transactionAdmin = transactionFactory.getTransactionAdmin(); + storageAdmin = null; + storage = null; + } + + /** @return storage for ScalarDB connection that is running in storage mode */ + public DistributedStorage getDistributedStorage() { + return storage; + } + + /** + * @return Distributed Transaction manager for ScalarDB connection that is running in transaction + * mode + */ + public DistributedTransactionManager getDistributedTransactionManager() { + return transactionManager; + } + + /** @return Distributed storage admin for ScalarDB admin operations */ + public DistributedStorageAdmin getDistributedStorageAdmin() { + return storageAdmin; + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java new file mode 100644 index 0000000000..ab9de219b3 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java @@ -0,0 +1,225 @@ +package com.scalar.db.dataloader.core.dataimport.dao; + +import static com.scalar.db.dataloader.core.UnitTestUtils.*; +import static org.assertj.core.api.Assertions.assertThat; + +import com.scalar.db.api.Scan; +import com.scalar.db.api.ScanBuilder; +import com.scalar.db.dataloader.core.ScanRange; +import com.scalar.db.io.Key; +import java.util.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ScalarDBDaoTest { + + private static final int TEST_VALUE_INT_MIN = 1; + private ScalarDBDao dao; + + @BeforeEach + public void setUp() { + this.dao = new ScalarDBDao(); + } + + @Test + void createScan_scanWithPartitionKey_shouldCreateScanObjectWithPartitionKey() { + + // Create Scan Object + Scan scan = + this.dao.createScan( + TEST_NAMESPACE, + TEST_TABLE_NAME, + Key.newBuilder().addBigInt(TEST_COLUMN_1_PK, TEST_VALUE_LONG).build(), + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0); + + // Create expected result + Scan expectedResult = + generateScanResult( + Key.newBuilder().addBigInt(TEST_COLUMN_1_PK, TEST_VALUE_LONG).build(), + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0); + + // Compare Scan object + assertThat(scan.toString()).isEqualTo(expectedResult.toString()); + } + + @Test + void createScan_scanWithLimitAndProjection_shouldCreateScanObjectWithLimitAndProjection() { + + // Create Scan Object + Scan scan = + this.dao.createScan( + TEST_NAMESPACE, + TEST_TABLE_NAME, + Key.newBuilder().addBigInt(TEST_COLUMN_1_PK, TEST_VALUE_LONG).build(), + new ScanRange(null, null, false, false), + new ArrayList<>(), + Arrays.asList(TEST_COLUMN_4, TEST_COLUMN_5, TEST_COLUMN_6), + 5); + + // Create expected result + Scan expectedResult = + generateScanResult( + Key.newBuilder().addBigInt(TEST_COLUMN_1_PK, TEST_VALUE_LONG).build(), + new ScanRange(null, null, false, false), + new ArrayList<>(), + Arrays.asList(TEST_COLUMN_4, TEST_COLUMN_5, TEST_COLUMN_6), + 5); + + // Compare Scan object + assertThat(scan.toString()).isEqualTo(expectedResult.toString()); + } + + @Test + void createScan_scanWithScanRangeAndOrder_shouldCreateScanObjectWithSortAndRange() { + + // Create Scan Object + Scan scan = + this.dao.createScan( + TEST_NAMESPACE, + TEST_TABLE_NAME, + Key.newBuilder().addBigInt(TEST_COLUMN_1_PK, TEST_VALUE_LONG).build(), + new ScanRange( + Key.newBuilder().addInt(TEST_COLUMN_2_CK, TEST_VALUE_INT_MIN).build(), + Key.newBuilder().addInt(TEST_COLUMN_2_CK, TEST_VALUE_INT).build(), + true, + false), + List.of(Scan.Ordering.asc(TEST_COLUMN_2_CK)), + new ArrayList<>(), + 0); + // Create expected result + Scan expectedResult = + generateScanResult( + Key.newBuilder().addBigInt(TEST_COLUMN_1_PK, TEST_VALUE_LONG).build(), + new ScanRange( + Key.newBuilder().addInt(TEST_COLUMN_2_CK, TEST_VALUE_INT_MIN).build(), + Key.newBuilder().addInt(TEST_COLUMN_2_CK, TEST_VALUE_INT).build(), + true, + false), + List.of(Scan.Ordering.asc(TEST_COLUMN_2_CK)), + new ArrayList<>(), + 0); + // Compare Scan object + assertThat(scan.toString()).isEqualTo(expectedResult.toString()); + } + + @Test + void createScan_scanWithoutPartitionKey_shouldCreateScanAllObject() { + + // Create Scan Object + Scan scan = + this.dao.createScan( + TEST_NAMESPACE, + TEST_TABLE_NAME, + null, + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0); + + // Create expected result + Scan expectedResult = generateScanAllResult(new ArrayList<>(), 0); + + // Compare ScanAll object + assertThat(scan.toString()).isEqualTo(expectedResult.toString()); + } + + @Test + void createScan_scanAllWithLimitAndProjection_shouldCreateScanAllObjectWithLimitAndProjection() { + + // Create Scan Object + Scan scan = + this.dao.createScan( + TEST_NAMESPACE, + TEST_TABLE_NAME, + null, + new ScanRange(null, null, false, false), + new ArrayList<>(), + Arrays.asList(TEST_COLUMN_4, TEST_COLUMN_5, TEST_COLUMN_6), + 5); + + // Create expected result + Scan expectedResult = + generateScanAllResult(Arrays.asList(TEST_COLUMN_4, TEST_COLUMN_5, TEST_COLUMN_6), 5); + + // Compare ScanAll object + assertThat(scan.toString()).isEqualTo(expectedResult.toString()); + } + + /** + * Create Scan Object + * + * @param partitionKey Partition key used in ScalarDB scan + * @param range Optional range to set ScalarDB scan start and end values + * @param sorts Optional scan clustering key sorting values + * @param projections List of column projection to use during scan + * @param limit Scan limit value + * @return ScalarDB scan instance + */ + private Scan generateScanResult( + Key partitionKey, + ScanRange range, + List sorts, + List projections, + int limit) { + ScanBuilder.BuildableScan scan = + Scan.newBuilder() + .namespace(TEST_NAMESPACE) + .table(TEST_TABLE_NAME) + .partitionKey(partitionKey); + + // Set boundary start + if (range.getScanStartKey() != null) { + scan.start(range.getScanStartKey(), range.isStartInclusive()); + } + + // with end + if (range.getScanEndKey() != null) { + scan.end(range.getScanEndKey(), range.isEndInclusive()); + } + + // clustering order + for (Scan.Ordering sort : sorts) { + scan.ordering(sort); + } + + // projections + if (projections != null && !projections.isEmpty()) { + scan.projections(projections); + } + + // limit + if (limit > 0) { + scan.limit(limit); + } + return scan.build(); + } + + /** + * Create ScanAll Object + * + * @param projections List of column projection to use during scan + * @param limit Scan limit value + * @return ScalarDB scan instance + */ + private Scan generateScanAllResult(List projections, int limit) { + ScanBuilder.BuildableScanAll scan = + Scan.newBuilder().namespace(TEST_NAMESPACE).table(TEST_TABLE_NAME).all(); + + // projections + if (projections != null && !projections.isEmpty()) { + scan.projections(projections); + } + + // limit + if (limit > 0) { + scan.limit(limit); + } + return scan.build(); + } +} From 8a7338be9fbf3d698dd0b864b467cb95ec42f403 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 17 Dec 2024 17:15:16 +0530 Subject: [PATCH 13/20] Remove unnecessary files --- .../db/dataloader/core/dataimport/ImportMode.java | 8 -------- .../controlfile/ControlFileValidationLevel.java | 11 ----------- 2 files changed, 19 deletions(-) delete mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java delete mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java deleted file mode 100644 index 7f2a805e75..0000000000 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportMode.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.scalar.db.dataloader.core.dataimport; - -/** Represents the way to be imported data is handled */ -public enum ImportMode { - INSERT, - UPDATE, - UPSERT -} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java deleted file mode 100644 index 3753d0ba65..0000000000 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileValidationLevel.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.scalar.db.dataloader.core.dataimport.controlfile; - -/** Control file validation level */ -public enum ControlFileValidationLevel { - /* All columns need to be mapped */ - FULL, - /* All partition key and clustering key columns need to be mapped */ - KEYS, - /* Only validate the columns that are mapped */ - MAPPED -} From e20607392270c21cfe64e2c0023e6dba96a47990 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 17 Dec 2024 18:15:18 +0530 Subject: [PATCH 14/20] Changes --- .../core/dataimport/dao/ScalarDBDao.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java index 9016d38d8c..9a96401d55 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java @@ -27,7 +27,11 @@ public class ScalarDBDao { /* Class logger */ - private static final Logger LOGGER = LoggerFactory.getLogger(ScalarDBDao.class); + private static final Logger logger = LoggerFactory.getLogger(ScalarDBDao.class); + private static final String GET_COMPLETED_MSG = "GET completed for %s"; + private static final String PUT_COMPLETED_MSG = "PUT completed for %s"; + private static final String SCAN_START_MSG = "SCAN started..."; + private static final String SCAN_END_MSG = "SCAN completed"; /** * Retrieve record from ScalarDB instance in storage mode @@ -53,7 +57,7 @@ public Optional get( try { Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); Optional result = storage.get(get); - LOGGER.info("GET completed for " + printKey); + logger.info(String.format(GET_COMPLETED_MSG, printKey)); return result; } catch (ExecutionException e) { throw new ScalarDBDaoException("error GET " + printKey, e); @@ -83,7 +87,7 @@ public Optional get( String printKey = keysToString(partitionKey, clusteringKey); try { Optional result = transaction.get(get); - LOGGER.info("GET completed for " + printKey); + logger.info(String.format(GET_COMPLETED_MSG, printKey)); return result; } catch (CrudException e) { throw new ScalarDBDaoException("error GET " + printKey, e.getCause()); @@ -116,7 +120,7 @@ public void put( } catch (CrudException e) { throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); } - LOGGER.info("PUT completed for " + keysToString(partitionKey, clusteringKey)); + logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey))); } /** @@ -144,7 +148,7 @@ public void put( } catch (ExecutionException e) { throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); } - LOGGER.info("PUT completed for " + keysToString(partitionKey, clusteringKey)); + logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey))); } /** @@ -176,11 +180,11 @@ public List scan( // scan data try { - LOGGER.info("SCAN started..."); + logger.info(SCAN_START_MSG); Scanner scanner = storage.scan(scan); List allResults = scanner.all(); scanner.close(); - LOGGER.info("SCAN completed"); + logger.info(SCAN_END_MSG); return allResults; } catch (ExecutionException | IOException e) { throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); @@ -218,9 +222,9 @@ public List scan( // scan data try { - LOGGER.info("SCAN started..."); + logger.info(SCAN_START_MSG); List results = transaction.scan(scan); - LOGGER.info("SCAN completed"); + logger.info(SCAN_END_MSG); return results; } catch (CrudException | NoSuchElementException e) { // No such element Exception is thrown when the scan is done in transaction mode but From 26d3144e1f77788e002a64b245971a15d575be00 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 18 Dec 2024 12:12:25 +0530 Subject: [PATCH 15/20] Changes --- .../com/scalar/db/common/error/CoreError.java | 14 ++++++++++---- .../core/dataimport/dao/ScalarDBDao.java | 15 ++++++++------- .../core/dataimport/dao/ScalarDBManager.java | 1 - 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 474b8f7b80..637a549dad 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -942,11 +942,17 @@ public enum CoreError implements ScalarDbError { "", ""), DATA_LOADER_ERROR_CRUD_EXCEPTION( - Category.INTERNAL_ERROR, "0047", "something went wrong while trying to save the data", "", "" - ), + Category.INTERNAL_ERROR, + "0047", + "something went wrong while trying to save the data", + "", + ""), DATA_LOADER_ERROR_SCAN( - Category.INTERNAL_ERROR, "0048", "Something went wrong while scanning. Are you sure you are running in the correct transaction mode?", "", "" - ), + Category.INTERNAL_ERROR, + "0048", + "Something went wrong while scanning. Are you sure you are running in the correct transaction mode?", + "", + ""), // // Errors for the unknown transaction status error category diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java index 9a96401d55..8f3556818b 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java @@ -52,15 +52,16 @@ public Optional get( DistributedStorage storage) throws ScalarDBDaoException { - String printKey = keysToString(partitionKey, clusteringKey); + // Retrieving the key data for logging + String loggingKey = keysToString(partitionKey, clusteringKey); try { Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); Optional result = storage.get(get); - logger.info(String.format(GET_COMPLETED_MSG, printKey)); + logger.info(String.format(GET_COMPLETED_MSG, loggingKey)); return result; } catch (ExecutionException e) { - throw new ScalarDBDaoException("error GET " + printKey, e); + throw new ScalarDBDaoException("error GET " + loggingKey, e); } } @@ -84,13 +85,14 @@ public Optional get( throws ScalarDBDaoException { Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); - String printKey = keysToString(partitionKey, clusteringKey); + // Retrieving the key data for logging + String loggingKey = keysToString(partitionKey, clusteringKey); try { Optional result = transaction.get(get); - logger.info(String.format(GET_COMPLETED_MSG, printKey)); + logger.info(String.format(GET_COMPLETED_MSG, loggingKey)); return result; } catch (CrudException e) { - throw new ScalarDBDaoException("error GET " + printKey, e.getCause()); + throw new ScalarDBDaoException("error GET " + loggingKey, e.getCause()); } } @@ -332,7 +334,6 @@ Scan createScan( } // Create a scan with partition key (not a scan all) - ScanBuilder.BuildableScan buildableScan = Scan.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java index 6ab2a4f4e6..ac246d8354 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java @@ -41,7 +41,6 @@ public ScalarDBManager(StorageFactory storageFactory) throws IOException { * @param transactionFactory Factory to create all the necessary ScalarDB data managers */ public ScalarDBManager(TransactionFactory transactionFactory) throws IOException { - transactionManager = transactionFactory.getTransactionManager(); transactionAdmin = transactionFactory.getTransactionAdmin(); storageAdmin = null; From b86487d68fd7690e0becb336b4ef6c5d48a113ab Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 18 Dec 2024 12:27:36 +0530 Subject: [PATCH 16/20] spotbugs exclude --- gradle/spotbugs-exclude.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 05571f3fdb..1724740470 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -34,4 +34,9 @@ + + + + + From 818a2b40660214e2a715a0cde34afca117a37813 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 18 Dec 2024 15:12:21 +0530 Subject: [PATCH 17/20] spotbugs exclude -2 --- gradle/spotbugs-exclude.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 1724740470..0479b8fa24 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -36,7 +36,7 @@ - + From 90abd9ede55b94e2f8a44a84ceac57b478a3cf3c Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 19 Dec 2024 13:46:45 +0530 Subject: [PATCH 18/20] Removed use of List.of to fix CI error --- .../db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java index ab9de219b3..d0571ddddf 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java @@ -89,7 +89,7 @@ void createScan_scanWithScanRangeAndOrder_shouldCreateScanObjectWithSortAndRange Key.newBuilder().addInt(TEST_COLUMN_2_CK, TEST_VALUE_INT).build(), true, false), - List.of(Scan.Ordering.asc(TEST_COLUMN_2_CK)), + Arrays.asList(Scan.Ordering.asc(TEST_COLUMN_2_CK)), new ArrayList<>(), 0); // Create expected result @@ -101,7 +101,7 @@ void createScan_scanWithScanRangeAndOrder_shouldCreateScanObjectWithSortAndRange Key.newBuilder().addInt(TEST_COLUMN_2_CK, TEST_VALUE_INT).build(), true, false), - List.of(Scan.Ordering.asc(TEST_COLUMN_2_CK)), + Arrays.asList(Scan.Ordering.asc(TEST_COLUMN_2_CK)), new ArrayList<>(), 0); // Compare Scan object From 03324e1837ca2b80880571e2fe592f1e0a7fcd18 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 19 Dec 2024 17:22:56 +0530 Subject: [PATCH 19/20] Minor change in test --- .../db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java index d0571ddddf..c46843156f 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDaoTest.java @@ -17,7 +17,7 @@ class ScalarDBDaoTest { private ScalarDBDao dao; @BeforeEach - public void setUp() { + void setUp() { this.dao = new ScalarDBDao(); } From acedabe6f596e27a14d480cab73e10ee456e32bf Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 24 Dec 2024 13:47:27 +0530 Subject: [PATCH 20/20] Partial feedback changes --- .../com/scalar/db/common/error/CoreError.java | 4 +- .../core/dataimport/dao/ScalarDBDao.java | 110 ++++++++---------- .../core/dataimport/dao/ScalarDBManager.java | 3 +- 3 files changed, 54 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 637a549dad..d6a4492264 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -944,13 +944,13 @@ public enum CoreError implements ScalarDbError { DATA_LOADER_ERROR_CRUD_EXCEPTION( Category.INTERNAL_ERROR, "0047", - "something went wrong while trying to save the data", + "Something went wrong while trying to save the data. Details %s", "", ""), DATA_LOADER_ERROR_SCAN( Category.INTERNAL_ERROR, "0048", - "Something went wrong while scanning. Are you sure you are running in the correct transaction mode?", + "Something went wrong while scanning. Are you sure you are running in the correct transaction mode? Details %s", "", ""), diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java index 8f3556818b..e7270de8eb 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java @@ -1,14 +1,7 @@ package com.scalar.db.dataloader.core.dataimport.dao; -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.DistributedTransaction; -import com.scalar.db.api.Get; -import com.scalar.db.api.Put; +import com.scalar.db.api.*; import com.scalar.db.api.PutBuilder.Buildable; -import com.scalar.db.api.Result; -import com.scalar.db.api.Scan; -import com.scalar.db.api.ScanBuilder; -import com.scalar.db.api.Scanner; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.exception.storage.ExecutionException; @@ -37,7 +30,7 @@ public class ScalarDBDao { * Retrieve record from ScalarDB instance in storage mode * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key for get * @param storage Distributed storage for ScalarDB connection that is running in storage mode. @@ -46,7 +39,7 @@ public class ScalarDBDao { */ public Optional get( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, DistributedStorage storage) @@ -56,7 +49,7 @@ public Optional get( String loggingKey = keysToString(partitionKey, clusteringKey); try { - Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); + Get get = createGetWith(namespace, table, partitionKey, clusteringKey); Optional result = storage.get(get); logger.info(String.format(GET_COMPLETED_MSG, loggingKey)); return result; @@ -69,7 +62,7 @@ public Optional get( * Retrieve record from ScalarDB instance in transaction mode * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key for get * @param transaction ScalarDB transaction instance @@ -78,13 +71,13 @@ public Optional get( */ public Optional get( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, DistributedTransaction transaction) throws ScalarDBDaoException { - Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); + Get get = createGetWith(namespace, table, partitionKey, clusteringKey); // Retrieving the key data for logging String loggingKey = keysToString(partitionKey, clusteringKey); try { @@ -100,7 +93,7 @@ public Optional get( * Save record in ScalarDB instance * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key * @param columns List of column values to be inserted or updated @@ -109,18 +102,19 @@ public Optional get( */ public void put( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, List> columns, DistributedTransaction transaction) throws ScalarDBDaoException { - Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns); + Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns); try { transaction.put(put); } catch (CrudException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e); } logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey))); } @@ -129,7 +123,7 @@ public void put( * Save record in ScalarDB instance * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key * @param columns List of column values to be inserted or updated @@ -138,17 +132,18 @@ public void put( */ public void put( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, List> columns, DistributedStorage storage) throws ScalarDBDaoException { - Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns); + Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns); try { storage.put(put); } catch (ExecutionException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e); } logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey))); } @@ -157,7 +152,7 @@ public void put( * Scan a ScalarDB table * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param range Optional range to set ScalarDB scan start and end values * @param sorts Optional scan clustering key sorting values @@ -169,7 +164,7 @@ public void put( */ public List scan( String namespace, - String tableName, + String table, Key partitionKey, ScanRange range, List sorts, @@ -178,7 +173,7 @@ public List scan( DistributedStorage storage) throws ScalarDBDaoException { // Create scan - Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit); + Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit); // scan data try { @@ -189,7 +184,8 @@ public List scan( logger.info(SCAN_END_MSG); return allResults; } catch (ExecutionException | IOException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -197,7 +193,7 @@ public List scan( * Scan a ScalarDB table * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param range Optional range to set ScalarDB scan start and end values * @param sorts Optional scan clustering key sorting values @@ -210,7 +206,7 @@ public List scan( */ public List scan( String namespace, - String tableName, + String table, Key partitionKey, ScanRange range, List sorts, @@ -220,7 +216,7 @@ public List scan( throws ScalarDBDaoException { // Create scan - Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit); + Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit); // scan data try { @@ -231,7 +227,8 @@ public List scan( } catch (CrudException | NoSuchElementException e) { // No such element Exception is thrown when the scan is done in transaction mode but // ScalarDB is running in storage mode - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -239,7 +236,7 @@ public List scan( * Create a ScalarDB scanner instance * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param projectionColumns List of column projection to use during scan * @param limit Scan limit value * @param storage Distributed storage for ScalarDB connection that is running in storage mode @@ -248,17 +245,18 @@ public List scan( */ public Scanner createScanner( String namespace, - String tableName, + String table, List projectionColumns, int limit, DistributedStorage storage) throws ScalarDBDaoException { Scan scan = - createScan(namespace, tableName, null, null, new ArrayList<>(), projectionColumns, limit); + createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit); try { return storage.scan(scan); } catch (ExecutionException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -266,7 +264,7 @@ public Scanner createScanner( * Create a ScalarDB scanner instance * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param scanRange Optional range to set ScalarDB scan start and end values * @param sortOrders Optional scan clustering key sorting values @@ -278,7 +276,7 @@ public Scanner createScanner( */ public Scanner createScanner( String namespace, - String tableName, + String table, Key partitionKey, ScanRange scanRange, List sortOrders, @@ -287,12 +285,12 @@ public Scanner createScanner( DistributedStorage storage) throws ScalarDBDaoException { Scan scan = - createScan( - namespace, tableName, partitionKey, scanRange, sortOrders, projectionColumns, limit); + createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit); try { return storage.scan(scan); } catch (ExecutionException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -300,7 +298,7 @@ public Scanner createScanner( * Create ScalarDB scan instance * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param scanRange Optional range to set ScalarDB scan start and end values * @param sortOrders Optional scan clustering key sorting values @@ -310,7 +308,7 @@ public Scanner createScanner( */ Scan createScan( String namespace, - String tableName, + String table, Key partitionKey, ScanRange scanRange, List sortOrders, @@ -319,7 +317,7 @@ Scan createScan( // If no partition key is provided a scan all is created if (partitionKey == null) { ScanBuilder.BuildableScanAll buildableScanAll = - Scan.newBuilder().namespace(namespace).table(tableName).all(); + Scan.newBuilder().namespace(namespace).table(table).all(); // projection columns if (projectionColumns != null && !projectionColumns.isEmpty()) { @@ -335,7 +333,7 @@ Scan createScan( // Create a scan with partition key (not a scan all) ScanBuilder.BuildableScan buildableScan = - Scan.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey); + Scan.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey); // Set the scan boundary if (scanRange != null) { @@ -371,33 +369,25 @@ Scan createScan( * Return a ScalarDB get based on provided parameters * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key for get * @return ScalarDB Get instance */ - private Get createGetWith( - String namespace, String tableName, Key partitionKey, Key clusteringKey) { + private Get createGetWith(String namespace, String table, Key partitionKey, Key clusteringKey) { + GetBuilder.BuildableGetWithPartitionKey buildable = + Get.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey); if (clusteringKey != null) { - return Get.newBuilder() - .namespace(namespace) - .table(tableName) - .partitionKey(partitionKey) - .clusteringKey(clusteringKey) - .build(); + buildable.clusteringKey(clusteringKey); } - return Get.newBuilder() - .namespace(namespace) - .table(tableName) - .partitionKey(partitionKey) - .build(); + return buildable.build(); } /** * Return a ScalarDB put based on provided parameters * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key * @param columns List of column values @@ -405,12 +395,12 @@ private Get createGetWith( */ private Put createPutWith( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, List> columns) { Buildable buildable = - Put.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey); + Put.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey); if (clusteringKey != null) { buildable.clusteringKey(clusteringKey); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java index ac246d8354..1016eaaba4 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java @@ -7,6 +7,7 @@ import com.scalar.db.service.StorageFactory; import com.scalar.db.service.TransactionFactory; import java.io.IOException; +import javax.annotation.Nullable; /** * A manager to retrieve the various ScalarDB managers based on the running mode @@ -16,7 +17,7 @@ public class ScalarDBManager { /* Distributed storage for ScalarDB connection that is running in storage mode. */ - private final DistributedStorage storage; + @Nullable private final DistributedStorage storage; /* Distributed Transaction manager for ScalarDB connection that is running in transaction mode */ private final DistributedTransactionManager transactionManager; /* Distributed storage admin for ScalarDB admin operations */