From 473fb6a0228091e0cc7fd473f1a32951127af11f Mon Sep 17 00:00:00 2001 From: Moriarty <22225248+apmoriarty@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:35:39 +0000 Subject: [PATCH] Add ScannerBuilders and ScannerManager to eventually replace the ScannerFactory --- .../query/index/lookup/RangeStream.java | 42 ++- .../query/index/lookup/ShardRangeStream.java | 17 +- .../query/planner/DefaultQueryPlanner.java | 5 +- .../query/tables/BatchScannerBuilder.java | 173 ++++++++++ .../query/tables/RangeStreamScanner.java | 28 +- .../datawave/query/tables/ScannerBuilder.java | 155 +++++++++ .../datawave/query/tables/ScannerFactory.java | 2 +- .../datawave/query/tables/ScannerManager.java | 92 +++++ .../query/tables/ScannerSessionBuilder.java | 243 ++++++++++++++ .../index/lookup/RangeStreamQueryTest.java | 4 +- .../query/index/lookup/RangeStreamTest.java | 5 +- .../query/index/lookup/RangeStreamTestX.java | 6 +- .../query/tables/BatchScannerBuilderTest.java | 156 +++++++++ .../query/tables/ScannerBuilderTest.java | 157 +++++++++ .../query/tables/ScannerManagerTest.java | 313 ++++++++++++++++++ .../tables/ScannerSessionBuilderTest.java | 203 ++++++++++++ 16 files changed, 1565 insertions(+), 36 deletions(-) create mode 100644 warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerBuilder.java create mode 100644 warehouse/query-core/src/main/java/datawave/query/tables/ScannerBuilder.java create mode 100644 warehouse/query-core/src/main/java/datawave/query/tables/ScannerManager.java create mode 100644 warehouse/query-core/src/main/java/datawave/query/tables/ScannerSessionBuilder.java create mode 100644 warehouse/query-core/src/test/java/datawave/query/tables/BatchScannerBuilderTest.java create mode 100644 warehouse/query-core/src/test/java/datawave/query/tables/ScannerBuilderTest.java create mode 100644 warehouse/query-core/src/test/java/datawave/query/tables/ScannerManagerTest.java create mode 100644 warehouse/query-core/src/test/java/datawave/query/tables/ScannerSessionBuilderTest.java diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java index 0eb3fe6b144..f0df4ee4674 100644 --- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; @@ -91,6 +92,8 @@ import datawave.query.planner.QueryPlan; import datawave.query.tables.RangeStreamScanner; import datawave.query.tables.ScannerFactory; +import datawave.query.tables.ScannerManager; +import datawave.query.tables.ScannerSessionBuilder; import datawave.query.tables.SessionOptions; import datawave.query.util.MetadataHelper; import datawave.query.util.QueryScannerHelper; @@ -114,7 +117,7 @@ public class RangeStream extends BaseVisitor implements CloseableIterable itr; protected StreamContext context; @@ -145,12 +148,28 @@ public class RangeStream extends BaseVisitor implements CloseableIterable indexOnlyFields = Sets.newHashSet(); + protected ScannerManager scannerManager = new ScannerManager(); - public RangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper metadataHelper) { + /** + * Deprecated, left in for reverse compatibility + * + * @param config + * the config + * @param scannerFactory + * the factory + * @param metadataHelper + * the metadata helper + */ + @Deprecated + public RangeStream(ShardQueryConfiguration config, ScannerFactory scannerFactory, MetadataHelper metadataHelper) { + this(config, config.getClient(), metadataHelper); + } + + public RangeStream(ShardQueryConfiguration config, AccumuloClient client, MetadataHelper metadataHelper) { this.config = config; - this.scanners = scanners; + this.client = client; this.metadataHelper = metadataHelper; - int maxLookup = (int) Math.max(config.getNumIndexLookupThreads(), 1); + int maxLookup = Math.max(config.getNumIndexLookupThreads(), 1); executor = Executors.newFixedThreadPool(maxLookup); runnables = new LinkedBlockingDeque<>(); int executeLookupMin = Math.max(maxLookup / 2, 1); @@ -557,7 +576,15 @@ public ScannerStream visit(ASTEQNode node, Object data) { int stackStart = config.getBaseIteratorPriority(); - RangeStreamScanner scannerSession; + // @formatter:off + RangeStreamScanner scannerSession = new ScannerSessionBuilder(client) + .withWrapper(RangeStreamScanner.class) + .withTableName(config.getIndexTableName()) + .withAuths(config.getAuthorizations()) + .withQuery(config.getQuery()) + .build(); + // @formatter:on + scannerManager.addScanner(scannerSession); SessionOptions options = new SessionOptions(); options.fetchColumnFamily(new Text(fieldName)); @@ -570,8 +597,6 @@ public ScannerStream visit(ASTEQNode node, Object data) { if (limitScanners) { // Setup the CreateUidsIterator - scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery()); - uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass); uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(collapseUids)); uidSetting.addOption(CreateUidsIterator.PARSE_TLD_UIDS, Boolean.toString(config.getParseTldUids())); @@ -580,8 +605,6 @@ public ScannerStream visit(ASTEQNode node, Object data) { } else { // Setup so this is a pass-through - scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery()); - uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass); uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(false)); uidSetting.addOption(CreateUidsIterator.PARSE_TLD_UIDS, Boolean.toString(false)); @@ -989,5 +1012,6 @@ protected boolean containsIndexOnlyFields(JexlNode node) throws TableNotFoundExc public void close() { streamExecutor.shutdownNow(); executor.shutdownNow(); + scannerManager.close(); } } diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java index 2b437ea61c5..a4a617b35ae 100644 --- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java @@ -28,6 +28,7 @@ import datawave.query.jexl.visitors.JexlStringBuildingVisitor; import datawave.query.planner.DefaultQueryPlanner; import datawave.query.planner.QueryPlan; +import datawave.query.tables.BatchScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.query.util.MetadataHelper; import datawave.util.time.DateHelper; @@ -35,7 +36,7 @@ public class ShardRangeStream extends RangeStream { public ShardRangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper helper) { - super(config, scanners, helper); + super(config, config.getClient(), helper); } @Override @@ -44,8 +45,16 @@ public CloseableIterable streamPlans(JexlNode node) { String queryString = JexlStringBuildingVisitor.buildQuery(node); int stackStart = config.getBaseIteratorPriority() + 40; - BatchScanner scanner = scanners.newScanner(config.getShardTableName(), config.getAuthorizations(), config.getNumQueryThreads(), config.getQuery(), - true); + // @formatter:off + BatchScanner scanner = new BatchScannerBuilder(client) + .withTableName(config.getShardTableName()) + .withAuths(config.getAuthorizations()) + .withNumThreads(config.getNumQueryThreads()) + .withQuery(config.getQuery()) + .build(); + // @formatter:on + + scannerManager.addScanner(scanner); IteratorSetting cfg = new IteratorSetting(stackStart++, "query", FieldIndexOnlyQueryIterator.class); @@ -97,7 +106,7 @@ public CloseableIterable streamPlans(JexlNode node) { } - } catch (TableNotFoundException | DatawaveQueryException e) { + } catch (DatawaveQueryException e) { throw new RuntimeException(e); } finally { // shut down the executor as all threads have completed diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java index 6df09d7646c..54cc57442ea 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java @@ -31,6 +31,7 @@ import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; +import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; @@ -2893,8 +2894,8 @@ private RangeStream initializeRangeStream(ShardQueryConfiguration config, Scanne try { rstream = Class.forName(rangeStreamClass).asSubclass(RangeStream.class); - RangeStream stream = rstream.getConstructor(ShardQueryConfiguration.class, ScannerFactory.class, MetadataHelper.class).newInstance(config, - scannerFactory, metadataHelper); + RangeStream stream = rstream.getConstructor(ShardQueryConfiguration.class, AccumuloClient.class, MetadataHelper.class).newInstance(config, + config.getClient(), metadataHelper); return stream.setUidIntersector(uidIntersector).setLimitScanners(limitScanners).setCreateUidsIteratorClass(createUidsIteratorClass); diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerBuilder.java b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerBuilder.java new file mode 100644 index 00000000000..e54255b43ca --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerBuilder.java @@ -0,0 +1,173 @@ +package datawave.query.tables; + +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.security.Authorizations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import datawave.microservice.query.Query; +import datawave.query.iterator.QueryInformationIterator; +import datawave.query.util.QueryInformation; +import datawave.security.util.ScannerHelper; + +/** + * Builder for an Accumulo {@link BatchScanner} + */ +public class BatchScannerBuilder { + + private static final Logger log = LoggerFactory.getLogger(BatchScannerBuilder.class); + + private static final int DEFAULT_NUM_THREADS = 8; + private int numThreads = DEFAULT_NUM_THREADS; + + private String tableName; + private Set auths; + private Query query; + private ScannerBase.ConsistencyLevel level; + private Map hints; + + private final AccumuloClient client; + + public BatchScannerBuilder(AccumuloClient client) { + Preconditions.checkNotNull(client); + this.client = client; + } + + /** + * Required parameter + * + * @param tableName + * the table name + * @return the builder + */ + public BatchScannerBuilder withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + /** + * Required parameter + * + * @param auths + * the authorizations + * @return the builder + */ + public BatchScannerBuilder withAuths(Set auths) { + this.auths = auths; + return this; + } + + /** + * Required parameter + * + * @param numThreads + * the number of threads + * @return the builder + */ + public BatchScannerBuilder withNumThreads(int numThreads) { + this.numThreads = numThreads; + return this; + } + + /** + * Optional parameter + * + * @param query + * a {@link Query} instance + * @return the builder + */ + public BatchScannerBuilder withQuery(Query query) { + this.query = query; + return this; + } + + /** + * Optional parameter + * + * @param level + * the {@link ScannerBase.ConsistencyLevel} + * @return the builder + */ + public BatchScannerBuilder withConsistencyLevel(ScannerBase.ConsistencyLevel level) { + this.level = level; + return this; + } + + /** + * Optional parameter + * + * @param hints + * a map of execution hints + * @return the builder + */ + public BatchScannerBuilder withExecutionHints(Map hints) { + this.hints = hints; + return this; + } + + /** + * Build the {@link BatchScanner}, setting any optional configs if necessary + * + * @return a Scanner + */ + public BatchScanner build() { + Preconditions.checkNotNull(tableName, "TableName must be set"); + Preconditions.checkNotNull(auths, "Authorizations must be set"); + + try { + BatchScanner scanner = ScannerHelper.createBatchScanner(client, tableName, auths, numThreads); + + if (query != null) { + QueryInformation info = new QueryInformation(query, query.getQuery()); + IteratorSetting setting = new IteratorSetting(Integer.MAX_VALUE, QueryInformationIterator.class, info.toMap()); + scanner.addScanIterator(setting); + } + + if (level != null) { + scanner.setConsistencyLevel(level); + } + + if (hints != null) { + scanner.setExecutionHints(hints); + } + + return scanner; + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + } + + public int getNumThreads() { + return numThreads; + } + + public String getTableName() { + return tableName; + } + + public Set getAuths() { + return auths; + } + + public Query getQuery() { + return query; + } + + public ScannerBase.ConsistencyLevel getLevel() { + return level; + } + + public Map getHints() { + return hints; + } + +} diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java b/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java index 94a332e9772..bd7fa22187b 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java @@ -19,6 +19,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; @@ -88,7 +89,8 @@ public class RangeStreamScanner extends ScannerSession implements Callable auths, ResourceQ readLock = queueLock.readLock(); writeLock = queueLock.writeLock(); myExecutor = MoreExecutors.newDirectExecutorService(); - if (null != stats) + if (null != stats) { initializeTimers(); + } } public RangeStreamScanner(String tableName, Set auths, ResourceQueue delegator, int maxResults, Query settings, SessionOptions options, @@ -118,8 +121,9 @@ public RangeStreamScanner(String tableName, Set auths, ResourceQ readLock = queueLock.readLock(); writeLock = queueLock.writeLock(); myExecutor = MoreExecutors.newDirectExecutorService(); - if (null != stats) + if (null != stats) { initializeTimers(); + } } public RangeStreamScanner(ScannerSession other) { @@ -130,8 +134,8 @@ public void setExecutor(ExecutorService service) { myExecutor = service; } - public RangeStreamScanner setScannerFactory(ScannerFactory factory) { - this.scannerFactory = factory; + public RangeStreamScanner setAccumuloClient(AccumuloClient client) { + this.client = client; return this; } @@ -684,7 +688,8 @@ protected void findTop() throws Exception { if (null != stats) stats.getTimer(TIMERS.SCANNER_START).resume(); - baseScanner = scannerFactory.newSingleScanner(tableName, auths, settings); + baseScanner = new ScannerBuilder(client).withTableName(tableName).withAuths(auths).build(); + scannerManager.addScanner(baseScanner); if (baseScanner instanceof Scanner) ((Scanner) baseScanner).setReadaheadThreshold(Long.MAX_VALUE); @@ -768,8 +773,11 @@ else if (baseScanner instanceof RfileScanner) * we've finished with this range. As a result, we set lastSeenKey to null, so that on our next pass through, we pop the next range from the queue * and continue or finish. We're going to timeslice and come back as know this range is likely finished. */ - if (log.isTraceEnabled()) + if (log.isTraceEnabled()) { log.trace(lastSeenKey + " is lastSeenKey, previous range is " + currentRange, e); + } + + log.error(e); // adding the same iterator twice will also cause an IllegalArgumentException lastSeenKey = null; @@ -780,10 +788,12 @@ else if (baseScanner instanceof RfileScanner) } finally { - if (null != stats) + if (null != stats) { stats.getTimer(TIMERS.SCANNER_START).suspend(); + } + + scannerManager.close(baseScanner); - scannerFactory.close(baseScanner); // no point in running again if (ranges.isEmpty() && lastSeenKey == null) { finished = true; diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerBuilder.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerBuilder.java new file mode 100644 index 00000000000..905fc9e74c2 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerBuilder.java @@ -0,0 +1,155 @@ +package datawave.query.tables; + +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.security.Authorizations; + +import com.google.common.base.Preconditions; + +import datawave.microservice.query.Query; +import datawave.query.iterator.QueryInformationIterator; +import datawave.query.util.QueryInformation; +import datawave.security.util.ScannerHelper; + +/** + * Builder for an Accumulo {@link Scanner} + */ +public class ScannerBuilder { + + private String tableName; + private Set auths; + private Query query; + private ConsistencyLevel level; + private Map hints; + + private final AccumuloClient client; + + /** + * Instantiates the builder using the provided accumulo client + * + * @param client + * the accumulo client + */ + public ScannerBuilder(AccumuloClient client) { + Preconditions.checkNotNull(client, "AccumuloClient must be set"); + this.client = client; + } + + /** + * Required parameter + * + * @param tableName + * the table name + * @return the builder + */ + public ScannerBuilder withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + /** + * Required parameter + * + * @param auths + * the authorizations + * @return the builder + */ + public ScannerBuilder withAuths(Set auths) { + this.auths = auths; + return this; + } + + /** + * Optional parameter + * + * @param query + * a {@link Query} instance + * @return the builder + */ + public ScannerBuilder withQuery(Query query) { + this.query = query; + return this; + } + + /** + * Optional parameter + * + * @param level + * the {@link ConsistencyLevel} + * @return the builder + */ + public ScannerBuilder withConsistencyLevel(ConsistencyLevel level) { + this.level = level; + return this; + } + + /** + * Optional parameter + * + * @param hints + * a map of execution hints + * @return the builder + */ + public ScannerBuilder withExecutionHints(Map hints) { + this.hints = hints; + return this; + } + + /** + * Build the {@link Scanner}, setting any optional configs if necessary + * + * @return a Scanner + */ + public Scanner build() { + Preconditions.checkNotNull(tableName, "TableName must be set"); + Preconditions.checkNotNull(auths, "Authorizations must be set"); + + try { + Scanner scanner = ScannerHelper.createScanner(client, tableName, auths); + + if (query != null) { + QueryInformation info = new QueryInformation(query, query.getQuery()); + IteratorSetting setting = new IteratorSetting(Integer.MAX_VALUE, QueryInformationIterator.class, info.toMap()); + scanner.addScanIterator(setting); + } + + if (level != null) { + scanner.setConsistencyLevel(level); + } + + if (hints != null) { + scanner.setExecutionHints(hints); + } + + return scanner; + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + } + + public String getTableName() { + return tableName; + } + + public Set getAuths() { + return auths; + } + + public Query getQuery() { + return query; + } + + public ConsistencyLevel getLevel() { + return level; + } + + public Map getHints() { + return hints; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java index 5366418ec48..6bb3d2f20f9 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java @@ -310,7 +310,7 @@ public RangeStreamScanner newRangeScanner(final String tableName, final Set auths, Query query, int shardsPerDayThreshold) throws Exception { - return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setScannerFactory(this); + return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setAccumuloClient(client); } public boolean close(ScannerBase bs) { diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerManager.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerManager.java new file mode 100644 index 00000000000..a22e357193f --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerManager.java @@ -0,0 +1,92 @@ +package datawave.query.tables; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.client.ScannerBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages instances of both {@link ScannerSession} and {@link ScannerBase}. + */ +public class ScannerManager { + + private static final Logger log = LoggerFactory.getLogger(ScannerManager.class); + + protected final Set baseInstances = Collections.synchronizedSet(new HashSet<>()); + protected final Set sessionInstances = Collections.synchronizedSet(new HashSet<>()); + + public ScannerManager() { + // empty constructor + } + + /** + * Adds a {@link ScannerBase} + * + * @param scanner + * a ScannerBase instance + */ + public void addScanner(ScannerBase scanner) { + log.trace("Adding scanner {}", scanner); + this.baseInstances.add(scanner); + } + + /** + * Adds a {@link ScannerSession} + * + * @param scanner + * a ScannerSession instance + */ + public void addScanner(ScannerSession scanner) { + log.trace("Adding scanner {}", scanner); + this.sessionInstances.add(scanner); + } + + /** + * Closes a {@link ScannerBase} if it is tracked + * + * @param scanner + * a ScannerBase + */ + public void close(ScannerBase scanner) { + if (baseInstances.remove(scanner)) { + scanner.close(); + log.debug("Closed scanner {}", scanner); + } else { + log.warn("ScannerManager was asked to close untracked scanner base: {}", scanner); + } + } + + /** + * Closes a {@link ScannerSession} if it is tracked + * + * @param scanner + * a ScannerSession + */ + public void close(ScannerSession scanner) { + if (sessionInstances.remove(scanner)) { + scanner.close(); + log.debug("Closed scanner: {}", scanner); + } else { + log.warn("ScannerManager was asked to close untracked scanner session: {}", scanner); + } + } + + /** + * Closes all scanners tracked by the {@link ScannerManager} + */ + public void close() { + log.trace("ScannerManager asked to close all tracked scanners"); + + for (ScannerBase scanner : new HashSet<>(baseInstances)) { + close(scanner); + } + + for (ScannerSession scanner : new HashSet<>(sessionInstances)) { + close(scanner); + } + } + +} diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSessionBuilder.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSessionBuilder.java new file mode 100644 index 00000000000..202801cd253 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSessionBuilder.java @@ -0,0 +1,243 @@ +package datawave.query.tables; + +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.security.Authorizations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import datawave.microservice.query.Query; +import datawave.query.tables.stats.ScanSessionStats; + +/** + * Builder for a Datawave {@link ScannerSession} + */ +public class ScannerSessionBuilder { + + private static final Logger log = LoggerFactory.getLogger(ScannerSessionBuilder.class); + + private static final int DEFAULT_SCAN_QUEUE_SIZE = 100; + private static final int DEFAULT_RESULT_QUEUE_SIZE = 1_000; + + private int numScanResources = DEFAULT_SCAN_QUEUE_SIZE; + private int resultQueueSize = DEFAULT_RESULT_QUEUE_SIZE; + + private Class wrapper; + private boolean statsEnabled = false; + + private String tableName; + private Set auths; + private Query query; + private ScannerBase.ConsistencyLevel level; + private Map hints; + + private final AccumuloClient client; + + public ScannerSessionBuilder(AccumuloClient client) { + Preconditions.checkNotNull(client, "AccumuloClient must be set"); + this.client = client; + } + + /** + * Optional Parameter, sets the number of threads used by the scanner session + * + * @param numThreads + * the number of threads + * @return the builder + */ + public ScannerSessionBuilder withNumThreads(int numThreads) { + this.numScanResources = numThreads; + return this; + } + + /** + * Optional Parameter, sets the result queue size + * + * @param resultQueueSize + * the result queue size + * @return the builder + */ + public ScannerSessionBuilder withResultQueueSize(int resultQueueSize) { + this.resultQueueSize = resultQueueSize; + return this; + } + + /** + * Required Parameter, the type of ScannerSession to build + * + * @param wrapper + * the scanner session class + * @return the builder + */ + public ScannerSessionBuilder withWrapper(Class wrapper) { + this.wrapper = wrapper; + return this; + } + + /** + * Optional Parameter, should the scanner session record stats + * + * @param statsEnabled + * flag that determines if stats are recorded + * @return the builder + */ + public ScannerSessionBuilder withStats(boolean statsEnabled) { + this.statsEnabled = statsEnabled; + return this; + } + + /** + * Required parameter + * + * @param tableName + * the table name + * @return the builder + */ + public ScannerSessionBuilder withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + /** + * Required parameter + * + * @param auths + * the authorizations + * @return the builder + */ + public ScannerSessionBuilder withAuths(Set auths) { + this.auths = auths; + return this; + } + + /** + * Optional parameter + * + * @param query + * a {@link Query} instance + * @return the builder + */ + public ScannerSessionBuilder withQuery(Query query) { + this.query = query; + return this; + } + + /** + * Optional parameter + * + * @param level + * the {@link ScannerBase.ConsistencyLevel} + * @return the builder + */ + public ScannerSessionBuilder withConsistencyLevel(ScannerBase.ConsistencyLevel level) { + this.level = level; + return this; + } + + /** + * Optional parameter + * + * @param hints + * a map of execution hints + * @return the builder + */ + public ScannerSessionBuilder withExecutionHints(Map hints) { + this.hints = hints; + return this; + } + + /** + * Build the {@link Scanner}, setting any optional configs if necessary + * + * @return a Scanner + */ + public T build() { + ResourceQueue resourceQueue; + Preconditions.checkNotNull(tableName, "TableName must be set"); + Preconditions.checkNotNull(auths, "Authorizations must be set"); + Preconditions.checkNotNull(wrapper, "ScannerSession type must be set"); + + try { + + resourceQueue = new ResourceQueue(numScanResources, client); + + log.debug("Creating ScannerSession with {} threads", resourceQueue.getCapacity()); + + T session; + if (wrapper == ScannerSession.class) { + session = (T) new ScannerSession(tableName, auths, resourceQueue, resultQueueSize, query); + } else { + session = (T) wrapper.getConstructor(ScannerSession.class) + .newInstance(new ScannerSession(tableName, auths, resourceQueue, resultQueueSize, query)); + } + + if (session instanceof RangeStreamScanner) { + // deal with the funkitude + ((RangeStreamScanner) session).setAccumuloClient(client); + } + + if (statsEnabled) { + session.applyStats(new ScanSessionStats()); + } + + if (level != null) { + SessionOptions options = session.getOptions(); + options.setConsistencyLevel(level); + session.setOptions(options); + } + + if (hints != null) { + SessionOptions options = session.getOptions(); + options.setExecutionHints(hints); + session.setOptions(options); + } + + return session; + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + public int getNumScanResources() { + return numScanResources; + } + + public int getResultQueueSize() { + return resultQueueSize; + } + + public Class getWrapper() { + return wrapper; + } + + public boolean isStatsEnabled() { + return statsEnabled; + } + + public String getTableName() { + return tableName; + } + + public Set getAuths() { + return auths; + } + + public Query getQuery() { + return query; + } + + public ScannerBase.ConsistencyLevel getLevel() { + return level; + } + + public Map getHints() { + return hints; + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java index 6e0a43fbd3e..ffbcd6dfeaf 100644 --- a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java @@ -49,7 +49,6 @@ import datawave.query.jexl.JexlNodeFactory; import datawave.query.jexl.visitors.TreeEqualityVisitor; import datawave.query.planner.QueryPlan; -import datawave.query.tables.ScannerFactory; import datawave.query.util.MockMetadataHelper; /** @@ -382,8 +381,7 @@ private void test(String query, String expected, QUERY_CONTEXT queryContext, TER // Run a standard limited-scanner range stream. count++; - ScannerFactory scannerFactory = new ScannerFactory(config); - rangeStream = new RangeStream(config, scannerFactory, helper); + rangeStream = new RangeStream(config, config.getClient(), helper); rangeStream.setLimitScanners(true); script = JexlASTHelper.parseAndFlattenJexlQuery(query); diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java index 1e262bf188f..149dee079fb 100644 --- a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java @@ -50,7 +50,6 @@ import datawave.query.jexl.JexlASTHelper; import datawave.query.jexl.visitors.JexlStringBuildingVisitor; import datawave.query.planner.QueryPlan; -import datawave.query.tables.ScannerFactory; import datawave.query.util.MetadataHelper; import datawave.query.util.MockMetadataHelper; import datawave.query.util.Tuple2; @@ -1266,7 +1265,6 @@ public void testIntersection_HighAndLowCardinality_withSeek() throws Exception { Range range3 = makeTestRange("20190315_49", "datatype1\u0000a.b.c"); Set expectedRanges = Sets.newHashSet(range1, range2, range3); - ScannerFactory scannerFactory = new ScannerFactory(config); RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); @@ -1533,7 +1531,6 @@ public void testIntersection_ofDayRangesAndShardRange() throws Exception { } private RangeStream getRangeStream(MetadataHelper helper) { - ScannerFactory scannerFactory = new ScannerFactory(config); - return new RangeStream(config, scannerFactory, helper); + return new RangeStream(config, config.getClient(), helper); } } diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java index ef440984a4d..a4d186d188e 100644 --- a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java @@ -42,7 +42,6 @@ import datawave.query.jexl.visitors.JexlStringBuildingVisitor; import datawave.query.jexl.visitors.TreeEqualityVisitor; import datawave.query.planner.QueryPlan; -import datawave.query.tables.ScannerFactory; import datawave.query.util.MockMetadataHelper; /** @@ -3430,13 +3429,12 @@ private void runTest(String query, List expectedRanges, List expe helper.setIndexedFields(dataTypes.keySet()); // Run a standard limited-scanner range stream. - ScannerFactory scannerFactory = new ScannerFactory(config); - RangeStream rangeStream = new RangeStream(config, scannerFactory, helper); + RangeStream rangeStream = new RangeStream(config, config.getClient(), helper); rangeStream.setLimitScanners(true); runTest(rangeStream, script, expectedRanges, expectedQueries); // Run a default range stream. - rangeStream = new RangeStream(config, scannerFactory, helper); + rangeStream = new RangeStream(config, config.getClient(), helper); rangeStream.setLimitScanners(false); runTest(rangeStream, script, expectedRanges, expectedQueries); diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/BatchScannerBuilderTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/BatchScannerBuilderTest.java new file mode 100644 index 00000000000..4818d4cf604 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/BatchScannerBuilderTest.java @@ -0,0 +1,156 @@ +package datawave.query.tables; + +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.security.Authorizations; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.microservice.query.Query; +import datawave.microservice.query.QueryImpl; +import datawave.util.TableName; + +public class BatchScannerBuilderTest { + + private static final InMemoryInstance instance = new InMemoryInstance(BatchScannerBuilderTest.class.getSimpleName()); + private static AccumuloClient client; + + private final String tableName = TableName.SHARD; + private final Set auths = Collections.singleton(new Authorizations("a", "b", "c")); + private final ConsistencyLevel level = ConsistencyLevel.IMMEDIATE; + + private BatchScanner scanner; + private BatchScannerBuilder builder; + + @BeforeAll + public static void setup() throws AccumuloSecurityException, AccumuloException, TableExistsException { + client = new InMemoryAccumuloClient("user", instance); + client.tableOperations().create(TableName.SHARD); + } + + @BeforeEach + public void beforeEach() { + scanner = null; + builder = null; + } + + @AfterEach + public void afterEach() { + if (scanner != null) { + scanner.close(); + } + } + + @Test + public void testBuilderWithNullClient() { + assertThrows(NullPointerException.class, () -> new BatchScannerBuilder(null)); + } + + @Test + public void testBuilderWithNullTableName() { + builder = new BatchScannerBuilder(client).withTableName(null); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderForTableThatDoesNotExist() { + builder = new BatchScannerBuilder(client).withTableName("whereiswaldo"); + assertThrows(RuntimeException.class, builder::build); + } + + @Test + public void testBuilderWithNullAuths() { + builder = new BatchScannerBuilder(client).withTableName(tableName).withAuths(null); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderWithMinimalArgs() { + builder = new BatchScannerBuilder(client).withTableName(tableName).withAuths(auths); + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(8, builder.getNumThreads()); + assertNull(builder.getQuery()); + assertNull(builder.getLevel()); + assertNull(builder.getHints()); + + scanner = builder.build(); + assertEquals(level, scanner.getConsistencyLevel()); + } + + @Test + public void testBuilderWithQuery() { + builder = new BatchScannerBuilder(client).withTableName(tableName).withAuths(auths).withQuery(getQuery()); + + assertEquals(getQuery(), builder.getQuery()); + + scanner = builder.build(); + } + + @Test + public void testBuilderWithConsistencyLevel() { + builder = new BatchScannerBuilder(client).withTableName(tableName).withAuths(auths).withConsistencyLevel(ConsistencyLevel.EVENTUAL); + + assertEquals(ConsistencyLevel.EVENTUAL, builder.getLevel()); + + scanner = builder.build(); + } + + @Test + public void testBuilderWithExecutionHints() { + builder = new BatchScannerBuilder(client).withTableName(tableName).withAuths(auths).withExecutionHints(getHints()); + + assertEquals(getHints(), builder.getHints()); + + scanner = builder.build(); + } + + @Test + public void testFullConfiguration() { + // @formatter:off + builder = new BatchScannerBuilder(client) + .withTableName(tableName) + .withAuths(auths) + .withNumThreads(8) + .withConsistencyLevel(ConsistencyLevel.EVENTUAL) + .withExecutionHints(getHints()); + // @formatter:on + + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(8, builder.getNumThreads()); + assertEquals(ConsistencyLevel.EVENTUAL, builder.getLevel()); + assertEquals(getHints(), builder.getHints()); + + scanner = builder.build(); + } + + private Query getQuery() { + QueryImpl query = new QueryImpl(); + query.setQuery("FOO == 'bar'"); + return query; + } + + private Map getHints() { + Map hints = new HashMap<>(); + hints.put("tableName", "executor-pool"); + return hints; + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/ScannerBuilderTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerBuilderTest.java new file mode 100644 index 00000000000..589cbae4926 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerBuilderTest.java @@ -0,0 +1,157 @@ +package datawave.query.tables; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.security.Authorizations; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.microservice.query.Query; +import datawave.microservice.query.QueryImpl; +import datawave.util.TableName; + +/** + * Test for the {@link ScannerBuilder} + */ +public class ScannerBuilderTest { + + private static final InMemoryInstance instance = new InMemoryInstance(ScannerBuilderTest.class.getSimpleName()); + private static AccumuloClient client; + + private final String tableName = TableName.SHARD; + private final Set auths = Collections.singleton(new Authorizations("a", "b", "c")); + private final ConsistencyLevel level = ConsistencyLevel.IMMEDIATE; + + private Scanner scanner; + private ScannerBuilder builder; + + @BeforeAll + public static void setup() throws AccumuloSecurityException, AccumuloException, TableExistsException { + client = new InMemoryAccumuloClient("user", instance); + client.tableOperations().create(TableName.SHARD); + } + + @BeforeEach + public void beforeEach() { + scanner = null; + builder = null; + } + + @AfterEach + public void afterEach() { + if (scanner != null) { + scanner.close(); + } + } + + @Test + public void testBuilderWithNullClient() { + assertThrows(NullPointerException.class, () -> new ScannerBuilder(null)); + } + + @Test + public void testBuilderWithNullTableName() { + builder = new ScannerBuilder(client).withTableName(null); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderForTableThatDoesNotExist() { + builder = new ScannerBuilder(client).withTableName("MrMcDoesntExist"); + assertThrows(RuntimeException.class, builder::build); + } + + @Test + public void testBuilderWithNullAuths() { + builder = new ScannerBuilder(client).withTableName(tableName).withAuths(null); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderWithMinimalArgs() { + builder = new ScannerBuilder(client).withTableName(tableName).withAuths(auths); + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertNull(builder.getLevel()); + + scanner = builder.build(); + assertEquals(level, scanner.getConsistencyLevel()); + } + + @Test + public void testBuilderWithQuery() { + Query query = getQuery(); + builder = new ScannerBuilder(client).withTableName(tableName).withAuths(auths).withQuery(query); + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(query, builder.getQuery()); + scanner = builder.build(); + } + + @Test + public void testBuilderWithConsistencyLevel() { + builder = new ScannerBuilder(client).withTableName(tableName).withAuths(auths).withConsistencyLevel(level); + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(level, builder.getLevel()); + scanner = builder.build(); + } + + @Test + public void testBuilderWithExecutionHints() { + builder = new ScannerBuilder(client).withTableName(tableName).withAuths(auths).withExecutionHints(getHints()); + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(getHints(), builder.getHints()); + scanner = builder.build(); + } + + @Test + public void testBuilderWithAllOptions() { + // @formatter:off + builder = new ScannerBuilder(client) + .withTableName(tableName) + .withAuths(auths) + .withQuery(getQuery()) + .withConsistencyLevel(level) + .withExecutionHints(getHints()); + // @formatter:on + + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(getQuery(), builder.getQuery()); + assertEquals(level, builder.getLevel()); + assertEquals(getHints(), builder.getHints()); + + scanner = builder.build(); + } + + private Query getQuery() { + QueryImpl query = new QueryImpl(); + query.setQuery("FOO == 'bar'"); + return query; + } + + private Map getHints() { + Map hints = new HashMap<>(); + hints.put("tableName", "executor-pool"); + return hints; + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/ScannerManagerTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerManagerTest.java new file mode 100644 index 00000000000..f43ee7ac2d9 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerManagerTest.java @@ -0,0 +1,313 @@ +package datawave.query.tables; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.microservice.query.Query; +import datawave.microservice.query.QueryImpl; +import datawave.util.TableName; + +public class ScannerManagerTest { + + private static final InMemoryInstance instance = new InMemoryInstance(ScannerManagerTest.class.getSimpleName()); + private static AccumuloClient client; + + private final String tableName = TableName.SHARD; + private final Set auths = Collections.singleton(new Authorizations("a", "b", "c")); + private final ScannerBase.ConsistencyLevel level = ScannerBase.ConsistencyLevel.IMMEDIATE; + + private ScannerManagerForTests manager; + + @BeforeAll + public static void setup() throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException { + client = new InMemoryAccumuloClient("user", instance); + client.tableOperations().create(TableName.SHARD); + + try (BatchWriter writer = client.createBatchWriter(TableName.SHARD)) { + Mutation m = new Mutation("row"); + m.put("cf", "cq", new Value()); + m.put("cf2", "cq2", new Value()); + m.put("cf3", "cq3", new Value()); + writer.addMutation(m); + } + } + + @BeforeEach + public void beforeEach() { + manager = new ScannerManagerForTests(); + } + + @Test + public void testManageScanner() { + Scanner scanner = createScanner(); + manager.addScanner(scanner); + advanceScanner(scanner); + + manager.close(scanner); + assertManagerState(); + } + + @Test + public void testManageBatchScanner() { + BatchScanner scanner = createBatchScanner(); + manager.addScanner(scanner); + advanceScanner(scanner); + + manager.close(scanner); + assertManagerState(); + } + + @Test + public void testManageScannerSession() { + ScannerSession scanner = createScannerSession(); + manager.addScanner(scanner); + advanceScanner(scanner); + + manager.close(scanner); + assertFalse(scanner.isRunning()); + assertManagerState(); + } + + @Test + public void testManageAnyFieldScanner() { + ScannerSession scanner = createAnyFieldScanner(); + manager.addScanner(scanner); + advanceScanner(scanner); + + manager.close(scanner); + assertFalse(scanner.isRunning()); + assertManagerState(); + } + + @Test + public void testManageBatchScannerSession() { + ScannerSession scanner = createBatchScannerSession(); + manager.addScanner(scanner); + advanceScanner(scanner); + + manager.close(scanner); + assertFalse(scanner.isRunning()); + assertManagerState(); + } + + @Test + public void testManageRangeStreamScanner() { + ScannerSession scanner = createRangeStreamScanner(); + manager.addScanner(scanner); + advanceScanner(scanner); + + manager.close(scanner); + assertFalse(scanner.isRunning()); + assertManagerState(); + } + + @Test + public void testManageMultipleScanners() { + Scanner scanner = createScanner(); + BatchScanner batchScanner = createBatchScanner(); + ScannerSession scannerSession = createScannerSession(); + + manager.addScanner(scanner); + manager.addScanner(batchScanner); + manager.addScanner(scannerSession); + + advanceScanner(scanner); + advanceScanner(batchScanner); + advanceScanner(scannerSession); + + manager.close(scanner); + manager.close(batchScanner); + manager.close(scannerSession); + + assertManagerState(); + } + + @Test + public void testCloseMultipleScannersOneAtATime() { + Scanner scanner = createScanner(); + BatchScanner batchScanner = createBatchScanner(); + ScannerSession scannerSession = createScannerSession(); + ScannerSession anyFieldScanner = createAnyFieldScanner(); + ScannerSession batchScannerSession = createBatchScannerSession(); + ScannerSession rangeStreamScanner = createRangeStreamScanner(); + + manager.addScanner(scanner); + manager.addScanner(batchScanner); + manager.addScanner(scannerSession); + manager.addScanner(anyFieldScanner); + manager.addScanner(batchScannerSession); + manager.addScanner(rangeStreamScanner); + + advanceScanner(scanner); + advanceScanner(batchScanner); + advanceScanner(scannerSession); + advanceScanner(anyFieldScanner); + advanceScanner(batchScannerSession); + advanceScanner(rangeStreamScanner); + + manager.close(scanner); + manager.close(batchScanner); + manager.close(scannerSession); + manager.close(anyFieldScanner); + manager.close(batchScannerSession); + manager.close(rangeStreamScanner); + + assertManagerState(); + } + + @Test + public void testCloseMultipleScannersAllAtOnce() { + Scanner scanner = createScanner(); + BatchScanner batchScanner = createBatchScanner(); + ScannerSession scannerSession = createScannerSession(); + ScannerSession anyFieldScanner = createAnyFieldScanner(); + ScannerSession batchScannerSession = createBatchScannerSession(); + ScannerSession rangeStreamScanner = createRangeStreamScanner(); + + manager.addScanner(scanner); + manager.addScanner(batchScanner); + manager.addScanner(scannerSession); + manager.addScanner(anyFieldScanner); + manager.addScanner(batchScannerSession); + manager.addScanner(rangeStreamScanner); + + advanceScanner(scanner); + advanceScanner(batchScanner); + advanceScanner(scannerSession); + advanceScanner(anyFieldScanner); + advanceScanner(batchScannerSession); + advanceScanner(rangeStreamScanner); + + // close all scanners at once + manager.close(); + + assertManagerState(); + } + + private void advanceScanner(Scanner scanner) { + scanner.setRange(new Range()); + var iter = scanner.iterator(); + assertTrue(iter.hasNext()); + assertNotNull(iter.next()); + } + + private void advanceScanner(BatchScanner scanner) { + scanner.setRanges(Collections.singleton(new Range())); + var iter = scanner.iterator(); + assertTrue(iter.hasNext()); + assertNotNull(iter.next()); + } + + private void advanceScanner(ScannerSession scanner) { + scanner.setRanges(Collections.singleton(new Range())); + scanner.hasNext(); + scanner.next(); + } + + private void assertManagerState() { + assertNotNull(manager); + assertEquals(manager.getAdded(), manager.getClosed()); + } + + private Scanner createScanner() { + return new ScannerBuilder(client).withTableName(tableName).withAuths(auths).build(); + } + + private BatchScanner createBatchScanner() { + return new BatchScannerBuilder(client).withTableName(tableName).withAuths(auths).build(); + } + + private ScannerSession createScannerSession() { + return new ScannerSessionBuilder(client).withWrapper(ScannerSession.class).withTableName(tableName).withAuths(auths).build(); + } + + private ScannerSession createAnyFieldScanner() { + return new ScannerSessionBuilder(client).withWrapper(AnyFieldScanner.class).withTableName(tableName).withAuths(auths).build(); + } + + private BatchScannerSession createBatchScannerSession() { + return new ScannerSessionBuilder(client).withWrapper(BatchScannerSession.class).withTableName(tableName).withAuths(auths).build(); + } + + private RangeStreamScanner createRangeStreamScanner() { + return new ScannerSessionBuilder(client).withWrapper(RangeStreamScanner.class).withTableName(tableName).withAuths(auths).build(); + } + + private Query getQuery() { + QueryImpl query = new QueryImpl(); + query.setQuery("FOO == 'bar'"); + return query; + } + + private Map getHints() { + Map hints = new HashMap<>(); + hints.put("tableName", "executor-pool"); + return hints; + } + + private class ScannerManagerForTests extends ScannerManager { + private int added = 0; + private int closed = 0; + + public void addScanner(ScannerBase scanner) { + added++; + super.addScanner(scanner); + } + + public void addScanner(ScannerSession scanner) { + added++; + super.addScanner(scanner); + } + + public void close(ScannerBase scanner) { + if (baseInstances.contains(scanner)) { + closed++; + super.close(scanner); + } + } + + public void close(ScannerSession scanner) { + if (sessionInstances.contains(scanner)) { + closed++; + super.close(scanner); + } + } + + public int getAdded() { + return added; + } + + public int getClosed() { + return closed; + } + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/ScannerSessionBuilderTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerSessionBuilderTest.java new file mode 100644 index 00000000000..8d1317bf82a --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerSessionBuilderTest.java @@ -0,0 +1,203 @@ +package datawave.query.tables; + +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.security.Authorizations; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.util.TableName; + +public class ScannerSessionBuilderTest { + + private static final InMemoryInstance instance = new InMemoryInstance(ScannerSessionBuilderTest.class.getSimpleName()); + private static AccumuloClient client; + + private final String tableName = TableName.SHARD; + private final Set auths = Collections.singleton(new Authorizations("a", "b", "c")); + private final ConsistencyLevel level = ConsistencyLevel.IMMEDIATE; + + private ScannerSession session; + private ScannerSessionBuilder builder; + + @BeforeAll + public static void setup() throws AccumuloSecurityException, AccumuloException, TableExistsException { + client = new InMemoryAccumuloClient("user", instance); + client.tableOperations().create(TableName.SHARD); + } + + @BeforeEach + public void beforeEach() { + session = null; + builder = null; + } + + @AfterEach + public void afterEach() { + if (session != null) { + session.close(); + } + } + + @Test + public void testBuilderWithNullClient() { + assertThrows(NullPointerException.class, () -> new ScannerSessionBuilder(null)); + } + + @Test + public void testBuilderWithNullTableName() { + builder = new ScannerSessionBuilder(client); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderWithTableThatDoesNotExist() { + builder = new ScannerSessionBuilder(client).withTableName("404"); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderWithNullAuths() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(null); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderWithNullWrapper() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(null); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + public void testBuilderWithMinimalArgs() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(ScannerSession.class); + + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(ScannerSession.class, builder.getWrapper()); + assertEquals(100, builder.getNumScanResources()); + assertEquals(1_000, builder.getResultQueueSize()); + assertFalse(builder.isStatsEnabled()); + assertNull(builder.getLevel()); + assertNull(builder.getHints()); + + session = builder.build(); + assertEquals(level, session.getOptions().getConsistencyLevel()); + } + + @Test + public void testBuilderWithNumScanResource() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(ScannerSession.class); + builder.withNumThreads(1); + + assertEquals(1, builder.getNumScanResources()); + session = builder.build(); + } + + @Test + public void testBuilderWithResultQueueSize() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(ScannerSession.class); + builder.withResultQueueSize(25); + + assertEquals(25, builder.getResultQueueSize()); + session = builder.build(); + } + + @Test + public void testBuilderWithStatsEnabled() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(ScannerSession.class); + builder.withStats(true); + + assertTrue(builder.isStatsEnabled()); + session = builder.build(); + } + + @Test + public void testBuilderWithConsistencyLevel() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(ScannerSession.class); + builder.withConsistencyLevel(ConsistencyLevel.EVENTUAL); + + assertEquals(ConsistencyLevel.EVENTUAL, builder.getLevel()); + session = builder.build(); + } + + @Test + public void testBuilderWithExecutionHints() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(ScannerSession.class); + builder.withExecutionHints(getHints()); + + assertEquals(getHints(), builder.getHints()); + session = builder.build(); + } + + @Test + public void testBuilderForAnyFieldScanner() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(AnyFieldScanner.class); + assertEquals(AnyFieldScanner.class, builder.getWrapper()); + session = builder.build(); + } + + @Test + public void testBuilderForBatchScannerSession() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(BatchScannerSession.class); + assertEquals(BatchScannerSession.class, builder.getWrapper()); + session = builder.build(); + } + + @Test + public void testBuilderForRangeStreamScanner() { + builder = new ScannerSessionBuilder(client).withTableName(tableName).withAuths(auths).withWrapper(RangeStreamScanner.class); + assertEquals(RangeStreamScanner.class, builder.getWrapper()); + session = builder.build(); + } + + @Test + public void testFullConfiguration() { + // @formatter:off + builder = new ScannerSessionBuilder(client) + .withTableName(tableName) + .withAuths(auths) + .withWrapper(BatchScannerSession.class) + .withNumThreads(25) + .withResultQueueSize(150) + .withExecutionHints(getHints()) + .withConsistencyLevel(ConsistencyLevel.EVENTUAL) + .withStats(true); + // @formatter:on + + assertEquals(tableName, builder.getTableName()); + assertEquals(auths, builder.getAuths()); + assertEquals(BatchScannerSession.class, builder.getWrapper()); + assertEquals(25, builder.getNumScanResources()); + assertEquals(150, builder.getResultQueueSize()); + assertTrue(builder.isStatsEnabled()); + assertEquals(ConsistencyLevel.EVENTUAL, builder.getLevel()); + assertEquals(getHints(), builder.getHints()); + + session = builder.build(); + } + + private Map getHints() { + Map hints = new HashMap<>(); + hints.put("tableName", "executor-pool"); + return hints; + } +}