diff --git a/warehouse/age-off/pom.xml b/warehouse/age-off/pom.xml index 696e1ea3278..727e191e0cf 100644 --- a/warehouse/age-off/pom.xml +++ b/warehouse/age-off/pom.xml @@ -63,6 +63,11 @@ ${version.hamcrest} test + + org.mockito + mockito-core + test + org.slf4j diff --git a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/AgeOffRuleLoader.java b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/AgeOffRuleLoader.java index 89fad8a4dcd..6d233fd91eb 100644 --- a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/AgeOffRuleLoader.java +++ b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/AgeOffRuleLoader.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -12,6 +13,7 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.hadoop.io.IOUtils; @@ -21,6 +23,7 @@ import org.w3c.dom.NamedNodeMap; import org.w3c.dom.Node; import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; import datawave.iterators.filter.AgeOffConfigParams; import datawave.iterators.filter.ageoff.FilterOptions; @@ -34,7 +37,7 @@ public AgeOffRuleLoader(AgeOffFileLoaderDependencyProvider loaderConfig) { this.loaderConfig = loaderConfig; } - public List load(InputStream in) throws IOException, java.lang.reflect.InvocationTargetException, NoSuchMethodException { + public List load(InputStream in) throws IOException { List mergedRuleConfigs = loadRuleConfigs(in); List filterRules = new ArrayList<>(); /** @@ -42,6 +45,10 @@ public List load(InputStream in) throws IOException, java.lang.refle */ for (RuleConfig ruleConfig : mergedRuleConfigs) { try { + if (ruleConfig.filterClassName == null) { + throw new IllegalArgumentException("The filter class must not be null"); + } + FilterRule filter = (FilterRule) Class.forName(ruleConfig.filterClassName).getDeclaredConstructor().newInstance(); FilterOptions option = new FilterOptions(); @@ -69,8 +76,9 @@ public List load(InputStream in) throws IOException, java.lang.refle filterRules.add(filter); - } catch (InstantiationException | ClassNotFoundException | IllegalAccessException e) { - log.error(e); + } catch (IllegalArgumentException | InstantiationException | ClassNotFoundException | IllegalAccessException | InvocationTargetException + | NoSuchMethodException e) { + log.trace("An error occurred while loading age-off rules, the exception will be rethrown", e); throw new IOException(e); } } @@ -133,8 +141,8 @@ protected List loadRuleConfigs(InputStream in) throws IOException { ruleConfigs.addAll(childRules); // @formatter:on - } catch (Exception ex) { - log.error("uh oh: " + ex); + } catch (ParserConfigurationException | SAXException ex) { + log.trace("An error occurred while loading age-off rules, the exception will be rethrown", ex); throw new IOException(ex); } finally { IOUtils.closeStream(in); diff --git a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileLoaderDependencyProvider.java b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileLoaderDependencyProvider.java new file mode 100644 index 00000000000..acf58f389dc --- /dev/null +++ b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileLoaderDependencyProvider.java @@ -0,0 +1,42 @@ +package datawave.ingest.util.cache.watch; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.w3c.dom.Node; + +class FileLoaderDependencyProvider implements AgeOffRuleLoader.AgeOffFileLoaderDependencyProvider { + private final FileSystem fs; + private final Path filePath; + private final IteratorEnvironment iterEnv; + + FileLoaderDependencyProvider(FileSystem fs, Path filePath, IteratorEnvironment iterEnv) { + this.fs = fs; + this.filePath = filePath; + this.iterEnv = iterEnv; + } + + @Override + public IteratorEnvironment getIterEnv() { + return iterEnv; + } + + @Override + public InputStream getParentStream(Node parent) throws IOException { + + String parentPathStr = parent.getTextContent(); + + if (null == parentPathStr || parentPathStr.isEmpty()) { + throw new IllegalArgumentException("Invalid parent config path, none specified!"); + } + // loading parent relative to dir that child is in. + Path parentPath = new Path(filePath.getParent(), parentPathStr); + if (!fs.exists(parentPath)) { + throw new IllegalArgumentException("Invalid parent config path specified, " + parentPathStr + " does not exist!"); + } + return fs.open(parentPath); + } +} diff --git a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleCacheLoader.java b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleCacheLoader.java new file mode 100644 index 00000000000..11de4a5622b --- /dev/null +++ b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleCacheLoader.java @@ -0,0 +1,48 @@ +package datawave.ingest.util.cache.watch; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; + +import com.google.common.cache.CacheLoader; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Cache loader implementation for loading {@link FileRuleCacheValue} referencing {@link Path} keys. + */ +public class FileRuleCacheLoader extends CacheLoader { + private final static int CONFIGURED_DIFF = 1; + + /** + * Reloads a new {@link FileRuleCacheValue} if the cached value has changes, otherwise returns the @param oldValue. + * + * @param key + * the key to reload for + * @param oldValue + * the existing value + * @return a new value if there are changes, otherwise @param oldValue is returned + * @throws IOException + * if any errors occur when loading a new instance of the cache value + */ + @Override + public ListenableFuture reload(String key, FileRuleCacheValue oldValue) throws IOException { + // checks here are performed on the caller thread + FileRuleCacheValue resultValue = oldValue.hasChanges() ? load(key) : oldValue; + return Futures.immediateFuture(resultValue); + } + + /** + * Loads a new rule cache value instance + * + * @param key + * the non-null key whose value should be loaded + * @return a new rule cache value instance + * @throws IOException + * if any errors occur when loading a new instance of the cache value + */ + @Override + public FileRuleCacheValue load(String key) throws IOException { + return FileRuleCacheValue.newCacheValue(key, CONFIGURED_DIFF); + } +} diff --git a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleCacheValue.java b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleCacheValue.java new file mode 100644 index 00000000000..b38b33fbd2c --- /dev/null +++ b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleCacheValue.java @@ -0,0 +1,126 @@ +package datawave.ingest.util.cache.watch; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; + +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +import com.google.common.annotations.VisibleForTesting; + +import datawave.iterators.filter.ageoff.AppliedRule; +import datawave.iterators.filter.ageoff.FilterRule; + +/** + * Rule cache value implementation for use with age-off rule loading. The implementation is thread-safe and supports concurrent access for all methods. + */ +public class FileRuleCacheValue { + private final static Logger log = Logger.getLogger(FileRuleCacheValue.class); + + private final Path filePath; + private final long configuredDiff; + private final FileSystem fs; + + private volatile FileRuleReference ruleRef; + + FileRuleCacheValue(FileSystem fs, Path filePath, long configuredDiff) { + this.filePath = filePath; + this.configuredDiff = configuredDiff; + this.fs = fs; + } + + /** + * Creates a new instance of this class for the specified @param filePath. Actual evaluation of the @param filePath are deferred until calls to + * {@link #newRulesetView(long, IteratorEnvironment)} + * + * @param filePath + * the file path to prepare a cached representation on + * @param configuredDiff + * the threshold time (in milliseconds) for when timestamp differences are considered changes + * @return a new cache value instance + * @throws IOException + * if the cache value instance cannot be created + */ + public static FileRuleCacheValue newCacheValue(String filePath, long configuredDiff) throws IOException { + Path filePathObj = new Path(filePath); + FileSystem fs = filePathObj.getFileSystem(new Configuration()); + return new FileRuleCacheValue(fs, filePathObj, configuredDiff); + } + + /** + * Gets the file path of this instance. + * + * @return path for the instance + */ + public Path getFilePath() { + return filePath; + } + + /** + * Check if the cached representation has changes. Changes are determined by checking the baseline modification time when the cached representation was + * discovered against the current modification time of the file. + * + * @return true if there are changes, otherwise false + */ + public boolean hasChanges() { + if (ruleRef == null) { + return true; + } + long currentTime; + try { + currentTime = fs.getFileStatus(filePath).getModificationTime(); + } catch (IOException e) { + log.debug("Error getting file status for: " + filePath, e); + return true; + } + long previousTime = ruleRef.getTimestamp(); + boolean changed = (currentTime - previousTime) > configuredDiff; + if (log.isTraceEnabled()) { + log.trace("Changes result: " + changed + ", current time: " + currentTime); + } + return changed; + } + + /** + * Creates a new ruleset view of the file. The initial call to the method will lazily create the base rules and return a view of the baseline rules. The + * next calls will create new view copies derived from the baseline rules. + * + * @param scanStart + * the start of a scan operation to use for the ruleset + * @param iterEnv + * the iterator environment for the scan + * @return a deep copy of the cached {@link AppliedRule} baseline rules + * @throws IOException + * if there are errors during the cache value creation, on initial call + */ + public Collection newRulesetView(long scanStart, IteratorEnvironment iterEnv) throws IOException { + // rule initialization/copies are performed on the calling thread + // the base iterator rules will use an iterator environment from the caller (and keep in the AppliedRule) + // the deep copy always creates new views of the rules with the caller's iterator environment + if (ruleRef == null) { + long ts = fs.getFileStatus(filePath).getModificationTime(); + Collection rulesBase = loadFilterRules(iterEnv); + ruleRef = new FileRuleReference(ts, rulesBase); + } + return ruleRef.deepCopy(scanStart, iterEnv); + } + + @VisibleForTesting + Collection loadFilterRules(IteratorEnvironment iterEnv) throws IOException { + AgeOffRuleLoader ruleLoader = new AgeOffRuleLoader(new FileLoaderDependencyProvider(fs, filePath, iterEnv)); + Collection rulesBase; + try (InputStream in = fs.open(filePath)) { + rulesBase = ruleLoader.load(in); + } + return rulesBase; + } + + @VisibleForTesting + FileRuleReference getRuleRef() { + return ruleRef; + } +} diff --git a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleReference.java b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleReference.java new file mode 100644 index 00000000000..e11cdee6ed1 --- /dev/null +++ b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleReference.java @@ -0,0 +1,27 @@ +package datawave.ingest.util.cache.watch; + +import java.util.Collection; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.iterators.IteratorEnvironment; + +import datawave.iterators.filter.ageoff.AppliedRule; +import datawave.iterators.filter.ageoff.FilterRule; + +class FileRuleReference { + private final long ts; + private final Collection rulesBase; + + FileRuleReference(long ts, Collection rulesBase) { + this.ts = ts; + this.rulesBase = rulesBase; + } + + public long getTimestamp() { + return ts; + } + + public Collection deepCopy(long scanStart, IteratorEnvironment iterEnv) { + return rulesBase.stream().map(rule -> (AppliedRule) rule.deepCopy(scanStart, iterEnv)).collect(Collectors.toList()); + } +} diff --git a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleWatcher.java b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleWatcher.java index c35619e5ece..fd2397debd3 100644 --- a/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleWatcher.java +++ b/warehouse/age-off/src/main/java/datawave/ingest/util/cache/watch/FileRuleWatcher.java @@ -10,7 +10,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Logger; -import org.w3c.dom.Node; import datawave.iterators.filter.ageoff.FilterRule; @@ -89,37 +88,10 @@ public FileRuleWatcher(Path filePath, long configuredDiff, IteratorEnvironment i @Override protected Collection loadContents(InputStream in) throws IOException { try { - AgeOffRuleLoader ruleLoader = new AgeOffRuleLoader(new FileWatcherDependencyProvider()); + AgeOffRuleLoader ruleLoader = new AgeOffRuleLoader(new FileLoaderDependencyProvider(fs, filePath, iterEnv)); return ruleLoader.load(in); - } catch (Exception ex) { - log.error("uh oh: " + ex); - throw new IOException(ex); } finally { IOUtils.closeStream(in); } } - - private class FileWatcherDependencyProvider implements AgeOffRuleLoader.AgeOffFileLoaderDependencyProvider { - @Override - public IteratorEnvironment getIterEnv() { - return iterEnv; - } - - @Override - public InputStream getParentStream(Node parent) throws IOException { - - String parentPathStr = parent.getTextContent(); - - if (null == parentPathStr || parentPathStr.isEmpty()) { - throw new IllegalArgumentException("Invalid parent config path, none specified!"); - } - // loading parent relative to dir that child is in. - Path parentPath = new Path(filePath.getParent(), parentPathStr); - if (!fs.exists(parentPath)) { - throw new IllegalArgumentException("Invalid parent config path specified, " + parentPathStr + " does not exist!"); - } - return fs.open(parentPath); - } - } - } diff --git a/warehouse/age-off/src/main/java/datawave/iterators/filter/ConfigurableAgeOffFilter.java b/warehouse/age-off/src/main/java/datawave/iterators/filter/ConfigurableAgeOffFilter.java index d823b3639fa..61b8f23bb08 100644 --- a/warehouse/age-off/src/main/java/datawave/iterators/filter/ConfigurableAgeOffFilter.java +++ b/warehouse/age-off/src/main/java/datawave/iterators/filter/ConfigurableAgeOffFilter.java @@ -1,7 +1,6 @@ package datawave.iterators.filter; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; @@ -10,7 +9,6 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -24,22 +22,19 @@ import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.OptionDescriber; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import datawave.ingest.util.cache.ReloadableCacheBuilder; -import datawave.ingest.util.cache.watch.FileRuleWatcher; +import datawave.ingest.util.cache.watch.FileRuleCacheLoader; +import datawave.ingest.util.cache.watch.FileRuleCacheValue; import datawave.iterators.filter.ageoff.AgeOffPeriod; import datawave.iterators.filter.ageoff.AppliedRule; -import datawave.iterators.filter.ageoff.FilterRule; /** * This class provides a subclass of the {@code org.apache.accumulo.core.iterators.Filter} class and implements the {@code Option Describer} interface. It @@ -121,11 +116,7 @@ public class ConfigurableAgeOffFilter extends Filter implements OptionDescriber protected static final long DEFAULT_EXPIRATION_INTERVAL_MS = 60 * 60 * 1000L; // default 1 hour protected static long EXPIRATION_INTERVAL_MS = DEFAULT_EXPIRATION_INTERVAL_MS; - /** - * Changed filter list to use FilterRule - */ - - protected static LoadingCache> ruleCache = null; + protected static volatile LoadingCache ruleCache = null; protected Collection filterList; @@ -137,11 +128,11 @@ public class ConfigurableAgeOffFilter extends Filter implements OptionDescriber protected String filename; - protected static FileSystem fs = null; - protected IteratorEnvironment myEnv; - private PluginEnvironment pluginEnv; + protected PluginEnvironment pluginEnv; + + protected FileRuleCacheValue cacheValue; // Adding the ability to disable the filter checks in the case of a system-initialized major compaction for example. // The thought is that we force compactions where we want the data to aged off. @@ -435,14 +426,15 @@ private boolean shouldDisableForNonUserCompaction(Map options, It * if there is an error reading the configuration file */ private void initFilterRules() throws IllegalArgumentException, IOException { - // filename if (null == ruleCache) { synchronized (ConfigurableAgeOffFilter.class) { if (null == ruleCache) { UPDATE_INTERVAL_MS = getLongProperty(UPDATE_INTERVAL_MS_PROP, DEFAULT_UPDATE_INTERVAL_MS); // 5 ms EXPIRATION_INTERVAL_MS = getLongProperty(EXPIRATION_INTERVAL_MS_PROP, DEFAULT_EXPIRATION_INTERVAL_MS); // 1 hour + log.debug("Configured refresh interval (ms): " + UPDATE_INTERVAL_MS); + log.debug("Configured expiration interval (ms): " + EXPIRATION_INTERVAL_MS); ruleCache = CacheBuilder.newBuilder().refreshAfterWrite(UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) - .expireAfterAccess(EXPIRATION_INTERVAL_MS, TimeUnit.MILLISECONDS).build(new ReloadableCacheBuilder()); + .expireAfterAccess(EXPIRATION_INTERVAL_MS, TimeUnit.MILLISECONDS).build(new FileRuleCacheLoader()); // this will schedule a check to see if the update or expiration intervals have changed // if so the ruleCache will be rebuilt with these new intervals SIMPLE_TIMER.scheduleWithFixedDelay(() -> { @@ -455,7 +447,7 @@ private void initFilterRules() throws IllegalArgumentException, IOException { log.info("Changing " + EXPIRATION_INTERVAL_MS_PROP + " to " + expiration); EXPIRATION_INTERVAL_MS = expiration; ruleCache = CacheBuilder.newBuilder().refreshAfterWrite(UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) - .expireAfterAccess(EXPIRATION_INTERVAL_MS, TimeUnit.MILLISECONDS).build(new ReloadableCacheBuilder()); + .expireAfterAccess(EXPIRATION_INTERVAL_MS, TimeUnit.MILLISECONDS).build(new FileRuleCacheLoader()); } } catch (Throwable t) { log.error(t, t); @@ -465,25 +457,17 @@ private void initFilterRules() throws IllegalArgumentException, IOException { } } - Path filePath = new Path(filename); - if (null == fs) { - synchronized (ConfigurableAgeOffFilter.class) { - if (null == fs) { - if (log.isTraceEnabled()) { - log.trace("Setting FileSystem reference"); - } - fs = filePath.getFileSystem(new Configuration()); - } - } - } else { - if (log.isTraceEnabled()) { - log.trace("Reusing file system reference."); - } + try { + cacheValue = ruleCache.get(filename); + } catch (ExecutionException e) { + throw new IOException(e); } - FileRuleWatcher watcherKey = new FileRuleWatcher(fs, filePath, 1, myEnv); - - copyRules(watcherKey); + // the rule cache value will be static + // initial load of the baseline rules will occur here, that call may take time + // after initial load, each subsequent initialize performs a deep copy (init) against the AppliedRule + // the internal deep copy operation (i.e. calls after initial load) are anticipated to be quick + filterList = cacheValue.newRulesetView(scanStart, myEnv); } private long getLongProperty(final String prop, final long defaultValue) { @@ -496,27 +480,6 @@ private long getLongProperty(final String prop, final long defaultValue) { return defaultValue; } - protected void copyRules(FileRuleWatcher watcherKey) throws IOException { - filterList = new ArrayList<>(); - try { - // rule cache is lazily loaded, so the act of getting the key will populate it with the key - // and trigger a bunch of loading logic which will ultimately call - // FileRuleWatcher.loadContents() which will return the rules - Collection rules = ruleCache.get(watcherKey); - - if (rules != null) { - for (FilterRule rule : rules) { - // NOTE: this propagates the anchor time (scanStart) to all of the applied rules - // This is used to calculate the AgeOffPeriod for all of the rules - filterList.add((AppliedRule) rule.deepCopy(this.scanStart, myEnv)); - } - } - - } catch (ExecutionException e) { - throw new IOException(e); - } - } - /** * This method is used by accumulo and its command line shell to prompt the user for the configuration options for this {@code Filter}. * @@ -589,6 +552,11 @@ private boolean validatePropertyIsBoolean(Map options, String pro return true; } + @VisibleForTesting + FileRuleCacheValue getFileRuleCacheValue() { + return cacheValue; + } + /** * Clear the file watcher cache. */ @@ -599,7 +567,7 @@ public static void clearCache() { } } - public static LoadingCache> getCache() { + public static LoadingCache getCache() { return ruleCache; } diff --git a/warehouse/age-off/src/main/java/datawave/iterators/filter/ageoff/AppliedRule.java b/warehouse/age-off/src/main/java/datawave/iterators/filter/ageoff/AppliedRule.java index 18919877b2f..d123f7d5104 100644 --- a/warehouse/age-off/src/main/java/datawave/iterators/filter/ageoff/AppliedRule.java +++ b/warehouse/age-off/src/main/java/datawave/iterators/filter/ageoff/AppliedRule.java @@ -112,7 +112,9 @@ public FilterRule deepCopy(AgeOffPeriod period, IteratorEnvironment iterEnv) { newFilter.deepCopyInit(currentOptions, this); // for some reason this needs to come after deep copy init newFilter.ageOffPeriod = new AgeOffPeriod(period.getCutOffMilliseconds()); - log.trace("Age off is " + newFilter.ageOffPeriod.getCutOffMilliseconds()); + if (log.isTraceEnabled()) { + log.trace("Age off is " + newFilter.ageOffPeriod.getCutOffMilliseconds()); + } return newFilter; } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { log.error(e); @@ -137,7 +139,9 @@ public FilterRule deepCopy(long scanStart, IteratorEnvironment iterEnv) { newFilter.deepCopyInit(newOptions, this); // for some reason this needs to come after deep copy init newFilter.ageOffPeriod = new AgeOffPeriod(scanStart, currentOptions.ttl, currentOptions.ttlUnits); - log.trace("Age off is " + newFilter.ageOffPeriod.getCutOffMilliseconds()); + if (log.isTraceEnabled()) { + log.trace("Age off is " + newFilter.ageOffPeriod.getCutOffMilliseconds()); + } return newFilter; } catch (InstantiationException | IllegalAccessException e) { log.error(e); diff --git a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/DifferentClassesMergeTest.java b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/DifferentClassesMergeTest.java index dbdde213961..aacbe3de00e 100644 --- a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/DifferentClassesMergeTest.java +++ b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/DifferentClassesMergeTest.java @@ -30,7 +30,6 @@ public class DifferentClassesMergeTest { private static final long TIMESTAMP_IN_FUTURE = 1000L * 60 * 60 * 24 * 365 * 1000; // 1,000 years in the future public static final long DAYS_AGO = -1000L * 60 * 60 * 24; - private FileRuleWatcher watcher; private ColumnVisibilityLabeledFilter parentFilter; // childFilter inherits matchPattern contents from parentFilter private EdgeColumnQualifierTokenFilter childFilter; @@ -40,12 +39,11 @@ public void before() throws IOException { // create childFilter Path childPath = new Path(this.getClass().getResource(CHILD_FILTER_CONFIGURATION_FILE).toString()); FileSystem fs = childPath.getFileSystem(new Configuration()); - watcher = new FileRuleWatcher(fs, childPath, 1); - childFilter = (EdgeColumnQualifierTokenFilter) loadRulesFromFile(watcher, fs, childPath); + childFilter = (EdgeColumnQualifierTokenFilter) loadRulesFromFile(fs, childPath); // create parentFilter Path rootPath = new Path(this.getClass().getResource(ROOT_FILTER_CONFIGURATION_FILE).toString()); - parentFilter = (ColumnVisibilityLabeledFilter) loadRulesFromFile(watcher, fs, rootPath); + parentFilter = (ColumnVisibilityLabeledFilter) loadRulesFromFile(fs, rootPath); } @Test @@ -208,8 +206,9 @@ private void assertParentRejects(String colVis, long timestamp) { assertFalse(parentFilter.accept(key, new Value())); } - private static AppliedRule loadRulesFromFile(FileRuleWatcher watcher, FileSystem fs, Path filePath) throws IOException { - Collection rules = watcher.loadContents(fs.open(filePath)); + private static AppliedRule loadRulesFromFile(FileSystem fs, Path filePath) throws IOException { + FileRuleCacheValue ruleValue = new FileRuleCacheValue(fs, filePath, 1); + Collection rules = ruleValue.loadFilterRules(null); // should only have the single rule assertEquals(1, rules.size()); return (AppliedRule) rules.iterator().next(); diff --git a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleCacheLoaderTest.java b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleCacheLoaderTest.java new file mode 100644 index 00000000000..d2506f755e9 --- /dev/null +++ b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleCacheLoaderTest.java @@ -0,0 +1,44 @@ +package datawave.ingest.util.cache.watch; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; + +import com.google.common.util.concurrent.ListenableFuture; + +public class FileRuleCacheLoaderTest { + @Test + public void testReloadReturnsNewInstanceWhenChanged() throws Exception { + String path = "file:/path/to/file"; + FileRuleCacheLoader loader = new FileRuleCacheLoader(); + FileRuleCacheValue val = mock(FileRuleCacheValue.class); + when(val.hasChanges()).thenReturn(true); + ListenableFuture reloadedVal = loader.reload(path, val); + + assertNotSame(val, reloadedVal.get()); + } + + @Test + public void testReloadReturnsSameInstanceWhenNotChanged() throws Exception { + String path = "file:/path/to/file"; + FileRuleCacheLoader loader = new FileRuleCacheLoader(); + FileRuleCacheValue val = mock(FileRuleCacheValue.class); + when(val.hasChanges()).thenReturn(false); + ListenableFuture reloadedVal = loader.reload(path, val); + + assertSame(val, reloadedVal.get()); + } + + @Test + public void testLoadWillCreateNewInstance() throws Exception { + String path = "file:/path/to/file"; + FileRuleCacheLoader loader = new FileRuleCacheLoader(); + FileRuleCacheValue loadedVal = loader.load(path); + + assertEquals(path, loadedVal.getFilePath().toString()); + } +} diff --git a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleCacheValueTest.java b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleCacheValueTest.java new file mode 100644 index 00000000000..ef6660ed9e7 --- /dev/null +++ b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleCacheValueTest.java @@ -0,0 +1,167 @@ +package datawave.ingest.util.cache.watch; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import datawave.iterators.filter.AgeOffConfigParams; +import datawave.iterators.filter.ageoff.AppliedRule; +import datawave.iterators.filter.ageoff.FilterRule; + +public class FileRuleCacheValueTest { + // Derived from original FileRuleWatcherTest unit tests + + private static final String FILTER_CONFIGURATION_FILE = "/filter/test-filter-rules.xml"; + private static final String FILE_WITH_MISSING_FILTER_CLASS = "/filter/missing-filter-class.xml"; + private static final String DEFAULT_UNITS = "d"; + private FileRuleCacheValue ruleValue; + private FileSystem fs; + private Path filePath; + private Collection rules; + private Map rulesByMatchPattern; + + @Before + public void before() throws IOException { + rulesByMatchPattern = new HashMap<>(); + filePath = new Path(this.getClass().getResource(FILTER_CONFIGURATION_FILE).toString()); + fs = filePath.getFileSystem(new Configuration()); + ruleValue = new FileRuleCacheValue(fs, filePath, 1); + rules = ruleValue.loadFilterRules(null); + Assert.assertEquals(5, rules.size()); + for (FilterRule rule : rules) { + Assert.assertEquals(TestFilter.class, rule.getClass()); + TestFilter testFilter = (TestFilter) rule; + String matchPattern = testFilter.options.getOption(AgeOffConfigParams.MATCHPATTERN); + rulesByMatchPattern.put(matchPattern, testFilter); + } + } + + @Test + public void verifyNoBleedOverOfTTlValue() { + Assert.assertEquals(10, rulesByMatchPattern.get("1").options.getTTL()); + Assert.assertEquals(-1, rulesByMatchPattern.get("A").options.getTTL()); + Assert.assertEquals(50, rulesByMatchPattern.get("B").options.getTTL()); + Assert.assertEquals(-1, rulesByMatchPattern.get("C").options.getTTL()); + Assert.assertEquals(10, rulesByMatchPattern.get("D").options.getTTL()); + } + + @Test + public void verifyNoBleedOverOfTTlUnits() { + Assert.assertEquals("ms", rulesByMatchPattern.get("1").options.getTTLUnits()); + Assert.assertEquals(DEFAULT_UNITS, rulesByMatchPattern.get("A").options.getTTLUnits()); + Assert.assertEquals("d", rulesByMatchPattern.get("B").options.getTTLUnits()); + Assert.assertEquals(DEFAULT_UNITS, rulesByMatchPattern.get("C").options.getTTLUnits()); + Assert.assertEquals("ms", rulesByMatchPattern.get("D").options.getTTLUnits()); + } + + @Test + public void verifyNoBleedOverOfExtendedOptions() { + Assert.assertEquals("false", rulesByMatchPattern.get("1").options.getOption("filtersWater")); + Assert.assertNull(rulesByMatchPattern.get("A").options.getOption("filtersWater")); + Assert.assertEquals("true", rulesByMatchPattern.get("B").options.getOption("filtersWater")); + Assert.assertNull(rulesByMatchPattern.get("C").options.getOption("filtersWater")); + Assert.assertEquals("false", rulesByMatchPattern.get("D").options.getOption("filtersWater")); + + Assert.assertEquals("1234", rulesByMatchPattern.get("1").options.getOption("myTagName.ttl")); + Assert.assertNull(rulesByMatchPattern.get("A").options.getOption("myTagName.ttl")); + Assert.assertNull(rulesByMatchPattern.get("B").options.getOption("myTagName.ttl")); + Assert.assertNull(rulesByMatchPattern.get("C").options.getOption("myTagName.ttl")); + Assert.assertNull(rulesByMatchPattern.get("D").options.getOption("myTagName.ttl")); + } + + @Test + public void verifyDeepCopyWillSeeDifferentRules() throws IOException { + Collection v1 = ruleValue.newRulesetView(0, null); + FileRuleReference r1 = ruleValue.getRuleRef(); + Collection v2 = ruleValue.newRulesetView(0, null); + FileRuleReference r2 = ruleValue.getRuleRef(); + + // check to ensure the applied rules returned are different objects + // but the underlying rule reference is not changing + Assert.assertNotSame(v1, v2); + Assert.assertSame(r1, r2); + } + + @Test(expected = IOException.class) + public void verifyFilterClass() throws IOException { + Path fileWithMissingClassname = new Path(this.getClass().getResource(FILE_WITH_MISSING_FILTER_CLASS).toString()); + FileRuleCacheValue exValue = new FileRuleCacheValue(fs, fileWithMissingClassname, 1); + rules = exValue.loadFilterRules(null); + } + + @Test + public void verifyNumericFieldInOptions() { + // backwards compatibility + Assert.assertEquals("2468", rulesByMatchPattern.get("D").options.getOption("last.ttl")); + Assert.assertEquals(10, rulesByMatchPattern.get("D").options.getTTL()); + Assert.assertEquals("ms", rulesByMatchPattern.get("D").options.getTTLUnits()); + // revised options + Assert.assertEquals("first,last", rulesByMatchPattern.get("D").options.getOption("fields")); + Assert.assertEquals("1234", rulesByMatchPattern.get("D").options.getOption("field.middle.ttl")); + Assert.assertEquals("m", rulesByMatchPattern.get("D").options.getOption("field.middle.ttlUnits")); + Assert.assertEquals("10", rulesByMatchPattern.get("D").options.getOption("field.suffix.ttl")); + Assert.assertEquals("d", rulesByMatchPattern.get("D").options.getOption("field.suffix.ttlUnits")); + Assert.assertEquals("77", rulesByMatchPattern.get("D").options.getOption("datatype.012345.ttl")); + Assert.assertEquals("ms", rulesByMatchPattern.get("D").options.getOption("datatype.012345.ttlUnits")); + } + + @Test + public void verifyHasChangesIfNotInitializedReturnsChanges() { + FileSystem fs = Mockito.mock(FileSystem.class); + Path path = new Path("hdfs://path/to/file"); + FileRuleCacheValue val = new FileRuleCacheValue(fs, path, 1L); + Assert.assertTrue(val.hasChanges()); + } + + @Test + public void verifyHasChangesIfThrowsReturnsChanges() throws IOException { + FileSystem fsSpy = Mockito.spy(fs); + Mockito.when(fsSpy.getFileStatus(filePath)).thenThrow(new IllegalStateException("Unable to fetch status")); + FileRuleCacheValue val = new FileRuleCacheValue(fs, filePath, 1L); + Assert.assertTrue(val.hasChanges()); + } + + @Test + public void verifyHasChangesWhenChanges() throws IOException { + long timestampBaseline = 0L; + long configuredDiff = 1L; + FileSystem fsSpy = Mockito.spy(fs); + FileStatus statusBase = Mockito.mock(FileStatus.class); + FileStatus statusUnchanged = Mockito.mock(FileStatus.class); + FileStatus statusChanged = Mockito.mock(FileStatus.class); + Mockito.when(statusBase.getModificationTime()).thenReturn(timestampBaseline); + Mockito.when(statusUnchanged.getModificationTime()).thenReturn(timestampBaseline + configuredDiff); + Mockito.when(statusChanged.getModificationTime()).thenReturn(timestampBaseline + configuredDiff + 1); + Mockito.when(fsSpy.getFileStatus(filePath)).thenReturn(statusBase); + + FileRuleCacheValue val = new FileRuleCacheValue(fsSpy, filePath, configuredDiff); + val.newRulesetView(0L, null); + + Assert.assertNotNull(val.getRuleRef()); + + // assert no changes after loading + Assert.assertFalse("Expected no changes initial evaluation", val.hasChanges()); + + // reset status to be unchanged (more than baseline) + Mockito.when(fsSpy.getFileStatus(filePath)).thenReturn(statusUnchanged); + + // assert no changes after loading + Assert.assertFalse("Expected no changes at threshold evaluation", val.hasChanges()); + + // reset status to be changed + Mockito.when(fsSpy.getFileStatus(filePath)).thenReturn(statusChanged); + + // assert changes now detected when (modificationTime - baseline) > configuredDiff + Assert.assertTrue("Expected evaluation has changes", val.hasChanges()); + } +} diff --git a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleDataTypeMergeTest.java b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleDataTypeMergeTest.java index ea185ebd9df..03845eaa3aa 100644 --- a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleDataTypeMergeTest.java +++ b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleDataTypeMergeTest.java @@ -22,7 +22,6 @@ public class FileRuleDataTypeMergeTest { private static final String ROOT_FILTER_CONFIGURATION_FILE = "/filter/test-root-data-type.xml"; private static final String CHILD_FILTER_CONFIGURATION_FILE = "/filter/test-customized-data-type.xml"; - private FileRuleWatcher watcher; private TestDataTypeFilter parentFilter; // this one inherits defaults from parentFilter private TestDataTypeFilter childFilter; @@ -32,9 +31,8 @@ public void before() throws IOException { Path childPath = new Path(this.getClass().getResource(CHILD_FILTER_CONFIGURATION_FILE).toString()); Path rootPath = new Path(this.getClass().getResource(ROOT_FILTER_CONFIGURATION_FILE).toString()); FileSystem fs = childPath.getFileSystem(new Configuration()); - watcher = new FileRuleWatcher(fs, childPath, 1); - parentFilter = (TestDataTypeFilter) loadRulesFromFile(watcher, fs, rootPath); - childFilter = (TestDataTypeFilter) loadRulesFromFile(watcher, fs, childPath); + parentFilter = (TestDataTypeFilter) loadRulesFromFile(fs, rootPath); + childFilter = (TestDataTypeFilter) loadRulesFromFile(fs, childPath); } @Test @@ -66,8 +64,9 @@ public void verifyOverridenValues() throws IOException { assertThat(childFilter.options.getOption("zip.ttl"), is("123")); } - private static FilterRule loadRulesFromFile(FileRuleWatcher watcher, FileSystem fs, Path filePath) throws IOException { - Collection rules = watcher.loadContents(fs.open(filePath)); + private static FilterRule loadRulesFromFile(FileSystem fs, Path filePath) throws IOException { + FileRuleCacheValue ruleValue = new FileRuleCacheValue(fs, filePath, 1); + Collection rules = ruleValue.loadFilterRules(null); // should only have the single rule assertThat(rules.size(), is(1)); for (FilterRule rule : rules) { diff --git a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleFieldMergeTest.java b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleFieldMergeTest.java index 14b84008fe0..41a331f550b 100644 --- a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleFieldMergeTest.java +++ b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleFieldMergeTest.java @@ -24,7 +24,6 @@ public class FileRuleFieldMergeTest { static final String ROOT_FILTER_CONFIGURATION_FILE = "/filter/test-root-field.xml"; private static final String CHILD_FILTER_CONFIGURATION_FILE = "/filter/test-customized-field.xml"; - private FileRuleWatcher watcher; private TestFieldFilter parentFilter; // this one inherits defaults from parentFilter private TestFieldFilter childFilter; @@ -34,9 +33,8 @@ public void before() throws IOException { Path childPath = new Path(this.getClass().getResource(CHILD_FILTER_CONFIGURATION_FILE).toString()); Path rootPath = new Path(this.getClass().getResource(ROOT_FILTER_CONFIGURATION_FILE).toString()); FileSystem fs = childPath.getFileSystem(new Configuration()); - watcher = new FileRuleWatcher(fs, childPath, 1); - parentFilter = (TestFieldFilter) loadRulesFromFile(watcher, fs, rootPath); - childFilter = (TestFieldFilter) loadRulesFromFile(watcher, fs, childPath); + parentFilter = (TestFieldFilter) loadRulesFromFile(fs, rootPath); + childFilter = (TestFieldFilter) loadRulesFromFile(fs, childPath); } @Test @@ -62,8 +60,9 @@ private Boolean isIndexTable(TestFieldFilter filter) { return Boolean.valueOf(filter.options.getOption("isindextable", "false")); } - private static FilterRule loadRulesFromFile(FileRuleWatcher watcher, FileSystem fs, Path filePath) throws IOException { - Collection rules = watcher.loadContents(fs.open(filePath)); + private static FilterRule loadRulesFromFile(FileSystem fs, Path filePath) throws IOException { + FileRuleCacheValue cacheValue = new FileRuleCacheValue(fs, filePath, 1); + Collection rules = cacheValue.loadFilterRules(null); // should only have the single rule assertThat(rules.size(), is(1)); for (FilterRule rule : rules) { diff --git a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleLoadContentsMergeFiltersTest.java b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleLoadContentsMergeFiltersTest.java index 74994b62f8e..7fd7a325c70 100644 --- a/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleLoadContentsMergeFiltersTest.java +++ b/warehouse/age-off/src/test/java/datawave/ingest/util/cache/watch/FileRuleLoadContentsMergeFiltersTest.java @@ -35,7 +35,6 @@ public class FileRuleLoadContentsMergeFiltersTest { private static final long MILLIS_IN_DAY = 24 * 60 * 60 * 1000; private static final int MILLIS_IN_ONE_SEC = 60 * 1000; - private FileRuleWatcher watcher; private TestTrieFilter parentFilter; // this one inherits defaults from parentFilter private TestTrieFilter childFilter; @@ -50,9 +49,8 @@ public void before() throws IOException { Path childPath = new Path(this.getClass().getResource(CHILD_FILTER_CONFIGURATION_FILE).toString()); Path rootPath = new Path(this.getClass().getResource(ROOT_FILTER_CONFIGURATION_FILE).toString()); FileSystem fs = childPath.getFileSystem(new Configuration()); - watcher = new FileRuleWatcher(fs, childPath, 1); - parentFilter = (TestTrieFilter) loadRulesFromFile(watcher, fs, rootPath, 3); - childFilter = (TestTrieFilter) loadRulesFromFile(watcher, fs, childPath, 4); + parentFilter = (TestTrieFilter) loadRulesFromFile(fs, rootPath, 3); + childFilter = (TestTrieFilter) loadRulesFromFile(fs, childPath, 4); anchorTime = System.currentTimeMillis(); // use this to configure the age off evaluation, all units expected to be in days filterOptions = new FilterOptions(); @@ -99,9 +97,11 @@ public void testNewConfigMaintainsOrder() throws Exception { Path rootPath = new Path(this.getClass().getResource(ROOT_FILTER_CONFIGURATION_FILE).toString()); Path childPath = new Path(this.getClass().getResource(CHILD_FILTER_CONFIGURATION_FILE).toString()); FileSystem fs = childPath.getFileSystem(new Configuration()); + FileRuleCacheValue parentValue = new FileRuleCacheValue(fs, rootPath, 1); + FileRuleCacheValue childValue = new FileRuleCacheValue(fs, childPath, 1); - List parentRules = (List) watcher.loadContents(fs.open(rootPath)); - List childRules = (List) watcher.loadContents(fs.open(childPath)); + List parentRules = (List) parentValue.loadFilterRules(null); + List childRules = (List) childValue.loadFilterRules(null); // should have one extra rule in child assertThat(childRules.size(), is(equalTo(parentRules.size() + 1))); @@ -186,8 +186,9 @@ public void testTtl() { assertTrue(filter.isFilterRuleApplied()); } - private static FilterRule loadRulesFromFile(FileRuleWatcher watcher, FileSystem fs, Path filePath, int expectedNumRules) throws IOException { - Collection rules = watcher.loadContents(fs.open(filePath)); + private static FilterRule loadRulesFromFile(FileSystem fs, Path filePath, int expectedNumRules) throws IOException { + FileRuleCacheValue cacheValue = new FileRuleCacheValue(fs, filePath, 1); + Collection rules = cacheValue.loadFilterRules(null); assertThat(rules.size(), is(expectedNumRules)); // only return the TestTrieFilter for this test Optional first = rules.stream().filter(r -> r instanceof TestTrieFilter).findFirst(); diff --git a/warehouse/age-off/src/test/java/datawave/iterators/filter/ConfigurableAgeOffFilterTest.java b/warehouse/age-off/src/test/java/datawave/iterators/filter/ConfigurableAgeOffFilterTest.java index e577c336563..b35f976f4bf 100644 --- a/warehouse/age-off/src/test/java/datawave/iterators/filter/ConfigurableAgeOffFilterTest.java +++ b/warehouse/age-off/src/test/java/datawave/iterators/filter/ConfigurableAgeOffFilterTest.java @@ -3,6 +3,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import java.io.IOException; import java.net.URL; @@ -22,6 +23,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.junit.Test; +import datawave.ingest.util.cache.watch.FileRuleCacheValue; import datawave.iterators.filter.ageoff.AppliedRule; import datawave.iterators.filter.ageoff.ConfigurableIteratorEnvironment; import datawave.iterators.filter.ageoff.FilterOptions; @@ -101,6 +103,27 @@ public void testAcceptKeyValue_WithFile() throws Exception { assertThat(filter.accept(getKey(daysAgo(123)), VALUE), is(false)); } + @Test + public void testInit_WillCachePreviousValue() throws Exception { + Map options = getOptionsMap(30, AgeOffTtlUnits.DAYS); + ConfigurableAgeOffFilter filter1 = new ConfigurableAgeOffFilter(); + options.put(AgeOffConfigParams.FILTER_CONFIG, pathFromClassloader("/filter/test-root-rules.xml")); + filter1.init(source, options, env); + + ConfigurableAgeOffFilter filter2 = new ConfigurableAgeOffFilter(); + filter2.init(source, options, env); + + FileRuleCacheValue cacheValue1 = filter1.getFileRuleCacheValue(); + FileRuleCacheValue cacheValue2 = filter2.getFileRuleCacheValue(); + + assertNotNull(cacheValue1); + assertNotNull(cacheValue2); + + // tests that both cache values are identical showing that the cache retrieval + // used by the init sees the same value + assertSame(cacheValue1, cacheValue2); + } + @Test public void testAcceptKeyValue_TtlSet() throws Exception { ConfigurableAgeOffFilter filter = new ConfigurableAgeOffFilter(); @@ -144,7 +167,7 @@ public void testAcceptKeyValue_MultipleFilters() throws Exception { rules.addAll(singleColumnFamilyMatcher("bar", options)); // for holding the filters FilterWrapper wrapper = getWrappedFilterWithRules(rules, source, options, env); - // copy cofigs to actual filter we are testing + // copy configs to actual filter we are testing filter.initialize(wrapper); // created two rules