Skip to content

Commit

Permalink
Support dynamic arbitrary filtering on NoSQL databases (#1682)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnmt authored May 22, 2024
1 parent 9c44045 commit 693388b
Show file tree
Hide file tree
Showing 37 changed files with 1,972 additions and 389 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.scalar.db.storage.cassandra;

import com.scalar.db.api.DistributedStorageCrossPartitionScanIntegrationTestBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class CassandraCrossPartitionScanIntegrationTest
extends DistributedStorageCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return CassandraEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return Collections.singletonMap(CassandraAdmin.REPLICATION_FACTOR, "1");
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in Cassandra")
public void scan_WithOrderingForNonPrimaryColumns_ShouldReturnProperResult() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static Properties getProperties(String testName) {
properties.setProperty(DatabaseConfig.USERNAME, username);
properties.setProperty(DatabaseConfig.PASSWORD, password);
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");

// Add testName as a metadata schema suffix
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.scalar.db.storage.cassandra;

import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitCrossPartitionScanIntegrationTestBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class ConsensusCommitCrossPartitionScanIntegrationTestWithCassandra
extends ConsensusCommitCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProps(String testName) {
Properties properties = CassandraEnv.getProperties(testName);
properties.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "SERIALIZABLE");
return properties;
}

@Override
protected Map<String, String> getCreationOptions() {
return Collections.singletonMap(CassandraAdmin.REPLICATION_FACTOR, "1");
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in Cassandra")
public void scan_CrossPartitionScanWithOrderingGivenForCommittedRecord_ShouldReturnRecords() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.scalar.db.storage.cassandra;

import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestWithCassandra
extends TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProps1(String testName) {
Properties properties = CassandraEnv.getProperties(testName);
properties.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "SERIALIZABLE");
return properties;
}

@Override
protected Map<String, String> getCreationOptions() {
return Collections.singletonMap(CassandraAdmin.REPLICATION_FACTOR, "1");
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in Cassandra")
public void scan_ScanWithOrderingGivenForCommittedRecord_ShouldReturnRecords() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.scalar.db.storage.cosmos;

import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitCrossPartitionScanIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class ConsensusCommitCrossPartitionScanIntegrationTestWithCosmos
extends ConsensusCommitCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProps(String testName) {
Properties properties = CosmosEnv.getProperties(testName);
properties.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "SERIALIZABLE");
return properties;
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in Cosmos DB")
public void scan_CrossPartitionScanWithOrderingGivenForCommittedRecord_ShouldReturnRecords() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.scalar.db.storage.cosmos;

import com.scalar.db.api.DistributedStorageCrossPartitionScanIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class CosmosCrossPartitionScanIntegrationTest
extends DistributedStorageCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return CosmosEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in Cosmos DB")
public void scan_WithOrderingForNonPrimaryColumns_ShouldReturnProperResult() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static Properties getProperties(String testName) {
properties.setProperty(DatabaseConfig.PASSWORD, password);
properties.setProperty(DatabaseConfig.STORAGE, "cosmos");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");

if (databasePrefix.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.scalar.db.storage.cosmos;

import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestWithCosmos
extends TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProps1(String testName) {
Properties properties = CosmosEnv.getProperties(testName);
properties.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "SERIALIZABLE");
return properties;
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in Cosmos DB")
public void scan_ScanWithOrderingGivenForCommittedRecord_ShouldReturnRecords() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitCrossPartitionScanIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class ConsensusCommitCrossPartitionScanIntegrationTestWithDynamo
extends ConsensusCommitCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProps(String testName) {
Properties properties = DynamoEnv.getProperties(testName);
properties.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "SERIALIZABLE");
return properties;
}

@Override
protected Map<String, String> getCreationOptions() {
return DynamoEnv.getCreationOptions();
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in DynamoDB")
public void scan_CrossPartitionScanWithOrderingGivenForCommittedRecord_ShouldReturnRecords() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.api.DistributedStorageCrossPartitionScanIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class DynamoCrossPartitionScanIntegrationTest
extends DistributedStorageCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return DynamoEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return DynamoEnv.getCreationOptions();
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in DynamoDB")
public void scan_WithOrderingForNonPrimaryColumns_ShouldReturnProperResult() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static Properties getProperties(String testName) {
properties.setProperty(DatabaseConfig.PASSWORD, secretAccessKey);
properties.setProperty(DatabaseConfig.STORAGE, "dynamo");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");

// Add testName as a metadata namespace suffix
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestWithDynamo
extends TwoPhaseConsensusCommitCrossPartitionScanIntegrationTestBase {

@Override
protected Properties getProps1(String testName) {
Properties properties = DynamoEnv.getProperties(testName);
properties.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "SERIALIZABLE");
return properties;
}

@Override
protected Map<String, String> getCreationOptions() {
return DynamoEnv.getCreationOptions();
}

@Test
@Override
@Disabled("Cross partition scan with ordering is not supported in DynamoDB")
public void scan_ScanWithOrderingGivenForCommittedRecord_ShouldReturnRecords() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ protected Put copyAndSetTargetToIfNot(Put put) {
protected Delete copyAndSetTargetToIfNot(Delete delete) {
return ScalarDbUtils.copyAndSetTargetToIfNot(delete, namespace, tableName);
}

protected Get copyAndPrepareForDynamicFiltering(Get get) {
return ScalarDbUtils.copyAndPrepareForDynamicFiltering(get);
}

protected Scan copyAndPrepareForDynamicFiltering(Scan scan) {
return ScalarDbUtils.copyAndPrepareForDynamicFiltering(scan);
}
}
88 changes: 88 additions & 0 deletions core/src/main/java/com/scalar/db/common/FilterableScanner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.scalar.db.common;

import com.google.errorprone.annotations.concurrent.LazyInit;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.Selection;
import com.scalar.db.api.Selection.Conjunction;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class FilterableScanner implements Scanner {

private final Scanner scanner;
private final List<String> projections;
private final Set<Conjunction> conjunctions;
@Nullable private Integer left = null;
@LazyInit private ScannerIterator scannerIterator;

public FilterableScanner(Selection selection, Scanner scanner) {
this.scanner = scanner;
this.projections = selection.getProjections();
this.conjunctions = selection.getConjunctions();
if (selection instanceof Scan) {
Scan scan = (Scan) selection;
this.left = scan.getLimit() > 0 ? scan.getLimit() : null;
}
}

@SuppressFBWarnings("DLS_DEAD_LOCAL_STORE")
@Override
public Optional<Result> one() throws ExecutionException {
if (left != null && left == 0) {
return Optional.empty();
}
while (true) {
Optional<Result> one = scanner.one();
if (one.isPresent()) {
if (ScalarDbUtils.columnsMatchAnyOfConjunctions(one.get().getColumns(), conjunctions)) {
if (left != null) {
left--;
}
return Optional.of(new ProjectedResult(one.get(), projections));
}
} else {
return Optional.empty();
}
}
}

@Override
public List<Result> all() throws ExecutionException {
List<Result> ret = new ArrayList<>();
while (true) {
Optional<Result> one = one();
if (!one.isPresent()) {
break;
}
ret.add(one.get());
}
return ret;
}

@Override
@Nonnull
public Iterator<Result> iterator() {
if (scannerIterator == null) {
scannerIterator = new ScannerIterator(this);
}
return scannerIterator;
}

@Override
public void close() throws IOException {
scanner.close();
}
}
Loading

0 comments on commit 693388b

Please sign in to comment.