Skip to content

Commit

Permalink
Adding ability to pass options to GlobalIndexUidAggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
mineralntl committed Nov 22, 2024
1 parent 6de9e50 commit 47c5f83
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ public CustomColumnToClassMapping(Integer priority, Map<String,String> opts) {
for (Entry<String,String> entry : opts.entrySet()) {
String column = entry.getKey();

final String className = entry.getValue();
final String val = entry.getValue().trim();
int spaceIdx = val.indexOf(' ');
final String className = spaceIdx < 0 ? val : val.substring(0, spaceIdx);

Pair<Text,Text> pcic;
if (ALL_CF_STR.equals(column)) {
Expand All @@ -397,6 +399,17 @@ public CustomColumnToClassMapping(Integer priority, Map<String,String> opts) {

agg = clazz.getDeclaredConstructor().newInstance();

//@formatter:off
if (spaceIdx > 0) {
final String encodedOpts = val.substring(spaceIdx + 1);
Map<String,String> aggOpts = Splitter
.on(';')
.trimResults()
.withKeyValueSeparator('=')
.split(encodedOpts);
agg.validateOptions(aggOpts);
}
//@formatter:on
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,24 @@
/**
* Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the shardIndex and shardReverseIndex, where the
* list of UIDs for events will be maintained in the global index for low cardinality terms.
* <p>
* Although this combiner allows the max UIDs kept to be configured, anyone using this feature should consider the impact of using it once data has been loaded
* into the system. Decreasing the max size will likely cause UID lists to be purged as they exceed the new max UID count. Increasing the max UID could is also
* unlikely to work as one would expect since any lists that already had their UIDs purged and the ignore flag set won't start collecting UIDs even if the
* previous count is less than the new max UID count. In practice, the main intent for this feature is to configure a new cluster with a different max UID count
* (without having to re-compile the code). With the caveats mentioned, it could be used on a system with data loaded, for example if a new data type becomes
* available and one wished to increase the max UID count while data for other data types is relatively stable or the side effects of the change don't matter.
* <p>
* When this class is used with {@link datawave.iterators.PropogatingIterator}, one must be aware that this class is not actually used as a combiner but rather
* is only used for its {@link #reset()} and {@link #aggregate()} methods. PropogatingIterator allows combiner options to be passed, which means when this class
* is used with PropogatingIterator one could change the max UID count. However, if that option is used it should be noted that the base
* {@link org.apache.accumulo.core.iterators.Combiner} option validation is used and therefore the option "all" must also be set to "true" or the "columns" must
* be set in order to pass option validation. While set, the "all" or "columns" option will have no effect when used with PropogatingIterator since only the
* {@link #reset()} and {@link #aggregate()} methods are invoked.
*/
public class GlobalIndexUidAggregator extends PropogatingCombiner {
private static final Logger log = LoggerFactory.getLogger(GlobalIndexUidAggregator.class);
private static final String MAX_UIDS_OPT = "maxuids";
private static final String TIMESTAMPS_IGNORED = "timestampsIgnored";

/**
Expand Down Expand Up @@ -123,7 +138,7 @@ public Value aggregate() {
* can store up to a certain number of UIDs and after that, the lists are no longer tracked (the ignore flag will be set) and only counts are tracked.
* REMOVEDUIDs are tracked to handle minor and partial major compactions where this reduce method won't necessarily see all possible values for a given key
* (e.g., the UIDs that are being removed might be in a different RFile that isn't involved in the current compaction).
*
* <p>
* Aggregation operates in one of two modes depending on whether or not timestamps are ignored. By default, timestamps are ignored since DataWave uses date
* to the day as the timestamp in the global term index. When timestamps are ignored, we cannot infer anything about the order of values under aggregation.
* Therefore, a decision must be made about how to handle removed UIDs vs added UIDs. In that case, removed UIDs take priority. This means that adding a
Expand Down Expand Up @@ -323,13 +338,26 @@ public boolean propogateKey() {
return propogate || !uids.isEmpty() || count > 0;
}

@Override
public IteratorOptions describeOptions() {
IteratorOptions io = super.describeOptions();
io.addNamedOption(MAX_UIDS_OPT, "The maximum number of UIDs to keep in the list. Default is " + MAX + ".");
return io;
}

@Override
public boolean validateOptions(Map<String,String> options) {
boolean valid = super.validateOptions(options);
if (valid) {
if (options.containsKey(TIMESTAMPS_IGNORED)) {
timestampsIgnored = Boolean.parseBoolean(options.get(TIMESTAMPS_IGNORED));
}
if (options.containsKey(MAX_UIDS_OPT)) {
maxUids = Integer.parseInt(options.get(MAX_UIDS_OPT));
if (maxUids <= 0) {
throw new IllegalArgumentException("Max UIDs must be greater than 0.");
}
}
}
return valid;
}
Expand All @@ -349,9 +377,16 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
if (options.containsKey(TIMESTAMPS_IGNORED)) {
timestampsIgnored = Boolean.parseBoolean(options.get(TIMESTAMPS_IGNORED));
}
if (options.containsKey(MAX_UIDS_OPT)) {
maxUids = Integer.parseInt(options.get(MAX_UIDS_OPT));
}
}

public static void setTimestampsIgnoredOpt(IteratorSetting is, boolean timestampsIgnored) {
is.addOption(TIMESTAMPS_IGNORED, Boolean.toString(timestampsIgnored));
}

public static void setMaxUidsOpt(IteratorSetting is, int maxUids) {
is.addOption(MAX_UIDS_OPT, Integer.toString(maxUids));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import org.apache.log4j.Logger;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;

import datawave.ingest.table.aggregator.PropogatingCombiner;

/**
* Purpose: Handle arbitrary propogating aggregations.
*
* <p>
* Design: Though very similar to the DeletingIterator, due to private methods and members, we cannot directly extend the DeletingIterator. As a result, the
* class extends SKVI. This class {@code USES --> PropogatingAggregator}. Note that propAgg can be null
*
* <p>
* Initially the TotalAggregatingIterator, this class was a direct copy. At some point it was identified that there was an artifact where deletes would not be
* propogated. As a result, this class becomes nearly identical to the DeletingIterator, whereby deletes are always propogated until a full major compaction.
*/
Expand All @@ -41,7 +42,7 @@ public class PropogatingIterator implements SortedKeyValueIterator<Key,Value>, O

public static final String ATTRIBUTE_DESCRIPTION = "Aggregators apply aggregating functions to values with identical keys. You can specify the column family. DEFAULT matches the default locality group";

public static final String UNNAMED_OPTION_DESCRIPTION = "<Column Family> <Combiner>";
public static final String UNNAMED_OPTION_DESCRIPTION = "<Column Family> <Combiner> <optional: combOpt1=comVal1;combOpt2=combVal2...>";

public static final String AGGREGATOR_DEFAULT = "DEFAULT";

Expand Down Expand Up @@ -82,12 +83,7 @@ public class PropogatingIterator implements SortedKeyValueIterator<Key,Value>, O
protected Map<ByteSequence,PropogatingCombiner> aggMap;

/**
* variable to determine if we should propogate deletes
*/
private boolean shouldPropogate;

/**
* Combiner options so that we can effectively deep copy
* Combiner options so that we can effectively deepCopy
*/
protected Map<String,String> options = Maps.newHashMap();

Expand Down Expand Up @@ -127,10 +123,8 @@ public PropogatingIterator() {
* Aggregates the same partial key.
*
* @return a partial key
* @throws IOException
* for issues with read/write
*/
private boolean aggregateRowColumn() throws IOException {
private boolean aggregateRowColumn() {
// this function assumes that first value is not delete

workKey.set(iterator.getTopKey());
Expand Down Expand Up @@ -195,12 +189,9 @@ private PropogatingCombiner getAggregator(Key key) {
}

/**
* Find Top method, will attempt to aggregate, iff an aggregator is specified
*
* @throws IOException
* for issues with read/write
* Find Top method, will attempt to aggregate, if an aggregator is specified
*/
private void findTop() throws IOException {
private void findTop() {
// check if aggregation is needed
while (iterator.hasTop() && !aggregateRowColumn())
;
Expand All @@ -214,10 +205,8 @@ private void findTop() throws IOException {
* an iterator
* @param Aggregators
* mapping of aggregators
* @throws IOException
* for issues with read/write
*/
public PropogatingIterator(SortedKeyValueIterator<Key,Value> iterator, ColumnToClassMapping<Combiner> Aggregators) throws IOException {
public PropogatingIterator(SortedKeyValueIterator<Key,Value> iterator, ColumnToClassMapping<Combiner> Aggregators) {
this.iterator = iterator;
findTop();
}
Expand Down Expand Up @@ -305,7 +294,7 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
@Override
public IteratorOptions describeOptions() {

return new IteratorOptions(ATTRIBUTE_NAME, ATTRIBUTE_DESCRIPTION, defaultMapOptions, Collections.singletonList("<ColumnFamily> <Combiner>"));
return new IteratorOptions(ATTRIBUTE_NAME, ATTRIBUTE_DESCRIPTION, defaultMapOptions, Collections.singletonList(UNNAMED_OPTION_DESCRIPTION));
}

@Override
Expand All @@ -317,24 +306,36 @@ public boolean validateOptions(Map<String,String> options) {
// Don't propagate for either scan or full major compaction. In either case, the aggregated result has combined
// all existing values for a key so we don't need to propagate temporary state that is only used to combine
// partial results with new info.
shouldPropogate = !(env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction()) && !(env.getIteratorScope() == IteratorScope.scan);

PropogatingCombiner propAgg = null;

for (Entry<String,String> familyOption : options.entrySet()) {
Object agg = createAggregator(familyOption.getValue());
boolean shouldPropogate = !(env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction())
&& !(env.getIteratorScope() == IteratorScope.scan);

options.forEach((name, value) -> {
value = value.trim();
int sepIdx = value.indexOf(' ');
String aggClass = (sepIdx < 0) ? value : value.substring(0, sepIdx);
Object agg = createAggregator(aggClass);
if (agg instanceof PropogatingCombiner) {
propAgg = PropogatingCombiner.class.cast(agg);
PropogatingCombiner propAgg = (PropogatingCombiner) agg;
if (sepIdx > 0) {
String encodedOpts = value.substring(sepIdx + 1);
//@formatter:off
Map<String,String> aggOpts = Splitter
.on(';')
.trimResults()
.withKeyValueSeparator('=')
.split(encodedOpts);
//@formatter:on
propAgg.validateOptions(aggOpts);
}
propAgg.setPropogate(shouldPropogate);
if (familyOption.getKey().equals(AGGREGATOR_DEFAULT) || familyOption.getKey().equals(AGGREGATOR_DEFAULT_OPT)) {
if (log.isTraceEnabled())
log.debug("Default aggregator is " + propAgg.getClass());
if (name.equals(AGGREGATOR_DEFAULT) || name.equals(AGGREGATOR_DEFAULT_OPT)) {
log.trace("Default aggregator is {}");
defaultAgg = propAgg;
} else {
aggMap.put(new ArrayByteSequence(familyOption.getKey().getBytes()), propAgg);
aggMap.put(new ArrayByteSequence(name.getBytes()), propAgg);
}
}
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@ public class PropogatingIteratorTest {
private static final String FIELD_TO_AGGREGATE = "UUID";
private static final long TIMESTAMP = 1349541830;

private void validateOverfullUidList(Value topValue, int count) throws InvalidProtocolBufferException {
Uid.List v = Uid.List.parseFrom(topValue.get());

Assert.assertEquals(count, v.getCOUNT());
Assert.assertEquals(0, v.getUIDList().size());
}

private void validateUids(Value topValue, String... uids) throws InvalidProtocolBufferException {
Uid.List v = Uid.List.parseFrom(topValue.get());

Assert.assertEquals(uids.length, v.getCOUNT());
for (String uid : uids) {
assertTrue(v.getUIDList().contains(uid));
Assert.assertTrue(uid + " missing from UIDs list", v.getUIDList().contains(uid));
}
}

Expand All @@ -57,7 +64,7 @@ private void validateRemoval(Value topValue, String... uids) throws InvalidProto

Assert.assertEquals(-uids.length, v.getCOUNT());
for (String uid : uids) {
assertTrue(v.getREMOVEDUIDList().contains(uid));
Assert.assertTrue(uid + " missing from Removed UIDs list", v.getREMOVEDUIDList().contains(uid));
}
}

Expand Down Expand Up @@ -262,6 +269,44 @@ private SortedMultiMapIterator createSourceWithTestData() {
return new SortedMultiMapIterator(map);
}

@Test
public void testAggregateOptions() throws IOException {
TreeMultimap<Key,Value> map = TreeMultimap.create();

map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.1").build().toByteArray()));
map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.2").build().toByteArray()));
map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.3").build().toByteArray()));
map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), new Value(createValueWithUid("abc.4").build().toByteArray()));

map.put(newKey(SHARD, FIELD_TO_AGGREGATE, "abd"), new Value(createValueWithUid("abc.3").build().toByteArray()));

SortedMultiMapIterator data = new SortedMultiMapIterator(map);

PropogatingIterator iter = new PropogatingIterator();
Map<String,String> options = Maps.newHashMap();

String encodedOptions = "all=true;maxuids=2";
options.put(PropogatingIterator.AGGREGATOR_DEFAULT, GlobalIndexUidAggregator.class.getCanonicalName() + " " + encodedOptions);

IteratorEnvironment env = new MockIteratorEnvironment(false);

iter.init(data, options, env);

iter.seek(new Range(), Collections.emptyList(), false);

Assert.assertTrue(iter.hasTop());

Key topKey = iter.getTopKey();

Assert.assertEquals(newKey(SHARD, FIELD_TO_AGGREGATE, "abc"), topKey);
validateOverfullUidList(iter.getTopValue(), 4);
iter.next();
topKey = iter.getTopKey();
Assert.assertEquals(newKey(SHARD, FIELD_TO_AGGREGATE, "abd"), topKey);
validateUids(iter.getTopValue(), "abc.3");

}

@Test(expected = NullPointerException.class)
public void testNullEnvironment() throws IOException {
PropogatingIterator iter = new PropogatingIterator();
Expand Down

0 comments on commit 47c5f83

Please sign in to comment.