Skip to content

Commit

Permalink
Optionally build a precise datatype filter when no filter is specified (
Browse files Browse the repository at this point in the history
  • Loading branch information
apmoriarty authored Apr 9, 2024
1 parent fb5b0f1 commit 6730346
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
private int collapseUidsThreshold = -1;
// Should this query dedupe terms within ANDs and ORs
private boolean enforceUniqueTermsWithinExpressions = false;
// After query planning rebuild the datatype filter from the remaining query fields.
// The actual filter may be a subset of the requested datatypes. This has implications
// for the global index lookup and execution of regex terms.
private boolean rebuildDatatypeFilter = false;
private boolean rebuildDatatypeFilterPerShard = false;
// reduces the datatype filter, respecting the user-supplied datatypes
private boolean reduceIngestTypes = false;
private boolean reduceIngestTypesPerShard = false;
// should this query attempt to prune terms via their ingest types
Expand Down Expand Up @@ -520,6 +526,8 @@ public ShardQueryConfiguration(ShardQueryConfiguration other) {
this.setReduceQueryFieldsPerShard(other.getReduceQueryFieldsPerShard());
this.setReduceTypeMetadata(other.getReduceTypeMetadata());
this.setReduceTypeMetadataPerShard(other.getReduceTypeMetadataPerShard());
this.setRebuildDatatypeFilter(other.isRebuildDatatypeFilter());
this.setRebuildDatatypeFilterPerShard(other.isRebuildDatatypeFilterPerShard());
this.setParseTldUids(other.getParseTldUids());
this.setSequentialScheduler(other.getSequentialScheduler());
this.setCollectTimingDetails(other.getCollectTimingDetails());
Expand Down Expand Up @@ -2627,6 +2635,22 @@ public void setPruneQueryOptions(boolean pruneQueryOptions) {
this.pruneQueryOptions = pruneQueryOptions;
}

public boolean isRebuildDatatypeFilter() {
return rebuildDatatypeFilter;
}

public void setRebuildDatatypeFilter(boolean rebuildDatatypeFilter) {
this.rebuildDatatypeFilter = rebuildDatatypeFilter;
}

public boolean isRebuildDatatypeFilterPerShard() {
return rebuildDatatypeFilterPerShard;
}

public void setRebuildDatatypeFilterPerShard(boolean rebuildDatatypeFilterPerShard) {
this.rebuildDatatypeFilterPerShard = rebuildDatatypeFilterPerShard;
}

public boolean getReduceIngestTypes() {
return reduceIngestTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.commons.jexl3.parser.JexlNode;
import org.apache.log4j.Logger;

import com.google.common.collect.Sets;

import datawave.query.jexl.JexlASTHelper;
import datawave.query.jexl.functions.ContentFunctionsDescriptor;
import datawave.query.jexl.functions.EvaluationPhaseFilterFunctionsDescriptor;
Expand All @@ -45,9 +43,15 @@
import datawave.query.util.TypeMetadata;

/**
* Visitor that returns the set of all ingest types associated with an arbitrary JexlNode.
* Visitor that returns the <b>effective set</b> of ingest types associated with an arbitrary JexlNode.
* <p>
* The effective set is calculated by applying union and intersection logic to produce the reduced set of ingest types for complex tree structures. For example:
* <p>
* <code>(A AND B)</code> when the A term maps to ingest type 1 and the B term maps to ingest types 1, 2, and 3.
* <p>
* Much of this code is duplicated from the {@link IngestTypePruningVisitor}.
* The full set of ingest types is {1, 2, 3}, but the <b>effective set</b> is just ingest type 1.
* <p>
* Much of this code is originated from the {@link IngestTypePruningVisitor}.
*/
public class IngestTypeVisitor extends BaseVisitor {

Expand Down Expand Up @@ -262,7 +266,7 @@ public Set<String> getIngestTypesForJunction(JexlNode node) {
public Set<String> getIngestTypesForLeaf(JexlNode node) {
node = JexlASTHelper.dereference(node);
if (node instanceof ASTEQNode) {
Object literal = JexlASTHelper.getLiteralValue(node);
Object literal = JexlASTHelper.getLiteralValueSafely(node);
if (literal == null) {
return Collections.singleton(UNKNOWN_TYPE);
}
Expand Down Expand Up @@ -356,6 +360,10 @@ private Set<String> getIngestTypesForIntersection(ASTAndNode node) {
JexlNode child = JexlASTHelper.dereference(node.jjtGetChild(i));
Set<String> childIngestTypes = (Set<String>) child.jjtAccept(this, null);

if (childIngestTypes == null) {
continue; // we could have a malformed query or a query with a _Drop_ marker
}

if (ingestTypes.isEmpty()) {
ingestTypes = childIngestTypes;
} else {
Expand All @@ -379,6 +387,7 @@ private Set<String> intersectTypes(Set<String> typesA, Set<String> typesB) {
if (typesA.contains(UNKNOWN_TYPE) || typesB.contains(UNKNOWN_TYPE)) {
return Collections.singleton(UNKNOWN_TYPE);
}
return Sets.intersection(typesA, typesB);
typesA.retainAll(typesB);
return typesA;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2220,6 +2220,11 @@ protected Future<IteratorSetting> loadQueryIterator(final MetadataHelper metadat
throw new DatawaveQueryException(qe);
}

if (!preloadOptions && config.isRebuildDatatypeFilter()) {
Set<String> datatypes = IngestTypeVisitor.getIngestTypes(config.getQueryTree(), getTypeMetadata());
config.setDatatypeFilter(datatypes);
}

String datatypeFilter = config.getDatatypeFilterAsString();

addOption(cfg, QueryOptions.DATATYPE_FILTER, datatypeFilter, false);
Expand Down Expand Up @@ -2652,6 +2657,12 @@ public Tuple2<CloseableIterable<QueryPlan>,Boolean> getQueryRanges(ScannerFactor
fullTableScanReason = state.reason;
}

// optionally build/rebuild the datatype filter with the fully planned query
if (config.isRebuildDatatypeFilter()) {
Set<String> ingestTypes = IngestTypeVisitor.getIngestTypes(config.getQueryTree(), getTypeMetadata());
config.setDatatypeFilter(ingestTypes);
}

Set<String> ingestTypes = null;
if (config.getReduceIngestTypes()) {
Set<String> userRequestedIngestTypes = config.getDatatypeFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2827,4 +2827,20 @@ public boolean getSortQueryByCounts() {
public void setSortQueryByCounts(boolean sortQueryByCounts) {
getConfig().setSortQueryByCounts(sortQueryByCounts);
}

public boolean isRebuildDatatypeFilter() {
return getConfig().isRebuildDatatypeFilter();
}

public void setRebuildDatatypeFilter(boolean rebuildDatatypeFilter) {
getConfig().setRebuildDatatypeFilter(rebuildDatatypeFilter);
}

public boolean isRebuildDatatypeFilterPerShard() {
return getConfig().isRebuildDatatypeFilterPerShard();
}

public void setRebuildDatatypeFilterPerShard(boolean rebuildDatatypeFilterPerShard) {
getConfig().setRebuildDatatypeFilterPerShard(rebuildDatatypeFilterPerShard);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -28,9 +29,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.geotools.data.Join;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -313,7 +316,7 @@ public ScannerChunk apply(@Nullable ScannerChunk input) {
reduceQueryFields(script, newIteratorSetting);
}

if (config.getReduceIngestTypesPerShard()) {
if (config.isRebuildDatatypeFilterPerShard() || config.getReduceIngestTypesPerShard()) {
reduceIngestTypes(script, newIteratorSetting);
}

Expand Down Expand Up @@ -491,17 +494,29 @@ private void reduceIngestTypes(ASTJexlScript script, IteratorSetting newIterator
cachedTypeMetadata = new TypeMetadata(serializedTypeMetadata);
}

Set<String> userRequestedDataTypes = config.getDatatypeFilter();
if (!userRequestedDataTypes.isEmpty()) {
Set<String> queryDataTypes = IngestTypeVisitor.getIngestTypes(script, cachedTypeMetadata);
Set<String> ingestTypes = Sets.intersection(userRequestedDataTypes, queryDataTypes);
if (ingestTypes.size() < userRequestedDataTypes.size()) {
newIteratorSetting.addOption(QueryOptions.DATATYPE_FILTER, Joiner.on(',').join(ingestTypes));
}
// get requested types
Set<String> requestedDatatypes;
String opt = newIteratorSetting.getOptions().get(QueryOptions.DATATYPE_FILTER);
if (opt == null) {
requestedDatatypes = Collections.emptySet();
} else {
requestedDatatypes = new HashSet<>(Splitter.on(',').splitToList(opt));
}

if (ingestTypes.isEmpty()) {
// get existing types from the query
Set<String> datatypes = IngestTypeVisitor.getIngestTypes(script, cachedTypeMetadata);

if (config.isRebuildDatatypeFilterPerShard()) {
newIteratorSetting.addOption(QueryOptions.DATATYPE_FILTER, Joiner.on(',').join(datatypes));
} else if (config.getReduceIngestTypesPerShard() && !requestedDatatypes.isEmpty() && !datatypes.isEmpty()) {
Set<String> intersectedTypes = Sets.intersection(requestedDatatypes, datatypes);
if (intersectedTypes.isEmpty()) {
// the EmptyPlanPruner in the RangeStream should have handled this situation, this exception indicates a bug exists
throw new DatawaveFatalQueryException("Reduced ingest types to zero, cannot execute query sub-plan");
throw new DatawaveFatalQueryException("Ingest types reduced to zero, cannot execute query sub-plan");
}

if (intersectedTypes.size() <= requestedDatatypes.size()) {
newIteratorSetting.addOption(QueryOptions.DATATYPE_FILTER, Joiner.on(',').join(intersectedTypes));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public void setUp() throws Exception {
updatedValues.put("reduceQueryFields", true);
defaultValues.put("reduceQueryFieldsPerShard", false);
updatedValues.put("reduceQueryFieldsPerShard", true);
defaultValues.put("rebuildDatatypeFilter", false);
updatedValues.put("rebuildDatatypeFilter", true);
defaultValues.put("rebuildDatatypeFilterPerShard", false);
updatedValues.put("rebuildDatatypeFilterPerShard", true);
defaultValues.put("reduceTypeMetadata", false);
updatedValues.put("reduceTypeMetadata", true);
defaultValues.put("reduceTypeMetadataPerShard", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@

<property name="auditType" value="ACTIVE" />
<property name="logicDescription" value="Event query with rewritten logic" />
<property name="rebuildDatatypeFilter" value="true" />
<property name="rebuildDatatypeFilterPerShard" value="false" />
<!-- Determines how many events in the global index lookup will be
aggregated into a day range -->
<property name="eventPerDayThreshold" value="40000" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@
<property name="tfAggregationThresholdMs" value="-1" />
<!-- enable per-tablet pruning of certain query options -->
<property name="pruneQueryOptions" value="false" />
<property name="rebuildDatatypeFilter" value="true" />
<property name="rebuildDatatypeFilterPerShard" value="false" />
<property name="reduceIngestTypes" value="true"/>
<property name="reduceIngestTypesPerShard" value="true"/>
<property name="reduceQueryFields" value="false" />
Expand Down

0 comments on commit 6730346

Please sign in to comment.