Skip to content

Commit

Permalink
Add ScannerBuilders and ScannerManager to eventually replace the Scan…
Browse files Browse the repository at this point in the history
…nerFactory
  • Loading branch information
apmoriarty committed Oct 10, 2024
1 parent 279c8a0 commit 473fb6a
Show file tree
Hide file tree
Showing 16 changed files with 1,565 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -91,6 +92,8 @@
import datawave.query.planner.QueryPlan;
import datawave.query.tables.RangeStreamScanner;
import datawave.query.tables.ScannerFactory;
import datawave.query.tables.ScannerManager;
import datawave.query.tables.ScannerSessionBuilder;
import datawave.query.tables.SessionOptions;
import datawave.query.util.MetadataHelper;
import datawave.query.util.QueryScannerHelper;
Expand All @@ -114,7 +117,7 @@ public class RangeStream extends BaseVisitor implements CloseableIterable<QueryP
*/

protected final ShardQueryConfiguration config;
protected final ScannerFactory scanners;
protected final AccumuloClient client;
protected final MetadataHelper metadataHelper;
protected Iterator<QueryPlan> itr;
protected StreamContext context;
Expand Down Expand Up @@ -145,12 +148,28 @@ public class RangeStream extends BaseVisitor implements CloseableIterable<QueryP
protected boolean termCounts = false;

protected Set<String> indexOnlyFields = Sets.newHashSet();
protected ScannerManager scannerManager = new ScannerManager();

public RangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper metadataHelper) {
/**
* Deprecated, left in for reverse compatibility
*
* @param config
* the config
* @param scannerFactory
* the factory
* @param metadataHelper
* the metadata helper
*/
@Deprecated
public RangeStream(ShardQueryConfiguration config, ScannerFactory scannerFactory, MetadataHelper metadataHelper) {
this(config, config.getClient(), metadataHelper);
}

public RangeStream(ShardQueryConfiguration config, AccumuloClient client, MetadataHelper metadataHelper) {
this.config = config;
this.scanners = scanners;
this.client = client;
this.metadataHelper = metadataHelper;
int maxLookup = (int) Math.max(config.getNumIndexLookupThreads(), 1);
int maxLookup = Math.max(config.getNumIndexLookupThreads(), 1);
executor = Executors.newFixedThreadPool(maxLookup);
runnables = new LinkedBlockingDeque<>();
int executeLookupMin = Math.max(maxLookup / 2, 1);
Expand Down Expand Up @@ -557,7 +576,15 @@ public ScannerStream visit(ASTEQNode node, Object data) {

int stackStart = config.getBaseIteratorPriority();

RangeStreamScanner scannerSession;
// @formatter:off
RangeStreamScanner scannerSession = new ScannerSessionBuilder(client)
.withWrapper(RangeStreamScanner.class)
.withTableName(config.getIndexTableName())
.withAuths(config.getAuthorizations())
.withQuery(config.getQuery())
.build();
// @formatter:on
scannerManager.addScanner(scannerSession);

SessionOptions options = new SessionOptions();
options.fetchColumnFamily(new Text(fieldName));
Expand All @@ -570,8 +597,6 @@ public ScannerStream visit(ASTEQNode node, Object data) {

if (limitScanners) {
// Setup the CreateUidsIterator
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery());

uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass);
uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(collapseUids));
uidSetting.addOption(CreateUidsIterator.PARSE_TLD_UIDS, Boolean.toString(config.getParseTldUids()));
Expand All @@ -580,8 +605,6 @@ public ScannerStream visit(ASTEQNode node, Object data) {

} else {
// Setup so this is a pass-through
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery());

uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass);
uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(false));
uidSetting.addOption(CreateUidsIterator.PARSE_TLD_UIDS, Boolean.toString(false));
Expand Down Expand Up @@ -989,5 +1012,6 @@ protected boolean containsIndexOnlyFields(JexlNode node) throws TableNotFoundExc
public void close() {
streamExecutor.shutdownNow();
executor.shutdownNow();
scannerManager.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
import datawave.query.jexl.visitors.JexlStringBuildingVisitor;
import datawave.query.planner.DefaultQueryPlanner;
import datawave.query.planner.QueryPlan;
import datawave.query.tables.BatchScannerBuilder;
import datawave.query.tables.ScannerFactory;
import datawave.query.util.MetadataHelper;
import datawave.util.time.DateHelper;

public class ShardRangeStream extends RangeStream {

public ShardRangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper helper) {
super(config, scanners, helper);
super(config, config.getClient(), helper);
}

@Override
Expand All @@ -44,8 +45,16 @@ public CloseableIterable<QueryPlan> streamPlans(JexlNode node) {
String queryString = JexlStringBuildingVisitor.buildQuery(node);

int stackStart = config.getBaseIteratorPriority() + 40;
BatchScanner scanner = scanners.newScanner(config.getShardTableName(), config.getAuthorizations(), config.getNumQueryThreads(), config.getQuery(),
true);
// @formatter:off
BatchScanner scanner = new BatchScannerBuilder(client)
.withTableName(config.getShardTableName())
.withAuths(config.getAuthorizations())
.withNumThreads(config.getNumQueryThreads())
.withQuery(config.getQuery())
.build();
// @formatter:on

scannerManager.addScanner(scanner);

IteratorSetting cfg = new IteratorSetting(stackStart++, "query", FieldIndexOnlyQueryIterator.class);

Expand Down Expand Up @@ -97,7 +106,7 @@ public CloseableIterable<QueryPlan> streamPlans(JexlNode node) {

}

} catch (TableNotFoundException | DatawaveQueryException e) {
} catch (DatawaveQueryException e) {
throw new RuntimeException(e);
} finally {
// shut down the executor as all threads have completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
Expand Down Expand Up @@ -2893,8 +2894,8 @@ private RangeStream initializeRangeStream(ShardQueryConfiguration config, Scanne
try {
rstream = Class.forName(rangeStreamClass).asSubclass(RangeStream.class);

RangeStream stream = rstream.getConstructor(ShardQueryConfiguration.class, ScannerFactory.class, MetadataHelper.class).newInstance(config,
scannerFactory, metadataHelper);
RangeStream stream = rstream.getConstructor(ShardQueryConfiguration.class, AccumuloClient.class, MetadataHelper.class).newInstance(config,
config.getClient(), metadataHelper);

return stream.setUidIntersector(uidIntersector).setLimitScanners(limitScanners).setCreateUidsIteratorClass(createUidsIteratorClass);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package datawave.query.tables;

import java.util.Map;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.security.Authorizations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

import datawave.microservice.query.Query;
import datawave.query.iterator.QueryInformationIterator;
import datawave.query.util.QueryInformation;
import datawave.security.util.ScannerHelper;

/**
* Builder for an Accumulo {@link BatchScanner}
*/
public class BatchScannerBuilder {

private static final Logger log = LoggerFactory.getLogger(BatchScannerBuilder.class);

private static final int DEFAULT_NUM_THREADS = 8;
private int numThreads = DEFAULT_NUM_THREADS;

private String tableName;
private Set<Authorizations> auths;
private Query query;
private ScannerBase.ConsistencyLevel level;
private Map<String,String> hints;

private final AccumuloClient client;

public BatchScannerBuilder(AccumuloClient client) {
Preconditions.checkNotNull(client);
this.client = client;
}

/**
* Required parameter
*
* @param tableName
* the table name
* @return the builder
*/
public BatchScannerBuilder withTableName(String tableName) {
this.tableName = tableName;
return this;
}

/**
* Required parameter
*
* @param auths
* the authorizations
* @return the builder
*/
public BatchScannerBuilder withAuths(Set<Authorizations> auths) {
this.auths = auths;
return this;
}

/**
* Required parameter
*
* @param numThreads
* the number of threads
* @return the builder
*/
public BatchScannerBuilder withNumThreads(int numThreads) {
this.numThreads = numThreads;
return this;
}

/**
* Optional parameter
*
* @param query
* a {@link Query} instance
* @return the builder
*/
public BatchScannerBuilder withQuery(Query query) {
this.query = query;
return this;
}

/**
* Optional parameter
*
* @param level
* the {@link ScannerBase.ConsistencyLevel}
* @return the builder
*/
public BatchScannerBuilder withConsistencyLevel(ScannerBase.ConsistencyLevel level) {
this.level = level;
return this;
}

/**
* Optional parameter
*
* @param hints
* a map of execution hints
* @return the builder
*/
public BatchScannerBuilder withExecutionHints(Map<String,String> hints) {
this.hints = hints;
return this;
}

/**
* Build the {@link BatchScanner}, setting any optional configs if necessary
*
* @return a Scanner
*/
public BatchScanner build() {
Preconditions.checkNotNull(tableName, "TableName must be set");
Preconditions.checkNotNull(auths, "Authorizations must be set");

try {
BatchScanner scanner = ScannerHelper.createBatchScanner(client, tableName, auths, numThreads);

if (query != null) {
QueryInformation info = new QueryInformation(query, query.getQuery());
IteratorSetting setting = new IteratorSetting(Integer.MAX_VALUE, QueryInformationIterator.class, info.toMap());
scanner.addScanIterator(setting);
}

if (level != null) {
scanner.setConsistencyLevel(level);
}

if (hints != null) {
scanner.setExecutionHints(hints);
}

return scanner;
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
}

public int getNumThreads() {
return numThreads;
}

public String getTableName() {
return tableName;
}

public Set<Authorizations> getAuths() {
return auths;
}

public Query getQuery() {
return query;
}

public ScannerBase.ConsistencyLevel getLevel() {
return level;
}

public Map<String,String> getHints() {
return hints;
}

}
Loading

0 comments on commit 473fb6a

Please sign in to comment.