diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/AggregatingReducer.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/AggregatingReducer.java index 68cf0bbf935..37adc84981e 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/AggregatingReducer.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/AggregatingReducer.java @@ -380,7 +380,9 @@ public CustomColumnToClassMapping(Integer priority, Map opts) { for (Entry entry : opts.entrySet()) { String column = entry.getKey(); - final String className = entry.getValue(); + final String val = entry.getValue().trim(); + int spaceIdx = val.indexOf(' '); + final String className = spaceIdx < 0 ? val : val.substring(0, spaceIdx); Pair pcic; if (ALL_CF_STR.equals(column)) { @@ -397,6 +399,17 @@ public CustomColumnToClassMapping(Integer priority, Map opts) { agg = clazz.getDeclaredConstructor().newInstance(); + //@formatter:off + if (spaceIdx > 0) { + final String encodedOpts = val.substring(spaceIdx + 1); + Map aggOpts = Splitter + .on(';') + .trimResults() + .withKeyValueSeparator('=') + .split(encodedOpts); + agg.validateOptions(aggOpts); + } + //@formatter:on } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException e) { throw new RuntimeException(e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/table/aggregator/GlobalIndexUidAggregator.java b/warehouse/ingest-core/src/main/java/datawave/ingest/table/aggregator/GlobalIndexUidAggregator.java index 82a07ea4e1a..d7df1302f6f 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/table/aggregator/GlobalIndexUidAggregator.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/table/aggregator/GlobalIndexUidAggregator.java @@ -21,9 +21,24 @@ /** * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the shardIndex and shardReverseIndex, where the * list of UIDs for events will be maintained in the global index for low cardinality terms. + *

+ * Although this combiner allows the max UIDs kept to be configured, anyone using this feature should consider the impact of using it once data has been loaded + * into the system. Decreasing the max size will likely cause UID lists to be purged as they exceed the new max UID count. Increasing the max UID could is also + * unlikely to work as one would expect since any lists that already had their UIDs purged and the ignore flag set won't start collecting UIDs even if the + * previous count is less than the new max UID count. In practice, the main intent for this feature is to configure a new cluster with a different max UID count + * (without having to re-compile the code). With the caveats mentioned, it could be used on a system with data loaded, for example if a new data type becomes + * available and one wished to increase the max UID count while data for other data types is relatively stable or the side effects of the change don't matter. + *

+ * When this class is used with {@link datawave.iterators.PropogatingIterator}, one must be aware that this class is not actually used as a combiner but rather + * is only used for its {@link #reset()} and {@link #aggregate()} methods. PropogatingIterator allows combiner options to be passed, which means when this class + * is used with PropogatingIterator one could change the max UID count. However, if that option is used it should be noted that the base + * {@link org.apache.accumulo.core.iterators.Combiner} option validation is used and therefore the option "all" must also be set to "true" or the "columns" must + * be set in order to pass option validation. While set, the "all" or "columns" option will have no effect when used with PropogatingIterator since only the + * {@link #reset()} and {@link #aggregate()} methods are invoked. */ public class GlobalIndexUidAggregator extends PropogatingCombiner { private static final Logger log = LoggerFactory.getLogger(GlobalIndexUidAggregator.class); + private static final String MAX_UIDS_OPT = "maxuids"; private static final String TIMESTAMPS_IGNORED = "timestampsIgnored"; /** @@ -123,7 +138,7 @@ public Value aggregate() { * can store up to a certain number of UIDs and after that, the lists are no longer tracked (the ignore flag will be set) and only counts are tracked. * REMOVEDUIDs are tracked to handle minor and partial major compactions where this reduce method won't necessarily see all possible values for a given key * (e.g., the UIDs that are being removed might be in a different RFile that isn't involved in the current compaction). - * + *

* Aggregation operates in one of two modes depending on whether or not timestamps are ignored. By default, timestamps are ignored since DataWave uses date * to the day as the timestamp in the global term index. When timestamps are ignored, we cannot infer anything about the order of values under aggregation. * Therefore, a decision must be made about how to handle removed UIDs vs added UIDs. In that case, removed UIDs take priority. This means that adding a @@ -323,6 +338,13 @@ public boolean propogateKey() { return propogate || !uids.isEmpty() || count > 0; } + @Override + public IteratorOptions describeOptions() { + IteratorOptions io = super.describeOptions(); + io.addNamedOption(MAX_UIDS_OPT, "The maximum number of UIDs to keep in the list. Default is " + MAX + "."); + return io; + } + @Override public boolean validateOptions(Map options) { boolean valid = super.validateOptions(options); @@ -330,6 +352,12 @@ public boolean validateOptions(Map options) { if (options.containsKey(TIMESTAMPS_IGNORED)) { timestampsIgnored = Boolean.parseBoolean(options.get(TIMESTAMPS_IGNORED)); } + if (options.containsKey(MAX_UIDS_OPT)) { + maxUids = Integer.parseInt(options.get(MAX_UIDS_OPT)); + if (maxUids <= 0) { + throw new IllegalArgumentException("Max UIDs must be greater than 0."); + } + } } return valid; } @@ -349,9 +377,16 @@ public void init(SortedKeyValueIterator source, Map op if (options.containsKey(TIMESTAMPS_IGNORED)) { timestampsIgnored = Boolean.parseBoolean(options.get(TIMESTAMPS_IGNORED)); } + if (options.containsKey(MAX_UIDS_OPT)) { + maxUids = Integer.parseInt(options.get(MAX_UIDS_OPT)); + } } public static void setTimestampsIgnoredOpt(IteratorSetting is, boolean timestampsIgnored) { is.addOption(TIMESTAMPS_IGNORED, Boolean.toString(timestampsIgnored)); } + + public static void setMaxUidsOpt(IteratorSetting is, int maxUids) { + is.addOption(MAX_UIDS_OPT, Integer.toString(maxUids)); + } } diff --git a/warehouse/ingest-core/src/main/java/datawave/iterators/PropogatingIterator.java b/warehouse/ingest-core/src/main/java/datawave/iterators/PropogatingIterator.java index d7eadac85fe..1d36b9b294e 100644 --- a/warehouse/ingest-core/src/main/java/datawave/iterators/PropogatingIterator.java +++ b/warehouse/ingest-core/src/main/java/datawave/iterators/PropogatingIterator.java @@ -22,16 +22,17 @@ import org.apache.log4j.Logger; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.collect.Maps; import datawave.ingest.table.aggregator.PropogatingCombiner; /** * Purpose: Handle arbitrary propogating aggregations. - * + *

* Design: Though very similar to the DeletingIterator, due to private methods and members, we cannot directly extend the DeletingIterator. As a result, the * class extends SKVI. This class {@code USES --> PropogatingAggregator}. Note that propAgg can be null - * + *

* Initially the TotalAggregatingIterator, this class was a direct copy. At some point it was identified that there was an artifact where deletes would not be * propogated. As a result, this class becomes nearly identical to the DeletingIterator, whereby deletes are always propogated until a full major compaction. */ @@ -41,7 +42,7 @@ public class PropogatingIterator implements SortedKeyValueIterator, O public static final String ATTRIBUTE_DESCRIPTION = "Aggregators apply aggregating functions to values with identical keys. You can specify the column family. DEFAULT matches the default locality group"; - public static final String UNNAMED_OPTION_DESCRIPTION = " "; + public static final String UNNAMED_OPTION_DESCRIPTION = " "; public static final String AGGREGATOR_DEFAULT = "DEFAULT"; @@ -82,12 +83,7 @@ public class PropogatingIterator implements SortedKeyValueIterator, O protected Map aggMap; /** - * variable to determine if we should propogate deletes - */ - private boolean shouldPropogate; - - /** - * Combiner options so that we can effectively deep copy + * Combiner options so that we can effectively deepCopy */ protected Map options = Maps.newHashMap(); @@ -127,10 +123,8 @@ public PropogatingIterator() { * Aggregates the same partial key. * * @return a partial key - * @throws IOException - * for issues with read/write */ - private boolean aggregateRowColumn() throws IOException { + private boolean aggregateRowColumn() { // this function assumes that first value is not delete workKey.set(iterator.getTopKey()); @@ -195,12 +189,9 @@ private PropogatingCombiner getAggregator(Key key) { } /** - * Find Top method, will attempt to aggregate, iff an aggregator is specified - * - * @throws IOException - * for issues with read/write + * Find Top method, will attempt to aggregate, if an aggregator is specified */ - private void findTop() throws IOException { + private void findTop() { // check if aggregation is needed while (iterator.hasTop() && !aggregateRowColumn()) ; @@ -214,10 +205,8 @@ private void findTop() throws IOException { * an iterator * @param Aggregators * mapping of aggregators - * @throws IOException - * for issues with read/write */ - public PropogatingIterator(SortedKeyValueIterator iterator, ColumnToClassMapping Aggregators) throws IOException { + public PropogatingIterator(SortedKeyValueIterator iterator, ColumnToClassMapping Aggregators) { this.iterator = iterator; findTop(); } @@ -305,7 +294,7 @@ public void init(SortedKeyValueIterator source, Map op @Override public IteratorOptions describeOptions() { - return new IteratorOptions(ATTRIBUTE_NAME, ATTRIBUTE_DESCRIPTION, defaultMapOptions, Collections.singletonList(" ")); + return new IteratorOptions(ATTRIBUTE_NAME, ATTRIBUTE_DESCRIPTION, defaultMapOptions, Collections.singletonList(UNNAMED_OPTION_DESCRIPTION)); } @Override @@ -317,24 +306,36 @@ public boolean validateOptions(Map options) { // Don't propagate for either scan or full major compaction. In either case, the aggregated result has combined // all existing values for a key so we don't need to propagate temporary state that is only used to combine // partial results with new info. - shouldPropogate = !(env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction()) && !(env.getIteratorScope() == IteratorScope.scan); - - PropogatingCombiner propAgg = null; - - for (Entry familyOption : options.entrySet()) { - Object agg = createAggregator(familyOption.getValue()); + boolean shouldPropogate = !(env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction()) + && !(env.getIteratorScope() == IteratorScope.scan); + + options.forEach((name, value) -> { + value = value.trim(); + int sepIdx = value.indexOf(' '); + String aggClass = (sepIdx < 0) ? value : value.substring(0, sepIdx); + Object agg = createAggregator(aggClass); if (agg instanceof PropogatingCombiner) { - propAgg = PropogatingCombiner.class.cast(agg); + PropogatingCombiner propAgg = (PropogatingCombiner) agg; + if (sepIdx > 0) { + String encodedOpts = value.substring(sepIdx + 1); + //@formatter:off + Map aggOpts = Splitter + .on(';') + .trimResults() + .withKeyValueSeparator('=') + .split(encodedOpts); + //@formatter:on + propAgg.validateOptions(aggOpts); + } propAgg.setPropogate(shouldPropogate); - if (familyOption.getKey().equals(AGGREGATOR_DEFAULT) || familyOption.getKey().equals(AGGREGATOR_DEFAULT_OPT)) { - if (log.isTraceEnabled()) - log.debug("Default aggregator is " + propAgg.getClass()); + if (name.equals(AGGREGATOR_DEFAULT) || name.equals(AGGREGATOR_DEFAULT_OPT)) { + log.trace("Default aggregator is {}"); defaultAgg = propAgg; } else { - aggMap.put(new ArrayByteSequence(familyOption.getKey().getBytes()), propAgg); + aggMap.put(new ArrayByteSequence(name.getBytes()), propAgg); } } - } + }); return true; } diff --git a/warehouse/ingest-core/src/test/java/datawave/iterators/PropogatingIteratorTest.java b/warehouse/ingest-core/src/test/java/datawave/iterators/PropogatingIteratorTest.java index f4ca220a6dd..e50ae6ded52 100644 --- a/warehouse/ingest-core/src/test/java/datawave/iterators/PropogatingIteratorTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/iterators/PropogatingIteratorTest.java @@ -43,12 +43,19 @@ public class PropogatingIteratorTest { private static final String FIELD_TO_AGGREGATE = "UUID"; private static final long TIMESTAMP = 1349541830; + private void validateOverfullUidList(Value topValue, int count) throws InvalidProtocolBufferException { + Uid.List v = Uid.List.parseFrom(topValue.get()); + + Assert.assertEquals(count, v.getCOUNT()); + Assert.assertEquals(0, v.getUIDList().size()); + } + private void validateUids(Value topValue, String... uids) throws InvalidProtocolBufferException { Uid.List v = Uid.List.parseFrom(topValue.get()); Assert.assertEquals(uids.length, v.getCOUNT()); for (String uid : uids) { - assertTrue(v.getUIDList().contains(uid)); + Assert.assertTrue(uid + " missing from UIDs list", v.getUIDList().contains(uid)); } } @@ -57,7 +64,7 @@ private void validateRemoval(Value topValue, String... uids) throws InvalidProto Assert.assertEquals(-uids.length, v.getCOUNT()); for (String uid : uids) { - assertTrue(v.getREMOVEDUIDList().contains(uid)); + Assert.assertTrue(uid + " missing from Removed UIDs list", v.getREMOVEDUIDList().contains(uid)); } } @@ -262,6 +269,44 @@ private SortedMultiMapIterator createSourceWithTestData() { return new SortedMultiMapIterator(map); } + @Test + public void testAggregateOptions() throws IOException { + TreeMultimap map = TreeMultimap.create(); + + map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.1").build().toByteArray())); + map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.2").build().toByteArray())); + map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.3").build().toByteArray())); + map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.4").build().toByteArray())); + + map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abd"), new Value(createValueWithUid("abc.3").build().toByteArray())); + + SortedMultiMapIterator data = new SortedMultiMapIterator(map); + + PropogatingIterator iter = new PropogatingIterator(); + Map options = Maps.newHashMap(); + + String encodedOptions = "all=true;maxuids=2"; + options.put(PropogatingIterator.AGGREGATOR_DEFAULT, GlobalIndexUidAggregator.class.getCanonicalName() + " " + encodedOptions); + + IteratorEnvironment env = new MockIteratorEnvironment(false); + + iter.init(data, options, env); + + iter.seek(new Range(), Collections.emptyList(), false); + + Assert.assertTrue(iter.hasTop()); + + Key topKey = iter.getTopKey(); + + Assert.assertEquals(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), topKey); + validateOverfullUidList(iter.getTopValue(), 4); + iter.next(); + topKey = iter.getTopKey(); + Assert.assertEquals(newKey(SHARD, FIELD_TO_AGGREGATE, "abd"), topKey); + validateUids(iter.getTopValue(), "abc.3"); + + } + @Test(expected = NullPointerException.class) public void testNullEnvironment() throws IOException { PropogatingIterator iter = new PropogatingIterator();