diff --git a/warehouse/ops-tools/index-validation/pom.xml b/warehouse/ops-tools/index-validation/pom.xml
index bcc768ba93f..8039271e3e1 100644
--- a/warehouse/ops-tools/index-validation/pom.xml
+++ b/warehouse/ops-tools/index-validation/pom.xml
@@ -4,7 +4,7 @@
gov.nsa.datawave
datawave-ops-tools-parent
- 7.7.0-SNAPSHOT
+ 7.11.0-SNAPSHOT
datawave-ops-tools-index-validation
jar
diff --git a/warehouse/ops-tools/pom.xml b/warehouse/ops-tools/pom.xml
index c90e3790ce5..f142105b8e6 100644
--- a/warehouse/ops-tools/pom.xml
+++ b/warehouse/ops-tools/pom.xml
@@ -4,7 +4,7 @@
gov.nsa.datawave
datawave-warehouse-parent
- 7.7.0-SNAPSHOT
+ 7.11.0-SNAPSHOT
datawave-ops-tools-parent
pom
diff --git a/warehouse/pom.xml b/warehouse/pom.xml
index eadfffc45e4..2cdbeb122f5 100644
--- a/warehouse/pom.xml
+++ b/warehouse/pom.xml
@@ -4,7 +4,7 @@
gov.nsa.datawave
datawave-parent
- 7.7.0-SNAPSHOT
+ 7.11.0-SNAPSHOT
datawave-warehouse-parent
pom
diff --git a/warehouse/query-core/pom.xml b/warehouse/query-core/pom.xml
index eecc22e554e..9b484e98b60 100644
--- a/warehouse/query-core/pom.xml
+++ b/warehouse/query-core/pom.xml
@@ -4,7 +4,7 @@
gov.nsa.datawave
datawave-warehouse-parent
- 7.7.0-SNAPSHOT
+ 7.11.0-SNAPSHOT
datawave-query-core
jar
diff --git a/warehouse/query-core/src/main/java/datawave/core/iterators/BoundedRangeExpansionIterator.java b/warehouse/query-core/src/main/java/datawave/core/iterators/BoundedRangeExpansionIterator.java
new file mode 100644
index 00000000000..f863a4d0675
--- /dev/null
+++ b/warehouse/query-core/src/main/java/datawave/core/iterators/BoundedRangeExpansionIterator.java
@@ -0,0 +1,171 @@
+package datawave.core.iterators;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.SeekingFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Splitter;
+
+import datawave.query.Constants;
+import datawave.query.jexl.LiteralRange;
+
+/**
+ * A {@link SeekingFilter} that attempts to expand bounded ranges using the global index
+ *
+ * The caller is responsible for fetching the appropriate column families. The range is constructed from a {@link LiteralRange}.
+ *
+ * The only thing this iterator does is advance through datatypes if a filter is supplied, advance to the start date, and advance to the next row within the
+ * range.
+ */
+public class BoundedRangeExpansionIterator extends SeekingFilter implements OptionDescriber {
+
+ private static final Logger log = LoggerFactory.getLogger(BoundedRangeExpansionIterator.class);
+
+ public static final String START_DATE = "start.date";
+ public static final String END_DATE = "end.date";
+ public static final String DATATYPES_OPT = "dts";
+
+ private TreeSet datatypes;
+ private String startDate;
+ private String endDate;
+
+ private Text prevRow;
+
+ @Override
+ public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException {
+ if (!validateOptions(options)) {
+ throw new IllegalArgumentException("BoundedRangeExpansionIterator not configured with correct options");
+ }
+
+ String opt = options.get(DATATYPES_OPT);
+ if (StringUtils.isBlank(opt)) {
+ datatypes = new TreeSet<>();
+ } else {
+ datatypes = new TreeSet<>(Splitter.on(',').splitToList(opt));
+ }
+
+ startDate = options.get(START_DATE);
+ endDate = options.get(END_DATE) + Constants.MAX_UNICODE_STRING;
+
+ super.init(source, options, env);
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ IteratorOptions opts = new IteratorOptions(getClass().getName(), "Expands bounded ranges using the global index", null, null);
+ opts.addNamedOption(START_DATE, "The start date");
+ opts.addNamedOption(END_DATE, "The end date");
+ opts.addNamedOption(DATATYPES_OPT, "The set of datatypes used to filter keys (optional)");
+ return opts;
+ }
+
+ @Override
+ public boolean validateOptions(Map options) {
+ return options.containsKey(START_DATE) && options.containsKey(END_DATE);
+ }
+
+ @Override
+ public FilterResult filter(Key k, Value v) {
+ log.trace("filter key: {}", k.toStringNoTime());
+
+ // shard + null + datatype
+ String cq = k.getColumnQualifier().toString();
+ int index = cq.indexOf('\u0000');
+ String date = cq.substring(0, index);
+
+ if (date.compareTo(startDate) < 0) {
+ log.trace("{} is before the start date {}, advancing to start date", date, startDate);
+ return new FilterResult(false, AdvanceResult.USE_HINT);
+ }
+
+ if (date.compareTo(endDate) > 0) {
+ log.trace("{} is past the end date {}, advancing to next row", date, endDate);
+ return new FilterResult(false, AdvanceResult.NEXT_ROW);
+ }
+
+ String datatype = cq.substring(index + 1);
+ if (!datatypes.isEmpty() && !datatypes.contains(datatype)) {
+ log.trace("datatype {} was filtered out, advancing to next key", datatype);
+ return new FilterResult(false, AdvanceResult.NEXT);
+ }
+
+ if (prevRow != null && prevRow.equals(k.getRow())) {
+ // this iterator should only return a single key per unique row, thus the previous row should never match the current row.
+ log.warn("should never see a duplicate row -- skip to next row");
+ return new FilterResult(false, AdvanceResult.NEXT_ROW);
+ }
+
+ prevRow = k.getRow();
+ return new FilterResult(true, AdvanceResult.NEXT_ROW);
+ }
+
+ /**
+ * Hint is only used to seek to the start date
+ *
+ * @param k
+ * a key
+ * @param v
+ * a value
+ * @return the key used to seek
+ */
+ @Override
+ public Key getNextKeyHint(Key k, Value v) {
+ log.trace("get next key hint: {}", k.toStringNoTime());
+
+ // shard + null + datatype
+ String cq = k.getColumnQualifier().toString();
+ int index = cq.indexOf('\u0000');
+ String date = cq.substring(0, index);
+
+ if (date.compareTo(startDate) < 0) {
+ Text columnQualifier;
+
+ if (datatypes.isEmpty()) {
+ log.trace("seek to start date");
+ columnQualifier = new Text(startDate + '\u0000');
+ } else {
+ log.trace("seek to start date and datatype");
+ columnQualifier = new Text(startDate + '\u0000' + datatypes.first());
+ }
+
+ return new Key(k.getRow(), k.getColumnFamily(), columnQualifier);
+ }
+
+ log.trace("next hint key was called in a bad state, reverting to no-op");
+ return k;
+ }
+
+ @Override
+ public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException {
+ if (!range.isStartKeyInclusive()) {
+ // need to skip to next row
+ Key skip = new Key(range.getStartKey().getRow().toString() + '\u0000');
+ if (skip.compareTo(range.getEndKey()) > 0) {
+ // handles case of bounded range against single value
+ // filter key: +cE1 NUM:20150808_0%00;generic [NA]
+ // skip key would be +cE1 but then the start key is greater than the end key. so we cheat accumulo.
+ Range skipRange = new Range(range.getEndKey(), true, range.getEndKey(), range.isEndKeyInclusive());
+ super.seek(skipRange, columnFamilies, inclusive);
+ } else {
+ Range skipRange = new Range(skip, true, range.getEndKey(), range.isEndKeyInclusive());
+ super.seek(skipRange, columnFamilies, inclusive);
+ }
+ } else {
+ super.seek(range, columnFamilies, inclusive);
+ }
+ }
+}
diff --git a/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java b/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java
index 1dcc248d6f4..f9a7ce19cb8 100644
--- a/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java
+++ b/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java
@@ -769,8 +769,8 @@ protected void findTop() throws IOException {
// no need to check containership if not returning sorted uids
if (!sortedUIDs || this.lastRangeSeeked.contains(key)) {
this.topKey = key;
- if (log.isDebugEnabled()) {
- log.debug("setting as topKey " + topKey);
+ if (log.isTraceEnabled()) {
+ log.trace("setting as topKey " + topKey);
}
break;
}
@@ -879,6 +879,7 @@ private void fillSortedSets() throws IOException {
if (log.isDebugEnabled()) {
log.debug("Processing " + boundingFiRanges + " for " + this);
}
+ long startFillSets = System.currentTimeMillis();
TotalResults totalResults = new TotalResults(maxResults);
@@ -916,8 +917,11 @@ private void fillSortedSets() throws IOException {
}
}
+ long fillSetTiming = System.currentTimeMillis() - startFillSets;
+ log.info("Filled ivarator sets for " + boundingFiRanges.size() + " ranges took " + fillSetTiming + "ms for " + this);
+
if (failed) {
- log.error("Failed to complete ivarator cache: " + result, exception);
+ log.error("Failed to complete ivarator cache: " + result + " for " + this, exception);
throw new IvaratorException("Failed to complete ivarator cache: " + result, exception);
}
@@ -1102,6 +1106,7 @@ protected Future> fillSet(final Range boundingFiRange, final TotalResults tota
// create runnable
Runnable runnable = () -> {
+ long startFillSet = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("Starting fillSet(" + boundingFiRange + ')');
}
@@ -1210,6 +1215,8 @@ protected Future> fillSet(final Range boundingFiRange, final TotalResults tota
log.error("Failed to complete fillSet(" + boundingFiRange + ")", e);
throw new RuntimeException(e);
} finally {
+ long timing = System.currentTimeMillis() - startFillSet;
+ log.info("Completed " + boundingFiRange + " ivarator in " + timing + "ms");
// return the ivarator source back to the pool.
returnPoolSource(source);
if (log.isDebugEnabled()) {
@@ -1644,4 +1651,13 @@ public void setCollectTimingDetails(boolean collectTimingDetails) {
public void setQuerySpanCollector(QuerySpanCollector querySpanCollector) {
this.querySpanCollector = querySpanCollector;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("DatawaveFieldIndexCachingIteratorJexl (").append(queryId).append(") fName=").append(getFieldName()).append(", fValue=")
+ .append(getFieldValue()).append(", negated=").append(isNegated()).append("}");
+ return builder.toString();
+ }
+
}
diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/MultiRfileInputformat.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/MultiRfileInputformat.java
index 4b7f1011730..1614fe2991f 100644
--- a/warehouse/query-core/src/main/java/datawave/mr/bulk/MultiRfileInputformat.java
+++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/MultiRfileInputformat.java
@@ -66,6 +66,7 @@ public class MultiRfileInputformat extends RFileInputFormat {
private static LoadingCache>>> locationMap = null;
protected static final Map dfsUriMap = new ConcurrentHashMap<>();
+ protected static final Map dfsDirMap = new ConcurrentHashMap<>();
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
@@ -147,11 +148,12 @@ public static List computeSplitPoints(AccumuloClient client, Configu
/**
* Attempt the following 1) try to get the default namespace from accumulo 2) Use the custom config option 3) use default name in the hdfs configuration
*/
- if (dfsUriMap.get(tableId) == null) {
+ if (dfsUriMap.get(tableId) == null || dfsDirMap.get(tableId) == null) {
synchronized (MultiRfileInputformat.class) {
final InstanceOperations instOps = client.instanceOperations();
- dfsUriMap.put(tableId, instOps.getSystemConfiguration().get(Property.INSTANCE_VOLUMES.getKey()));
+ dfsUriMap.put(tableId, instOps.getSystemConfiguration().get(Property.INSTANCE_DFS_URI.getKey()));
+ dfsDirMap.put(tableId, instOps.getSystemConfiguration().get(Property.INSTANCE_DFS_DIR.getKey()));
}
}
@@ -165,7 +167,7 @@ public static List computeSplitPoints(AccumuloClient client, Configu
}
}
- basePath = dfsUriMap.get(tableId);
+ basePath = dfsDirMap.get(tableId);
if (StringUtils.isEmpty(basePath)) {
basePath = ACCUMULO_BASE_PATH;
diff --git a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorQueryIterator.java b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorQueryIterator.java
index 61009d31fba..7528a24e9f6 100644
--- a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorQueryIterator.java
+++ b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorQueryIterator.java
@@ -40,6 +40,7 @@
import datawave.query.function.RangeProvider;
import datawave.query.iterator.NestedQueryIterator;
import datawave.query.iterator.QueryIterator;
+import datawave.query.iterator.QueryOptions;
import datawave.query.iterator.SourcedOptions;
import datawave.query.iterator.logic.IndexIterator;
import datawave.query.jexl.DatawaveJexlContext;
@@ -138,6 +139,31 @@ public EventDataQueryFilter getEvaluationFilter() {
return evaluationFilter != null ? evaluationFilter.clone() : null;
}
+ /**
+ * In the Ancestor case replace the {@link QueryOptions#eventFilter} with an evaluation filter
+ *
+ * @return an evaluation filter
+ */
+ public EventDataQueryFilter getEventFilter() {
+ return getEvaluationFilter();
+ }
+
+ @Override
+ public EventDataQueryFilter getFiEvaluationFilter() {
+ if (fiEvaluationFilter == null) {
+ fiEvaluationFilter = getEvaluationFilter();
+ }
+ return fiEvaluationFilter.clone();
+ }
+
+ @Override
+ public EventDataQueryFilter getEventEvaluationFilter() {
+ if (eventEvaluationFilter == null) {
+ eventEvaluationFilter = getEvaluationFilter();
+ }
+ return eventEvaluationFilter.clone();
+ }
+
@Override
protected JexlEvaluation getJexlEvaluation(NestedQueryIterator documentSource) {
return new JexlEvaluation(query, getArithmetic()) {
diff --git a/warehouse/query-core/src/main/java/datawave/query/attributes/Document.java b/warehouse/query-core/src/main/java/datawave/query/attributes/Document.java
index 2edaa426da2..8bb3d146589 100644
--- a/warehouse/query-core/src/main/java/datawave/query/attributes/Document.java
+++ b/warehouse/query-core/src/main/java/datawave/query/attributes/Document.java
@@ -210,27 +210,8 @@ public Attribute> toDocKeyAttributes(Set docKeys, boolean keepRecordId) {
}
public void debugDocumentSize(Key docKey) {
- long bytes = sizeInBytes();
- // if more than 100M, then error
- if (bytes > (ONE_HUNDRED_M)) {
- log.error("Document " + docKey + "; size = " + size() + "; bytes = " + bytes);
- }
- // if more than 10M, then warn
- // else if (bytes > (1024l * 1000 * 10)) {
- // log.warn("Document " + docKey + "; size = " + size() + "; bytes = " + bytes);
- // }
-
- // if more than 1M, then info
- else if (bytes > (ONE_M)) {
- log.info("Document " + docKey + "; size = " + size() + "; bytes = " + bytes);
- }
- // if more than 500K, then debug
- else if (bytes > (FIVE_HUNDRED_K) && log.isDebugEnabled()) {
- log.debug("Document " + docKey + "; size = " + size() + "; bytes = " + bytes);
- }
- // trace everything
- else if (log.isTraceEnabled()) {
- log.trace("Document " + docKey + "; size = " + size() + "; bytes = " + bytes);
+ if (log.isDebugEnabled()) {
+ log.debug("Document " + docKey + "; size = " + size() + "; bytes = " + sizeInBytes());
}
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/config/LookupUUIDTune.java b/warehouse/query-core/src/main/java/datawave/query/config/LookupUUIDTune.java
index 7e5dd61b73a..91f084abcf8 100644
--- a/warehouse/query-core/src/main/java/datawave/query/config/LookupUUIDTune.java
+++ b/warehouse/query-core/src/main/java/datawave/query/config/LookupUUIDTune.java
@@ -46,6 +46,7 @@ public class LookupUUIDTune implements Profile {
protected boolean reduceQuery = false;
private boolean enforceUniqueTermsWithinExpressions = false;
private boolean reduceQueryFields = false;
+ private boolean seekingEventAggregation;
protected List transforms = null;
protected Map querySyntaxParsers = null;
@@ -64,6 +65,7 @@ public void configure(BaseQueryLogic> logic) {
rsq.setFiNextSeek(getFiNextSeek());
rsq.setEventFieldSeek(getEventFieldSeek());
rsq.setEventNextSeek(getEventNextSeek());
+ rsq.setSeekingEventAggregation(isSeekingEventAggregation());
if (querySyntaxParsers != null) {
rsq.setQuerySyntaxParsers(querySyntaxParsers);
@@ -136,6 +138,7 @@ public void configure(GenericQueryConfiguration configuration) {
rsqc.setFiNextSeek(getFiNextSeek());
rsqc.setEventFieldSeek(getEventFieldSeek());
rsqc.setEventNextSeek(getEventNextSeek());
+ rsqc.setSeekingEventAggregation(isSeekingEventAggregation());
// we need this since we've finished the deep copy already
rsqc.setSpeculativeScanning(speculativeScanning);
@@ -354,4 +357,12 @@ public Map getQuerySyntaxParsers() {
public void setQuerySyntaxParsers(Map querySyntaxParsers) {
this.querySyntaxParsers = querySyntaxParsers;
}
+
+ public boolean isSeekingEventAggregation() {
+ return seekingEventAggregation;
+ }
+
+ public void setSeekingEventAggregation(boolean seekingEventAggregation) {
+ this.seekingEventAggregation = seekingEventAggregation;
+ }
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java b/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java
index 0c29817d43d..94ddc4d5ba7 100644
--- a/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java
+++ b/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java
@@ -84,7 +84,7 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
public static final String QUERY_LOGIC_NAME_SOURCE = "queryLogic";
@SuppressWarnings("unused")
- private static final long serialVersionUID = -4354990715046146110L;
+ private static final long serialVersionUID = 2321985989282659247L;
private static final Logger log = Logger.getLogger(ShardQueryConfiguration.class);
// is this a tld query, explicitly default to false
@@ -99,6 +99,8 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
private int maxIndexBatchSize = 1000;
private boolean allTermsIndexOnly;
private long maxIndexScanTimeMillis = Long.MAX_VALUE;
+ private long maxAnyFieldScanTimeMillis = Long.MAX_VALUE;
+
// Allows this query to parse the root uids from TLD uids found in the global shard index. This effectively ignores hits in child documents.
private boolean parseTldUids = false;
private boolean collapseUids = false;
@@ -451,6 +453,11 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
private int tfFieldSeek = -1;
private int tfNextSeek = -1;
+ /**
+ * Flag that enables a field-based seeking aggregation in the standard event query. Must be used in conjunction with {@link #eventFieldSeek}
+ */
+ private boolean seekingEventAggregation = false;
+
/**
* The maximum weight for entries in the visitor function cache. The weight is calculated as the total number of characters for each key and value in the
* cache. Default is 5m characters, which is roughly 10MB
@@ -477,24 +484,26 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
private boolean pruneQueryOptions = false;
/**
- * Flag to control gathering field counts from the global index and persisting those to the query iterator. Negated terms and branches are not considered.
+ * Flag that sorts the query prior to the global index lookup using inferred costs. This step may reduce time spent in the global index depending on
+ * individual term selectivity.
*/
- private boolean useFieldCounts = false;
+ private boolean sortQueryPreIndexWithImpliedCounts = false;
+
/**
- * Flag to control gathering term counts from the global index and persisting those to the query iterator. Negated terms and branches are not considered.
+ * Flag that sorts the query prior to the global index lookup using field counts from the {@link TableName#METADATA} table. This option opens a scanner and
+ * thus is more expensive than sorting by implied counts, but is potentially more accurate.
*/
- private boolean useTermCounts = false;
+ private boolean sortQueryPreIndexWithFieldCounts = false;
+
/**
- * Flag to control sorting a query by inferred default costs prior to the global index lookup. This step may reduce time performing a secondary sort as when
- * {@link #sortQueryByCounts} is enabled.
+ * Flag that sorts the query using field counts gathered as part of the global index lookup. Negated terms and branches are not considered.
*/
- private boolean sortQueryBeforeGlobalIndex = false;
+ private boolean sortQueryPostIndexWithFieldCounts = false;
/**
- * Flag to control if a query is sorted by either field or term counts. Either {@link #useFieldCounts} or {@link #useTermCounts} must be set for this option
- * to take effect.
+ * Flag that sorts the query using term counts gathered as part of the global index lookup. Negated terms and branches are not considered.
*/
- private boolean sortQueryByCounts = false;
+ private boolean sortQueryPostIndexWithTermCounts = false;
/**
* Insert rules for processing the QueryTree to automatically apply hints to queries. Hints will be passed to the ScannerFactory
@@ -547,6 +556,7 @@ public void copyFrom(ShardQueryConfiguration other) {
this.setMaxIndexBatchSize(other.getMaxIndexBatchSize());
this.setAllTermsIndexOnly(other.isAllTermsIndexOnly());
this.setMaxIndexScanTimeMillis(other.getMaxIndexScanTimeMillis());
+ this.setMaxAnyFieldScanTimeMillis(other.getMaxAnyFieldScanTimeMillis());
this.setCollapseUids(other.getCollapseUids());
this.setCollapseUidsThreshold(other.getCollapseUidsThreshold());
this.setEnforceUniqueTermsWithinExpressions(other.getEnforceUniqueTermsWithinExpressions());
@@ -733,6 +743,7 @@ public void copyFrom(ShardQueryConfiguration other) {
this.setEventNextSeek(other.getEventNextSeek());
this.setTfFieldSeek(other.getTfFieldSeek());
this.setTfNextSeek(other.getTfNextSeek());
+ this.setSeekingEventAggregation(other.isSeekingEventAggregation());
this.setVisitorFunctionMaxWeight(other.getVisitorFunctionMaxWeight());
this.setQueryExecutionForPageTimeout(other.getQueryExecutionForPageTimeout());
this.setLazySetMechanismEnabled(other.isLazySetMechanismEnabled());
@@ -740,10 +751,10 @@ public void copyFrom(ShardQueryConfiguration other) {
this.setTfAggregationThresholdMs(other.getTfAggregationThresholdMs());
this.setGroupFields(GroupFields.copyOf(other.getGroupFields()));
this.setPruneQueryOptions(other.getPruneQueryOptions());
- this.setUseFieldCounts(other.getUseFieldCounts());
- this.setUseTermCounts(other.getUseTermCounts());
- this.setSortQueryBeforeGlobalIndex(other.isSortQueryBeforeGlobalIndex());
- this.setSortQueryByCounts(other.isSortQueryByCounts());
+ this.setSortQueryPreIndexWithImpliedCounts(other.isSortQueryPreIndexWithImpliedCounts());
+ this.setSortQueryPreIndexWithFieldCounts(other.isSortQueryPreIndexWithFieldCounts());
+ this.setSortQueryPostIndexWithTermCounts(other.isSortQueryPostIndexWithTermCounts());
+ this.setSortQueryPostIndexWithFieldCounts(other.isSortQueryPostIndexWithFieldCounts());
this.setUseQueryTreeScanHintRules(other.isUseQueryTreeScanHintRules());
this.setQueryTreeScanHintRules(other.getQueryTreeScanHintRules());
this.setFieldIndexHoleMinThreshold(other.getFieldIndexHoleMinThreshold());
@@ -2651,6 +2662,14 @@ public void setTfNextSeek(int tfNextSeek) {
this.tfNextSeek = tfNextSeek;
}
+ public boolean isSeekingEventAggregation() {
+ return seekingEventAggregation;
+ }
+
+ public void setSeekingEventAggregation(boolean seekingEventAggregation) {
+ this.seekingEventAggregation = seekingEventAggregation;
+ }
+
public long getVisitorFunctionMaxWeight() {
return visitorFunctionMaxWeight;
}
@@ -2759,36 +2778,36 @@ public void setCachePreviouslyExpandedFields(boolean cachePreviouslyExpandedFiel
this.cachePreviouslyExpandedFields = cachePreviouslyExpandedFields;
}
- public boolean getUseTermCounts() {
- return useTermCounts;
+ public boolean isSortQueryPreIndexWithImpliedCounts() {
+ return sortQueryPreIndexWithImpliedCounts;
}
- public void setUseTermCounts(boolean useTermCounts) {
- this.useTermCounts = useTermCounts;
+ public void setSortQueryPreIndexWithImpliedCounts(boolean sortQueryPreIndexWithImpliedCounts) {
+ this.sortQueryPreIndexWithImpliedCounts = sortQueryPreIndexWithImpliedCounts;
}
- public boolean getUseFieldCounts() {
- return useFieldCounts;
+ public boolean isSortQueryPreIndexWithFieldCounts() {
+ return sortQueryPreIndexWithFieldCounts;
}
- public void setUseFieldCounts(boolean useFieldCounts) {
- this.useFieldCounts = useFieldCounts;
+ public void setSortQueryPreIndexWithFieldCounts(boolean sortQueryPreIndexWithFieldCounts) {
+ this.sortQueryPreIndexWithFieldCounts = sortQueryPreIndexWithFieldCounts;
}
- public boolean isSortQueryBeforeGlobalIndex() {
- return sortQueryBeforeGlobalIndex;
+ public boolean isSortQueryPostIndexWithFieldCounts() {
+ return sortQueryPostIndexWithFieldCounts;
}
- public void setSortQueryBeforeGlobalIndex(boolean sortQueryBeforeGlobalIndex) {
- this.sortQueryBeforeGlobalIndex = sortQueryBeforeGlobalIndex;
+ public void setSortQueryPostIndexWithFieldCounts(boolean sortQueryPostIndexWithFieldCounts) {
+ this.sortQueryPostIndexWithFieldCounts = sortQueryPostIndexWithFieldCounts;
}
- public boolean isSortQueryByCounts() {
- return sortQueryByCounts;
+ public boolean isSortQueryPostIndexWithTermCounts() {
+ return sortQueryPostIndexWithTermCounts;
}
- public void setSortQueryByCounts(boolean sortQueryByCounts) {
- this.sortQueryByCounts = sortQueryByCounts;
+ public void setSortQueryPostIndexWithTermCounts(boolean sortQueryPostIndexWithTermCounts) {
+ this.sortQueryPostIndexWithTermCounts = sortQueryPostIndexWithTermCounts;
}
@Override
@@ -2988,16 +3007,17 @@ public boolean equals(Object o) {
getEventNextSeek() == that.getEventNextSeek() &&
getTfFieldSeek() == that.getTfFieldSeek() &&
getTfNextSeek() == that.getTfNextSeek() &&
+ isSeekingEventAggregation() == that.isSeekingEventAggregation() &&
getVisitorFunctionMaxWeight() == that.getVisitorFunctionMaxWeight() &&
getQueryExecutionForPageTimeout() == that.getQueryExecutionForPageTimeout() &&
isLazySetMechanismEnabled() == that.isLazySetMechanismEnabled() &&
getDocAggregationThresholdMs() == that.getDocAggregationThresholdMs() &&
getTfAggregationThresholdMs() == that.getTfAggregationThresholdMs() &&
getPruneQueryOptions() == that.getPruneQueryOptions() &&
- getUseFieldCounts() == that.getUseFieldCounts() &&
- getUseTermCounts() == that.getUseTermCounts() &&
- isSortQueryBeforeGlobalIndex() == that.isSortQueryBeforeGlobalIndex() &&
- isSortQueryByCounts() == that.isSortQueryByCounts();
+ isSortQueryPreIndexWithImpliedCounts() == isSortQueryPreIndexWithImpliedCounts() &&
+ isSortQueryPreIndexWithFieldCounts() == isSortQueryPreIndexWithFieldCounts() &&
+ isSortQueryPostIndexWithTermCounts() == isSortQueryPostIndexWithTermCounts() &&
+ isSortQueryPostIndexWithFieldCounts() == isSortQueryPostIndexWithFieldCounts();
// @formatter:on
}
@@ -3192,16 +3212,18 @@ public int hashCode() {
getEventNextSeek(),
getTfFieldSeek(),
getTfNextSeek(),
+ isSeekingEventAggregation(),
getVisitorFunctionMaxWeight(),
getQueryExecutionForPageTimeout(),
isLazySetMechanismEnabled(),
getDocAggregationThresholdMs(),
getTfAggregationThresholdMs(),
getPruneQueryOptions(),
- getUseFieldCounts(),
- getUseTermCounts(),
- isSortQueryBeforeGlobalIndex(),
- isSortQueryByCounts());
+ isSortQueryPreIndexWithImpliedCounts(),
+ isSortQueryPreIndexWithFieldCounts(),
+ isSortQueryPostIndexWithTermCounts(),
+ isSortQueryPostIndexWithFieldCounts()
+ );
// @formatter:on
}
@@ -3227,4 +3249,12 @@ public List> getQueryTreeScanHintRules() {
public void setQueryTreeScanHintRules(List> queryTreeScanHintRules) {
this.queryTreeScanHintRules = queryTreeScanHintRules;
}
+
+ public long getMaxAnyFieldScanTimeMillis() {
+ return maxAnyFieldScanTimeMillis;
+ }
+
+ public void setMaxAnyFieldScanTimeMillis(long maxAnyFieldScanTimeMillis) {
+ this.maxAnyFieldScanTimeMillis = maxAnyFieldScanTimeMillis;
+ }
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveredThing.java b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveredThing.java
index ec0987fdb88..3e2c3e21e95 100644
--- a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveredThing.java
+++ b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveredThing.java
@@ -3,14 +3,14 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Objects;
+import java.util.StringJoiner;
import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.WritableComparable;
-import com.google.common.base.Objects;
-
import datawave.core.query.configuration.ResultContext;
public class DiscoveredThing implements WritableComparable {
@@ -86,6 +86,7 @@ public void readFields(DataInput in) throws IOException {
@Override
public int compareTo(DiscoveredThing o) {
+
CompareToBuilder cmp = new CompareToBuilder();
if (o == null) {
return 1;
@@ -96,28 +97,34 @@ public int compareTo(DiscoveredThing o) {
cmp.append(getDate(), o.getDate());
cmp.append(getColumnVisibility(), o.getColumnVisibility());
cmp.append(getCount(), o.getCount());
+ cmp.append(getCountsByColumnVisibility(), o.getCountsByColumnVisibility());
return cmp.toComparison();
}
}
@Override
public boolean equals(Object o) {
- if (!(o instanceof DiscoveredThing))
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
- DiscoveredThing other = (DiscoveredThing) o;
- return Objects.equal(getTerm(), other.getTerm()) && Objects.equal(getField(), other.getField()) && Objects.equal(getType(), other.getType())
- && Objects.equal(getDate(), other.getDate()) && Objects.equal(getColumnVisibility(), other.getColumnVisibility())
- && Objects.equal(getCount(), other.getCount());
+ }
+ DiscoveredThing that = (DiscoveredThing) o;
+ return Objects.equals(term, that.term) && Objects.equals(field, that.field) && Objects.equals(type, that.type) && Objects.equals(date, that.date)
+ && Objects.equals(columnVisibility, that.columnVisibility) && Objects.equals(count, that.count)
+ && Objects.equals(countsByColumnVisibility, that.countsByColumnVisibility);
}
@Override
public int hashCode() {
- return Objects.hashCode(getTerm(), getField(), getType(), getDate(), getColumnVisibility(), getCount());
+ return Objects.hash(term, field, type, date, columnVisibility, count, countsByColumnVisibility);
}
@Override
public String toString() {
- return "DiscoveredThing [term=" + term + ", field=" + field + ", type=" + type + ", date=" + date + ", columnVisibility=" + columnVisibility
- + ", count=" + count + "]";
+ return new StringJoiner(", ", DiscoveredThing.class.getSimpleName() + "[", "]").add("term='" + term + "'").add("field='" + field + "'")
+ .add("type='" + type + "'").add("date='" + date + "'").add("columnVisibility='" + columnVisibility + "'").add("count=" + count)
+ .add("countsByColumnVisibility=" + countsByColumnVisibility).toString();
}
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryIterator.java b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryIterator.java
index 1400308f3c2..404d9c29dda 100644
--- a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryIterator.java
+++ b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryIterator.java
@@ -1,14 +1,15 @@
package datawave.query.discovery;
-import static com.google.common.collect.Collections2.filter;
-import static com.google.common.collect.Collections2.transform;
-import static com.google.common.collect.Lists.newArrayList;
-
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -17,141 +18,312 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.log4j.Logger;
-import com.google.common.base.Predicates;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import datawave.ingest.protobuf.Uid;
+import datawave.marking.MarkingFunctions;
+import datawave.query.Constants;
public class DiscoveryIterator implements SortedKeyValueIterator {
+
private static final Logger log = Logger.getLogger(DiscoveryIterator.class);
+ private static final MarkingFunctions markingFunctions = MarkingFunctions.Factory.createMarkingFunctions();
- private Key tk;
- private Value tv;
- private SortedKeyValueIterator itr;
+ private Key key;
+ private Value value;
+ private SortedKeyValueIterator iterator;
private boolean separateCountsByColVis = false;
private boolean showReferenceCount = false;
private boolean reverseIndex = false;
+ private boolean sumCounts = false;
@Override
public DiscoveryIterator deepCopy(IteratorEnvironment env) {
- DiscoveryIterator i = new DiscoveryIterator();
- i.itr = itr.deepCopy(env);
- return i;
+ DiscoveryIterator copy = new DiscoveryIterator();
+ copy.iterator = iterator.deepCopy(env);
+ return copy;
}
@Override
public void next() throws IOException {
- tk = null;
- tv = null;
+ this.key = null;
+ this.value = null;
- while (itr.hasTop() && tk == null) {
- Multimap terms = aggregateDate();
+ while (iterator.hasTop() && key == null) {
+ // Get the entries to aggregate.
+ Multimap terms = getTermsByDatatype();
if (terms.isEmpty()) {
- if (log.isTraceEnabled())
- log.trace("Couldn't aggregate index info; moving onto next date/field/term if data is available.");
- continue;
+ log.trace("Couldn't aggregate index info; moving onto next date/field/term if data is available.");
} else {
- if (log.isTraceEnabled())
- log.trace("Received term info multimap of size [" + terms.size() + "]");
- ArrayList things = newArrayList(
- filter(transform(terms.asMap().values(), new TermInfoAggregation(separateCountsByColVis, showReferenceCount, reverseIndex)),
- Predicates.notNull()));
- if (log.isTraceEnabled())
- log.trace("After conversion to discovery objects, there are [" + things.size() + "] term info objects.");
- if (things.isEmpty()) {
- continue;
- } else {
- Pair top = makeTop(things);
- tk = top.getFirst();
- tv = top.getSecond();
+ // Aggregate the entries.
+ List things = terms.asMap().values().stream().map(this::aggregate).filter(Objects::nonNull).collect(Collectors.toList());
+ // Establish the next top of this iterator.
+ if (!things.isEmpty()) {
+ setTop(things);
return;
}
}
}
- if (log.isTraceEnabled())
- log.trace("No data found.");
+ log.trace("No data found.");
}
- private Multimap aggregateDate() throws IOException {
- Multimap terms = ArrayListMultimap.create();
- Key start = new Key(itr.getTopKey()), key = null;
- while (itr.hasTop() && start.equals((key = itr.getTopKey()), PartialKey.ROW_COLFAM) && datesMatch(start, key)) {
- TermInfo ti = new TermInfo(key, itr.getTopValue());
- if (ti.valid)
- terms.put(ti.datatype, ti);
+ /**
+ * Return a multimap containing mappings of datatypes to term entries that should be aggregated.
+ */
+ private Multimap getTermsByDatatype() throws IOException {
+ Multimap terms = ArrayListMultimap.create();
+ Key start = new Key(iterator.getTopKey());
+ Key key;
+ // If we should sum up counts, we want to collect the term entries for each date seen for the current field and term of start. Otherwise, we only want
+ // to collect the term entries for the current field, term, and date of start.
+ BiFunction dateMatchingFunction = sumCounts ? (first, second) -> true : this::datesMatch;
+ // Find all matching entries and parse term entries from them.
+ while (iterator.hasTop() && start.equals((key = iterator.getTopKey()), PartialKey.ROW_COLFAM) && dateMatchingFunction.apply(start, key)) {
+ TermEntry termEntry = new TermEntry(key, iterator.getTopValue());
+ if (termEntry.isValid())
+ terms.put(termEntry.getDatatype(), termEntry);
else {
- if (log.isTraceEnabled())
- log.trace("Received invalid term info from key: " + key);
+ if (log.isTraceEnabled()) {
+ log.trace("Received invalid term entry from key: " + key);
+ }
}
- itr.next();
+ iterator.next();
}
return terms;
}
- private static boolean datesMatch(Key reference, Key test) {
- ByteSequence a = reference.getColumnQualifierData(), b = test.getColumnQualifierData();
+ /**
+ * Return true if the dates for the two keys match, or false otherwise.
+ */
+ private boolean datesMatch(Key left, Key right) {
+ ByteSequence leftBytes = left.getColumnQualifierData();
+ ByteSequence rightBytes = right.getColumnQualifierData();
for (int i = 0; i < 8; i++) {
- if (a.byteAt(i) != b.byteAt(i)) {
+ if (leftBytes.byteAt(i) != rightBytes.byteAt(i)) {
return false;
}
}
return true;
}
- private Pair makeTop(List things) {
- Writable[] returnedThings = new Writable[things.size()];
- for (int i = 0; i < returnedThings.length; ++i)
- returnedThings[i] = things.get(i);
- ArrayWritable aw = new ArrayWritable(DiscoveredThing.class);
- aw.set(returnedThings);
+ /**
+ * Return the given term entries aggregated into a single {@link DiscoveredThing} if possible, or return null if any issues occurred.
+ */
+ private DiscoveredThing aggregate(Collection termEntries) {
+ if (termEntries.isEmpty()) {
+ return null;
+ } else {
+ TermEntry first = termEntries.iterator().next();
+ String term = reverseIndex ? new StringBuilder(first.getTerm()).reverse().toString() : first.getTerm();
+ String date = sumCounts ? "" : first.date;
+
+ Set visibilities = new HashSet<>();
+ Map visibilityToCounts = new HashMap<>();
+ long count = 0L;
+
+ // Aggregate the counts and visibilities from each entry.
+ for (TermEntry termEntry : termEntries) {
+ // Fetch the count to aggregate based of whether we should show the term count or the reference count.
+ long currentCount = this.showReferenceCount ? termEntry.getUidListSize() : termEntry.getUidCount();
+ try {
+ // Track the distinct visibilities seen.
+ visibilities.add(termEntry.getVisibility());
+ // If counts by visibility should be tracked, do so.
+ if (this.separateCountsByColVis) {
+ String visibility = new String(termEntry.getVisibility().flatten());
+ visibilityToCounts.compute(visibility, (k, v) -> v == null ? currentCount : v + currentCount);
+ }
+ } catch (Exception e) {
+ // If an exception occurs, skip to the next entry.
+ log.trace(e);
+ continue;
+ }
+ // Increment the overall count.
+ count += currentCount;
+ }
+
+ // If we do not have a count greater than 0, return null.
+ if (count <= 0) {
+ if (log.isTraceEnabled()) {
+ log.trace("Did not aggregate any counts for [" + first.getTerm() + "][" + first.getField() + "][" + first.getDatatype() + "]["
+ + first.getDate() + "]. Returning null");
+ }
+ return null;
+ } else {
+ // Otherwise, combine the visibilities, and return the aggregated result.
+ try {
+ ColumnVisibility visibility = markingFunctions.combine(visibilities);
+ MapWritable countsByVis = new MapWritable();
+ visibilityToCounts.forEach((key, value) -> countsByVis.put(new Text(key), new LongWritable(value)));
+ return new DiscoveredThing(term, first.getField(), first.getDatatype(), date, new String(visibility.flatten()), count, countsByVis);
+ } catch (Exception e) {
+ if (log.isTraceEnabled()) {
+ log.warn("Invalid column visibilities after combining " + visibilities);
+ }
+ return null;
+ }
+ }
+ }
+ }
+ /**
+ * Set the top {@link Key} and {@link Value} of this iterator, created from the given list of {@link DiscoveredThing} instances.
+ */
+ private void setTop(List things) {
+ // We want the key to be the last possible key for this date. Return the key as it is in the index (reversed if necessary) to ensure the keys are
+ // consistent with the initial seek range.
DiscoveredThing thing = things.get(0);
- // we want the key to be the last possible key for this date. Return the key as it is in the index (reversed if necessary) to
- // ensure the keys are consistent with the initial seek range.
- String row = (reverseIndex ? new StringBuilder().append(thing.getTerm()).reverse().toString() : thing.getTerm());
- return new Pair<>(new Key(row, thing.getField(), thing.getDate() + '\uffff'), new Value(WritableUtils.toByteArray(aw)));
+ String row = (this.reverseIndex ? new StringBuilder().append(thing.getTerm()).reverse().toString() : thing.getTerm());
+ Key newKey = new Key(row, thing.getField(), thing.getDate() + "\uffff");
+
+ // Create a value from the list of things.
+ ArrayWritable thingArray = new ArrayWritable(DiscoveredThing.class, things.toArray(new DiscoveredThing[0]));
+ Value newValue = new Value(WritableUtils.toByteArray(thingArray));
+
+ this.key = newKey;
+ this.value = newValue;
}
@Override
public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException {
-
- itr.seek(range, columnFamilies, inclusive);
- if (log.isTraceEnabled())
- log.trace("My source " + (itr.hasTop() ? "does" : "does not") + " have a top.");
+ this.iterator.seek(range, columnFamilies, inclusive);
+ if (log.isTraceEnabled()) {
+ log.trace("My source " + (this.iterator.hasTop() ? "does" : "does not") + " have a top.");
+ }
next();
}
@Override
public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException {
- itr = source;
- separateCountsByColVis = Boolean.parseBoolean(options.get(DiscoveryLogic.SEPARATE_COUNTS_BY_COLVIS));
- showReferenceCount = Boolean.parseBoolean(options.get(DiscoveryLogic.SHOW_REFERENCE_COUNT));
- reverseIndex = Boolean.parseBoolean(options.get(DiscoveryLogic.REVERSE_INDEX));
+ this.iterator = source;
+ this.separateCountsByColVis = Boolean.parseBoolean(options.get(DiscoveryLogic.SEPARATE_COUNTS_BY_COLVIS));
+ this.showReferenceCount = Boolean.parseBoolean(options.get(DiscoveryLogic.SHOW_REFERENCE_COUNT));
+ this.reverseIndex = Boolean.parseBoolean(options.get(DiscoveryLogic.REVERSE_INDEX));
+ this.sumCounts = Boolean.parseBoolean(options.get(DiscoveryLogic.SUM_COUNTS));
if (log.isTraceEnabled()) {
- log.trace("My source is a " + source.getClass().getName());
- log.trace("Separate counts by column visibility = " + separateCountsByColVis);
- log.trace("Show reference count only = " + showReferenceCount);
+ log.trace("Source: " + source.getClass().getName());
+ log.trace("Separate counts by column visibility: " + this.separateCountsByColVis);
+ log.trace("Show reference counts only: " + this.showReferenceCount);
+ log.trace("Reverse index: " + this.reverseIndex);
+ log.trace("Sum counts: " + this.sumCounts);
}
}
@Override
public boolean hasTop() {
- return tk != null;
+ return key != null;
}
@Override
public Key getTopKey() {
- return tk;
+ return key;
}
@Override
public Value getTopValue() {
- return tv;
+ return value;
+ }
+
+ /**
+ * Represents term information parsed from a {@link Key}, {@link Value} pair.
+ */
+ private static class TermEntry {
+
+ private final String term;
+ private final String field;
+ private String date;
+ private String datatype;
+ private ColumnVisibility visibility;
+ private long uidCount;
+ private long uidListSize;
+ private boolean valid;
+
+ public TermEntry(Key key, Value value) {
+ term = key.getRow().toString();
+ field = key.getColumnFamily().toString();
+
+ String colq = key.getColumnQualifier().toString();
+ int firstSeparatorPos = colq.indexOf(Constants.NULL_BYTE_STRING);
+ if (firstSeparatorPos != -1) {
+ int lastSeparatorPos = colq.lastIndexOf(Constants.NULL_BYTE_STRING);
+ // If multiple separators are present, this is a task datatype entry.
+ if (firstSeparatorPos != lastSeparatorPos) {
+ // Ensure that we at least have yyyyMMdd.
+ if ((lastSeparatorPos - firstSeparatorPos) < 9) {
+ return;
+ }
+ // The form is datatype\0date\0task status (old knowledge entry).
+ date = colq.substring(firstSeparatorPos + 1, firstSeparatorPos + 9);
+ datatype = colq.substring(0, firstSeparatorPos);
+ } else {
+ // Ensure that we at least have yyyyMMdd.
+ if (firstSeparatorPos < 8) {
+ return;
+ }
+ // The form is shardId\0datatype.
+ date = colq.substring(0, 8);
+ datatype = colq.substring(firstSeparatorPos + 1);
+ }
+
+ // Parse the UID.List object from the value.
+ try {
+ Uid.List uidList = Uid.List.parseFrom(value.get());
+ if (uidList != null) {
+ uidCount = uidList.getCOUNT();
+ uidListSize = uidList.getUIDList().size();
+ }
+ } catch (InvalidProtocolBufferException e) {
+ // Don't add UID information. At least we know what shard it's located in.
+ }
+
+ visibility = new ColumnVisibility(key.getColumnVisibility());
+
+ // This is now considered a valid entry for aggregation.
+ valid = true;
+ }
+ }
+
+ public String getTerm() {
+ return term;
+ }
+
+ public String getField() {
+ return field;
+ }
+
+ public String getDate() {
+ return date;
+ }
+
+ public String getDatatype() {
+ return datatype;
+ }
+
+ public ColumnVisibility getVisibility() {
+ return visibility;
+ }
+
+ public long getUidCount() {
+ return uidCount;
+ }
+
+ public long getUidListSize() {
+ return uidListSize;
+ }
+
+ public boolean isValid() {
+ return valid;
+ }
}
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java
index 91424b1afb1..315a0b5343e 100644
--- a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java
+++ b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java
@@ -51,6 +51,7 @@
import datawave.core.query.configuration.QueryData;
import datawave.data.type.Type;
import datawave.microservice.query.Query;
+import datawave.microservice.query.QueryImpl;
import datawave.query.Constants;
import datawave.query.QueryParameters;
import datawave.query.discovery.FindLiteralsAndPatternsVisitor.QueryValues;
@@ -72,18 +73,45 @@ public class DiscoveryLogic extends ShardIndexQueryTable {
private static final Logger log = Logger.getLogger(DiscoveryLogic.class);
+ /**
+ * Used to specify if counts should be separated by column visibility.
+ */
public static final String SEPARATE_COUNTS_BY_COLVIS = "separate.counts.by.colvis";
+
+ /**
+ * Used to specify if reference counts should be shown instead of term counts.
+ */
public static final String SHOW_REFERENCE_COUNT = "show.reference.count";
+
+ /**
+ * Used to specify whether to sum up the counts instead of returning counts per date.
+ */
+ public static final String SUM_COUNTS = "sum.counts";
+
+ /**
+ * Used to specify whether to search against the reversed index.
+ */
public static final String REVERSE_INDEX = "reverse.index";
+
private DiscoveryQueryConfiguration config;
private MetadataHelper metadataHelper;
+ /**
+ * Basic constructor.
+ */
public DiscoveryLogic() {
super();
}
+ /**
+ * Copy constructor.
+ *
+ * @param other
+ * the other logic to copy
+ */
public DiscoveryLogic(DiscoveryLogic other) {
super(other);
+ this.config = new DiscoveryQueryConfiguration(other.config);
this.metadataHelper = other.metadataHelper;
}
@@ -92,7 +120,6 @@ public DiscoveryQueryConfiguration getConfig() {
if (this.config == null) {
this.config = DiscoveryQueryConfiguration.create();
}
-
return this.config;
}
@@ -111,56 +138,51 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting
log.debug("Query parameters set to " + settings.getParameters());
}
- // Check if the default modelName and modelTableNames have been overriden by custom parameters.
- if (null != settings.findParameter(QueryParameters.PARAMETER_MODEL_NAME)
- && !settings.findParameter(QueryParameters.PARAMETER_MODEL_NAME).getParameterValue().trim().isEmpty()) {
- setModelName(settings.findParameter(QueryParameters.PARAMETER_MODEL_NAME).getParameterValue().trim());
- }
- if (null != settings.findParameter(QueryParameters.PARAMETER_MODEL_TABLE_NAME)
- && !settings.findParameter(QueryParameters.PARAMETER_MODEL_TABLE_NAME).getParameterValue().trim().isEmpty()) {
- setModelTableName(settings.findParameter(QueryParameters.PARAMETER_MODEL_TABLE_NAME).getParameterValue().trim());
- }
+ // Check if the default model name and model table name have been overridden.
+ setModelName(getOrDefault(settings, QueryParameters.PARAMETER_MODEL_NAME, getConfig().getModelName()));
+ setModelTableName(getOrDefault(settings, QueryParameters.PARAMETER_MODEL_TABLE_NAME, getConfig().getModelTableName()));
- // Check if user would like counts separated by column visibility
- if (null != settings.findParameter(SEPARATE_COUNTS_BY_COLVIS)
- && !settings.findParameter(SEPARATE_COUNTS_BY_COLVIS).getParameterValue().trim().isEmpty()) {
- boolean separateCountsByColVis = Boolean.valueOf(settings.findParameter(SEPARATE_COUNTS_BY_COLVIS).getParameterValue().trim());
- getConfig().setSeparateCountsByColVis(separateCountsByColVis);
- }
+ // Check if counts should be separated by column visibility.
+ setSeparateCountsByColVis(getOrDefaultBoolean(settings, SEPARATE_COUNTS_BY_COLVIS, getSeparateCountsByColVis()));
- // Check if user would like to show reference counts instead of term counts
- if (null != settings.findParameter(SHOW_REFERENCE_COUNT) && !settings.findParameter(SHOW_REFERENCE_COUNT).getParameterValue().trim().isEmpty()) {
- boolean showReferenceCount = Boolean.valueOf(settings.findParameter(SHOW_REFERENCE_COUNT).getParameterValue().trim());
- getConfig().setShowReferenceCount(showReferenceCount);
- }
+ // Check if reference counts should be shown.
+ setShowReferenceCount(getOrDefaultBoolean(settings, SHOW_REFERENCE_COUNT, getShowReferenceCount()));
+
+ // Check if counts should be summed.
+ setSumCounts(getOrDefaultBoolean(settings, SUM_COUNTS, getSumCounts()));
+
+ // Check if any datatype filters were specified.
+ getConfig().setDatatypeFilter(getOrDefaultSet(settings, QueryParameters.DATATYPE_FILTER_SET, getConfig().getDatatypeFilter()));
+
+ // Update the query model.
setQueryModel(metadataHelper.getQueryModel(getModelTableName(), getModelName(), null));
- // get the data type filter set if any
- if (null != settings.findParameter(QueryParameters.DATATYPE_FILTER_SET)
- && !settings.findParameter(QueryParameters.DATATYPE_FILTER_SET).getParameterValue().trim().isEmpty()) {
- Set dataTypeFilter = new HashSet<>(Arrays.asList(StringUtils
- .split(settings.findParameter(QueryParameters.DATATYPE_FILTER_SET).getParameterValue().trim(), Constants.PARAM_VALUE_SEP)));
- getConfig().setDatatypeFilter(dataTypeFilter);
- if (log.isDebugEnabled()) {
- log.debug("Data type filter set to " + dataTypeFilter);
- }
- }
- // Set the connector
+ // Set the currently indexed fields
+ getConfig().setIndexedFields(metadataHelper.getIndexedFields(Collections.emptySet()));
+
+ // Set the connector.
getConfig().setClient(client);
- // Set the auths
+
+ // Set the auths.
getConfig().setAuthorizations(auths);
- // Get the ranges
+ // Get the ranges.
getConfig().setBeginDate(settings.getBeginDate());
getConfig().setEndDate(settings.getEndDate());
- if (null == getConfig().getBeginDate() || null == getConfig().getEndDate()) {
- getConfig().setBeginDate(new Date(0));
+ // If a begin date was not specified, default to the earliest date.
+ if (getConfig().getBeginDate() == null) {
+ getConfig().setBeginDate(new Date(0L));
+ log.warn("Begin date not specified, using earliest begin date.");
+ }
+
+ // If an end date was not specified, default to the latest date.
+ if (getConfig().getEndDate() == null) {
getConfig().setEndDate(new Date(Long.MAX_VALUE));
- log.warn("Dates not specified, using entire date range");
+ log.warn("End date not specified, using latest end date.");
}
- // start with a trimmed version of the query, converted to JEXL
+ // Start with a trimmed version of the query, converted to JEXL
LuceneToJexlQueryParser parser = new LuceneToJexlQueryParser();
parser.setAllowLeadingWildCard(isAllowLeadingWildcard());
QueryNode node = parser.parse(settings.getQuery().trim());
@@ -173,9 +195,9 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting
// Parse & flatten the query
ASTJexlScript script = JexlASTHelper.parseAndFlattenJexlQuery(getConfig().getQueryString());
+ CaseSensitivityVisitor.upperCaseIdentifiers(getConfig(), metadataHelper, script);
- script = CaseSensitivityVisitor.upperCaseIdentifiers(getConfig(), metadataHelper, script);
-
+ // Apply the query model.
Set dataTypes = getConfig().getDatatypeFilter();
Set allFields;
allFields = metadataHelper.getAllFields(dataTypes);
@@ -183,14 +205,13 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting
QueryValues literalsAndPatterns = FindLiteralsAndPatternsVisitor.find(script);
Stopwatch timer = Stopwatch.createStarted();
- // no caching for getAllNormalizers, so try some magic with getFields...
+ // No caching for getAllNormalizers, so try some magic with getFields...
Multimap> dataTypeMap = ArrayListMultimap.create(metadataHelper.getFieldsToDatatypes(getConfig().getDatatypeFilter()));
- /*
- * we have a mapping of FIELD->DataType, but not a mapping of ANYFIELD->DataType which should be all dataTypes
- */
- dataTypeMap.putAll(Constants.ANY_FIELD, uniqueByType(dataTypeMap.values()));
+ // We have a mapping of FIELD->DataType, but not a mapping of ANYFIELD->DataType which should be all datatypes.
+ dataTypeMap.putAll(Constants.ANY_FIELD, getUniqueTypes(dataTypeMap.values()));
timer.stop();
log.debug("Took " + timer.elapsed(TimeUnit.MILLISECONDS) + "ms to get all the dataTypes.");
+
getConfig().setLiterals(normalize(new LiteralNormalization(), literalsAndPatterns.getLiterals(), dataTypeMap));
getConfig().setPatterns(normalize(new PatternNormalization(), literalsAndPatterns.getPatterns(), dataTypeMap));
getConfig().setRanges(normalizeRanges(new LiteralNormalization(), literalsAndPatterns.getRanges(), dataTypeMap));
@@ -199,44 +220,143 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting
log.debug("Normalized Patterns = " + getConfig().getPatterns());
}
+ // Set the planned queries to execute.
getConfig().setQueries(createQueries(getConfig()));
return getConfig();
}
- public List createQueries(DiscoveryQueryConfiguration config) throws QueryException, TableNotFoundException, IOException, ExecutionException {
- final List queries = Lists.newLinkedList();
+ /**
+ * If present, return the value of the given parameter from the given settings, or return the default value otherwise.
+ */
+ private String getOrDefault(Query settings, String parameterName, String defaultValue) {
+ String value = getTrimmedParameter(settings, parameterName);
+ return StringUtils.isBlank(value) ? defaultValue : value;
+ }
- Set familiesToSeek = Sets.newHashSet();
- Pair,Set> seekRanges = makeRanges(getConfig(), familiesToSeek, metadataHelper);
- Collection forward = seekRanges.getValue0();
+ /**
+ * If present, return the value of the given parameter from the given settings as a boolean, or return the default value otherwise.
+ */
+ private boolean getOrDefaultBoolean(Query settings, String parameterName, boolean defaultValue) {
+ String value = getTrimmedParameter(settings, parameterName);
+ log.debug("Trimmed value for " + parameterName + ": " + value);
+ return StringUtils.isBlank(value) ? defaultValue : Boolean.parseBoolean(value);
+ }
- if (!forward.isEmpty()) {
- List settings = getIteratorSettingsForDiscovery(getConfig(), getConfig().getLiterals(), getConfig().getPatterns(),
- getConfig().getRanges(), false);
- if (isCheckpointable()) {
- // if checkpointable, then only one range per query data so that the whole checkpointing thing works correctly
- for (Range range : forward) {
- queries.add(new QueryData(config.getIndexTableName(), null, Collections.singleton(range), familiesToSeek, settings));
+ /**
+ * If present, return the value of the given parameter from the given settings as a set, or return the default value otherwise.
+ */
+ private Set getOrDefaultSet(Query settings, String parameterName, Set defaultValue) {
+ String value = getTrimmedParameter(settings, parameterName);
+ return StringUtils.isBlank(value) ? defaultValue : new HashSet<>(Arrays.asList(StringUtils.split(value, Constants.PARAM_VALUE_SEP)));
+ }
+
+ /**
+ * Return the trimmed value of the given parameter from the given settings, or null if a value is not present.
+ */
+ private String getTrimmedParameter(Query settings, String parameterName) {
+ QueryImpl.Parameter parameter = settings.findParameter(parameterName);
+ return parameter != null ? parameter.getParameterValue().trim() : null;
+ }
+
+ /**
+ * Given a sequence of objects of type T, this method will return a single object for every unique type passed in. This is used to dedupe normalizer
+ * instances by their type, so that we only get 1 instance per type of normalizer.
+ */
+ private Collection> getUniqueTypes(Iterable> things) {
+ Map,Type>> map = Maps.newHashMap();
+ for (Type> t : things) {
+ map.put(t.getClass(), t);
+ }
+ return map.values();
+ }
+
+ /**
+ * This attempts to normalize all of the {@code } tuples with the corresponding {@code } tuple. The Normalization object
+ * will determine whether a regex or literal is being normalized.
+ *
+ * See the {@link PatternNormalization} and {@link LiteralNormalization} implementations.
+ *
+ * @param normalization
+ * the normalizer object
+ * @param valuesToFields
+ * mapping of values to fields
+ * @param dataTypeMap
+ * the data type map
+ * @return a mapping of the normalized tuples
+ */
+ private Multimap normalize(Normalization normalization, Multimap valuesToFields, Multimap> dataTypeMap) {
+ Multimap normalizedValuesToFields = HashMultimap.create();
+ for (Entry valueAndField : valuesToFields.entries()) {
+ String value = valueAndField.getKey(), field = valueAndField.getValue();
+ for (Type> dataType : dataTypeMap.get(field)) {
+ try {
+ log.debug("Attempting to normalize [" + value + "] with [" + dataType.getClass() + "]");
+ String normalized = normalization.normalize(dataType, field, value);
+ normalizedValuesToFields.put(normalized, field);
+ log.debug("Normalization succeeded!");
+ } catch (Exception exception) {
+ log.debug("Normalization failed.");
}
- } else {
- queries.add(new QueryData(config.getIndexTableName(), null, forward, familiesToSeek, settings));
}
}
+ return normalizedValuesToFields;
+ }
- Collection reverse = seekRanges.getValue1();
- if (!reverse.isEmpty()) {
- List settings = getIteratorSettingsForDiscovery(getConfig(), getConfig().getLiterals(), getConfig().getPatterns(),
- getConfig().getRanges(), true);
- if (isCheckpointable()) {
- // if checkpointable, then only one range per query data so that the whole checkpointing thing works correctly
- for (Range range : reverse) {
- queries.add(new QueryData(config.getReverseIndexTableName(), null, Collections.singleton(range), familiesToSeek, settings));
+ /**
+ * This attempts to normalize all of the {@code } tuples with the corresponding {@code } tuple. The Normalization object
+ * will determine whether a regex or literal is being normalized.
+ *
+ * See the {@link PatternNormalization} and {@link LiteralNormalization} implementations.
+ *
+ * @param normalization
+ * the normalizer object
+ * @param valuesToFields
+ * mapping of values to fields
+ * @param dataTypeMap
+ * the data type map
+ * @return a mapping of the normalized ranges
+ */
+ private Multimap> normalizeRanges(Normalization normalization, Multimap> valuesToFields,
+ Multimap> dataTypeMap) {
+ Multimap> normalizedValuesToFields = HashMultimap.create();
+ for (Entry> valueAndField : valuesToFields.entries()) {
+ String field = valueAndField.getKey();
+ LiteralRange> value = valueAndField.getValue();
+ for (Type> dataType : dataTypeMap.get(field)) {
+ try {
+ log.debug("Attempting to normalize [" + value + "] with [" + dataType.getClass() + "]");
+ String normalizedLower = normalization.normalize(dataType, field, value.getLower().toString());
+ String normalizedUpper = normalization.normalize(dataType, field, value.getUpper().toString());
+ normalizedValuesToFields.put(field, new LiteralRange<>(normalizedLower, value.isLowerInclusive(), normalizedUpper, value.isUpperInclusive(),
+ value.getFieldName(), value.getNodeOperand()));
+ log.debug("Normalization succeeded!");
+ } catch (Exception exception) {
+ log.debug("Normalization failed.");
}
- } else {
- queries.add(new QueryData(config.getReverseIndexTableName(), null, reverse, familiesToSeek, settings));
}
}
+ return normalizedValuesToFields;
+ }
+
+ /**
+ * Create and return a list of planned queries.
+ *
+ * @param config
+ * the config
+ * @return the list of query data
+ */
+ private List createQueries(DiscoveryQueryConfiguration config) throws TableNotFoundException, ExecutionException {
+ final List queries = Lists.newLinkedList();
+
+ Set familiesToSeek = Sets.newHashSet(); // This will be populated by createRanges().
+ Pair,Set> seekRanges = createRanges(config, familiesToSeek, metadataHelper);
+
+ // Create the forward queries.
+ queries.addAll(createQueriesFromRanges(config, seekRanges.getValue0(), familiesToSeek, false));
+
+ // Create the reverse queries.
+ queries.addAll(createQueriesFromRanges(config, seekRanges.getValue1(), familiesToSeek, true));
if (log.isDebugEnabled()) {
log.debug("Created ranges: " + queries);
@@ -245,67 +365,161 @@ public List createQueries(DiscoveryQueryConfiguration config) throws
return queries;
}
- @Override
- public void setupQuery(GenericQueryConfiguration genericConfig) throws QueryException, TableNotFoundException, IOException, ExecutionException {
- if (!genericConfig.getClass().getName().equals(DiscoveryQueryConfiguration.class.getName())) {
- throw new QueryException("Did not receive a DiscoveryQueryConfiguration instance!!");
+ /**
+ * Create planned queries for the given ranges.
+ *
+ * @param config
+ * the config
+ * @param ranges
+ * the ranges
+ * @param familiesToSeek
+ * the families to seek
+ * @param reversed
+ * whether the ranges are for the reversed index
+ * @return the queries
+ */
+ private List createQueriesFromRanges(DiscoveryQueryConfiguration config, Set ranges, Set familiesToSeek, boolean reversed) {
+ List queries = new ArrayList<>();
+ if (!ranges.isEmpty()) {
+ List settings = getIteratorSettings(config, reversed);
+ String tableName = reversed ? config.getReverseIndexTableName() : config.getIndexTableName();
+ if (isCheckpointable()) {
+ for (Range range : ranges) {
+ queries.add(new QueryData(tableName, null, Collections.singleton(range), familiesToSeek, settings));
+ }
+ } else {
+ queries.add(new QueryData(tableName, null, ranges, familiesToSeek, settings));
+ }
}
- this.config = (DiscoveryQueryConfiguration) genericConfig;
- final List> iterators = Lists.newArrayList();
+ return queries;
+ }
- for (QueryData qd : config.getQueries()) {
- if (log.isDebugEnabled()) {
- log.debug("Creating scanner for " + qd);
+ /**
+ * Creates two collections of ranges: one for the forward index (value0) and one for the reverse index (value1). If a literal has a field name, then the
+ * Range for that term will include the column family. If there are multiple fields, then multiple ranges are created.
+ *
+ * @param config
+ * the discovery config
+ * @param familiesToSeek
+ * the families to seek
+ * @param metadataHelper
+ * a metadata helper
+ * @return a pair of ranges
+ * @throws TableNotFoundException
+ * if the table is not found
+ * @throws ExecutionException
+ * for execution exceptions
+ */
+ private Pair,Set> createRanges(DiscoveryQueryConfiguration config, Set familiesToSeek, MetadataHelper metadataHelper)
+ throws TableNotFoundException, ExecutionException {
+ Set forwardRanges = new HashSet<>();
+ Set reverseRanges = new HashSet<>();
+
+ // Evaluate the literals.
+ for (Entry literalAndField : config.getLiterals().entries()) {
+ String literal = literalAndField.getKey(), field = literalAndField.getValue();
+ // If the field is _ANYFIELD_, use null when making the range.
+ field = Constants.ANY_FIELD.equals(field) ? null : field;
+ // Mark the field as a family to seek if not null.
+ if (field != null) {
+ familiesToSeek.add(field);
}
- // scan the table
- BatchScanner bs = scannerFactory.newScanner(qd.getTableName(), config.getAuthorizations(), config.getNumQueryThreads(), config.getQuery());
+ forwardRanges.add(ShardIndexQueryTableStaticMethods.getLiteralRange(field, literal));
+ }
- bs.setRanges(qd.getRanges());
- for (IteratorSetting setting : qd.getSettings()) {
- bs.addScanIterator(setting);
+ // Evaluate the ranges.
+ for (Entry> rangeEntry : config.getRanges().entries()) {
+ LiteralRange range = rangeEntry.getValue();
+ String field = rangeEntry.getKey();
+ // If the field is _ANYFIELD_, use null when making the range.
+ field = Constants.ANY_FIELD.equals(field) ? null : field;
+ // Mark the field as a family to seek if not null.
+ if (field != null) {
+ familiesToSeek.add(field);
}
- for (String cf : qd.getColumnFamilies()) {
- bs.fetchColumnFamily(new Text(cf));
+ try {
+ forwardRanges.add(ShardIndexQueryTableStaticMethods.getBoundedRangeRange(range));
+ } catch (IllegalRangeArgumentException e) {
+ log.error("Error using range [" + range + "]", e);
}
+ }
- iterators.add(transformScanner(bs, qd));
+ // Evaluate the patterns.
+ for (Entry patternAndField : config.getPatterns().entries()) {
+ String pattern = patternAndField.getKey(), field = patternAndField.getValue();
+ // If the field is _ANYFIELD_, use null when making the range.
+ field = Constants.ANY_FIELD.equals(field) ? null : field;
+ // Mark the field as a family to seek if not null.
+ if (field != null) {
+ familiesToSeek.add(field);
+ }
+ ShardIndexQueryTableStaticMethods.RefactoredRangeDescription description;
+ try {
+ description = ShardIndexQueryTableStaticMethods.getRegexRange(field, pattern, false, metadataHelper, config);
+ } catch (JavaRegexParseException e) {
+ log.error("Error parsing pattern [" + pattern + "]", e);
+ continue;
+ }
+ if (description.isForReverseIndex) {
+ reverseRanges.add(description.range);
+ } else {
+ forwardRanges.add(description.range);
+ }
}
- this.iterator = concat(iterators.iterator());
- }
- public static List getIteratorSettingsForDiscovery(DiscoveryQueryConfiguration config, Multimap literals,
- Multimap patterns, Multimap> ranges, boolean reverseIndex) {
+ return Pair.with(forwardRanges, reverseRanges);
+ }
+ /**
+ * Return the set of iterator settings that should be applied to queries for the given configuration.
+ *
+ * @param config
+ * the config
+ * @param reverseIndex
+ * whether the iterator settings should be configured for a reversed index
+ * @return the iterator settings
+ */
+ private List getIteratorSettings(DiscoveryQueryConfiguration config, boolean reverseIndex) {
List settings = Lists.newLinkedList();
- // The begin date from the query may be down to the second, for doing lookups in the index we want to use the day because
- // the times in the index table have been truncated to the day.
+
+ // Add a date range filter.
+ // The begin date from the query may be down to the second, for doing look-ups in the index we want to use the day because the times in the index table
+ // have been truncated to the day.
Date begin = DateUtils.truncate(config.getBeginDate(), Calendar.DAY_OF_MONTH);
- // we don't need to bump up the end date any more because it's not apart of the range set on the scanner
+ // we don't need to bump up the end date any more because it's not a part of the range set on the scanner.
Date end = config.getEndDate();
-
LongRange dateRange = new LongRange(begin.getTime(), end.getTime());
-
settings.add(ShardIndexQueryTableStaticMethods.configureGlobalIndexDateRangeFilter(config, dateRange));
+
+ // Add a datatype filter.
settings.add(ShardIndexQueryTableStaticMethods.configureGlobalIndexDataTypeFilter(config, config.getDatatypeFilter()));
- IteratorSetting matchingIterator = configureIndexMatchingIterator(config, literals, patterns, ranges, reverseIndex);
+ // Add an iterator to match literals, patterns, and ranges against the index.
+ IteratorSetting matchingIterator = configureIndexMatchingIterator(config, reverseIndex);
if (matchingIterator != null) {
settings.add(matchingIterator);
}
- IteratorSetting discoveryIteratorSetting = new IteratorSetting(config.getBaseIteratorPriority() + 50, DiscoveryIterator.class);
- discoveryIteratorSetting.addOption(REVERSE_INDEX, Boolean.toString(reverseIndex));
- discoveryIteratorSetting.addOption(SEPARATE_COUNTS_BY_COLVIS, config.getSeparateCountsByColVis().toString());
- if (config.getShowReferenceCount()) {
- discoveryIteratorSetting.addOption(SHOW_REFERENCE_COUNT, config.getShowReferenceCount().toString());
- }
- settings.add(discoveryIteratorSetting);
+ // Add an iterator to create the actual DiscoveryThings.
+ settings.add(configureDiscoveryIterator(config, reverseIndex));
return settings;
}
- public static final IteratorSetting configureIndexMatchingIterator(DiscoveryQueryConfiguration config, Multimap literals,
- Multimap patterns, Multimap> ranges, boolean reverseIndex) {
+ /**
+ * Return a {@link IteratorSetting} for an {@link IndexMatchingIterator}.
+ *
+ * @param config
+ * the config
+ * @param reverseIndex
+ * whether searching against the reversed index.
+ * @return the iterator setting
+ */
+ private IteratorSetting configureIndexMatchingIterator(DiscoveryQueryConfiguration config, boolean reverseIndex) {
+ Multimap literals = config.getLiterals();
+ Multimap patterns = config.getPatterns();
+ Multimap> ranges = config.getRanges();
+
if ((literals == null || literals.isEmpty()) && (patterns == null || patterns.isEmpty()) && (ranges == null || ranges.isEmpty())) {
return null;
}
@@ -314,6 +528,7 @@ public static final IteratorSetting configureIndexMatchingIterator(DiscoveryQuer
IteratorSetting cfg = new IteratorSetting(config.getBaseIteratorPriority() + 23, "termMatcher", IndexMatchingIterator.class);
IndexMatchingIterator.Configuration conf = new IndexMatchingIterator.Configuration();
+ // Add literals.
if (literals != null) {
for (Entry literal : literals.entries()) {
if (Constants.ANY_FIELD.equals(literal.getValue())) {
@@ -323,6 +538,7 @@ public static final IteratorSetting configureIndexMatchingIterator(DiscoveryQuer
}
}
}
+ // Add patterns.
if (patterns != null) {
for (Entry pattern : patterns.entries()) {
if (Constants.ANY_FIELD.equals(pattern.getValue())) {
@@ -332,6 +548,7 @@ public static final IteratorSetting configureIndexMatchingIterator(DiscoveryQuer
}
}
}
+ // Add ranges.
if (ranges != null) {
for (Entry> range : ranges.entries()) {
if (Constants.ANY_FIELD.equals(range.getKey())) {
@@ -343,25 +560,73 @@ public static final IteratorSetting configureIndexMatchingIterator(DiscoveryQuer
}
cfg.addOption(IndexMatchingIterator.CONF, IndexMatchingIterator.gson().toJson(conf));
-
cfg.addOption(IndexMatchingIterator.REVERSE_INDEX, Boolean.toString(reverseIndex));
return cfg;
}
+ /**
+ * Return an {@link IteratorSetting} for an {@link DiscoveryIterator}.
+ *
+ * @param config
+ * the config
+ * @param reverseIndex
+ * whether searching against the reversed index.
+ * @return the iterator setting
+ */
+ private IteratorSetting configureDiscoveryIterator(DiscoveryQueryConfiguration config, boolean reverseIndex) {
+ IteratorSetting setting = new IteratorSetting(config.getBaseIteratorPriority() + 50, DiscoveryIterator.class);
+ setting.addOption(REVERSE_INDEX, Boolean.toString(reverseIndex));
+ setting.addOption(SEPARATE_COUNTS_BY_COLVIS, Boolean.toString(config.getSeparateCountsByColVis()));
+ setting.addOption(SHOW_REFERENCE_COUNT, Boolean.toString(config.getShowReferenceCount()));
+ setting.addOption(SUM_COUNTS, Boolean.toString(config.getSumCounts()));
+ return setting;
+ }
+
+ @Override
+ public void setupQuery(GenericQueryConfiguration genericConfig) throws QueryException, TableNotFoundException, IOException, ExecutionException {
+ if (!genericConfig.getClass().getName().equals(DiscoveryQueryConfiguration.class.getName())) {
+ throw new QueryException("Did not receive a DiscoveryQueryConfiguration instance!!");
+ }
+ this.config = (DiscoveryQueryConfiguration) genericConfig;
+ final List> iterators = Lists.newArrayList();
+
+ for (QueryData qd : config.getQueries()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating scanner for " + qd);
+ }
+ // scan the table
+ BatchScanner bs = scannerFactory.newScanner(qd.getTableName(), config.getAuthorizations(), config.getNumQueryThreads(), config.getQuery());
+
+ bs.setRanges(qd.getRanges());
+ for (IteratorSetting setting : qd.getSettings()) {
+ bs.addScanIterator(setting);
+ }
+ for (String cf : qd.getColumnFamilies()) {
+ bs.fetchColumnFamily(new Text(cf));
+ }
+
+ iterators.add(transformScanner(bs, qd, config.getIndexedFields()));
+ }
+ this.iterator = concat(iterators.iterator());
+ }
+
@Override
public ShardIndexQueryTable clone() {
return new DiscoveryLogic(this);
}
/**
- * Takes in a batch scanner and returns an iterator over the DiscoveredThing objects contained in the value.
+ * Takes in a batch scanner, removes all DiscoveredThings that do not have an indexed field, and returns an iterator over the DiscoveredThing objects
+ * contained in the value.
*
* @param scanner
* a batch scanner
+ * @param indexedFields
+ * set of currently indexed fields
* @return iterator for discoveredthings
*/
- public static Iterator transformScanner(final BatchScanner scanner, final QueryData queryData) {
+ private Iterator transformScanner(final BatchScanner scanner, final QueryData queryData, Set indexedFields) {
return concat(transform(scanner.iterator(), new Function,Iterator>() {
DataInputBuffer in = new DataInputBuffer();
@@ -379,190 +644,47 @@ public Iterator apply(Entry from) {
}
ArrayList thangs = Lists.newArrayListWithCapacity(aw.get().length);
for (Writable w : aw.get()) {
- thangs.add((DiscoveredThing) w);
+ // Check to see if the field is currently indexed, if it's not, we should NOT be adding it to 'thangs'
+ if (indexedFields.contains(((DiscoveredThing) w).getField())) {
+ thangs.add((DiscoveredThing) w);
+ } else {
+ log.debug(((DiscoveredThing) w).getField() + " was NOT found in IndexedFields");
+ }
}
return thangs.iterator();
}
}));
}
- /**
- * Makes two collections of ranges: one for the forward index (value0) and one for the reverse index (value1).
- *
- * If a literal has a field name, then the Range for that term will include the column family. If there are multiple fields, then multiple ranges are
- * created.
- *
- * @param config
- * the discovery config
- * @param familiesToSeek
- * the families to seek
- * @param metadataHelper
- * a metadata helper
- * @return a pair of ranges
- * @throws TableNotFoundException
- * if the table is not found
- * @throws ExecutionException
- * for execution exceptions
- */
- @SuppressWarnings("unchecked")
- public static Pair,Set> makeRanges(DiscoveryQueryConfiguration config, Set familiesToSeek, MetadataHelper metadataHelper)
- throws TableNotFoundException, ExecutionException {
- Set forwardRanges = new HashSet<>();
- for (Entry literalAndField : config.getLiterals().entries()) {
- String literal = literalAndField.getKey(), field = literalAndField.getValue();
- // if we're _ANYFIELD_, then use null when making the literal range
- field = Constants.ANY_FIELD.equals(field) ? null : field;
- if (field != null) {
- familiesToSeek.add(field);
- }
- forwardRanges.add(ShardIndexQueryTableStaticMethods.getLiteralRange(field, literal));
- }
- for (Entry> rangeEntry : config.getRanges().entries()) {
- LiteralRange range = rangeEntry.getValue();
- String field = rangeEntry.getKey();
- // if we're _ANYFIELD_, then use null when making the literal range
- field = Constants.ANY_FIELD.equals(field) ? null : field;
- if (field != null) {
- familiesToSeek.add(field);
- }
- try {
- forwardRanges.add(ShardIndexQueryTableStaticMethods.getBoundedRangeRange(range));
- } catch (IllegalRangeArgumentException e) {
- log.error("Error using range [" + range + "]", e);
- continue;
- }
- }
- Set reverseRanges = new HashSet<>();
- for (Entry patternAndField : config.getPatterns().entries()) {
- String pattern = patternAndField.getKey(), field = patternAndField.getValue();
- // if we're _ANYFIELD_, then use null when making the literal range
- field = Constants.ANY_FIELD.equals(field) ? null : field;
- ShardIndexQueryTableStaticMethods.RefactoredRangeDescription description;
- try {
- if (field != null) {
- familiesToSeek.add(field);
- }
- description = ShardIndexQueryTableStaticMethods.getRegexRange(field, pattern, false, metadataHelper, config);
- } catch (JavaRegexParseException e) {
- log.error("Error parsing pattern [" + pattern + "]", e);
- continue;
- }
- if (description.isForReverseIndex) {
- reverseRanges.add(description.range);
- } else {
- forwardRanges.add(description.range);
- }
- }
- return Pair.with(forwardRanges, reverseRanges);
- }
-
- /**
- * This attempts to normalize all of the {@code } tuples with the corresponding {@code } tuple. The Normalization object
- * will determine whether or not a regex or literal is being normalized.
- *
- * See the {@link PatternNormalization} and {@link LiteralNormalization} implementations.
- *
- * @param normalization
- * the normalizer object
- * @param valuesToFields
- * mapping of values to fields
- * @param dataTypeMap
- * the data type map
- * @return a mapping of the noramlized tuples
- */
- public static Multimap normalize(Normalization normalization, Multimap valuesToFields, Multimap> dataTypeMap) {
- Multimap normalizedValuesToFields = HashMultimap.create();
- for (Entry valueAndField : valuesToFields.entries()) {
- String value = valueAndField.getKey(), field = valueAndField.getValue();
- for (Type> dataType : dataTypeMap.get(field)) {
- try {
- log.debug("Attempting to normalize [" + value + "] with [" + dataType.getClass() + "]");
- String normalized = normalization.normalize(dataType, field, value);
- normalizedValuesToFields.put(normalized, field);
- log.debug("Normalization succeeded!");
- } catch (Exception exception) {
- log.debug("Normalization failed.");
- }
- }
- }
- return normalizedValuesToFields;
- }
-
- /**
- * This attempts to normalize all of the {@code } tuples with the corresponding {@code } tuple. The Normalization object
- * will determine whether or not a regex or literal is being normalized.
- *
- * See the {@link PatternNormalization} and {@link LiteralNormalization} implementations.
- *
- * @param normalization
- * the normalizer object
- * @param valuesToFields
- * mapping of values to fields
- * @param dataTypeMap
- * the data type map
- * @return a mapping of the normalized ranges
- */
- public static Multimap> normalizeRanges(Normalization normalization, Multimap> valuesToFields,
- Multimap> dataTypeMap) {
- Multimap> normalizedValuesToFields = HashMultimap.create();
- for (Entry> valueAndField : valuesToFields.entries()) {
- String field = valueAndField.getKey();
- LiteralRange> value = valueAndField.getValue();
- for (Type> dataType : dataTypeMap.get(field)) {
- try {
- log.debug("Attempting to normalize [" + value + "] with [" + dataType.getClass() + "]");
- String normalizedLower = normalization.normalize(dataType, field, value.getLower().toString());
- String normalizedUpper = normalization.normalize(dataType, field, value.getUpper().toString());
- normalizedValuesToFields.put(field, new LiteralRange<>(normalizedLower, value.isLowerInclusive(), normalizedUpper, value.isUpperInclusive(),
- value.getFieldName(), value.getNodeOperand()));
- log.debug("Normalization succeeded!");
- } catch (Exception exception) {
- log.debug("Normalization failed.");
- }
- }
- }
- return normalizedValuesToFields;
- }
-
- /**
- * Given a sequence of objects of type T, this method will return a single object for every unique type passed in. This is used to dedupe normalizer
- * instances by their type, so that we only get 1 instance per type of normalizer.
- *
- * @param things
- * iterable list of objects
- * @param
- * type of the objects
- * @return an object for each type passed in
- */
- public static Collection uniqueByType(Iterable things) {
- Map,T> map = Maps.newHashMap();
- for (T t : things) {
- map.put(t.getClass(), t);
- }
- return map.values();
- }
-
@Override
public Set getOptionalQueryParameters() {
Set params = super.getOptionalQueryParameters();
params.add(SEPARATE_COUNTS_BY_COLVIS);
+ params.add(SUM_COUNTS);
return params;
}
- public Boolean getSeparateCountsByColVis() {
+ public boolean getSeparateCountsByColVis() {
return getConfig().getSeparateCountsByColVis();
}
- public void setSeparateCountsByColVis(Boolean separateCountsByColVis) {
+ public void setSeparateCountsByColVis(boolean separateCountsByColVis) {
getConfig().setSeparateCountsByColVis(separateCountsByColVis);
}
- public Boolean getShowReferenceCount() {
+ public boolean getShowReferenceCount() {
return getConfig().getShowReferenceCount();
}
- public void setShowReferenceCount(Boolean showReferenceCount) {
+ public void setShowReferenceCount(boolean showReferenceCount) {
getConfig().setShowReferenceCount(showReferenceCount);
}
+ public boolean getSumCounts() {
+ return getConfig().getSumCounts();
+ }
+
+ public void setSumCounts(boolean sumCounts) {
+ getConfig().setSumCounts(sumCounts);
+ }
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryQueryConfiguration.java b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryQueryConfiguration.java
index 13c8fa25d75..59d09666450 100644
--- a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryQueryConfiguration.java
+++ b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryQueryConfiguration.java
@@ -3,6 +3,7 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.Objects;
+import java.util.StringJoiner;
import com.google.common.collect.Multimap;
@@ -17,8 +18,9 @@
public class DiscoveryQueryConfiguration extends ShardIndexQueryConfiguration implements Serializable {
private Multimap literals, patterns;
private Multimap> ranges;
- private Boolean separateCountsByColVis = false;
- private Boolean showReferenceCount = false;
+ private boolean separateCountsByColVis = false;
+ private boolean showReferenceCount = false;
+ private boolean sumCounts = false;
public DiscoveryQueryConfiguration() {}
@@ -116,23 +118,31 @@ public void setPatterns(Multimap patterns) {
this.patterns = patterns;
}
- public Boolean getSeparateCountsByColVis() {
+ public boolean getSeparateCountsByColVis() {
return separateCountsByColVis;
}
- public Boolean getShowReferenceCount() {
+ public boolean getShowReferenceCount() {
return showReferenceCount;
}
+ public boolean getSumCounts() {
+ return sumCounts;
+ }
+
public void setSeparateCountsByColVis(boolean separateCountsByColVis) {
this.separateCountsByColVis = separateCountsByColVis;
}
- public void setShowReferenceCount(Boolean showReferenceCount) {
+ public void setShowReferenceCount(boolean showReferenceCount) {
this.showReferenceCount = showReferenceCount;
}
+ public void setSumCounts(boolean sumCounts) {
+ this.sumCounts = sumCounts;
+ }
+
@Override
public DiscoveryQueryConfiguration checkpoint() {
// Create a new config that only contains what is needed to execute the specified ranges
@@ -156,4 +166,11 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(super.hashCode(), literals, patterns, ranges, separateCountsByColVis, showReferenceCount);
}
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", DiscoveryQueryConfiguration.class.getSimpleName() + "[", "]").add("literals=" + literals).add("patterns=" + patterns)
+ .add("ranges=" + ranges).add("separateCountsByColVis=" + separateCountsByColVis).add("showReferenceCount=" + showReferenceCount)
+ .add("sumCounts=" + sumCounts).toString();
+ }
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/exceptions/DatawaveAsyncOperationException.java b/warehouse/query-core/src/main/java/datawave/query/exceptions/DatawaveAsyncOperationException.java
new file mode 100644
index 00000000000..d8296f1e68f
--- /dev/null
+++ b/warehouse/query-core/src/main/java/datawave/query/exceptions/DatawaveAsyncOperationException.java
@@ -0,0 +1,32 @@
+package datawave.query.exceptions;
+
+import datawave.query.planner.DefaultQueryPlanner;
+
+/**
+ * An exception thrown when the {@link DefaultQueryPlanner} encounters a problem during an async operation like fetching field sets or serializing iterator
+ * options in another thread
+ */
+public class DatawaveAsyncOperationException extends RuntimeException {
+
+ private static final long serialVersionUID = -5455973957749708049L;
+
+ public DatawaveAsyncOperationException() {
+ super();
+ }
+
+ public DatawaveAsyncOperationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DatawaveAsyncOperationException(String message) {
+ super(message);
+ }
+
+ public DatawaveAsyncOperationException(Throwable cause) {
+ super(cause);
+ }
+
+ protected DatawaveAsyncOperationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/warehouse/query-core/src/main/java/datawave/query/function/DescendantCountFunction.java b/warehouse/query-core/src/main/java/datawave/query/function/DescendantCountFunction.java
index 1aa09e1753d..e21d78aa7de 100644
--- a/warehouse/query-core/src/main/java/datawave/query/function/DescendantCountFunction.java
+++ b/warehouse/query-core/src/main/java/datawave/query/function/DescendantCountFunction.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
+import com.google.common.collect.Lists;
+
import datawave.data.hash.UID;
import datawave.data.hash.UIDConstants;
import datawave.query.Constants;
@@ -75,9 +77,9 @@ public class DescendantCountFunction implements SourcedFunction columnFamilies = KeyToDocumentData.columnFamilies;
+ private Collection columnFamilies = Lists.newArrayList(new ArrayByteSequence("tf"), new ArrayByteSequence("d"));
- private boolean inclusive = KeyToDocumentData.inclusive;
+ private boolean inclusive = false;
private Text indexCf;
@@ -133,7 +135,7 @@ public DescendantCount apply(final Tuple3>> tupl
long timestamp = key.getTimestamp();
boolean hasChildren = count.hasDescendants();
final Key hasChildrenKey = new Key(key.getRow(), key.getColumnFamily(),
- new Text(QueryOptions.DEFAULT_HAS_CHILDREN_FIELDNAME + '\0' + Boolean.toString(hasChildren)), visibility, timestamp);
+ new Text(QueryOptions.DEFAULT_HAS_CHILDREN_FIELDNAME + '\0' + hasChildren), visibility, timestamp);
countKeys.add(hasChildrenKey);
}
@@ -142,7 +144,7 @@ public DescendantCount apply(final Tuple3>> tupl
long timestamp = key.getTimestamp();
int numChildren = count.getFirstGenerationCount();
final Key childCountKey = new Key(key.getRow(), key.getColumnFamily(),
- new Text(QueryOptions.DEFAULT_CHILD_COUNT_FIELDNAME + '\0' + Integer.toString(numChildren)), visibility, timestamp);
+ new Text(QueryOptions.DEFAULT_CHILD_COUNT_FIELDNAME + '\0' + numChildren), visibility, timestamp);
countKeys.add(childCountKey);
}
@@ -152,9 +154,9 @@ public DescendantCount apply(final Tuple3>> tupl
int numDescendants = count.getAllGenerationsCount();
final Text text;
if (count.skippedDescendants()) {
- text = new Text(QueryOptions.DEFAULT_DESCENDANT_COUNT_FIELDNAME + '\0' + Integer.toString(numDescendants - 1) + '+');
+ text = new Text(QueryOptions.DEFAULT_DESCENDANT_COUNT_FIELDNAME + '\0' + (numDescendants - 1) + '+');
} else {
- text = new Text(QueryOptions.DEFAULT_DESCENDANT_COUNT_FIELDNAME + '\0' + Integer.toString(numDescendants));
+ text = new Text(QueryOptions.DEFAULT_DESCENDANT_COUNT_FIELDNAME + '\0' + numDescendants);
}
final Key descendantCountKey = new Key(key.getRow(), key.getColumnFamily(), text, visibility, timestamp);
@@ -266,7 +268,7 @@ private int getCountByEventScan(final Range seekRange, final Text row, final Str
Key endKey = new Key(row, new Text(dataType + '\0' + baseUid + Constants.MAX_UNICODE_STRING));
Range range = new Range(startKey, true, endKey, false);
- // seek too the new range
+ // seek to the new range
Set emptyCfs = Collections.emptySet();
this.source.seek(range, emptyCfs, false);
@@ -388,7 +390,7 @@ private CountResult getCountByFieldIndexScan(final Range seekRange, final Text r
} else {
// If configured, past an exceptionally large number of irrelevant grandchildren.
// Although this would potentially throw off the descendant count, it may be necessary
- // if a given event has thousands (or millions) of grandchildren and we're mainly interested
+ // if a given event has thousands (or millions) of grandchildren, and we're mainly interested
// in the number of 1st generation children.
nonMatchingDescendants++;
if ((this.skipThreshold > 0) && (nonMatchingDescendants >= this.skipThreshold)) {
diff --git a/warehouse/query-core/src/main/java/datawave/query/function/IndexOnlyKeyToDocumentData.java b/warehouse/query-core/src/main/java/datawave/query/function/IndexOnlyKeyToDocumentData.java
index 8628206161f..6ff31065b71 100644
--- a/warehouse/query-core/src/main/java/datawave/query/function/IndexOnlyKeyToDocumentData.java
+++ b/warehouse/query-core/src/main/java/datawave/query/function/IndexOnlyKeyToDocumentData.java
@@ -38,9 +38,9 @@
* Fetches index-only tf key/values and outputs them as "standard" field key/value pairs
*/
public class IndexOnlyKeyToDocumentData extends KeyToDocumentData implements Iterator> {
- private static final Collection COLUMN_FAMILIES = Lists. newArrayList(new ArrayByteSequence("d"));
+ private static final Collection COLUMN_FAMILIES = Lists.newArrayList(new ArrayByteSequence("d"));
- private static Logger LOG = Logger.getLogger(IndexOnlyKeyToDocumentData.class);
+ private static final Logger LOG = Logger.getLogger(IndexOnlyKeyToDocumentData.class);
private static final Entry INVALID_COLUMNQUALIFIER_FORMAT_KEY = Maps.immutableEntry(new Key("INVALID_COLUMNQUALIFIER_FORMAT_KEY"), EMPTY_VALUE);
@@ -159,7 +159,7 @@ public Entry apply(final Entry from) {
// get the document key
Key docKey = getDocKey(from.getKey());
- // Ensure that we have a non-empty colqual
+ // Ensure that we have a non-empty column qualifier
final Key stopKey = new Key(from.getKey().getRow().toString(), from.getKey().getColumnFamily().toString(),
from.getKey().getColumnQualifier().toString() + '\u0000' + '\uffff');
diff --git a/warehouse/query-core/src/main/java/datawave/query/function/KeyToDocumentData.java b/warehouse/query-core/src/main/java/datawave/query/function/KeyToDocumentData.java
index 5e1e147b5f3..fabd48d09e1 100644
--- a/warehouse/query-core/src/main/java/datawave/query/function/KeyToDocumentData.java
+++ b/warehouse/query-core/src/main/java/datawave/query/function/KeyToDocumentData.java
@@ -54,8 +54,7 @@ public class KeyToDocumentData implements Function,Entry columnFamilies = Lists.newArrayList(new ArrayByteSequence("tf"), new ArrayByteSequence("d"));
- protected static final boolean inclusive = false;
+ protected final Collection columnFamilies = Lists.newArrayList(new ArrayByteSequence("tf"), new ArrayByteSequence("d"));
private final DescendantCountFunction countFunction;
@@ -153,7 +152,7 @@ public List> appendHierarchyFields(final List>
@Override
public Entry apply(Entry from) {
- // We want to ensure that we have a non-empty colqual
+ // We want to ensure that we have a non-empty column qualifier
if (null == from || null == from.getKey() || null == from.getValue()) {
return null;
}
@@ -162,7 +161,7 @@ public Entry apply(Entry from) {
try {
logStart();
- source.seek(keyRange, columnFamilies, inclusive);
+ source.seek(keyRange, columnFamilies, false);
if (log.isDebugEnabled())
log.debug(source.hasTop() + " Key range is " + keyRange);
@@ -201,32 +200,6 @@ public Entry apply(Entry from) {
* for issues with read/write
*/
public List> collectDocumentAttributes(final Key documentStartKey, final Set docKeys, final Range keyRange) throws IOException {
- return collectAttributesForDocumentKey(documentStartKey, source, equality, filter, docKeys, keyRange);
- }
-
- /**
- * Given a Key pointing to the start of an document to aggregate, construct a Range that should encapsulate the "document" to be aggregated together. Also
- * checks to see if data was found for the constructed Range before returning.
- *
- * @param documentStartKey
- * A Key of the form "bucket type\x00uid: "
- * @param keyRange
- * the Range used to initialize source with seek()
- * @param source
- * a source
- * @param equality
- * an equality
- * @param docKeys
- * set of keys
- * @param filter
- * a query filter
- * @return the attributes
- * @throws IOException
- * for issues with read/write
- */
- private static List> collectAttributesForDocumentKey(Key documentStartKey, SortedKeyValueIterator source, Equality equality,
- EventDataQueryFilter filter, Set docKeys, Range keyRange) throws IOException {
-
// set up the document key we are filtering for on the EventDataQueryFilter
if (filter != null) {
filter.startNewDocument(documentStartKey);
@@ -256,7 +229,7 @@ private static List> collectAttributesForDocumentKey(Key docume
// request a seek range from the filter
Range seekRange = filter.getSeekRange(docAttrKey.get(), keyRange.getEndKey(), keyRange.isEndKeyInclusive());
if (seekRange != null) {
- source.seek(seekRange, columnFamilies, inclusive);
+ source.seek(seekRange, columnFamilies, false);
seeked = true;
}
}
@@ -292,7 +265,7 @@ public static Key getDocKey(Key key) {
private static List> appendHierarchyFields(List> documentAttributes, Key key, Range seekRange,
DescendantCountFunction function, boolean includeParent) {
- if ((null != function) || includeParent) {
+ if (function != null || includeParent) {
// get the minimal timestamp and majority visibility from the
// attributes
@@ -329,7 +302,7 @@ private static List> appendHierarchyFields(List> appendHierarchyFields(List> documentAttributes, final String visibility, long timestamp) {
int basicChildCount = 0;
- if ((null != function) && (null != key)) {
- // Count the descendants, generating keys based on query options and
- // document attributes
- final Tuple3>> tuple = new Tuple3<>(range, key, documentAttributes);
- final DescendantCount count = function.apply(tuple);
-
- // No need to do any more work if there aren't any descendants
- if ((null != count) && count.hasDescendants()) {
- // Extract the basic, first-generation count
- basicChildCount = count.getFirstGenerationCount();
-
- // Get any generated keys, apply any specified visibility, and
- // add to the document attributes
- final List keys = count.getKeys();
- if ((null != documentAttributes) && !documentAttributes.isEmpty() && !keys.isEmpty()) {
- // Create a Text for the Keys' visibility
- Text appliedVis;
- if ((null != visibility) && !visibility.isEmpty()) {
- appliedVis = new Text(visibility);
- } else {
- appliedVis = new Text();
- }
+ if (null == function || null == key) {
+ return basicChildCount;
+ }
- // Conditionally adjust visibility and timestamp
- for (final Key childCountKey : keys) {
- final Text appliedRow = childCountKey.getRow();
- final Text appliedCf = childCountKey.getColumnFamily();
- final Text appliedCq = childCountKey.getColumnQualifier();
- if ((null == visibility) || visibility.isEmpty()) {
- childCountKey.getColumnVisibility(appliedVis);
- }
- if (timestamp <= 0) {
- timestamp = childCountKey.getTimestamp();
- }
+ // Count the descendants, generating keys based on query options and
+ // document attributes
+ final Tuple3>> tuple = new Tuple3<>(range, key, documentAttributes);
+ final DescendantCount count = function.apply(tuple);
+
+ // No need to do any more work if there aren't any descendants
+ if (count != null && count.hasDescendants()) {
+ // Extract the basic, first-generation count
+ basicChildCount = count.getFirstGenerationCount();
+
+ // Get any generated keys, apply any specified visibility, and
+ // add to the document attributes
+ final List keys = count.getKeys();
+ if (documentAttributes != null && !documentAttributes.isEmpty() && !keys.isEmpty()) {
+ // Create a Text for the Keys' visibility
+ Text appliedVis;
+ if (visibility != null && !visibility.isEmpty()) {
+ appliedVis = new Text(visibility);
+ } else {
+ appliedVis = new Text();
+ }
- final Key appliedKey = new Key(appliedRow, appliedCf, appliedCq, appliedVis, timestamp);
- documentAttributes.add(Maps.immutableEntry(appliedKey, EMPTY_VALUE));
+ // Conditionally adjust visibility and timestamp
+ for (final Key childCountKey : keys) {
+ final Text appliedRow = childCountKey.getRow();
+ final Text appliedCf = childCountKey.getColumnFamily();
+ final Text appliedCq = childCountKey.getColumnQualifier();
+ if (visibility == null || visibility.isEmpty()) {
+ childCountKey.getColumnVisibility(appliedVis);
}
+ if (timestamp <= 0) {
+ timestamp = childCountKey.getTimestamp();
+ }
+
+ final Key appliedKey = new Key(appliedRow, appliedCf, appliedCq, appliedVis, timestamp);
+ documentAttributes.add(Maps.immutableEntry(appliedKey, EMPTY_VALUE));
}
}
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/IndexInfo.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/IndexInfo.java
index 19bec2cb83b..bc2a0dc8781 100644
--- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/IndexInfo.java
+++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/IndexInfo.java
@@ -328,11 +328,17 @@ public IndexInfo union(IndexInfo o, List delayedNodes) {
merged.count = merged.uids.size();
}
- merged.setFieldCounts(this.getFieldCounts());
- merged.mergeFieldCounts(o.getFieldCounts());
+ if (this == o) {
+ // handle idiosyncrasy of the peeking iterator where the first term is merged with itself
+ merged.setFieldCounts(o.getFieldCounts());
+ merged.setTermCounts(o.getTermCounts());
+ } else {
+ merged.setFieldCounts(getFieldCounts());
+ merged.setTermCounts(getTermCounts());
- merged.setTermCounts(this.getTermCounts());
- merged.mergeTermCounts(o.getTermCounts());
+ merged.mergeFieldCounts(o.getFieldCounts());
+ merged.mergeTermCounts(o.getTermCounts());
+ }
/*
* If there are multiple levels within a union we could have an ASTOrNode. We cannot prune OrNodes as we would with an intersection, so propagate the
diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java
index 0eb3fe6b144..9d1f2951563 100644
--- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java
+++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java
@@ -157,8 +157,8 @@ public RangeStream(ShardQueryConfiguration config, ScannerFactory scanners, Meta
streamExecutor = new ThreadPoolExecutor(executeLookupMin, maxLookup, 100, TimeUnit.MILLISECONDS, runnables);
fieldDataTypes = config.getQueryFieldsDatatypes();
collapseUids = config.getCollapseUids();
- fieldCounts = config.getUseFieldCounts();
- termCounts = config.getUseTermCounts();
+ fieldCounts = config.isSortQueryPostIndexWithFieldCounts();
+ termCounts = config.isSortQueryPostIndexWithTermCounts();
try {
Set ioFields = metadataHelper.getIndexOnlyFields(null);
if (null != ioFields) {
@@ -264,8 +264,8 @@ public Iterator iterator() {
this.itr = filter(concat(transform(queryStream, new TupleToRange(config.getShardTableName(), queryStream.currentNode(), config))),
getEmptyPlanPruner());
- if (config.isSortQueryByCounts() && (config.getUseFieldCounts() || config.getUseTermCounts())) {
- this.itr = transform(itr, new OrderingTransform(config.getUseFieldCounts(), config.getUseTermCounts()));
+ if (config.isSortQueryPostIndexWithFieldCounts() || config.isSortQueryPostIndexWithTermCounts()) {
+ this.itr = transform(itr, new OrderingTransform(config.isSortQueryPostIndexWithFieldCounts(), config.isSortQueryPostIndexWithTermCounts()));
}
}
} finally {
@@ -362,7 +362,7 @@ public QueryPlan apply(QueryPlan plan) {
Map counts = plan.getTermCounts().getCounts();
OrderByCostVisitor.orderByTermCount(plan.getQueryTree(), counts);
} else if (useFieldCounts) {
- Map counts = plan.getTermCounts().getCounts();
+ Map counts = plan.getFieldCounts().getCounts();
OrderByCostVisitor.orderByFieldCount(plan.getQueryTree(), counts);
}
return plan;
@@ -602,6 +602,10 @@ public ScannerStream visit(ASTEQNode node, Object data) {
String queryString = fieldName + "=='" + literal + "'";
options.addScanIterator(QueryScannerHelper.getQueryInfoIterator(config.getQuery(), false, queryString));
+ // easier to apply hints to new options than deal with copying existing hints between
+ options.applyExecutionHints(config.getIndexTableName(), config.getTableHints());
+ options.applyConsistencyLevel(config.getIndexTableName(), config.getTableConsistencyLevels());
+
scannerSession.setOptions(options);
scannerSession.setMaxResults(config.getMaxIndexBatchSize());
scannerSession.setExecutor(streamExecutor);
diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardLimitingIterator.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardLimitingIterator.java
index 6681b9ebd0b..4972d173f5f 100644
--- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardLimitingIterator.java
+++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardLimitingIterator.java
@@ -9,7 +9,7 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.commons.collections4.iterators.PeekingIterator;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java
index 2b437ea61c5..55cf0b0dff0 100644
--- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java
+++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java
@@ -1,5 +1,6 @@
package datawave.query.index.lookup;
+import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
@@ -12,15 +13,17 @@
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.commons.collections4.iterators.PeekingIterator;
import org.apache.commons.jexl3.parser.JexlNode;
import com.google.common.base.Function;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Multimap;
+import datawave.data.type.Type;
import datawave.query.CloseableIterable;
import datawave.query.config.ShardQueryConfiguration;
-import datawave.query.exceptions.DatawaveQueryException;
import datawave.query.index.lookup.IndexStream.StreamContext;
import datawave.query.iterator.FieldIndexOnlyQueryIterator;
import datawave.query.iterator.QueryOptions;
@@ -65,7 +68,7 @@ public CloseableIterable streamPlans(JexlNode node) {
DefaultQueryPlanner.addOption(cfg, QueryOptions.DATATYPE_FILTER, config.getDatatypeFilterAsString(), false);
DefaultQueryPlanner.addOption(cfg, QueryOptions.END_TIME, Long.toString(config.getEndDate().getTime()), false);
- DefaultQueryPlanner.configureTypeMappings(config, cfg, metadataHelper, true);
+ configureTypeMappings(config, cfg, metadataHelper);
scanner.setRanges(Collections.singleton(rangeForTerm(null, null, config)));
@@ -97,7 +100,7 @@ public CloseableIterable streamPlans(JexlNode node) {
}
- } catch (TableNotFoundException | DatawaveQueryException e) {
+ } catch (TableNotFoundException e) {
throw new RuntimeException(e);
} finally {
// shut down the executor as all threads have completed
@@ -134,4 +137,29 @@ public QueryPlan apply(Entry entry) {
// @formatter:on
}
}
+
+ /**
+ * Lift and shift from DefaultQueryPlanner to avoid reliance on static methods
+ */
+ private void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper) {
+ DefaultQueryPlanner.addOption(cfg, QueryOptions.QUERY_MAPPING_COMPRESS, Boolean.toString(true), false);
+
+ Multimap> nonIndexedQueryFieldsDatatypes = HashMultimap.create(config.getQueryFieldsDatatypes());
+ nonIndexedQueryFieldsDatatypes.keySet().removeAll(config.getIndexedFields());
+ String nonIndexedTypes = QueryOptions.buildFieldNormalizerString(nonIndexedQueryFieldsDatatypes);
+ DefaultQueryPlanner.addOption(cfg, QueryOptions.NON_INDEXED_DATATYPES, nonIndexedTypes, false);
+
+ try {
+ String serializedTypeMetadata = metadataHelper.getTypeMetadata(config.getDatatypeFilter()).toString();
+ DefaultQueryPlanner.addOption(cfg, QueryOptions.TYPE_METADATA, serializedTypeMetadata, false);
+
+ String requiredAuthsString = metadataHelper.getUsersMetadataAuthorizationSubset();
+ requiredAuthsString = QueryOptions.compressOption(requiredAuthsString, QueryOptions.UTF8);
+ DefaultQueryPlanner.addOption(cfg, QueryOptions.TYPE_METADATA_AUTHS, requiredAuthsString, false);
+ } catch (TableNotFoundException | IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ DefaultQueryPlanner.addOption(cfg, QueryOptions.METADATA_TABLE_NAME, config.getMetadataTableName(), false);
+ }
}
diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/ParentQueryIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/ParentQueryIterator.java
index a5e5c948814..63a612f7b3e 100644
--- a/warehouse/query-core/src/main/java/datawave/query/iterator/ParentQueryIterator.java
+++ b/warehouse/query-core/src/main/java/datawave/query/iterator/ParentQueryIterator.java
@@ -69,6 +69,31 @@ public EventDataQueryFilter getEvaluationFilter() {
return evaluationFilter != null ? evaluationFilter.clone() : null;
}
+ /**
+ * In the Parent case replace the {@link QueryOptions#eventFilter} with an evaluation filter
+ *
+ * @return an evaluation filter
+ */
+ public EventDataQueryFilter getEventFilter() {
+ return getEvaluationFilter();
+ }
+
+ @Override
+ public EventDataQueryFilter getFiEvaluationFilter() {
+ if (fiEvaluationFilter == null) {
+ fiEvaluationFilter = getEvaluationFilter();
+ }
+ return fiEvaluationFilter.clone();
+ }
+
+ @Override
+ public EventDataQueryFilter getEventEvaluationFilter() {
+ if (eventEvaluationFilter == null) {
+ eventEvaluationFilter = getEvaluationFilter();
+ }
+ return eventEvaluationFilter.clone();
+ }
+
@Override
public Iterator> mapDocument(SortedKeyValueIterator deepSourceCopy, Iterator> documents,
CompositeMetadata compositeMetadata) {
diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java
index 1d4dc14dcb5..6509b471db3 100644
--- a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java
+++ b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java
@@ -759,22 +759,25 @@ public Iterator> createDocumentPipeline(SortedKeyValueIterat
if (log.isTraceEnabled()) {
log.trace("isFieldIndexSatisfyingQuery");
}
- docMapper = new Function,Entry>() {
+ docMapper = new Function<>() {
@Nullable
@Override
public Entry apply(@Nullable Entry input) {
-
Entry entry = null;
if (input != null) {
- entry = Maps.immutableEntry(new DocumentData(input.getKey(), Collections.singleton(input.getKey()), Collections.EMPTY_LIST, true),
+ entry = Maps.immutableEntry(new DocumentData(input.getKey(), Collections.singleton(input.getKey()), Collections.emptyList(), true),
input.getValue());
}
return entry;
}
};
} else {
- docMapper = new KeyToDocumentData(deepSourceCopy, myEnvironment, documentOptions, getEquality(), getEvaluationFilter(), this.includeHierarchyFields,
- this.includeHierarchyFields).withRangeProvider(getRangeProvider()).withAggregationThreshold(getDocAggregationThresholdMs());
+ // @formatter:off
+ docMapper = new KeyToDocumentData(deepSourceCopy, myEnvironment, documentOptions, getEquality(), getEventEvaluationFilter(), this.includeHierarchyFields,
+ this.includeHierarchyFields)
+ .withRangeProvider(getRangeProvider())
+ .withAggregationThreshold(getDocAggregationThresholdMs());
+ // @formatter:on
}
Iterator> sourceIterator = Iterators.transform(documentSpecificSource, from -> {
@@ -787,7 +790,7 @@ public Entry apply(@Nullable Entry input) {
// which do not fall within the expected time range
Iterator> documents = null;
Aggregation a = new Aggregation(this.getTimeFilter(), this.typeMetadataWithNonIndexed, compositeMetadata, this.isIncludeGroupingContext(),
- this.includeRecordId, this.disableIndexOnlyDocuments(), getEvaluationFilter(), isTrackSizes());
+ this.includeRecordId, this.disableIndexOnlyDocuments(), getEventEvaluationFilter(), isTrackSizes());
if (gatherTimingDetails()) {
documents = Iterators.transform(sourceIterator, new EvaluationTrackingFunction<>(QuerySpan.Stage.Aggregation, trackingSpan, a));
} else {
@@ -1094,15 +1097,18 @@ protected Iterator> mapDocument(SortedKeyValueIterator> mappedDocuments = Iterators.transform(documents,
new GetDocument(docMapper,
new Aggregation(this.getTimeFilter(), typeMetadataWithNonIndexed, compositeMetadata,
this.isIncludeGroupingContext(), this.includeRecordId, this.disableIndexOnlyDocuments(),
- getEvaluationFilter(), isTrackSizes())));
+ getEventEvaluationFilter(), isTrackSizes())));
Iterator