Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cross-partition scan options #1294

Merged
merged 6 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public static Properties getProperties(@SuppressWarnings("unused") String testNa
props.setProperty(DatabaseConfig.CONTACT_POINTS, contactPoints);
props.setProperty(DatabaseConfig.USERNAME, username);
props.setProperty(DatabaseConfig.PASSWORD, password);
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public static Properties getProperties(String testName) {
props.setProperty(DatabaseConfig.CONTACT_POINTS, contactPoint);
props.setProperty(DatabaseConfig.PASSWORD, password);
props.setProperty(DatabaseConfig.STORAGE, "cosmos");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");

if (databasePrefix.isPresent()) {
// Add the prefix and testName as a metadata database suffix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public static Properties getProperties(String testName) {
props.setProperty(DatabaseConfig.USERNAME, accessKeyId);
props.setProperty(DatabaseConfig.PASSWORD, secretAccessKey);
props.setProperty(DatabaseConfig.STORAGE, "dynamo");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");

// Add testName as a metadata namespace suffix
props.setProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public static Properties getProperties(String testName) {
props.setProperty(DatabaseConfig.USERNAME, username);
props.setProperty(DatabaseConfig.PASSWORD, password);
props.setProperty(DatabaseConfig.STORAGE, "jdbc");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "true");

// Add testName as a metadata schema suffix
props.setProperty(JdbcConfig.METADATA_SCHEMA, JdbcAdmin.METADATA_SCHEMA + "_" + testName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public static Properties getPropertiesForCassandra(@SuppressWarnings("unused") S
properties.setProperty(DatabaseConfig.USERNAME, username);
properties.setProperty(DatabaseConfig.PASSWORD, password);
properties.setProperty(DatabaseConfig.STORAGE, "cassandra");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");
return properties;
}

Expand All @@ -54,6 +57,9 @@ public static Properties getPropertiesForJdbc(String testName) {
properties.setProperty(DatabaseConfig.USERNAME, username);
properties.setProperty(DatabaseConfig.PASSWORD, password);
properties.setProperty(DatabaseConfig.STORAGE, "jdbc");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "true");

// Add testName as a metadata schema suffix
properties.setProperty(JdbcConfig.METADATA_SCHEMA, JdbcAdmin.METADATA_SCHEMA + "_" + testName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.TableMetadataManager;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Column;
import com.scalar.db.io.Key;
Expand All @@ -27,9 +28,11 @@
@ThreadSafe
public class OperationChecker {

private final DatabaseConfig config;
private final TableMetadataManager tableMetadataManager;

public OperationChecker(TableMetadataManager tableMetadataManager) {
public OperationChecker(DatabaseConfig config, TableMetadataManager tableMetadataManager) {
this.config = config;
this.tableMetadataManager = tableMetadataManager;
}

Expand Down Expand Up @@ -122,6 +125,11 @@ public void check(Scan scan) throws ExecutionException {
}

private void check(ScanAll scanAll) throws ExecutionException {
if (!config.isCrossPartitionScanEnabled()) {
throw new IllegalArgumentException(
"Cross-partition scan is not enabled. Operation: " + scanAll);
}

TableMetadata metadata = getTableMetadata(scanAll);

checkProjections(scanAll, metadata);
Expand All @@ -130,8 +138,16 @@ private void check(ScanAll scanAll) throws ExecutionException {
throw new IllegalArgumentException("The limit cannot be negative. Operation: " + scanAll);
}

if (!config.isCrossPartitionScanOrderingEnabled() && !scanAll.getOrderings().isEmpty()) {
throw new IllegalArgumentException(
"Cross-partition scan ordering is not enabled. Operation: " + scanAll);
}
checkOrderings(scanAll, metadata);

if (!config.isCrossPartitionScanFilteringEnabled() && !scanAll.getConjunctions().isEmpty()) {
throw new IllegalArgumentException(
"Cross-partition scan filtering is not enabled. Operation: " + scanAll);
}
checkConjunctions(scanAll, metadata);
}

Expand Down
32 changes: 32 additions & 0 deletions core/src/main/java/com/scalar/db/config/DatabaseConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.config;

import static com.google.common.base.Preconditions.checkArgument;
import static com.scalar.db.config.ConfigUtils.getBoolean;
import static com.scalar.db.config.ConfigUtils.getInt;
import static com.scalar.db.config.ConfigUtils.getLong;
import static com.scalar.db.config.ConfigUtils.getString;
Expand Down Expand Up @@ -32,6 +33,9 @@ public class DatabaseConfig {
private long metadataCacheExpirationTimeSecs;
private long activeTransactionManagementExpirationTimeMillis;
@Nullable private String defaultNamespaceName;
private boolean crossPartitionScanEnabled;
private boolean crossPartitionScanFilteringEnabled;
private boolean crossPartitionScanOrderingEnabled;

public static final String PREFIX = "scalar.db.";
public static final String CONTACT_POINTS = PREFIX + "contact_points";
Expand All @@ -45,6 +49,10 @@ public class DatabaseConfig {
public static final String ACTIVE_TRANSACTION_MANAGEMENT_EXPIRATION_TIME_MILLIS =
PREFIX + "active_transaction_management.expiration_time_millis";
public static final String DEFAULT_NAMESPACE_NAME = PREFIX + "default_namespace_name";
public static final String SCAN_PREFIX = PREFIX + "cross_partition_scan.";
public static final String CROSS_PARTITION_SCAN = SCAN_PREFIX + "enabled";
public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled";
public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled";

public DatabaseConfig(File propertiesFile) throws IOException {
try (FileInputStream stream = new FileInputStream(propertiesFile)) {
Expand Down Expand Up @@ -90,6 +98,18 @@ protected void load() {
activeTransactionManagementExpirationTimeMillis =
getLong(getProperties(), ACTIVE_TRANSACTION_MANAGEMENT_EXPIRATION_TIME_MILLIS, -1);
defaultNamespaceName = getString(getProperties(), DEFAULT_NAMESPACE_NAME, null);
crossPartitionScanEnabled = getBoolean(getProperties(), CROSS_PARTITION_SCAN, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is false in v4.0.0 or later, but it should be true in v3.x for backward compatibility and warned when using it with SERIALIZABLE. I will handle this after this PR is merged to the master.

crossPartitionScanFilteringEnabled =
getBoolean(getProperties(), CROSS_PARTITION_SCAN_FILTERING, false);
crossPartitionScanOrderingEnabled =
getBoolean(getProperties(), CROSS_PARTITION_SCAN_ORDERING, false);

if (!crossPartitionScanEnabled
&& (crossPartitionScanFilteringEnabled || crossPartitionScanOrderingEnabled)) {
throw new IllegalArgumentException(
CROSS_PARTITION_SCAN
+ " must be enabled to use cross-partition scan with filtering or ordering");
}
}

public List<String> getContactPoints() {
Expand Down Expand Up @@ -127,4 +147,16 @@ public long getActiveTransactionManagementExpirationTimeMillis() {
public Optional<String> getDefaultNamespaceName() {
return Optional.ofNullable(defaultNamespaceName);
}

public boolean isCrossPartitionScanEnabled() {
return crossPartitionScanEnabled;
}

public boolean isCrossPartitionScanFilteringEnabled() {
return crossPartitionScanFilteringEnabled;
}

public boolean isCrossPartitionScanOrderingEnabled() {
return crossPartitionScanOrderingEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
Expand All @@ -45,6 +44,13 @@ public class Cassandra extends AbstractDistributedStorage {
@Inject
public Cassandra(DatabaseConfig config) {
super(config);

if (config.isCrossPartitionScanFilteringEnabled()
|| config.isCrossPartitionScanOrderingEnabled()) {
throw new IllegalArgumentException(
"Cross-partition scan with filtering or ordering is not supported in Cassandra");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is related to https://github.com/scalar-labs/scalardb/pull/1294/files#diff-660ca76c9b64d94000e0854e2a18fdc5e6a69c028a74b8cacfa4ef733d4604c2L111, but sorry, I forgot why these types of scan were prohibitted. Could you tell me that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the question. Basically, it's hard to push down all kinds of filtering and ordering to NoSQL databases. But for filtering, we can support ScalarDB-side filtering in the near future since we already have a logic for evaluating all types of our predicates.

}

clusterManager = new ClusterManager(config);
Session session = clusterManager.getSession();

Expand All @@ -63,7 +69,7 @@ public Cassandra(DatabaseConfig config) {
new TableMetadataManager(
new CassandraAdmin(clusterManager, config),
config.getMetadataCacheExpirationTimeSecs());
operationChecker = new OperationChecker(metadataManager);
operationChecker = new OperationChecker(config, metadataManager);
}

@VisibleForTesting
Expand Down Expand Up @@ -108,11 +114,6 @@ public Scanner scan(Scan scan) throws ExecutionException {
scan = copyAndSetTargetToIfNot(scan);
operationChecker.check(scan);

if (ScalarDbUtils.isRelational(scan)) {
throw new UnsupportedOperationException(
"Scanning all records with orderings or conditions is not supported in Cassandra");
}

ResultSet results = handlers.select().handle(scan);

return new ScannerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
import javax.annotation.Nullable;

public class CassandraConfig {
public static final String PREFIX = DatabaseConfig.PREFIX + "cassandra.";
public static final String STORAGE_NAME = "cassandra";
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
public static final String METADATA_KEYSPACE = PREFIX + "metadata.keyspace";
@Nullable private final String metadataKeyspace;

public CassandraConfig(DatabaseConfig databaseConfig) {
String storage = databaseConfig.getStorage();
if (!storage.equals("cassandra")) {
throw new IllegalArgumentException(DatabaseConfig.STORAGE + " should be 'cassandra'");
if (!storage.equals(STORAGE_NAME)) {
throw new IllegalArgumentException(
DatabaseConfig.STORAGE + " should be '" + STORAGE_NAME + "'");
}
metadataKeyspace = getString(databaseConfig.getProperties(), METADATA_KEYSPACE, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class CassandraProvider implements DistributedStorageProvider {
@Override
public String getName() {
return "cassandra";
return CassandraConfig.STORAGE_NAME;
}

@Override
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
Expand All @@ -47,6 +46,13 @@ public class Cosmos extends AbstractDistributedStorage {
@Inject
public Cosmos(DatabaseConfig databaseConfig) {
super(databaseConfig);

if (databaseConfig.isCrossPartitionScanFilteringEnabled()
|| databaseConfig.isCrossPartitionScanOrderingEnabled()) {
throw new IllegalArgumentException(
"Cross-partition scan with filtering or ordering is not supported in Cosmos DB");
}

CosmosConfig config = new CosmosConfig(databaseConfig);

client =
Expand All @@ -60,7 +66,7 @@ public Cosmos(DatabaseConfig databaseConfig) {
TableMetadataManager metadataManager =
new TableMetadataManager(
new CosmosAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs());
operationChecker = new CosmosOperationChecker(metadataManager);
operationChecker = new CosmosOperationChecker(databaseConfig, metadataManager);

selectStatementHandler = new SelectStatementHandler(client, metadataManager);
putStatementHandler = new PutStatementHandler(client, metadataManager);
Expand Down Expand Up @@ -108,11 +114,6 @@ public Scanner scan(Scan scan) throws ExecutionException {
scan = copyAndSetTargetToIfNot(scan);
operationChecker.check(scan);

if (ScalarDbUtils.isRelational(scan)) {
throw new UnsupportedOperationException(
"Scanning all records with orderings or conditions is not supported in Cosmos DB");
}

return selectStatementHandler.handle(scan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
@Immutable
public class CosmosConfig {
private static final Logger logger = LoggerFactory.getLogger(CosmosConfig.class);
public static final String PREFIX = DatabaseConfig.PREFIX + "cosmos.";

public static final String STORAGE_NAME = "cosmos";
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
/** @deprecated As of 5.0, will be removed. Use {@link #METADATA_DATABASE} instead. */
@Deprecated
public static final String TABLE_METADATA_DATABASE = PREFIX + "table_metadata.database";
Expand All @@ -23,9 +25,10 @@ public class CosmosConfig {
@Nullable private final String metadataDatabase;

public CosmosConfig(DatabaseConfig databaseConfig) {
String storage = databaseConfig.getProperties().getProperty(DatabaseConfig.STORAGE);
if (!"cosmos".equals(storage)) {
throw new IllegalArgumentException(DatabaseConfig.STORAGE + " should be 'cosmos'");
String storage = databaseConfig.getStorage();
if (!storage.equals(STORAGE_NAME)) {
throw new IllegalArgumentException(
DatabaseConfig.STORAGE + " should be '" + STORAGE_NAME + "'");
}

if (databaseConfig.getContactPoints().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.TableMetadataManager;
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.DataType;

public class CosmosOperationChecker extends OperationChecker {
public CosmosOperationChecker(TableMetadataManager metadataManager) {
super(metadataManager);
public CosmosOperationChecker(
DatabaseConfig databaseConfig, TableMetadataManager metadataManager) {
super(databaseConfig, metadataManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class CosmosProvider implements DistributedStorageProvider {
@Override
public String getName() {
return "cosmos";
return CosmosConfig.STORAGE_NAME;
}

@Override
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.io.IOException;
import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -51,6 +50,13 @@ public class Dynamo extends AbstractDistributedStorage {
@Inject
public Dynamo(DatabaseConfig databaseConfig) {
super(databaseConfig);

if (databaseConfig.isCrossPartitionScanFilteringEnabled()
|| databaseConfig.isCrossPartitionScanOrderingEnabled()) {
throw new IllegalArgumentException(
"Cross-partition scan with filtering or ordering is not supported in DynamoDB");
}

DynamoConfig config = new DynamoConfig(databaseConfig);

DynamoDbClientBuilder builder = DynamoDbClient.builder();
Expand All @@ -67,7 +73,7 @@ public Dynamo(DatabaseConfig databaseConfig) {
TableMetadataManager metadataManager =
new TableMetadataManager(
new DynamoAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs());
operationChecker = new DynamoOperationChecker(metadataManager);
operationChecker = new DynamoOperationChecker(databaseConfig, metadataManager);

selectStatementHandler =
new SelectStatementHandler(client, metadataManager, config.getNamespacePrefix());
Expand Down Expand Up @@ -128,11 +134,6 @@ public Scanner scan(Scan scan) throws ExecutionException {
scan = copyAndSetTargetToIfNot(scan);
operationChecker.check(scan);

if (ScalarDbUtils.isRelational(scan)) {
throw new UnsupportedOperationException(
"Scanning all records with orderings or conditions is not supported in DynamoDB");
}

return selectStatementHandler.handle(scan);
}

Expand Down
Loading
Loading