diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 637a549da..d6a449226 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -944,13 +944,13 @@ public enum CoreError implements ScalarDbError { DATA_LOADER_ERROR_CRUD_EXCEPTION( Category.INTERNAL_ERROR, "0047", - "something went wrong while trying to save the data", + "Something went wrong while trying to save the data. Details %s", "", ""), DATA_LOADER_ERROR_SCAN( Category.INTERNAL_ERROR, "0048", - "Something went wrong while scanning. Are you sure you are running in the correct transaction mode?", + "Something went wrong while scanning. Are you sure you are running in the correct transaction mode? Details %s", "", ""), diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java index 8f3556818..e7270de8e 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBDao.java @@ -1,14 +1,7 @@ package com.scalar.db.dataloader.core.dataimport.dao; -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.DistributedTransaction; -import com.scalar.db.api.Get; -import com.scalar.db.api.Put; +import com.scalar.db.api.*; import com.scalar.db.api.PutBuilder.Buildable; -import com.scalar.db.api.Result; -import com.scalar.db.api.Scan; -import com.scalar.db.api.ScanBuilder; -import com.scalar.db.api.Scanner; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.exception.storage.ExecutionException; @@ -37,7 +30,7 @@ public class ScalarDBDao { * Retrieve record from ScalarDB instance in storage mode * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key for get * @param storage Distributed storage for ScalarDB connection that is running in storage mode. @@ -46,7 +39,7 @@ public class ScalarDBDao { */ public Optional get( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, DistributedStorage storage) @@ -56,7 +49,7 @@ public Optional get( String loggingKey = keysToString(partitionKey, clusteringKey); try { - Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); + Get get = createGetWith(namespace, table, partitionKey, clusteringKey); Optional result = storage.get(get); logger.info(String.format(GET_COMPLETED_MSG, loggingKey)); return result; @@ -69,7 +62,7 @@ public Optional get( * Retrieve record from ScalarDB instance in transaction mode * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key for get * @param transaction ScalarDB transaction instance @@ -78,13 +71,13 @@ public Optional get( */ public Optional get( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, DistributedTransaction transaction) throws ScalarDBDaoException { - Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey); + Get get = createGetWith(namespace, table, partitionKey, clusteringKey); // Retrieving the key data for logging String loggingKey = keysToString(partitionKey, clusteringKey); try { @@ -100,7 +93,7 @@ public Optional get( * Save record in ScalarDB instance * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key * @param columns List of column values to be inserted or updated @@ -109,18 +102,19 @@ public Optional get( */ public void put( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, List> columns, DistributedTransaction transaction) throws ScalarDBDaoException { - Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns); + Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns); try { transaction.put(put); } catch (CrudException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e); } logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey))); } @@ -129,7 +123,7 @@ public void put( * Save record in ScalarDB instance * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key * @param columns List of column values to be inserted or updated @@ -138,17 +132,18 @@ public void put( */ public void put( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, List> columns, DistributedStorage storage) throws ScalarDBDaoException { - Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns); + Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns); try { storage.put(put); } catch (ExecutionException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e); } logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey))); } @@ -157,7 +152,7 @@ public void put( * Scan a ScalarDB table * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param range Optional range to set ScalarDB scan start and end values * @param sorts Optional scan clustering key sorting values @@ -169,7 +164,7 @@ public void put( */ public List scan( String namespace, - String tableName, + String table, Key partitionKey, ScanRange range, List sorts, @@ -178,7 +173,7 @@ public List scan( DistributedStorage storage) throws ScalarDBDaoException { // Create scan - Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit); + Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit); // scan data try { @@ -189,7 +184,8 @@ public List scan( logger.info(SCAN_END_MSG); return allResults; } catch (ExecutionException | IOException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -197,7 +193,7 @@ public List scan( * Scan a ScalarDB table * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param range Optional range to set ScalarDB scan start and end values * @param sorts Optional scan clustering key sorting values @@ -210,7 +206,7 @@ public List scan( */ public List scan( String namespace, - String tableName, + String table, Key partitionKey, ScanRange range, List sorts, @@ -220,7 +216,7 @@ public List scan( throws ScalarDBDaoException { // Create scan - Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit); + Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit); // scan data try { @@ -231,7 +227,8 @@ public List scan( } catch (CrudException | NoSuchElementException e) { // No such element Exception is thrown when the scan is done in transaction mode but // ScalarDB is running in storage mode - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -239,7 +236,7 @@ public List scan( * Create a ScalarDB scanner instance * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param projectionColumns List of column projection to use during scan * @param limit Scan limit value * @param storage Distributed storage for ScalarDB connection that is running in storage mode @@ -248,17 +245,18 @@ public List scan( */ public Scanner createScanner( String namespace, - String tableName, + String table, List projectionColumns, int limit, DistributedStorage storage) throws ScalarDBDaoException { Scan scan = - createScan(namespace, tableName, null, null, new ArrayList<>(), projectionColumns, limit); + createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit); try { return storage.scan(scan); } catch (ExecutionException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -266,7 +264,7 @@ public Scanner createScanner( * Create a ScalarDB scanner instance * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param scanRange Optional range to set ScalarDB scan start and end values * @param sortOrders Optional scan clustering key sorting values @@ -278,7 +276,7 @@ public Scanner createScanner( */ public Scanner createScanner( String namespace, - String tableName, + String table, Key partitionKey, ScanRange scanRange, List sortOrders, @@ -287,12 +285,12 @@ public Scanner createScanner( DistributedStorage storage) throws ScalarDBDaoException { Scan scan = - createScan( - namespace, tableName, partitionKey, scanRange, sortOrders, projectionColumns, limit); + createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit); try { return storage.scan(scan); } catch (ExecutionException e) { - throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e); + throw new ScalarDBDaoException( + CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e); } } @@ -300,7 +298,7 @@ public Scanner createScanner( * Create ScalarDB scan instance * * @param namespace ScalarDB namespace - * @param tableName ScalarDB table name + * @param table ScalarDB table name * @param partitionKey Partition key used in ScalarDB scan * @param scanRange Optional range to set ScalarDB scan start and end values * @param sortOrders Optional scan clustering key sorting values @@ -310,7 +308,7 @@ public Scanner createScanner( */ Scan createScan( String namespace, - String tableName, + String table, Key partitionKey, ScanRange scanRange, List sortOrders, @@ -319,7 +317,7 @@ Scan createScan( // If no partition key is provided a scan all is created if (partitionKey == null) { ScanBuilder.BuildableScanAll buildableScanAll = - Scan.newBuilder().namespace(namespace).table(tableName).all(); + Scan.newBuilder().namespace(namespace).table(table).all(); // projection columns if (projectionColumns != null && !projectionColumns.isEmpty()) { @@ -335,7 +333,7 @@ Scan createScan( // Create a scan with partition key (not a scan all) ScanBuilder.BuildableScan buildableScan = - Scan.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey); + Scan.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey); // Set the scan boundary if (scanRange != null) { @@ -371,33 +369,25 @@ Scan createScan( * Return a ScalarDB get based on provided parameters * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key for get * @return ScalarDB Get instance */ - private Get createGetWith( - String namespace, String tableName, Key partitionKey, Key clusteringKey) { + private Get createGetWith(String namespace, String table, Key partitionKey, Key clusteringKey) { + GetBuilder.BuildableGetWithPartitionKey buildable = + Get.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey); if (clusteringKey != null) { - return Get.newBuilder() - .namespace(namespace) - .table(tableName) - .partitionKey(partitionKey) - .clusteringKey(clusteringKey) - .build(); + buildable.clusteringKey(clusteringKey); } - return Get.newBuilder() - .namespace(namespace) - .table(tableName) - .partitionKey(partitionKey) - .build(); + return buildable.build(); } /** * Return a ScalarDB put based on provided parameters * * @param namespace Namespace name - * @param tableName Table name + * @param table Table name * @param partitionKey Partition key * @param clusteringKey Optional clustering key * @param columns List of column values @@ -405,12 +395,12 @@ private Get createGetWith( */ private Put createPutWith( String namespace, - String tableName, + String table, Key partitionKey, Key clusteringKey, List> columns) { Buildable buildable = - Put.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey); + Put.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey); if (clusteringKey != null) { buildable.clusteringKey(clusteringKey); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java index ac246d835..1016eaaba 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDBManager.java @@ -7,6 +7,7 @@ import com.scalar.db.service.StorageFactory; import com.scalar.db.service.TransactionFactory; import java.io.IOException; +import javax.annotation.Nullable; /** * A manager to retrieve the various ScalarDB managers based on the running mode @@ -16,7 +17,7 @@ public class ScalarDBManager { /* Distributed storage for ScalarDB connection that is running in storage mode. */ - private final DistributedStorage storage; + @Nullable private final DistributedStorage storage; /* Distributed Transaction manager for ScalarDB connection that is running in transaction mode */ private final DistributedTransactionManager transactionManager; /* Distributed storage admin for ScalarDB admin operations */