Skip to content

Commit

Permalink
Partial feedback changes
Browse files Browse the repository at this point in the history
  • Loading branch information
inv-jishnu committed Dec 24, 2024
1 parent 03324e1 commit acedabe
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 63 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -944,13 +944,13 @@ public enum CoreError implements ScalarDbError {
DATA_LOADER_ERROR_CRUD_EXCEPTION(
Category.INTERNAL_ERROR,
"0047",
"something went wrong while trying to save the data",
"Something went wrong while trying to save the data. Details %s",
"",
""),
DATA_LOADER_ERROR_SCAN(
Category.INTERNAL_ERROR,
"0048",
"Something went wrong while scanning. Are you sure you are running in the correct transaction mode?",
"Something went wrong while scanning. Are you sure you are running in the correct transaction mode? Details %s",
"",
""),

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package com.scalar.db.dataloader.core.dataimport.dao;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.Get;
import com.scalar.db.api.Put;
import com.scalar.db.api.*;
import com.scalar.db.api.PutBuilder.Buildable;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanBuilder;
import com.scalar.db.api.Scanner;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.exception.storage.ExecutionException;
Expand Down Expand Up @@ -37,7 +30,7 @@ public class ScalarDBDao {
* Retrieve record from ScalarDB instance in storage mode
*
* @param namespace Namespace name
* @param tableName Table name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key for get
* @param storage Distributed storage for ScalarDB connection that is running in storage mode.
Expand All @@ -46,7 +39,7 @@ public class ScalarDBDao {
*/
public Optional<Result> get(
String namespace,
String tableName,
String table,
Key partitionKey,
Key clusteringKey,
DistributedStorage storage)
Expand All @@ -56,7 +49,7 @@ public Optional<Result> get(
String loggingKey = keysToString(partitionKey, clusteringKey);

try {
Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey);
Get get = createGetWith(namespace, table, partitionKey, clusteringKey);
Optional<Result> result = storage.get(get);
logger.info(String.format(GET_COMPLETED_MSG, loggingKey));
return result;
Expand All @@ -69,7 +62,7 @@ public Optional<Result> get(
* Retrieve record from ScalarDB instance in transaction mode
*
* @param namespace Namespace name
* @param tableName Table name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key for get
* @param transaction ScalarDB transaction instance
Expand All @@ -78,13 +71,13 @@ public Optional<Result> get(
*/
public Optional<Result> get(
String namespace,
String tableName,
String table,
Key partitionKey,
Key clusteringKey,
DistributedTransaction transaction)
throws ScalarDBDaoException {

Get get = createGetWith(namespace, tableName, partitionKey, clusteringKey);
Get get = createGetWith(namespace, table, partitionKey, clusteringKey);
// Retrieving the key data for logging
String loggingKey = keysToString(partitionKey, clusteringKey);
try {
Expand All @@ -100,7 +93,7 @@ public Optional<Result> get(
* Save record in ScalarDB instance
*
* @param namespace Namespace name
* @param tableName Table name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key
* @param columns List of column values to be inserted or updated
Expand All @@ -109,18 +102,19 @@ public Optional<Result> get(
*/
public void put(
String namespace,
String tableName,
String table,
Key partitionKey,
Key clusteringKey,
List<Column<?>> columns,
DistributedTransaction transaction)
throws ScalarDBDaoException {

Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns);
Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns);
try {
transaction.put(put);
} catch (CrudException e) {
throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e);
throw new ScalarDBDaoException(
CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
}
logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey)));
}
Expand All @@ -129,7 +123,7 @@ public void put(
* Save record in ScalarDB instance
*
* @param namespace Namespace name
* @param tableName Table name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key
* @param columns List of column values to be inserted or updated
Expand All @@ -138,17 +132,18 @@ public void put(
*/
public void put(
String namespace,
String tableName,
String table,
Key partitionKey,
Key clusteringKey,
List<Column<?>> columns,
DistributedStorage storage)
throws ScalarDBDaoException {
Put put = createPutWith(namespace, tableName, partitionKey, clusteringKey, columns);
Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns);
try {
storage.put(put);
} catch (ExecutionException e) {
throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(), e);
throw new ScalarDBDaoException(
CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
}
logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey)));
}
Expand All @@ -157,7 +152,7 @@ public void put(
* Scan a ScalarDB table
*
* @param namespace ScalarDB namespace
* @param tableName ScalarDB table name
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param range Optional range to set ScalarDB scan start and end values
* @param sorts Optional scan clustering key sorting values
Expand All @@ -169,7 +164,7 @@ public void put(
*/
public List<Result> scan(
String namespace,
String tableName,
String table,
Key partitionKey,
ScanRange range,
List<Scan.Ordering> sorts,
Expand All @@ -178,7 +173,7 @@ public List<Result> scan(
DistributedStorage storage)
throws ScalarDBDaoException {
// Create scan
Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit);
Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit);

// scan data
try {
Expand All @@ -189,15 +184,16 @@ public List<Result> scan(
logger.info(SCAN_END_MSG);
return allResults;
} catch (ExecutionException | IOException e) {
throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e);
throw new ScalarDBDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Scan a ScalarDB table
*
* @param namespace ScalarDB namespace
* @param tableName ScalarDB table name
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param range Optional range to set ScalarDB scan start and end values
* @param sorts Optional scan clustering key sorting values
Expand All @@ -210,7 +206,7 @@ public List<Result> scan(
*/
public List<Result> scan(
String namespace,
String tableName,
String table,
Key partitionKey,
ScanRange range,
List<Scan.Ordering> sorts,
Expand All @@ -220,7 +216,7 @@ public List<Result> scan(
throws ScalarDBDaoException {

// Create scan
Scan scan = createScan(namespace, tableName, partitionKey, range, sorts, projections, limit);
Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit);

// scan data
try {
Expand All @@ -231,15 +227,16 @@ public List<Result> scan(
} catch (CrudException | NoSuchElementException e) {
// No such element Exception is thrown when the scan is done in transaction mode but
// ScalarDB is running in storage mode
throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e);
throw new ScalarDBDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create a ScalarDB scanner instance
*
* @param namespace ScalarDB namespace
* @param tableName ScalarDB table name
* @param table ScalarDB table name
* @param projectionColumns List of column projection to use during scan
* @param limit Scan limit value
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
Expand All @@ -248,25 +245,26 @@ public List<Result> scan(
*/
public Scanner createScanner(
String namespace,
String tableName,
String table,
List<String> projectionColumns,
int limit,
DistributedStorage storage)
throws ScalarDBDaoException {
Scan scan =
createScan(namespace, tableName, null, null, new ArrayList<>(), projectionColumns, limit);
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
try {
return storage.scan(scan);
} catch (ExecutionException e) {
throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e);
throw new ScalarDBDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create a ScalarDB scanner instance
*
* @param namespace ScalarDB namespace
* @param tableName ScalarDB table name
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param scanRange Optional range to set ScalarDB scan start and end values
* @param sortOrders Optional scan clustering key sorting values
Expand All @@ -278,7 +276,7 @@ public Scanner createScanner(
*/
public Scanner createScanner(
String namespace,
String tableName,
String table,
Key partitionKey,
ScanRange scanRange,
List<Scan.Ordering> sortOrders,
Expand All @@ -287,20 +285,20 @@ public Scanner createScanner(
DistributedStorage storage)
throws ScalarDBDaoException {
Scan scan =
createScan(
namespace, tableName, partitionKey, scanRange, sortOrders, projectionColumns, limit);
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
try {
return storage.scan(scan);
} catch (ExecutionException e) {
throw new ScalarDBDaoException(CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(), e);
throw new ScalarDBDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create ScalarDB scan instance
*
* @param namespace ScalarDB namespace
* @param tableName ScalarDB table name
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param scanRange Optional range to set ScalarDB scan start and end values
* @param sortOrders Optional scan clustering key sorting values
Expand All @@ -310,7 +308,7 @@ public Scanner createScanner(
*/
Scan createScan(
String namespace,
String tableName,
String table,
Key partitionKey,
ScanRange scanRange,
List<Scan.Ordering> sortOrders,
Expand All @@ -319,7 +317,7 @@ Scan createScan(
// If no partition key is provided a scan all is created
if (partitionKey == null) {
ScanBuilder.BuildableScanAll buildableScanAll =
Scan.newBuilder().namespace(namespace).table(tableName).all();
Scan.newBuilder().namespace(namespace).table(table).all();

// projection columns
if (projectionColumns != null && !projectionColumns.isEmpty()) {
Expand All @@ -335,7 +333,7 @@ Scan createScan(

// Create a scan with partition key (not a scan all)
ScanBuilder.BuildableScan buildableScan =
Scan.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey);
Scan.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey);

// Set the scan boundary
if (scanRange != null) {
Expand Down Expand Up @@ -371,46 +369,38 @@ Scan createScan(
* Return a ScalarDB get based on provided parameters
*
* @param namespace Namespace name
* @param tableName Table name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key for get
* @return ScalarDB Get instance
*/
private Get createGetWith(
String namespace, String tableName, Key partitionKey, Key clusteringKey) {
private Get createGetWith(String namespace, String table, Key partitionKey, Key clusteringKey) {
GetBuilder.BuildableGetWithPartitionKey buildable =
Get.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey);
if (clusteringKey != null) {
return Get.newBuilder()
.namespace(namespace)
.table(tableName)
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.build();
buildable.clusteringKey(clusteringKey);
}
return Get.newBuilder()
.namespace(namespace)
.table(tableName)
.partitionKey(partitionKey)
.build();
return buildable.build();
}

/**
* Return a ScalarDB put based on provided parameters
*
* @param namespace Namespace name
* @param tableName Table name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key
* @param columns List of column values
* @return ScalarDB Put Instance
*/
private Put createPutWith(
String namespace,
String tableName,
String table,
Key partitionKey,
Key clusteringKey,
List<Column<?>> columns) {
Buildable buildable =
Put.newBuilder().namespace(namespace).table(tableName).partitionKey(partitionKey);
Put.newBuilder().namespace(namespace).table(table).partitionKey(partitionKey);
if (clusteringKey != null) {
buildable.clusteringKey(clusteringKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.IOException;
import javax.annotation.Nullable;

/**
* A manager to retrieve the various ScalarDB managers based on the running mode
Expand All @@ -16,7 +17,7 @@
public class ScalarDBManager {

/* Distributed storage for ScalarDB connection that is running in storage mode. */
private final DistributedStorage storage;
@Nullable private final DistributedStorage storage;
/* Distributed Transaction manager for ScalarDB connection that is running in transaction mode */
private final DistributedTransactionManager transactionManager;
/* Distributed storage admin for ScalarDB admin operations */
Expand Down

0 comments on commit acedabe

Please sign in to comment.