From cbabee9c98d68b151f87b90ddcbf6dbb29f02f78 Mon Sep 17 00:00:00 2001 From: gnydick Date: Thu, 17 Jan 2019 10:39:56 -0800 Subject: [PATCH 1/7] commit just to save changes (cherry picked from commit ace409b71e9d4c18661c5ebb29599fe8bcbd32ec) --- src/normalize/NormalizePlugin.java | 23 +++++++++++++++++++++++ src/utils/PluginConfig.java | 19 +++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 src/normalize/NormalizePlugin.java create mode 100644 src/utils/PluginConfig.java diff --git a/src/normalize/NormalizePlugin.java b/src/normalize/NormalizePlugin.java new file mode 100644 index 0000000000..50091b1d1e --- /dev/null +++ b/src/normalize/NormalizePlugin.java @@ -0,0 +1,23 @@ +package net.opentsdb.normalize; + +import com.stumbleupon.async.Deferred; +import net.opentsdb.core.TSDB; +import net.opentsdb.stats.StatsCollector; +import net.opentsdb.utils.PluginConfig; + +public abstract class NormalizePlugin { + + private PluginConfig pluginConfig; + + public NormalizePlugin(PluginConfig pluginConfig){ + this.pluginConfig = pluginConfig; + }; + + private NormalizePlugin() {} + public abstract void initialize(final TSDB tsdb); + public abstract Deferred shutdown(); + public abstract String version(); + public abstract void collectStats(final StatsCollector collector); + + +} diff --git a/src/utils/PluginConfig.java b/src/utils/PluginConfig.java new file mode 100644 index 0000000000..a7ad8d9230 --- /dev/null +++ b/src/utils/PluginConfig.java @@ -0,0 +1,19 @@ +package net.opentsdb.utils; + +import java.util.Properties; + +public abstract class PluginConfig { + + protected String configUrl; + private PluginConfig(){} + protected Properties properties; + + public PluginConfig(String configUrl){ + this.configUrl = configUrl; + } + + public Properties getConfig() { + return properties; + } + +} From 2f73b5f29cc06cfcdc79b7c0c2a8a82f013e0163 Mon Sep 17 00:00:00 2001 From: gnydick Date: Tue, 22 Jan 2019 19:16:47 -0800 Subject: [PATCH 2/7] adding the normalize plugin and plugin config --- src/core/TSDB.java | 4433 +++++++++++++++++++++++--------------------- 1 file changed, 2305 insertions(+), 2128 deletions(-) diff --git a/src/core/TSDB.java b/src/core/TSDB.java index 119c207a24..6abe9e5171 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -12,59 +12,17 @@ // see . package net.opentsdb.core; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - import com.google.common.base.Strings; import com.google.common.io.Files; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; import com.stumbleupon.async.DeferredGroupException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.hbase.async.AppendRequest; -import org.hbase.async.Bytes; -import org.hbase.async.Bytes.ByteMap; -import org.hbase.async.ClientStats; -import org.hbase.async.DeleteRequest; -import org.hbase.async.GetRequest; -import org.hbase.async.HBaseClient; -import org.hbase.async.HBaseException; -import org.hbase.async.KeyValue; -import org.hbase.async.PutRequest; -import org.hbase.async.TableNotFoundException; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.Timer; - import net.opentsdb.auth.Authentication; -import net.opentsdb.tree.TreeBuilder; -import net.opentsdb.tsd.RTPublisher; -import net.opentsdb.tsd.StorageExceptionHandler; -import net.opentsdb.uid.NoSuchUniqueId; -import net.opentsdb.uid.NoSuchUniqueName; -import net.opentsdb.uid.UniqueId; -import net.opentsdb.uid.UniqueIdFilterPlugin; -import net.opentsdb.uid.UniqueId.UniqueIdType; -import net.opentsdb.utils.Config; -import net.opentsdb.utils.DateTime; -import net.opentsdb.utils.JSON; -import net.opentsdb.utils.PluginLoader; -import net.opentsdb.utils.Threads; import net.opentsdb.meta.Annotation; import net.opentsdb.meta.MetaDataCache; import net.opentsdb.meta.TSMeta; import net.opentsdb.meta.UIDMeta; +import net.opentsdb.normalize.NormalizePlugin; import net.opentsdb.query.QueryLimitOverride; import net.opentsdb.query.expression.ExpressionFactory; import net.opentsdb.query.filter.TagVFilter; @@ -73,10 +31,34 @@ import net.opentsdb.rollup.RollupUtils; import net.opentsdb.search.SearchPlugin; import net.opentsdb.search.SearchQuery; -import net.opentsdb.tools.StartupPlugin; import net.opentsdb.stats.Histogram; import net.opentsdb.stats.QueryStats; import net.opentsdb.stats.StatsCollector; +import net.opentsdb.tools.StartupPlugin; +import net.opentsdb.tree.TreeBuilder; +import net.opentsdb.tsd.RTPublisher; +import net.opentsdb.tsd.StorageExceptionHandler; +import net.opentsdb.uid.NoSuchUniqueId; +import net.opentsdb.uid.NoSuchUniqueName; +import net.opentsdb.uid.UniqueId; +import net.opentsdb.uid.UniqueId.UniqueIdType; +import net.opentsdb.uid.UniqueIdFilterPlugin; +import net.opentsdb.utils.Config; +import net.opentsdb.utils.*; +import org.hbase.async.*; +import org.hbase.async.Bytes.ByteMap; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.charset.Charset; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; /** * Thread-safe implementation of the TSDB client. @@ -85,2108 +67,2303 @@ * points or query the database. */ public final class TSDB { - private static final Logger LOG = LoggerFactory.getLogger(TSDB.class); - - static final byte[] FAMILY = { 't' }; - - /** Charset used to convert Strings to byte arrays and back. */ - private static final Charset CHARSET = Charset.forName("ISO-8859-1"); - private static final String METRICS_QUAL = "metrics"; - private static short METRICS_WIDTH = 3; - private static final String TAG_NAME_QUAL = "tagk"; - private static short TAG_NAME_WIDTH = 3; - private static final String TAG_VALUE_QUAL = "tagv"; - private static short TAG_VALUE_WIDTH = 3; - private static final int MIN_HISTOGRAM_BYTES = 1; - - /** The operation mode (role) of the TSD. */ - public enum OperationMode { - READWRITE, - READONLY, - WRITEONLY - } - - /** Client for the HBase cluster to use. */ - final HBaseClient client; - - /** The operation mode (role) of the TSD. */ - final OperationMode mode; - - /** Name of the table in which timeseries are stored. */ - final byte[] table; - /** Name of the table in which UID information is stored. */ - final byte[] uidtable; - /** Name of the table where tree data is stored. */ - final byte[] treetable; - /** Name of the table where meta data is stored. */ - final byte[] meta_table; - - /** Unique IDs for the metric names. */ - final UniqueId metrics; - /** Unique IDs for the tag names. */ - final UniqueId tag_names; - /** Unique IDs for the tag values. */ - final UniqueId tag_values; - - /** Configuration object for all TSDB components */ - final Config config; - - /** Timer used for various tasks such as idle timeouts or query timeouts */ - private final HashedWheelTimer timer; - - /** - * Row keys that need to be compacted. - * Whenever we write a new data point to a row, we add the row key to this - * set. Every once in a while, the compaction thread will go through old - * row keys and will read re-compact them. - */ - private final CompactionQueue compactionq; - - /** Authentication Plugin to use if configured */ - private Authentication authentication = null; - - /** Search indexer to use if configured */ - private SearchPlugin search = null; - - /** Optional Startup Plugin to use if configured */ - private StartupPlugin startup = null; - - /** Optional real time pulblisher plugin to use if configured */ - private RTPublisher rt_publisher = null; - - /** Optional plugin for handling meta data caching and updating */ - private MetaDataCache meta_cache = null; - - /** Plugin for dealing with data points that can't be stored */ - private StorageExceptionHandler storage_exception_handler = null; - - /** A filter plugin for allowing or blocking time series */ - private WriteableDataPointFilterPlugin ts_filter; - - /** A filter plugin for allowing or blocking UIDs */ - private UniqueIdFilterPlugin uid_filter; - - /** The rollup config object for storing and querying rollups */ - private final RollupConfig rollup_config; - - /** The default rollup interval. */ - private final RollupInterval default_interval; - - /** Name of the tag we use to determine aggregates */ - private final String agg_tag_key; - - /** Name of the tag we use use for raw data. */ - private final String raw_agg_tag_value; - - /** Whether or not to tag raw data with the raw value tag */ - private final boolean tag_raw_data; - - /** Whether or not to block writing of derived rollups/pre-ags */ - private final boolean rollups_block_derived; - - /** An optional histogram manger used when the TSD will be dealing with - * histograms and sketches. Instantiated ONLY if - * {@link #initializePlugins(boolean)} was called.*/ - private HistogramCodecManager histogram_manager; - - /** A list of query overrides for the scanners */ - private final QueryLimitOverride query_limits; - - /** Writes rejected by the filter */ - private final AtomicLong rejected_dps = new AtomicLong(); - private final AtomicLong rejected_aggregate_dps = new AtomicLong(); - - /** Datapoints Added */ - private static final AtomicLong datapoints_added = new AtomicLong(); - - /** - * Constructor - * @param client An initialized HBase client object - * @param config An initialized configuration object - * @since 2.1 - */ - public TSDB(final HBaseClient client, final Config config) { - this.config = config; - if (client == null) { - final org.hbase.async.Config async_config; - if (config.configLocation() != null && !config.configLocation().isEmpty()) { - try { - async_config = new org.hbase.async.Config(config.configLocation()); - } catch (final IOException e) { - throw new RuntimeException("Failed to read the config file: " + - config.configLocation(), e); - } - } else { - async_config = new org.hbase.async.Config(); - } - async_config.overrideConfig("hbase.zookeeper.znode.parent", - config.getString("tsd.storage.hbase.zk_basedir")); - async_config.overrideConfig("hbase.zookeeper.quorum", - config.getString("tsd.storage.hbase.zk_quorum")); - this.client = new HBaseClient(async_config); - } else { - this.client = client; - } - - String string_mode = config.getString("tsd.mode"); - if (Strings.isNullOrEmpty(string_mode)) { - mode = OperationMode.READWRITE; - } else if (string_mode.toLowerCase().equals("ro")) { - mode = OperationMode.READONLY; - } else if (string_mode.toLowerCase().equals("wo")) { - mode = OperationMode.WRITEONLY; - } else { - mode = OperationMode.READWRITE; - } - - // SALT AND UID WIDTHS - // Users really wanted this to be set via config instead of having to - // compile. Hopefully they know NOT to change these after writing data. - if (config.hasProperty("tsd.storage.uid.width.metric")) { - METRICS_WIDTH = config.getShort("tsd.storage.uid.width.metric"); - } - if (config.hasProperty("tsd.storage.uid.width.tagk")) { - TAG_NAME_WIDTH = config.getShort("tsd.storage.uid.width.tagk"); - } - if (config.hasProperty("tsd.storage.uid.width.tagv")) { - TAG_VALUE_WIDTH = config.getShort("tsd.storage.uid.width.tagv"); - } - if (config.hasProperty("tsd.storage.max_tags")) { - Const.setMaxNumTags(config.getShort("tsd.storage.max_tags")); - } - if (config.hasProperty("tsd.storage.salt.buckets")) { - Const.setSaltBuckets(config.getInt("tsd.storage.salt.buckets")); - } - if (config.hasProperty("tsd.storage.salt.width")) { - Const.setSaltWidth(config.getInt("tsd.storage.salt.width")); - } - - table = config.getString("tsd.storage.hbase.data_table").getBytes(CHARSET); - uidtable = config.getString("tsd.storage.hbase.uid_table").getBytes(CHARSET); - treetable = config.getString("tsd.storage.hbase.tree_table").getBytes(CHARSET); - meta_table = config.getString("tsd.storage.hbase.meta_table").getBytes(CHARSET); - - if (config.getBoolean("tsd.core.uid.random_metrics")) { - metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, true); - } else { - metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, false); - } - tag_names = new UniqueId(this, uidtable, TAG_NAME_QUAL, TAG_NAME_WIDTH, false); - tag_values = new UniqueId(this, uidtable, TAG_VALUE_QUAL, TAG_VALUE_WIDTH, false); - compactionq = new CompactionQueue(this); - - if (config.hasProperty("tsd.core.timezone")) { - DateTime.setDefaultTimezone(config.getString("tsd.core.timezone")); - } - - timer = Threads.newTimer("TSDB Timer"); - - if (config.getBoolean("tsd.rollups.enable")) { - String conf = config.getString("tsd.rollups.config"); - if (Strings.isNullOrEmpty(conf)) { - throw new IllegalArgumentException("Rollups were enabled but " - + "'tsd.rollups.config' is null or empty."); - } - if (conf.endsWith(".json")) { + private static final Logger LOG = LoggerFactory.getLogger(TSDB.class); + + static final byte[] FAMILY = {'t'}; + + /** + * Charset used to convert Strings to byte arrays and back. + */ + private static final Charset CHARSET = Charset.forName("ISO-8859-1"); + private static final String METRICS_QUAL = "metrics"; + private static short METRICS_WIDTH = 3; + private static final String TAG_NAME_QUAL = "tagk"; + private static short TAG_NAME_WIDTH = 3; + private static final String TAG_VALUE_QUAL = "tagv"; + private static short TAG_VALUE_WIDTH = 3; + private static final int MIN_HISTOGRAM_BYTES = 1; + + /** + * The operation mode (role) of the TSD. + */ + public enum OperationMode { + READWRITE, + READONLY, + WRITEONLY + } + + /** + * Client for the HBase cluster to use. + */ + final HBaseClient client; + + /** + * The operation mode (role) of the TSD. + */ + final OperationMode mode; + + /** + * Name of the table in which timeseries are stored. + */ + final byte[] table; + /** + * Name of the table in which UID information is stored. + */ + final byte[] uidtable; + /** + * Name of the table where tree data is stored. + */ + final byte[] treetable; + /** + * Name of the table where meta data is stored. + */ + final byte[] meta_table; + + /** + * Unique IDs for the metric names. + */ + final UniqueId metrics; + /** + * Unique IDs for the tag names. + */ + final UniqueId tag_names; + /** + * Unique IDs for the tag values. + */ + final UniqueId tag_values; + + /** + * Configuration object for all TSDB components + */ + final Config config; + + /** + * Timer used for various tasks such as idle timeouts or query timeouts + */ + private final HashedWheelTimer timer; + + /** + * Row keys that need to be compacted. + * Whenever we write a new data point to a row, we add the row key to this + * set. Every once in a while, the compaction thread will go through old + * row keys and will read re-compact them. + */ + private final CompactionQueue compactionq; + + /** + * Authentication Plugin to use if configured + */ + private Authentication authentication = null; + + /** + * Normalization Plugin for fixing up tags + */ + private NormalizePlugin normalize = null; + + /** + * Search indexer to use if configured + */ + private SearchPlugin search = null; + + /** + * Optional Startup Plugin to use if configured + */ + private StartupPlugin startup = null; + + /** + * Optional real time pulblisher plugin to use if configured + */ + private RTPublisher rt_publisher = null; + + /** + * Optional plugin for handling meta data caching and updating + */ + private MetaDataCache meta_cache = null; + + /** + * Plugin for dealing with data points that can't be stored + */ + private StorageExceptionHandler storage_exception_handler = null; + + /** + * A filter plugin for allowing or blocking time series + */ + private WriteableDataPointFilterPlugin ts_filter; + + /** + * A filter plugin for allowing or blocking UIDs + */ + private UniqueIdFilterPlugin uid_filter; + + /** + * The rollup config object for storing and querying rollups + */ + private final RollupConfig rollup_config; + + /** + * The default rollup interval. + */ + private final RollupInterval default_interval; + + /** + * Name of the tag we use to determine aggregates + */ + private final String agg_tag_key; + + /** + * Name of the tag we use use for raw data. + */ + private final String raw_agg_tag_value; + + /** + * Whether or not to tag raw data with the raw value tag + */ + private final boolean tag_raw_data; + + /** + * Whether or not to block writing of derived rollups/pre-ags + */ + private final boolean rollups_block_derived; + + /** + * An optional histogram manger used when the TSD will be dealing with + * histograms and sketches. Instantiated ONLY if + * {@link #initializePlugins(boolean)} was called. + */ + private HistogramCodecManager histogram_manager; + + /** + * A list of query overrides for the scanners + */ + private final QueryLimitOverride query_limits; + + /** + * Writes rejected by the filter + */ + private final AtomicLong rejected_dps = new AtomicLong(); + private final AtomicLong rejected_aggregate_dps = new AtomicLong(); + + /** + * Datapoints Added + */ + private static final AtomicLong datapoints_added = new AtomicLong(); + + /** + * Constructor + * + * @param client An initialized HBase client object + * @param config An initialized configuration object + * @since 2.1 + */ + public TSDB(final HBaseClient client, final Config config) { + this.config = config; + if (client == null) { + final org.hbase.async.Config async_config; + if (config.configLocation() != null && !config.configLocation().isEmpty()) { + try { + async_config = new org.hbase.async.Config(config.configLocation()); + } catch (final IOException e) { + throw new RuntimeException("Failed to read the config file: " + + config.configLocation(), e); + } + } else { + async_config = new org.hbase.async.Config(); + } + async_config.overrideConfig("hbase.zookeeper.znode.parent", + config.getString("tsd.storage.hbase.zk_basedir")); + async_config.overrideConfig("hbase.zookeeper.quorum", + config.getString("tsd.storage.hbase.zk_quorum")); + this.client = new HBaseClient(async_config); + } else { + this.client = client; + } + + String string_mode = config.getString("tsd.mode"); + if (Strings.isNullOrEmpty(string_mode)) { + mode = OperationMode.READWRITE; + } else if (string_mode.toLowerCase().equals("ro")) { + mode = OperationMode.READONLY; + } else if (string_mode.toLowerCase().equals("wo")) { + mode = OperationMode.WRITEONLY; + } else { + mode = OperationMode.READWRITE; + } + + // SALT AND UID WIDTHS + // Users really wanted this to be set via config instead of having to + // compile. Hopefully they know NOT to change these after writing data. + if (config.hasProperty("tsd.storage.uid.width.metric")) { + METRICS_WIDTH = config.getShort("tsd.storage.uid.width.metric"); + } + if (config.hasProperty("tsd.storage.uid.width.tagk")) { + TAG_NAME_WIDTH = config.getShort("tsd.storage.uid.width.tagk"); + } + if (config.hasProperty("tsd.storage.uid.width.tagv")) { + TAG_VALUE_WIDTH = config.getShort("tsd.storage.uid.width.tagv"); + } + if (config.hasProperty("tsd.storage.max_tags")) { + Const.setMaxNumTags(config.getShort("tsd.storage.max_tags")); + } + if (config.hasProperty("tsd.storage.salt.buckets")) { + Const.setSaltBuckets(config.getInt("tsd.storage.salt.buckets")); + } + if (config.hasProperty("tsd.storage.salt.width")) { + Const.setSaltWidth(config.getInt("tsd.storage.salt.width")); + } + + table = config.getString("tsd.storage.hbase.data_table").getBytes(CHARSET); + uidtable = config.getString("tsd.storage.hbase.uid_table").getBytes(CHARSET); + treetable = config.getString("tsd.storage.hbase.tree_table").getBytes(CHARSET); + meta_table = config.getString("tsd.storage.hbase.meta_table").getBytes(CHARSET); + + if (config.getBoolean("tsd.core.uid.random_metrics")) { + metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, true); + } else { + metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, false); + } + tag_names = new UniqueId(this, uidtable, TAG_NAME_QUAL, TAG_NAME_WIDTH, false); + tag_values = new UniqueId(this, uidtable, TAG_VALUE_QUAL, TAG_VALUE_WIDTH, false); + compactionq = new CompactionQueue(this); + + if (config.hasProperty("tsd.core.timezone")) { + DateTime.setDefaultTimezone(config.getString("tsd.core.timezone")); + } + + timer = Threads.newTimer("TSDB Timer"); + + if (config.getBoolean("tsd.rollups.enable")) { + String conf = config.getString("tsd.rollups.config"); + if (Strings.isNullOrEmpty(conf)) { + throw new IllegalArgumentException("Rollups were enabled but " + + "'tsd.rollups.config' is null or empty."); + } + if (conf.endsWith(".json")) { + try { + conf = Files.toString(new File(conf), Const.UTF8_CHARSET); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to open conf file: " + + conf, e); + } + } + rollup_config = JSON.parseToObject(conf, RollupConfig.class); + RollupInterval config_default = null; + for (final RollupInterval interval : rollup_config.getRollups().values()) { + if (interval.isDefaultInterval()) { + config_default = interval; + break; + } + } + if (config_default == null) { + throw new IllegalArgumentException("None of the rollup intervals were " + + "marked as the \"default\"."); + } + default_interval = config_default; + tag_raw_data = config.getBoolean("tsd.rollups.tag_raw"); + agg_tag_key = config.getString("tsd.rollups.agg_tag_key"); + raw_agg_tag_value = config.getString("tsd.rollups.raw_agg_tag_value"); + rollups_block_derived = config.getBoolean("tsd.rollups.block_derived"); + } else { + rollup_config = null; + default_interval = null; + tag_raw_data = false; + agg_tag_key = null; + raw_agg_tag_value = null; + rollups_block_derived = false; + } + + QueryStats.setEnableDuplicates( + config.getBoolean("tsd.query.allow_simultaneous_duplicates")); + + if (config.getBoolean("tsd.core.preload_uid_cache")) { + final ByteMap uid_cache_map = new ByteMap(); + uid_cache_map.put(METRICS_QUAL.getBytes(CHARSET), metrics); + uid_cache_map.put(TAG_NAME_QUAL.getBytes(CHARSET), tag_names); + uid_cache_map.put(TAG_VALUE_QUAL.getBytes(CHARSET), tag_values); + UniqueId.preloadUidCache(this, uid_cache_map); + } + + if (config.getString("tsd.core.tag.allow_specialchars") != null) { + Tags.setAllowSpecialChars(config.getString("tsd.core.tag.allow_specialchars")); + } + + query_limits = new QueryLimitOverride(this); + + // load up the functions that require the TSDB object + ExpressionFactory.addTSDBFunctions(this); + + // set any extra tags from the config for stats + StatsCollector.setGlobalTags(config); + + LOG.debug(config.dumpConfiguration()); + } + + /** + * Constructor + * + * @param config An initialized configuration object + * @since 2.0 + */ + public TSDB(final Config config) { + this(null, config); + } + + /** + * @return The data point column family name + */ + public static byte[] FAMILY() { + return FAMILY; + } + + /** + * Called by initializePlugins, also used to load startup plugins. + * + * @since 2.3 + */ + public static void loadPluginPath(final String plugin_path) { + if (plugin_path != null && !plugin_path.isEmpty()) { + try { + PluginLoader.loadJARs(plugin_path); + } catch (Exception e) { + LOG.error("Error loading plugins from plugin path: " + plugin_path, e); + throw new RuntimeException("Error loading plugins from plugin path: " + + plugin_path, e); + } + } + } + + /** + * Should be called immediately after construction to initialize plugins and + * objects that rely on such. It also moves most of the potential exception + * throwing code out of the constructor so TSDMain can shutdown clients and + * such properly. + * + * @param init_rpcs Whether or not to initialize RPC plugins as well + * @throws RuntimeException if the plugin path could not be processed + * @throws IllegalArgumentException if a plugin could not be initialized + * @since 2.0 + */ + public void initializePlugins(final boolean init_rpcs) { + final String plugin_path = config.getString("tsd.core.plugin_path"); + loadPluginPath(plugin_path); + try { - conf = Files.toString(new File(conf), Const.UTF8_CHARSET); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to open conf file: " - + conf, e); - } - } - rollup_config = JSON.parseToObject(conf, RollupConfig.class); - RollupInterval config_default = null; - for (final RollupInterval interval: rollup_config.getRollups().values()) { - if (interval.isDefaultInterval()) { - config_default = interval; - break; - } - } - if (config_default == null) { - throw new IllegalArgumentException("None of the rollup intervals were " - + "marked as the \"default\"."); - } - default_interval = config_default; - tag_raw_data = config.getBoolean("tsd.rollups.tag_raw"); - agg_tag_key = config.getString("tsd.rollups.agg_tag_key"); - raw_agg_tag_value = config.getString("tsd.rollups.raw_agg_tag_value"); - rollups_block_derived = config.getBoolean("tsd.rollups.block_derived"); - } else { - rollup_config = null; - default_interval = null; - tag_raw_data = false; - agg_tag_key = null; - raw_agg_tag_value = null; - rollups_block_derived = false; - } - - QueryStats.setEnableDuplicates( - config.getBoolean("tsd.query.allow_simultaneous_duplicates")); - - if (config.getBoolean("tsd.core.preload_uid_cache")) { - final ByteMap uid_cache_map = new ByteMap(); - uid_cache_map.put(METRICS_QUAL.getBytes(CHARSET), metrics); - uid_cache_map.put(TAG_NAME_QUAL.getBytes(CHARSET), tag_names); - uid_cache_map.put(TAG_VALUE_QUAL.getBytes(CHARSET), tag_values); - UniqueId.preloadUidCache(this, uid_cache_map); - } - - if (config.getString("tsd.core.tag.allow_specialchars") != null) { - Tags.setAllowSpecialChars(config.getString("tsd.core.tag.allow_specialchars")); - } - - query_limits = new QueryLimitOverride(this); - - // load up the functions that require the TSDB object - ExpressionFactory.addTSDBFunctions(this); - - // set any extra tags from the config for stats - StatsCollector.setGlobalTags(config); - - LOG.debug(config.dumpConfiguration()); - } - - /** - * Constructor - * @param config An initialized configuration object - * @since 2.0 - */ - public TSDB(final Config config) { - this(null, config); - } - - /** @return The data point column family name */ - public static byte[] FAMILY() { - return FAMILY; - } - - /** - * Called by initializePlugins, also used to load startup plugins. - * @since 2.3 - */ - public static void loadPluginPath(final String plugin_path) { - if (plugin_path != null && !plugin_path.isEmpty()) { - try { - PluginLoader.loadJARs(plugin_path); - } catch (Exception e) { - LOG.error("Error loading plugins from plugin path: " + plugin_path, e); - throw new RuntimeException("Error loading plugins from plugin path: " + - plugin_path, e); - } - } - } - - /** - * Should be called immediately after construction to initialize plugins and - * objects that rely on such. It also moves most of the potential exception - * throwing code out of the constructor so TSDMain can shutdown clients and - * such properly. - * @param init_rpcs Whether or not to initialize RPC plugins as well - * @throws RuntimeException if the plugin path could not be processed - * @throws IllegalArgumentException if a plugin could not be initialized - * @since 2.0 - */ - public void initializePlugins(final boolean init_rpcs) { - final String plugin_path = config.getString("tsd.core.plugin_path"); - loadPluginPath(plugin_path); - - try { - TagVFilter.initializeFilterMap(this); - // @#$@%$%#$ing typed exceptions - } catch (SecurityException e) { - throw new RuntimeException("Failed to instantiate filters", e); - } catch (IllegalArgumentException e) { - throw new RuntimeException("Failed to instantiate filters", e); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Failed to instantiate filters", e); - } catch (NoSuchMethodException e) { - throw new RuntimeException("Failed to instantiate filters", e); - } catch (NoSuchFieldException e) { - throw new RuntimeException("Failed to instantiate filters", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Failed to instantiate filters", e); - } catch (InvocationTargetException e) { - throw new RuntimeException("Failed to instantiate filters", e); - } - - // load the authentication plugin if enabled - if (config.getBoolean("tsd.core.authentication.enable")) { - authentication = PluginLoader.loadSpecificPlugin( - config.getString("tsd.core.authentication.plugin"), Authentication.class); - if (authentication == null) { - throw new IllegalArgumentException("Unable to locate authentication " - + "plugin: " + config.getString("tsd.core.authentication.plugin")); - } - try { - authentication.initialize(this); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize authentication plugin", e); - } - } - - // load the search plugin if enabled - if (config.getBoolean("tsd.search.enable")) { - search = PluginLoader.loadSpecificPlugin( - config.getString("tsd.search.plugin"), SearchPlugin.class); - if (search == null) { - throw new IllegalArgumentException("Unable to locate search plugin: " + - config.getString("tsd.search.plugin")); - } - try { - search.initialize(this); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize search plugin", e); - } - LOG.info("Successfully initialized search plugin [" + - search.getClass().getCanonicalName() + "] version: " - + search.version()); - } else { - search = null; - } - - // load the real time publisher plugin if enabled - if (config.getBoolean("tsd.rtpublisher.enable")) { - rt_publisher = PluginLoader.loadSpecificPlugin( - config.getString("tsd.rtpublisher.plugin"), RTPublisher.class); - if (rt_publisher == null) { - throw new IllegalArgumentException( - "Unable to locate real time publisher plugin: " + - config.getString("tsd.rtpublisher.plugin")); - } - try { - rt_publisher.initialize(this); - } catch (Exception e) { - throw new RuntimeException( - "Failed to initialize real time publisher plugin", e); - } - LOG.info("Successfully initialized real time publisher plugin [" + - rt_publisher.getClass().getCanonicalName() + "] version: " - + rt_publisher.version()); - } else { - rt_publisher = null; - } - - // load the meta cache plugin if enabled - if (config.getBoolean("tsd.core.meta.cache.enable")) { - meta_cache = PluginLoader.loadSpecificPlugin( - config.getString("tsd.core.meta.cache.plugin"), MetaDataCache.class); - if (meta_cache == null) { - throw new IllegalArgumentException( - "Unable to locate meta cache plugin: " + - config.getString("tsd.core.meta.cache.plugin")); - } - try { - meta_cache.initialize(this); - } catch (Exception e) { - throw new RuntimeException( - "Failed to initialize meta cache plugin", e); - } - LOG.info("Successfully initialized meta cache plugin [" + - meta_cache.getClass().getCanonicalName() + "] version: " - + meta_cache.version()); - } - - // load the storage exception plugin if enabled - if (config.getBoolean("tsd.core.storage_exception_handler.enable")) { - storage_exception_handler = PluginLoader.loadSpecificPlugin( - config.getString("tsd.core.storage_exception_handler.plugin"), - StorageExceptionHandler.class); - if (storage_exception_handler == null) { - throw new IllegalArgumentException( - "Unable to locate storage exception handler plugin: " + - config.getString("tsd.core.storage_exception_handler.plugin")); - } - try { - storage_exception_handler.initialize(this); - } catch (Exception e) { - throw new RuntimeException( - "Failed to initialize storage exception handler plugin", e); - } - LOG.info("Successfully initialized storage exception handler plugin [" + - storage_exception_handler.getClass().getCanonicalName() + "] version: " - + storage_exception_handler.version()); - } - - // Writeable Data Point Filter - if (config.getBoolean("tsd.timeseriesfilter.enable")) { - ts_filter = PluginLoader.loadSpecificPlugin( - config.getString("tsd.timeseriesfilter.plugin"), - WriteableDataPointFilterPlugin.class); - if (ts_filter == null) { - throw new IllegalArgumentException( - "Unable to locate time series filter plugin plugin: " + - config.getString("tsd.timeseriesfilter.plugin")); - } - try { - ts_filter.initialize(this); - } catch (Exception e) { - throw new RuntimeException( - "Failed to initialize time series filter plugin", e); - } - LOG.info("Successfully initialized time series filter plugin [" + - ts_filter.getClass().getCanonicalName() + "] version: " - + ts_filter.version()); - } - - // UID Filter - if (config.getBoolean("tsd.uidfilter.enable")) { - uid_filter = PluginLoader.loadSpecificPlugin( - config.getString("tsd.uidfilter.plugin"), - UniqueIdFilterPlugin.class); - if (uid_filter == null) { - throw new IllegalArgumentException( - "Unable to locate UID filter plugin plugin: " + - config.getString("tsd.uidfilter.plugin")); - } - try { - uid_filter.initialize(this); - } catch (Exception e) { - throw new RuntimeException( - "Failed to initialize UID filter plugin", e); - } - LOG.info("Successfully initialized UID filter plugin [" + - uid_filter.getClass().getCanonicalName() + "] version: " - + uid_filter.version()); - } - - // finally load the histo manager after plugins have been loaded. - if (config.hasProperty("tsd.core.histograms.config")) { - histogram_manager = new HistogramCodecManager(this); - } else { - histogram_manager = null; - } - } - - /** - * Returns the configured Authentication Plugin - * @return The Authentication Plugin - * @since 2.4 - */ - public final Authentication getAuth() { - return this.authentication; - } - - /** - * Returns the configured HBase client - * @return The HBase client - * @since 2.0 - */ - public final HBaseClient getClient() { - return this.client; - } - - /** - * Sets the startup plugin so that it can be shutdown properly. - * Note that this method will not initialize or call any other methods - * belonging to the plugin's implementation. - * @param plugin The startup plugin that was used. - * @since 2.3 - */ - public final void setStartupPlugin(final StartupPlugin plugin) { - startup = plugin; - } - - /** - * Getter that returns the startup plugin object. - * @return The StartupPlugin object or null if the plugin was not set. - * @since 2.3 - */ - public final StartupPlugin getStartupPlugin() { - return startup; - } - - /** - * Getter that returns the configuration object - * @return The configuration object - * @since 2.0 - */ - public final Config getConfig() { - return this.config; - } - - /** - * Returns the storage exception handler. May be null if not enabled - * @return The storage exception handler - * @since 2.2 - */ - public final StorageExceptionHandler getStorageExceptionHandler() { - return storage_exception_handler; - } - - /** - * @return the TS filter object, may be null - * @since 2.3 - */ - public WriteableDataPointFilterPlugin getTSfilter() { - return ts_filter; - } - - /** - * @return The UID filter object, may be null. - * @since 2.3 - */ - public UniqueIdFilterPlugin getUidFilter() { - return uid_filter; - } - - /** - * Attempts to find the name for a unique identifier given a type - * @param type The type of UID - * @param uid The UID to search for - * @return The name of the UID object if found - * @throws IllegalArgumentException if the type is not valid - * @throws NoSuchUniqueId if the UID was not found - * @since 2.0 - */ - public Deferred getUidName(final UniqueIdType type, final byte[] uid) { - if (uid == null) { - throw new IllegalArgumentException("Missing UID"); - } - - switch (type) { - case METRIC: - return this.metrics.getNameAsync(uid); - case TAGK: - return this.tag_names.getNameAsync(uid); - case TAGV: - return this.tag_values.getNameAsync(uid); - default: - throw new IllegalArgumentException("Unrecognized UID type"); - } - } - - /** - * Attempts to find the UID matching a given name - * @param type The type of UID - * @param name The name to search for - * @throws IllegalArgumentException if the type is not valid - * @throws NoSuchUniqueName if the name was not found - * @since 2.0 - */ - public byte[] getUID(final UniqueIdType type, final String name) { - try { - return getUIDAsync(type, name).join(); - } catch (NoSuchUniqueName e) { - throw e; - } catch (IllegalArgumentException e) { - throw e; - } catch (Exception e) { - LOG.error("Unexpected exception", e); - throw new RuntimeException(e); - } - } - - /** - * Attempts to find the UID matching a given name - * @param type The type of UID - * @param name The name to search for - * @throws IllegalArgumentException if the type is not valid - * @throws NoSuchUniqueName if the name was not found - * @since 2.1 - */ - public Deferred getUIDAsync(final UniqueIdType type, final String name) { - if (name == null || name.isEmpty()) { - throw new IllegalArgumentException("Missing UID name"); - } - switch (type) { - case METRIC: - return metrics.getIdAsync(name); - case TAGK: - return tag_names.getIdAsync(name); - case TAGV: - return tag_values.getIdAsync(name); - default: - throw new IllegalArgumentException("Unrecognized UID type"); - } - } - - /** - * Verifies that the data and UID tables exist in HBase and optionally the - * tree and meta data tables if the user has enabled meta tracking or tree - * building - * @return An ArrayList of objects to wait for - * @throws TableNotFoundException - * @since 2.0 - */ - public Deferred> checkNecessaryTablesExist() { - final ArrayList> checks = - new ArrayList>(2); - checks.add(client.ensureTableExists( - config.getString("tsd.storage.hbase.data_table"))); - checks.add(client.ensureTableExists( - config.getString("tsd.storage.hbase.uid_table"))); - if (config.enable_tree_processing()) { - checks.add(client.ensureTableExists( - config.getString("tsd.storage.hbase.tree_table"))); - } - if (config.enable_realtime_ts() || config.enable_realtime_uid() || - config.enable_tsuid_incrementing()) { - checks.add(client.ensureTableExists( - config.getString("tsd.storage.hbase.meta_table"))); - } - return Deferred.group(checks); - } - - /** Number of cache hits during lookups involving UIDs. */ - public long uidCacheHits() { - return (metrics.cacheHits() + tag_names.cacheHits() - + tag_values.cacheHits()); - } - - /** Number of cache misses during lookups involving UIDs. */ - public long uidCacheMisses() { - return (metrics.cacheMisses() + tag_names.cacheMisses() - + tag_values.cacheMisses()); - } - - /** Number of cache entries currently in RAM for lookups involving UIDs. */ - public long uidCacheSize() { - return (metrics.cacheSize() + tag_names.cacheSize() - + tag_values.cacheSize()); - } - - /** - * Collects the stats and metrics tracked by this instance. - * @param collector The collector to use. - */ - public void collectStats(final StatsCollector collector) { - final byte[][] kinds = { - METRICS_QUAL.getBytes(CHARSET), - TAG_NAME_QUAL.getBytes(CHARSET), - TAG_VALUE_QUAL.getBytes(CHARSET) - }; - try { - final Map used_uids = UniqueId.getUsedUIDs(this, kinds) - .joinUninterruptibly(); - - collectUidStats(metrics, collector); - if (config.getBoolean("tsd.core.uid.random_metrics")) { - collector.record("uid.ids-used", 0, "kind=" + METRICS_QUAL); - collector.record("uid.ids-available", 0, "kind=" + METRICS_QUAL); - } else { - collector.record("uid.ids-used", used_uids.get(METRICS_QUAL), - "kind=" + METRICS_QUAL); - collector.record("uid.ids-available", - (Internal.getMaxUnsignedValueOnBytes(metrics.width()) - - used_uids.get(METRICS_QUAL)), "kind=" + METRICS_QUAL); - } - - collectUidStats(tag_names, collector); - collector.record("uid.ids-used", used_uids.get(TAG_NAME_QUAL), - "kind=" + TAG_NAME_QUAL); - collector.record("uid.ids-available", - (Internal.getMaxUnsignedValueOnBytes(tag_names.width()) - - used_uids.get(TAG_NAME_QUAL)), - "kind=" + TAG_NAME_QUAL); - - collectUidStats(tag_values, collector); - collector.record("uid.ids-used", used_uids.get(TAG_VALUE_QUAL), - "kind=" + TAG_VALUE_QUAL); - collector.record("uid.ids-available", - (Internal.getMaxUnsignedValueOnBytes(tag_values.width()) - - used_uids.get(TAG_VALUE_QUAL)), "kind=" + TAG_VALUE_QUAL); - - } catch (Exception e) { - throw new RuntimeException("Shouldn't be here", e); - } - - collector.record("uid.filter.rejected", rejected_dps.get(), "kind=raw"); - collector.record("uid.filter.rejected", rejected_aggregate_dps.get(), - "kind=aggregate"); - - { - final Runtime runtime = Runtime.getRuntime(); - collector.record("jvm.ramfree", runtime.freeMemory()); - collector.record("jvm.ramused", runtime.totalMemory()); - } - - collector.addExtraTag("class", "IncomingDataPoints"); - try { - collector.record("hbase.latency", IncomingDataPoints.putlatency, "method=put"); - } finally { - collector.clearExtraTag("class"); - } - - collector.addExtraTag("class", "TSDB"); - try { - collector.record("datapoints.added", datapoints_added, "type=all"); - } finally { - collector.clearExtraTag("class"); - } - - collector.addExtraTag("class", "TsdbQuery"); - try { - collector.record("hbase.latency", TsdbQuery.scanlatency, "method=scan"); - } finally { - collector.clearExtraTag("class"); - } - final ClientStats stats = client.stats(); - collector.record("hbase.root_lookups", stats.rootLookups()); - collector.record("hbase.meta_lookups", - stats.uncontendedMetaLookups(), "type=uncontended"); - collector.record("hbase.meta_lookups", - stats.contendedMetaLookups(), "type=contended"); - collector.record("hbase.rpcs", - stats.atomicIncrements(), "type=increment"); - collector.record("hbase.rpcs", stats.deletes(), "type=delete"); - collector.record("hbase.rpcs", stats.gets(), "type=get"); - collector.record("hbase.rpcs", stats.puts(), "type=put"); - collector.record("hbase.rpcs", stats.appends(), "type=append"); - collector.record("hbase.rpcs", stats.rowLocks(), "type=rowLock"); - collector.record("hbase.rpcs", stats.scannersOpened(), "type=openScanner"); - collector.record("hbase.rpcs", stats.scans(), "type=scan"); - collector.record("hbase.rpcs.batched", stats.numBatchedRpcSent()); - collector.record("hbase.flushes", stats.flushes()); - collector.record("hbase.connections.created", stats.connectionsCreated()); - collector.record("hbase.connections.idle_closed", stats.idleConnectionsClosed()); - collector.record("hbase.nsre", stats.noSuchRegionExceptions()); - collector.record("hbase.nsre.rpcs_delayed", - stats.numRpcDelayedDueToNSRE()); - collector.record("hbase.region_clients.open", - stats.regionClients()); - collector.record("hbase.region_clients.idle_closed", - stats.idleConnectionsClosed()); - - compactionq.collectStats(collector); - // Collect Stats from Plugins - if (startup != null) { - try { - collector.addExtraTag("plugin", "startup"); - startup.collectStats(collector); - } finally { - collector.clearExtraTag("plugin"); - } - } - if (rt_publisher != null) { - try { - collector.addExtraTag("plugin", "publish"); - rt_publisher.collectStats(collector); - } finally { - collector.clearExtraTag("plugin"); - } - } - if (authentication != null) { - try { - collector.addExtraTag("plugin", "authentication"); - authentication.collectStats(collector); - } finally { - collector.clearExtraTag("plugin"); - } - } - if (search != null) { - try { - collector.addExtraTag("plugin", "search"); - search.collectStats(collector); - } finally { - collector.clearExtraTag("plugin"); - } - } - if (storage_exception_handler != null) { - try { - collector.addExtraTag("plugin", "storageExceptionHandler"); - storage_exception_handler.collectStats(collector); - } finally { - collector.clearExtraTag("plugin"); - } - } - if (ts_filter != null) { - try { - collector.addExtraTag("plugin", "timeseriesFilter"); - ts_filter.collectStats(collector); - } finally { - collector.clearExtraTag("plugin"); - } - } - if (uid_filter != null) { - try { - collector.addExtraTag("plugin", "uidFilter"); - uid_filter.collectStats(collector); - } finally { - collector.clearExtraTag("plugin"); - } - } - } - - /** Returns a latency histogram for Put RPCs used to store data points. */ - public Histogram getPutLatencyHistogram() { - return IncomingDataPoints.putlatency; - } - - /** Returns a latency histogram for Scan RPCs used to fetch data points. */ - public Histogram getScanLatencyHistogram() { - return TsdbQuery.scanlatency; - } - - /** - * Collects the stats for a {@link UniqueId}. - * @param uid The instance from which to collect stats. - * @param collector The collector to use. - */ - private static void collectUidStats(final UniqueId uid, - final StatsCollector collector) { - collector.record("uid.cache-hit", uid.cacheHits(), "kind=" + uid.kind()); - collector.record("uid.cache-miss", uid.cacheMisses(), "kind=" + uid.kind()); - collector.record("uid.cache-size", uid.cacheSize(), "kind=" + uid.kind()); - collector.record("uid.random-collisions", uid.randomIdCollisions(), - "kind=" + uid.kind()); - collector.record("uid.rejected-assignments", uid.rejectedAssignments(), - "kind=" + uid.kind()); - } - - /** @return the width, in bytes, of metric UIDs */ - public static short metrics_width() { - return METRICS_WIDTH; - } - - /** @return the width, in bytes, of tagk UIDs */ - public static short tagk_width() { - return TAG_NAME_WIDTH; - } - - /** @return the width, in bytes, of tagv UIDs */ - public static short tagv_width() { - return TAG_VALUE_WIDTH; - } - - /** - * Returns a new {@link Query} instance suitable for this TSDB. - */ - public Query newQuery() { - return new TsdbQuery(this); - } - - /** - * Returns a new {@link WritableDataPoints} instance suitable for this TSDB. - *

- * If you want to add a single data-point, consider using {@link #addPoint} - * instead. - */ - public WritableDataPoints newDataPoints() { - return new IncomingDataPoints(this); - } - - /** - * Returns a new {@link BatchedDataPoints} instance suitable for this TSDB. - * - * @param metric Every data point that gets appended must be associated to this metric. - * @param tags The associated tags for all data points being added. - * @return data structure which can have data points appended. - */ - public WritableDataPoints newBatch(String metric, Map tags) { - return new BatchedDataPoints(this, metric, tags); - } - - /** - * Adds a single integer value data point in the TSDB. - *

- * WARNING: The tags map may be modified by this method without a lock. Give - * the method a copy if you plan to use it elsewhere. - *

- * @param metric A non-empty string. - * @param timestamp The timestamp associated with the value. - * @param value The value of the data point. - * @param tags The tags on this series. This map must be non-empty. - * @return A deferred object that indicates the completion of the request. - * The {@link Object} has not special meaning and can be {@code null} (think - * of it as {@code Deferred}). But you probably want to attach at - * least an errback to this {@code Deferred} to handle failures. - * @throws IllegalArgumentException if the timestamp is less than or equal - * to the previous timestamp added or 0 for the first timestamp, or if the - * difference with the previous timestamp is too large. - * @throws IllegalArgumentException if the metric name is empty or contains - * illegal characters. - * @throws IllegalArgumentException if the tags list is empty or one of the - * elements contains illegal characters. - * @throws HBaseException (deferred) if there was a problem while persisting - * data. - */ - public Deferred addPoint(final String metric, - final long timestamp, - final long value, - final Map tags) { - final byte[] v; - if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) { - v = new byte[] { (byte) value }; - } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) { - v = Bytes.fromShort((short) value); - } else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) { - v = Bytes.fromInt((int) value); - } else { - v = Bytes.fromLong(value); - } - - final short flags = (short) (v.length - 1); // Just the length. - return addPointInternal(metric, timestamp, v, tags, flags); - } - - /** - * Adds a double precision floating-point value data point in the TSDB. - *

- * WARNING: The tags map may be modified by this method without a lock. Give - * the method a copy if you plan to use it elsewhere. - *

- * @param metric A non-empty string. - * @param timestamp The timestamp associated with the value. - * @param value The value of the data point. - * @param tags The tags on this series. This map must be non-empty. - * @return A deferred object that indicates the completion of the request. - * The {@link Object} has not special meaning and can be {@code null} (think - * of it as {@code Deferred}). But you probably want to attach at - * least an errback to this {@code Deferred} to handle failures. - * @throws IllegalArgumentException if the timestamp is less than or equal - * to the previous timestamp added or 0 for the first timestamp, or if the - * difference with the previous timestamp is too large. - * @throws IllegalArgumentException if the metric name is empty or contains - * illegal characters. - * @throws IllegalArgumentException if the value is NaN or infinite. - * @throws IllegalArgumentException if the tags list is empty or one of the - * elements contains illegal characters. - * @throws HBaseException (deferred) if there was a problem while persisting - * data. - * @since 1.2 - */ - public Deferred addPoint(final String metric, - final long timestamp, - final double value, - final Map tags) { - if (Double.isNaN(value) || Double.isInfinite(value)) { - throw new IllegalArgumentException("value is NaN or Infinite: " + value - + " for metric=" + metric - + " timestamp=" + timestamp); - } - final short flags = Const.FLAG_FLOAT | 0x7; // A float stored on 8 bytes. - return addPointInternal(metric, timestamp, - Bytes.fromLong(Double.doubleToRawLongBits(value)), - tags, flags); - } - - /** - * Adds a single floating-point value data point in the TSDB. - *

- * WARNING: The tags map may be modified by this method without a lock. Give - * the method a copy if you plan to use it elsewhere. - *

- * @param metric A non-empty string. - * @param timestamp The timestamp associated with the value. - * @param value The value of the data point. - * @param tags The tags on this series. This map must be non-empty. - * @return A deferred object that indicates the completion of the request. - * The {@link Object} has not special meaning and can be {@code null} (think - * of it as {@code Deferred}). But you probably want to attach at - * least an errback to this {@code Deferred} to handle failures. - * @throws IllegalArgumentException if the timestamp is less than or equal - * to the previous timestamp added or 0 for the first timestamp, or if the - * difference with the previous timestamp is too large. - * @throws IllegalArgumentException if the metric name is empty or contains - * illegal characters. - * @throws IllegalArgumentException if the value is NaN or infinite. - * @throws IllegalArgumentException if the tags list is empty or one of the - * elements contains illegal characters. - * @throws HBaseException (deferred) if there was a problem while persisting - * data. - */ - public Deferred addPoint(final String metric, - final long timestamp, - final float value, - final Map tags) { - if (Float.isNaN(value) || Float.isInfinite(value)) { - throw new IllegalArgumentException("value is NaN or Infinite: " + value - + " for metric=" + metric - + " timestamp=" + timestamp); - } - final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes. - return addPointInternal(metric, timestamp, - Bytes.fromInt(Float.floatToRawIntBits(value)), - tags, flags); - } - - /** - * Adds an encoded Histogram data point in the TSDB. - * @param metric A non-empty string. - * @param timestamp The timestamp associated with the value. - * @param raw_data The encoded data blob of the Histogram point. - * @param tags The tags on this series. This map must be non-empty. - * @return A deferred object that indicates the completion of the request. - * The {@link Object} has not special meaning and can be {@code null} (think - * of it as {@code Deferred}). But you probably want to attach at - * least an errback to this {@code Deferred} to handle failures. - * @throws IllegalArgumentException if the timestamp is less than or equal - * to the previous timestamp added or 0 for the first timestamp, or if the - * difference with the previous timestamp is too large. - * @throws IllegalArgumentException if the metric name is empty or contains - * illegal characters. - * @throws IllegalArgumentException if the tags list is empty or one of the - * elements contains illegal characters. - * @throws HBaseException (deferred) if there was a problem while persisting - * data. - */ - public Deferred addHistogramPoint(final String metric, - final long timestamp, - final byte[] raw_data, - final Map tags) { - if (raw_data == null || raw_data.length < MIN_HISTOGRAM_BYTES) { - return Deferred.fromError(new IllegalArgumentException( - "The histogram raw data is invalid: " + Bytes.pretty(raw_data))); - } - - checkTimestampAndTags(metric, timestamp, raw_data, tags, (short) 0); - final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); - - final byte[] qualifier = Internal.getQualifier(timestamp, - HistogramDataPoint.PREFIX); - - return storeIntoDB(metric, timestamp, raw_data, tags, (short) 0, row, qualifier); - } - - final Deferred addPointInternal(final String metric, - final long timestamp, - final byte[] value, - final Map tags, - final short flags) { - - checkTimestampAndTags(metric, timestamp, value, tags, flags); - final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); - - final byte[] qualifier = Internal.buildQualifier(timestamp, flags); - - return storeIntoDB(metric, timestamp, value, tags, flags, row, qualifier); - } - - private final Deferred storeIntoDB(final String metric, - final long timestamp, - final byte[] value, - final Map tags, - final short flags, - final byte[] row, - final byte[] qualifier) { - final long base_time; - - if ((timestamp & Const.SECOND_MASK) != 0) { - // drop the ms timestamp to seconds to calculate the base timestamp - base_time = ((timestamp / 1000) - - ((timestamp / 1000) % Const.MAX_TIMESPAN)); - } else { - base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); - } - - /** Callback executed for chaining filter calls to see if the value - * should be written or not. */ - final class WriteCB implements Callback, Boolean> { - @Override - public Deferred call(final Boolean allowed) throws Exception { - if (!allowed) { - rejected_dps.incrementAndGet(); - return Deferred.fromResult(null); - } - - Bytes.setInt(row, (int) base_time, metrics.width() + Const.SALT_WIDTH()); - RowKey.prefixKeyWithSalt(row); - - Deferred result = null; - if (!isHistogram(qualifier) && config.enable_appends()) { - if(config.use_otsdb_timestamp()) { - LOG.error("Cannot use Date Tiered Compaction with AppendPoints. Please turn off either of them."); - } - final AppendDataPoints kv = new AppendDataPoints(qualifier, value); - final AppendRequest point = new AppendRequest(table, row, FAMILY, - AppendDataPoints.APPEND_COLUMN_QUALIFIER, kv.getBytes()); - result = client.append(point); - } else if (!isHistogram(qualifier)) { - scheduleForCompaction(row, (int) base_time); - final PutRequest point = RequestBuilder.buildPutRequest(config, table, row, FAMILY, qualifier, value, timestamp); - result = client.put(point); + TagVFilter.initializeFilterMap(this); + // @#$@%$%#$ing typed exceptions + } catch (SecurityException e) { + throw new RuntimeException("Failed to instantiate filters", e); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Failed to instantiate filters", e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to instantiate filters", e); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Failed to instantiate filters", e); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Failed to instantiate filters", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Failed to instantiate filters", e); + } catch (InvocationTargetException e) { + throw new RuntimeException("Failed to instantiate filters", e); + } + + // load the authentication plugin if enabled + if (config.getBoolean("tsd.core.authentication.enable")) { + authentication = PluginLoader.loadSpecificPlugin( + config.getString("tsd.core.authentication.plugin"), Authentication.class); + if (authentication == null) { + throw new IllegalArgumentException("Unable to locate authentication " + + "plugin: " + config.getString("tsd.core.authentication.plugin")); + } + try { + authentication.initialize(this); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize authentication plugin", e); + } + } + + + // load the normalize plugin if enabled + if (config.getBoolean("tsd.normalize.enable")) { + normalize = PluginLoader.loadSpecificPlugin( + config.getString("tsd.normalize.plugin"), NormalizePlugin.class); + if (normalize == null) { + throw new IllegalArgumentException("Unable to locate normalize plugin: " + + config.getString("tsd.normalize.plugin")); + } + try { + normalize.initialize(this); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize normalize plugin", e); + } + LOG.info("Successfully initialized normalize plugin [" + + normalize.getClass().getCanonicalName() + "] version: " + + normalize.version()); } else { - scheduleForCompaction(row, (int) base_time); - final PutRequest histo_point = new PutRequest(table, row, FAMILY, qualifier, value); - result = client.put(histo_point); + normalize = null; } - // Count all added datapoints, not just those that came in through PUT rpc - // Will there be others? Well, something could call addPoint programatically right? - datapoints_added.incrementAndGet(); + // load the search plugin if enabled + if (config.getBoolean("tsd.search.enable")) { + search = PluginLoader.loadSpecificPlugin( + config.getString("tsd.search.plugin"), SearchPlugin.class); + if (search == null) { + throw new IllegalArgumentException("Unable to locate search plugin: " + + config.getString("tsd.search.plugin")); + } + try { + search.initialize(this); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize search plugin", e); + } + LOG.info("Successfully initialized search plugin [" + + search.getClass().getCanonicalName() + "] version: " + + search.version()); + } else { + search = null; + } - // TODO(tsuna): Add a callback to time the latency of HBase and store the - // timing in a moving Histogram (once we have a class for this). + // load the real time publisher plugin if enabled + if (config.getBoolean("tsd.rtpublisher.enable")) { + rt_publisher = PluginLoader.loadSpecificPlugin( + config.getString("tsd.rtpublisher.plugin"), RTPublisher.class); + if (rt_publisher == null) { + throw new IllegalArgumentException( + "Unable to locate real time publisher plugin: " + + config.getString("tsd.rtpublisher.plugin")); + } + try { + rt_publisher.initialize(this); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize real time publisher plugin", e); + } + LOG.info("Successfully initialized real time publisher plugin [" + + rt_publisher.getClass().getCanonicalName() + "] version: " + + rt_publisher.version()); + } else { + rt_publisher = null; + } - if (!config.enable_realtime_ts() && !config.enable_tsuid_incrementing() && - !config.enable_tsuid_tracking() && rt_publisher == null) { - return result; + // load the meta cache plugin if enabled + if (config.getBoolean("tsd.core.meta.cache.enable")) { + meta_cache = PluginLoader.loadSpecificPlugin( + config.getString("tsd.core.meta.cache.plugin"), MetaDataCache.class); + if (meta_cache == null) { + throw new IllegalArgumentException( + "Unable to locate meta cache plugin: " + + config.getString("tsd.core.meta.cache.plugin")); + } + try { + meta_cache.initialize(this); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize meta cache plugin", e); + } + LOG.info("Successfully initialized meta cache plugin [" + + meta_cache.getClass().getCanonicalName() + "] version: " + + meta_cache.version()); } - final byte[] tsuid = UniqueId.getTSUIDFromKey(row, METRICS_WIDTH, - Const.TIMESTAMP_BYTES); + // load the storage exception plugin if enabled + if (config.getBoolean("tsd.core.storage_exception_handler.enable")) { + storage_exception_handler = PluginLoader.loadSpecificPlugin( + config.getString("tsd.core.storage_exception_handler.plugin"), + StorageExceptionHandler.class); + if (storage_exception_handler == null) { + throw new IllegalArgumentException( + "Unable to locate storage exception handler plugin: " + + config.getString("tsd.core.storage_exception_handler.plugin")); + } + try { + storage_exception_handler.initialize(this); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize storage exception handler plugin", e); + } + LOG.info("Successfully initialized storage exception handler plugin [" + + storage_exception_handler.getClass().getCanonicalName() + "] version: " + + storage_exception_handler.version()); + } - // if the meta cache plugin is instantiated then tracking goes through it - if (meta_cache != null) { - meta_cache.increment(tsuid); + // Writeable Data Point Filter + if (config.getBoolean("tsd.timeseriesfilter.enable")) { + ts_filter = PluginLoader.loadSpecificPlugin( + config.getString("tsd.timeseriesfilter.plugin"), + WriteableDataPointFilterPlugin.class); + if (ts_filter == null) { + throw new IllegalArgumentException( + "Unable to locate time series filter plugin plugin: " + + config.getString("tsd.timeseriesfilter.plugin")); + } + try { + ts_filter.initialize(this); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize time series filter plugin", e); + } + LOG.info("Successfully initialized time series filter plugin [" + + ts_filter.getClass().getCanonicalName() + "] version: " + + ts_filter.version()); + } + + // UID Filter + if (config.getBoolean("tsd.uidfilter.enable")) { + uid_filter = PluginLoader.loadSpecificPlugin( + config.getString("tsd.uidfilter.plugin"), + UniqueIdFilterPlugin.class); + if (uid_filter == null) { + throw new IllegalArgumentException( + "Unable to locate UID filter plugin plugin: " + + config.getString("tsd.uidfilter.plugin")); + } + try { + uid_filter.initialize(this); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize UID filter plugin", e); + } + LOG.info("Successfully initialized UID filter plugin [" + + uid_filter.getClass().getCanonicalName() + "] version: " + + uid_filter.version()); + } + + // finally load the histo manager after plugins have been loaded. + if (config.hasProperty("tsd.core.histograms.config")) { + histogram_manager = new HistogramCodecManager(this); } else { - if (config.enable_tsuid_tracking()) { - if (config.enable_realtime_ts()) { - if (config.enable_tsuid_incrementing()) { - TSMeta.incrementAndGetCounter(TSDB.this, tsuid); - } else { - TSMeta.storeIfNecessary(TSDB.this, tsuid); - } + histogram_manager = null; + } + } + + /** + * Returns the configured Authentication Plugin + * + * @return The Authentication Plugin + * @since 2.4 + */ + public final Authentication getAuth() { + return this.authentication; + } + + /** + * Returns the configured HBase client + * + * @return The HBase client + * @since 2.0 + */ + public final HBaseClient getClient() { + return this.client; + } + + /** + * Sets the startup plugin so that it can be shutdown properly. + * Note that this method will not initialize or call any other methods + * belonging to the plugin's implementation. + * + * @param plugin The startup plugin that was used. + * @since 2.3 + */ + public final void setStartupPlugin(final StartupPlugin plugin) { + startup = plugin; + } + + /** + * Getter that returns the startup plugin object. + * + * @return The StartupPlugin object or null if the plugin was not set. + * @since 2.3 + */ + public final StartupPlugin getStartupPlugin() { + return startup; + } + + /** + * Getter that returns the configuration object + * + * @return The configuration object + * @since 2.0 + */ + public final Config getConfig() { + return this.config; + } + + /** + * Returns the storage exception handler. May be null if not enabled + * + * @return The storage exception handler + * @since 2.2 + */ + public final StorageExceptionHandler getStorageExceptionHandler() { + return storage_exception_handler; + } + + /** + * @return the TS filter object, may be null + * @since 2.3 + */ + public WriteableDataPointFilterPlugin getTSfilter() { + return ts_filter; + } + + /** + * @return The UID filter object, may be null. + * @since 2.3 + */ + public UniqueIdFilterPlugin getUidFilter() { + return uid_filter; + } + + /** + * Attempts to find the name for a unique identifier given a type + * + * @param type The type of UID + * @param uid The UID to search for + * @return The name of the UID object if found + * @throws IllegalArgumentException if the type is not valid + * @throws NoSuchUniqueId if the UID was not found + * @since 2.0 + */ + public Deferred getUidName(final UniqueIdType type, final byte[] uid) { + if (uid == null) { + throw new IllegalArgumentException("Missing UID"); + } + + switch (type) { + case METRIC: + return this.metrics.getNameAsync(uid); + case TAGK: + return this.tag_names.getNameAsync(uid); + case TAGV: + return this.tag_values.getNameAsync(uid); + default: + throw new IllegalArgumentException("Unrecognized UID type"); + } + } + + /** + * Attempts to find the UID matching a given name + * + * @param type The type of UID + * @param name The name to search for + * @throws IllegalArgumentException if the type is not valid + * @throws NoSuchUniqueName if the name was not found + * @since 2.0 + */ + public byte[] getUID(final UniqueIdType type, final String name) { + try { + return getUIDAsync(type, name).join(); + } catch (NoSuchUniqueName e) { + throw e; + } catch (IllegalArgumentException e) { + throw e; + } catch (Exception e) { + LOG.error("Unexpected exception", e); + throw new RuntimeException(e); + } + } + + /** + * Attempts to find the UID matching a given name + * + * @param type The type of UID + * @param name The name to search for + * @throws IllegalArgumentException if the type is not valid + * @throws NoSuchUniqueName if the name was not found + * @since 2.1 + */ + public Deferred getUIDAsync(final UniqueIdType type, final String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Missing UID name"); + } + switch (type) { + case METRIC: + return metrics.getIdAsync(name); + case TAGK: + return tag_names.getIdAsync(name); + case TAGV: + return tag_values.getIdAsync(name); + default: + throw new IllegalArgumentException("Unrecognized UID type"); + } + } + + /** + * Verifies that the data and UID tables exist in HBase and optionally the + * tree and meta data tables if the user has enabled meta tracking or tree + * building + * + * @return An ArrayList of objects to wait for + * @throws TableNotFoundException + * @since 2.0 + */ + public Deferred> checkNecessaryTablesExist() { + final ArrayList> checks = + new ArrayList>(2); + checks.add(client.ensureTableExists( + config.getString("tsd.storage.hbase.data_table"))); + checks.add(client.ensureTableExists( + config.getString("tsd.storage.hbase.uid_table"))); + if (config.enable_tree_processing()) { + checks.add(client.ensureTableExists( + config.getString("tsd.storage.hbase.tree_table"))); + } + if (config.enable_realtime_ts() || config.enable_realtime_uid() || + config.enable_tsuid_incrementing()) { + checks.add(client.ensureTableExists( + config.getString("tsd.storage.hbase.meta_table"))); + } + return Deferred.group(checks); + } + + /** + * Number of cache hits during lookups involving UIDs. + */ + public long uidCacheHits() { + return (metrics.cacheHits() + tag_names.cacheHits() + + tag_values.cacheHits()); + } + + /** + * Number of cache misses during lookups involving UIDs. + */ + public long uidCacheMisses() { + return (metrics.cacheMisses() + tag_names.cacheMisses() + + tag_values.cacheMisses()); + } + + /** + * Number of cache entries currently in RAM for lookups involving UIDs. + */ + public long uidCacheSize() { + return (metrics.cacheSize() + tag_names.cacheSize() + + tag_values.cacheSize()); + } + + /** + * Collects the stats and metrics tracked by this instance. + * + * @param collector The collector to use. + */ + public void collectStats(final StatsCollector collector) { + final byte[][] kinds = { + METRICS_QUAL.getBytes(CHARSET), + TAG_NAME_QUAL.getBytes(CHARSET), + TAG_VALUE_QUAL.getBytes(CHARSET) + }; + try { + final Map used_uids = UniqueId.getUsedUIDs(this, kinds) + .joinUninterruptibly(); + + collectUidStats(metrics, collector); + if (config.getBoolean("tsd.core.uid.random_metrics")) { + collector.record("uid.ids-used", 0, "kind=" + METRICS_QUAL); + collector.record("uid.ids-available", 0, "kind=" + METRICS_QUAL); } else { - final PutRequest tracking = new PutRequest(meta_table, tsuid, - TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1)); - client.put(tracking); + collector.record("uid.ids-used", used_uids.get(METRICS_QUAL), + "kind=" + METRICS_QUAL); + collector.record("uid.ids-available", + (Internal.getMaxUnsignedValueOnBytes(metrics.width()) - + used_uids.get(METRICS_QUAL)), "kind=" + METRICS_QUAL); } - } + + collectUidStats(tag_names, collector); + collector.record("uid.ids-used", used_uids.get(TAG_NAME_QUAL), + "kind=" + TAG_NAME_QUAL); + collector.record("uid.ids-available", + (Internal.getMaxUnsignedValueOnBytes(tag_names.width()) - + used_uids.get(TAG_NAME_QUAL)), + "kind=" + TAG_NAME_QUAL); + + collectUidStats(tag_values, collector); + collector.record("uid.ids-used", used_uids.get(TAG_VALUE_QUAL), + "kind=" + TAG_VALUE_QUAL); + collector.record("uid.ids-available", + (Internal.getMaxUnsignedValueOnBytes(tag_values.width()) - + used_uids.get(TAG_VALUE_QUAL)), "kind=" + TAG_VALUE_QUAL); + + } catch (Exception e) { + throw new RuntimeException("Shouldn't be here", e); + } + + collector.record("uid.filter.rejected", rejected_dps.get(), "kind=raw"); + collector.record("uid.filter.rejected", rejected_aggregate_dps.get(), + "kind=aggregate"); + + { + final Runtime runtime = Runtime.getRuntime(); + collector.record("jvm.ramfree", runtime.freeMemory()); + collector.record("jvm.ramused", runtime.totalMemory()); } + collector.addExtraTag("class", "IncomingDataPoints"); + try { + collector.record("hbase.latency", IncomingDataPoints.putlatency, "method=put"); + } finally { + collector.clearExtraTag("class"); + } + + collector.addExtraTag("class", "TSDB"); + try { + collector.record("datapoints.added", datapoints_added, "type=all"); + } finally { + collector.clearExtraTag("class"); + } + + collector.addExtraTag("class", "TsdbQuery"); + try { + collector.record("hbase.latency", TsdbQuery.scanlatency, "method=scan"); + } finally { + collector.clearExtraTag("class"); + } + final ClientStats stats = client.stats(); + collector.record("hbase.root_lookups", stats.rootLookups()); + collector.record("hbase.meta_lookups", + stats.uncontendedMetaLookups(), "type=uncontended"); + collector.record("hbase.meta_lookups", + stats.contendedMetaLookups(), "type=contended"); + collector.record("hbase.rpcs", + stats.atomicIncrements(), "type=increment"); + collector.record("hbase.rpcs", stats.deletes(), "type=delete"); + collector.record("hbase.rpcs", stats.gets(), "type=get"); + collector.record("hbase.rpcs", stats.puts(), "type=put"); + collector.record("hbase.rpcs", stats.appends(), "type=append"); + collector.record("hbase.rpcs", stats.rowLocks(), "type=rowLock"); + collector.record("hbase.rpcs", stats.scannersOpened(), "type=openScanner"); + collector.record("hbase.rpcs", stats.scans(), "type=scan"); + collector.record("hbase.rpcs.batched", stats.numBatchedRpcSent()); + collector.record("hbase.flushes", stats.flushes()); + collector.record("hbase.connections.created", stats.connectionsCreated()); + collector.record("hbase.connections.idle_closed", stats.idleConnectionsClosed()); + collector.record("hbase.nsre", stats.noSuchRegionExceptions()); + collector.record("hbase.nsre.rpcs_delayed", + stats.numRpcDelayedDueToNSRE()); + collector.record("hbase.region_clients.open", + stats.regionClients()); + collector.record("hbase.region_clients.idle_closed", + stats.idleConnectionsClosed()); + + compactionq.collectStats(collector); + // Collect Stats from Plugins + if (startup != null) { + try { + collector.addExtraTag("plugin", "startup"); + startup.collectStats(collector); + } finally { + collector.clearExtraTag("plugin"); + } + } if (rt_publisher != null) { - if (isHistogram(qualifier)) { - rt_publisher.publishHistogramPoint(metric, timestamp, value, tags, tsuid); - } else { - rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags); - } - } - return result; - } - @Override - public String toString() { - return "addPointInternal Write Callback"; - } - } - - if (ts_filter != null && ts_filter.filterDataPoints()) { - if (isHistogram(qualifier)) { - return ts_filter.allowHistogramPoint(metric, timestamp, value, tags) - .addCallbackDeferring(new WriteCB()); - } else { - return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags) - .addCallbackDeferring(new WriteCB()); - } - } - return Deferred.fromResult(true).addCallbackDeferring(new WriteCB()); - } - - private final void checkTimestampAndTags(final String metric, final long timestamp, - final byte[] value, - final Map tags, final short flags) { - // we only accept positive unix epoch timestamps in seconds or milliseconds - if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 && - timestamp > 9999999999999L)) { - throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") - + " timestamp=" + timestamp - + " when trying to add value=" + Arrays.toString(value) + '/' + flags - + " to metric=" + metric + ", tags=" + tags); - } - - IncomingDataPoints.checkMetricAndTags(metric, tags); - } - - /** - * Adds a rolled up and/or groupby/pre-agged data point to the proper table. - *

- * WARNING: The tags map may be modified by this method without a lock. Give - * the method a copy if you plan to use it elsewhere. - *

- * If {@code interval} is null then the value will be directed to the - * pre-agg table. - * If the {@code is_groupby} flag is set, then the aggregate tag, defined in - * "tsd.rollups.agg_tag", will be added or overwritten with the {@code aggregator} - * value in uppercase as the value. - * @param metric A non-empty string. - * @param timestamp The timestamp associated with the value. - * @param value The value of the data point. - * @param tags The tags on this series. This map must be non-empty. - * @param is_groupby Whether or not the value is a pre-aggregate - * @param interval The interval the data reflects (may be null) - * @param rollup_aggregator The aggregator used to generate the data - * @param groupby_aggregator = The aggregator used for pre-aggregated data. - * @return A deferred to optionally wait on to be sure the value was stored - * @throws IllegalArgumentException if the timestamp is less than or equal - * to the previous timestamp added or 0 for the first timestamp, or if the - * difference with the previous timestamp is too large. - * @throws IllegalArgumentException if the metric name is empty or contains - * illegal characters. - * @throws IllegalArgumentException if the tags list is empty or one of the - * elements contains illegal characters. - * @throws HBaseException (deferred) if there was a problem while persisting - * data. - * @since 2.4 - */ - public Deferred addAggregatePoint(final String metric, - final long timestamp, - final long value, - final Map tags, - final boolean is_groupby, - final String interval, - final String rollup_aggregator, - final String groupby_aggregator) { - final byte[] val = Internal.vleEncodeLong(value); - - final short flags = (short) (val.length - 1); // Just the length. - - return addAggregatePointInternal(metric, timestamp, - val, tags, flags, is_groupby, interval, rollup_aggregator, - groupby_aggregator); - } - - /** - * Adds a rolled up and/or groupby/pre-agged data point to the proper table. - *

- * WARNING: The tags map may be modified by this method without a lock. Give - * the method a copy if you plan to use it elsewhere. - *

- * If {@code interval} is null then the value will be directed to the - * pre-agg table. - * If the {@code is_groupby} flag is set, then the aggregate tag, defined in - * "tsd.rollups.agg_tag", will be added or overwritten with the {@code aggregator} - * value in uppercase as the value. - * @param metric A non-empty string. - * @param timestamp The timestamp associated with the value. - * @param value The value of the data point. - * @param tags The tags on this series. This map must be non-empty. - * @param is_groupby Whether or not the value is a pre-aggregate - * @param interval The interval the data reflects (may be null) - * @param rollup_aggregator The aggregator used to generate the data - * @param groupby_aggregator = The aggregator used for pre-aggregated data. - * @return A deferred to optionally wait on to be sure the value was stored - * @throws IllegalArgumentException if the timestamp is less than or equal - * to the previous timestamp added or 0 for the first timestamp, or if the - * difference with the previous timestamp is too large. - * @throws IllegalArgumentException if the metric name is empty or contains - * illegal characters. - * @throws IllegalArgumentException if the tags list is empty or one of the - * elements contains illegal characters. - * @throws HBaseException (deferred) if there was a problem while persisting - * data. - * @since 2.4 - */ - public Deferred addAggregatePoint(final String metric, - final long timestamp, - final float value, - final Map tags, - final boolean is_groupby, - final String interval, - final String rollup_aggregator, - final String groupby_aggregator) { - if (Float.isNaN(value) || Float.isInfinite(value)) { - throw new IllegalArgumentException("value is NaN or Infinite: " + value - + " for metric=" + metric - + " timestamp=" + timestamp); - } - - final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes. - - final byte[] val = Bytes.fromInt(Float.floatToRawIntBits(value)); - - return addAggregatePointInternal(metric, timestamp, - val, tags, flags, is_groupby, interval, rollup_aggregator, - groupby_aggregator); - } - - /** - * Adds a rolled up and/or groupby/pre-agged data point to the proper table. - * If {@code interval} is null then the value will be directed to the - * pre-agg table. - * If the {@code is_groupby} flag is set, then the aggregate tag, defined in - * "tsd.rollups.agg_tag", will be added or overwritten with the {@code aggregator} - * value in uppercase as the value. - * @param metric A non-empty string. - * @param timestamp The timestamp associated with the value. - * @param value The value of the data point. - * @param tags The tags on this series. This map must be non-empty. - * @param is_groupby Whether or not the value is a pre-aggregate - * @param interval The interval the data reflects (may be null) - * @param rollup_aggregator The aggregator used to generate the data - * @param groupby_aggregator = The aggregator used for pre-aggregated data. - * @return A deferred to optionally wait on to be sure the value was stored - * @throws IllegalArgumentException if the timestamp is less than or equal - * to the previous timestamp added or 0 for the first timestamp, or if the - * difference with the previous timestamp is too large. - * @throws IllegalArgumentException if the metric name is empty or contains - * illegal characters. - * @throws IllegalArgumentException if the tags list is empty or one of the - * elements contains illegal characters. - * @throws HBaseException (deferred) if there was a problem while persisting - * data. - * @since 2.4 - */ - public Deferred addAggregatePoint(final String metric, - final long timestamp, - final double value, - final Map tags, - final boolean is_groupby, - final String interval, - final String rollup_aggregator, - final String groupby_aggregator) { - if (Double.isNaN(value) || Double.isInfinite(value)) { - throw new IllegalArgumentException("value is NaN or Infinite: " + value - + " for metric=" + metric - + " timestamp=" + timestamp); - } - - final short flags = Const.FLAG_FLOAT | 0x7; // A float stored on 4 bytes. - - final byte[] val = Bytes.fromLong(Double.doubleToRawLongBits(value)); - - return addAggregatePointInternal(metric, timestamp, - val, tags, flags, is_groupby, interval, rollup_aggregator, - groupby_aggregator); - } - - Deferred addAggregatePointInternal(final String metric, - final long timestamp, - final byte[] value, - final Map tags, - final short flags, - final boolean is_groupby, - final String interval, - final String rollup_aggregator, - final String groupby_aggregator) { - - if (interval != null && !interval.isEmpty() && rollup_config == null) { - throw new IllegalArgumentException( - "No rollup or aggregations were configured"); - } - if (is_groupby && - (groupby_aggregator == null || groupby_aggregator.isEmpty())) { - throw new IllegalArgumentException("Cannot write a group by data point " - + "without specifying the aggregation function. Metric=" + metric - + " tags=" + tags); - } - - // we only accept positive unix epoch timestamps in seconds for rollups - // and allow milliseconds for pre-aggregates - if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0)) { - throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") - + " timestamp=" + timestamp - + " when trying to add value=" + Arrays.toString(value) + '/' + flags - + " to metric=" + metric + ", tags=" + tags); - } - - String agg_tag_value = tags.get(agg_tag_key); - if (agg_tag_value == null) { - if (!is_groupby) { - // it's a rollup on "raw" data. - if (tag_raw_data) { - tags.put(agg_tag_key, raw_agg_tag_value); - } - agg_tag_value = raw_agg_tag_value; - } else { - // pre-agged so use the aggregator as the tag - agg_tag_value = groupby_aggregator.toUpperCase(); - tags.put(agg_tag_key, agg_tag_value); - } - } else { - // sanity check - if (!agg_tag_value.equalsIgnoreCase(groupby_aggregator)) { - throw new IllegalArgumentException("Given tag value for " + agg_tag_key - + " of " + agg_tag_value + " did not match the group by " - + "aggregator of " + groupby_aggregator + " for " + metric - + " " + tags); - } - // force upper case - agg_tag_value = groupby_aggregator.toUpperCase(); - tags.put(agg_tag_key, agg_tag_value); - } - - if (is_groupby) { - try { - Aggregators.get(groupby_aggregator.toLowerCase()); - } catch (NoSuchElementException e) { - throw new IllegalArgumentException("Invalid group by aggregator " - + groupby_aggregator + " with metric " + metric + " " + tags); - } - if (rollups_block_derived && - // TODO - create a better list of aggs to block - (agg_tag_value.equals("AVG") || - agg_tag_value.equals("DEV"))) { - throw new IllegalArgumentException("Derived group by aggregations " - + "are not allowed " + groupby_aggregator + " with metric " - + metric + " " + tags); - } - } - - IncomingDataPoints.checkMetricAndTags(metric, tags); - - final RollupInterval rollup_interval = (interval == null || interval.isEmpty() - ? null : rollup_config.getRollupInterval(interval)); - final int aggregator_id = rollup_interval == null ? -1 : - rollup_config.getIdForAggregator(rollup_aggregator); - final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); - final String rollup_agg = rollup_aggregator != null ? - rollup_aggregator.toUpperCase() : null; - if (rollup_agg!= null && rollups_block_derived && - // TODO - create a better list of aggs to block - (rollup_agg.equals("AVG") || - rollup_agg.equals("DEV"))) { - throw new IllegalArgumentException("Derived rollup aggregations " - + "are not allowed " + rollup_agg + " with metric " - + metric + " " + tags); - } - final int base_time = interval == null || interval.isEmpty() ? - (int)(timestamp - (timestamp % Const.MAX_TIMESPAN)) - : RollupUtils.getRollupBasetime(timestamp, rollup_interval); - final byte[] qualifier = interval == null || interval.isEmpty() ? - Internal.buildQualifier(timestamp, flags) - : RollupUtils.buildRollupQualifier( - timestamp, base_time, flags, aggregator_id, rollup_interval); - - /** Callback executed for chaining filter calls to see if the value - * should be written or not. */ - final class WriteCB implements Callback, Boolean> { - @Override - public Deferred call(final Boolean allowed) throws Exception { - if (!allowed) { - rejected_aggregate_dps.incrementAndGet(); - return Deferred.fromResult(null); - } - Internal.setBaseTime(row, base_time); - // NOTE: Do not modify the row key after calculating and applying the salt - RowKey.prefixKeyWithSalt(row); - - Deferred result; - - final PutRequest point; - if (interval == null || interval.isEmpty()) { - if (!is_groupby) { - throw new IllegalArgumentException("Interval cannot be null " - + "for a non-group by point"); - } - point = new PutRequest(default_interval.getGroupbyTable(), row, - FAMILY, qualifier, value); + try { + collector.addExtraTag("plugin", "publish"); + rt_publisher.collectStats(collector); + } finally { + collector.clearExtraTag("plugin"); + } + } + if (authentication != null) { + try { + collector.addExtraTag("plugin", "authentication"); + authentication.collectStats(collector); + } finally { + collector.clearExtraTag("plugin"); + } + } + if (search != null) { + try { + collector.addExtraTag("plugin", "search"); + search.collectStats(collector); + } finally { + collector.clearExtraTag("plugin"); + } + } + if (storage_exception_handler != null) { + try { + collector.addExtraTag("plugin", "storageExceptionHandler"); + storage_exception_handler.collectStats(collector); + } finally { + collector.clearExtraTag("plugin"); + } + } + if (ts_filter != null) { + try { + collector.addExtraTag("plugin", "timeseriesFilter"); + ts_filter.collectStats(collector); + } finally { + collector.clearExtraTag("plugin"); + } + } + if (uid_filter != null) { + try { + collector.addExtraTag("plugin", "uidFilter"); + uid_filter.collectStats(collector); + } finally { + collector.clearExtraTag("plugin"); + } + } + } + + /** + * Returns a latency histogram for Put RPCs used to store data points. + */ + public Histogram getPutLatencyHistogram() { + return IncomingDataPoints.putlatency; + } + + /** + * Returns a latency histogram for Scan RPCs used to fetch data points. + */ + public Histogram getScanLatencyHistogram() { + return TsdbQuery.scanlatency; + } + + /** + * Collects the stats for a {@link UniqueId}. + * + * @param uid The instance from which to collect stats. + * @param collector The collector to use. + */ + private static void collectUidStats(final UniqueId uid, + final StatsCollector collector) { + collector.record("uid.cache-hit", uid.cacheHits(), "kind=" + uid.kind()); + collector.record("uid.cache-miss", uid.cacheMisses(), "kind=" + uid.kind()); + collector.record("uid.cache-size", uid.cacheSize(), "kind=" + uid.kind()); + collector.record("uid.random-collisions", uid.randomIdCollisions(), + "kind=" + uid.kind()); + collector.record("uid.rejected-assignments", uid.rejectedAssignments(), + "kind=" + uid.kind()); + } + + /** + * @return the width, in bytes, of metric UIDs + */ + public static short metrics_width() { + return METRICS_WIDTH; + } + + /** + * @return the width, in bytes, of tagk UIDs + */ + public static short tagk_width() { + return TAG_NAME_WIDTH; + } + + /** + * @return the width, in bytes, of tagv UIDs + */ + public static short tagv_width() { + return TAG_VALUE_WIDTH; + } + + /** + * Returns a new {@link Query} instance suitable for this TSDB. + */ + public Query newQuery() { + return new TsdbQuery(this); + } + + /** + * Returns a new {@link WritableDataPoints} instance suitable for this TSDB. + *

+ * If you want to add a single data-point, consider using {@link #addPoint} + * instead. + */ + public WritableDataPoints newDataPoints() { + return new IncomingDataPoints(this); + } + + /** + * Returns a new {@link BatchedDataPoints} instance suitable for this TSDB. + * + * @param metric Every data point that gets appended must be associated to this metric. + * @param tags The associated tags for all data points being added. + * @return data structure which can have data points appended. + */ + public WritableDataPoints newBatch(String metric, Map tags) { + return new BatchedDataPoints(this, metric, tags); + } + + /** + * Adds a single integer value data point in the TSDB. + *

+ * WARNING: The tags map may be modified by this method without a lock. Give + * the method a copy if you plan to use it elsewhere. + *

+ * + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} (think + * of it as {@code Deferred}). But you probably want to attach at + * least an errback to this {@code Deferred} to handle failures. + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + */ + public Deferred addPoint(final String metric, + final long timestamp, + final long value, + final Map tags) { + final byte[] v; + if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) { + v = new byte[]{(byte) value}; + } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) { + v = Bytes.fromShort((short) value); + } else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) { + v = Bytes.fromInt((int) value); + } else { + v = Bytes.fromLong(value); + } + + final short flags = (short) (v.length - 1); // Just the length. + return addPointInternal(metric, timestamp, v, tags, flags); + } + + /** + * Adds a double precision floating-point value data point in the TSDB. + *

+ * WARNING: The tags map may be modified by this method without a lock. Give + * the method a copy if you plan to use it elsewhere. + *

+ * + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} (think + * of it as {@code Deferred}). But you probably want to attach at + * least an errback to this {@code Deferred} to handle failures. + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the value is NaN or infinite. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + * @since 1.2 + */ + public Deferred addPoint(final String metric, + final long timestamp, + final double value, + final Map tags) { + if (Double.isNaN(value) || Double.isInfinite(value)) { + throw new IllegalArgumentException("value is NaN or Infinite: " + value + + " for metric=" + metric + + " timestamp=" + timestamp); + } + final short flags = Const.FLAG_FLOAT | 0x7; // A float stored on 8 bytes. + return addPointInternal(metric, timestamp, + Bytes.fromLong(Double.doubleToRawLongBits(value)), + tags, flags); + } + + /** + * Adds a single floating-point value data point in the TSDB. + *

+ * WARNING: The tags map may be modified by this method without a lock. Give + * the method a copy if you plan to use it elsewhere. + *

+ * + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} (think + * of it as {@code Deferred}). But you probably want to attach at + * least an errback to this {@code Deferred} to handle failures. + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the value is NaN or infinite. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + */ + public Deferred addPoint(final String metric, + final long timestamp, + final float value, + final Map tags) { + if (Float.isNaN(value) || Float.isInfinite(value)) { + throw new IllegalArgumentException("value is NaN or Infinite: " + value + + " for metric=" + metric + + " timestamp=" + timestamp); + } + final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes. + return addPointInternal(metric, timestamp, + Bytes.fromInt(Float.floatToRawIntBits(value)), + tags, flags); + } + + /** + * Adds an encoded Histogram data point in the TSDB. + * + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param raw_data The encoded data blob of the Histogram point. + * @param tags The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} (think + * of it as {@code Deferred}). But you probably want to attach at + * least an errback to this {@code Deferred} to handle failures. + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + */ + public Deferred addHistogramPoint(final String metric, + final long timestamp, + final byte[] raw_data, + final Map tags) { + if (raw_data == null || raw_data.length < MIN_HISTOGRAM_BYTES) { + return Deferred.fromError(new IllegalArgumentException( + "The histogram raw data is invalid: " + Bytes.pretty(raw_data))); + } + + checkTimestampAndTags(metric, timestamp, raw_data, tags, (short) 0); + final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); + + final byte[] qualifier = Internal.getQualifier(timestamp, + HistogramDataPoint.PREFIX); + + return storeIntoDB(metric, timestamp, raw_data, tags, (short) 0, row, qualifier); + } + + final Deferred addPointInternal(final String metric, + final long timestamp, + final byte[] value, + final Map tags, + final short flags) { + + checkTimestampAndTags(metric, timestamp, value, tags, flags); + final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); + + final byte[] qualifier = Internal.buildQualifier(timestamp, flags); + + return storeIntoDB(metric, timestamp, value, tags, flags, row, qualifier); + } + + private final Deferred storeIntoDB(final String metric, + final long timestamp, + final byte[] value, + final Map tags, + final short flags, + final byte[] row, + final byte[] qualifier) { + final long base_time; + + if ((timestamp & Const.SECOND_MASK) != 0) { + // drop the ms timestamp to seconds to calculate the base timestamp + base_time = ((timestamp / 1000) - + ((timestamp / 1000) % Const.MAX_TIMESPAN)); } else { - point = new PutRequest( - is_groupby ? rollup_interval.getGroupbyTable() : - rollup_interval.getTemporalTable(), - row, FAMILY, qualifier, value); - } - - // TODO: Add a callback to time the latency of HBase and store the - // timing in a moving Histogram (once we have a class for this). - result = client.put(point); - - // TODO - figure out what we want to do with the real time publisher and - // the meta tracking. - return result; - } - } - - if (ts_filter != null) { - return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags) - .addCallbackDeferring(new WriteCB()); - } - try { - return new WriteCB().call(true); - } catch (Exception e) { - return Deferred.fromError(e); - } - } - - /** - * Forces a flush of any un-committed in memory data including left over - * compactions. - *

- * For instance, any data point not persisted will be sent to HBase. - * @return A {@link Deferred} that will be called once all the un-committed - * data has been successfully and durably stored. The value of the deferred - * object return is meaningless and unspecified, and can be {@code null}. - * @throws HBaseException (deferred) if there was a problem sending - * un-committed data to HBase. Please refer to the {@link HBaseException} - * hierarchy to handle the possible failures. Some of them are easily - * recoverable by retrying, some are not. - */ - public Deferred flush() throws HBaseException { - final class HClientFlush implements Callback> { - public Object call(final ArrayList args) { - return client.flush(); - } - public String toString() { - return "flush HBase client"; - } - } - - return config.enable_compactions() && compactionq != null - ? compactionq.flush().addCallback(new HClientFlush()) - : client.flush(); - } - - /** - * Gracefully shuts down this TSD instance. - *

- * The method must call {@code shutdown()} on all plugins as well as flush the - * compaction queue. - * @return A {@link Deferred} that will be called once all the un-committed - * data has been successfully and durably stored, and all resources used by - * this instance have been released. The value of the deferred object - * return is meaningless and unspecified, and can be {@code null}. - * @throws HBaseException (deferred) if there was a problem sending - * un-committed data to HBase. Please refer to the {@link HBaseException} - * hierarchy to handle the possible failures. Some of them are easily - * recoverable by retrying, some are not. - */ - public Deferred shutdown() { - final ArrayList> deferreds = - new ArrayList>(); - - final class FinalShutdown implements Callback { - @Override - public Object call(Object result) throws Exception { - if (result instanceof Exception) { - LOG.error("A previous shutdown failed", (Exception)result); - } - final Set timeouts = timer.stop(); - // TODO - at some point we should clean these up. - if (timeouts.size() > 0) { - LOG.warn("There were " + timeouts.size() + " timer tasks queued"); - } - LOG.info("Completed shutting down the TSDB"); - return Deferred.fromResult(null); - } - } - - final class SEHShutdown implements Callback { - @Override - public Object call(Object result) throws Exception { - if (result instanceof Exception) { - LOG.error("Shutdown of the HBase client failed", (Exception)result); - } - LOG.info("Shutting down storage exception handler plugin: " + - storage_exception_handler.getClass().getCanonicalName()); - return storage_exception_handler.shutdown().addBoth(new FinalShutdown()); - } - @Override - public String toString() { - return "SEHShutdown"; - } - } - - final class HClientShutdown implements Callback, ArrayList> { - public Deferred call(final ArrayList args) { + base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); + } + + /** Callback executed for chaining filter calls to see if the value + * should be written or not. */ + final class WriteCB implements Callback, Boolean> { + @Override + public Deferred call(final Boolean allowed) throws Exception { + if (!allowed) { + rejected_dps.incrementAndGet(); + return Deferred.fromResult(null); + } + + Bytes.setInt(row, (int) base_time, metrics.width() + Const.SALT_WIDTH()); + RowKey.prefixKeyWithSalt(row); + + Deferred result = null; + if (!isHistogram(qualifier) && config.enable_appends()) { + if (config.use_otsdb_timestamp()) { + LOG.error("Cannot use Date Tiered Compaction with AppendPoints. Please turn off either of them."); + } + final AppendDataPoints kv = new AppendDataPoints(qualifier, value); + final AppendRequest point = new AppendRequest(table, row, FAMILY, + AppendDataPoints.APPEND_COLUMN_QUALIFIER, kv.getBytes()); + result = client.append(point); + } else if (!isHistogram(qualifier)) { + scheduleForCompaction(row, (int) base_time); + final PutRequest point = RequestBuilder.buildPutRequest(config, table, row, FAMILY, qualifier, value, timestamp); + result = client.put(point); + } else { + scheduleForCompaction(row, (int) base_time); + final PutRequest histo_point = new PutRequest(table, row, FAMILY, qualifier, value); + result = client.put(histo_point); + } + + // Count all added datapoints, not just those that came in through PUT rpc + // Will there be others? Well, something could call addPoint programatically right? + datapoints_added.incrementAndGet(); + + // TODO(tsuna): Add a callback to time the latency of HBase and store the + // timing in a moving Histogram (once we have a class for this). + + if (!config.enable_realtime_ts() && !config.enable_tsuid_incrementing() && + !config.enable_tsuid_tracking() && rt_publisher == null) { + return result; + } + + final byte[] tsuid = UniqueId.getTSUIDFromKey(row, METRICS_WIDTH, + Const.TIMESTAMP_BYTES); + + // if the meta cache plugin is instantiated then tracking goes through it + if (meta_cache != null) { + meta_cache.increment(tsuid); + } else { + if (config.enable_tsuid_tracking()) { + if (config.enable_realtime_ts()) { + if (config.enable_tsuid_incrementing()) { + TSMeta.incrementAndGetCounter(TSDB.this, tsuid); + } else { + TSMeta.storeIfNecessary(TSDB.this, tsuid); + } + } else { + final PutRequest tracking = new PutRequest(meta_table, tsuid, + TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1)); + client.put(tracking); + } + } + } + + if (rt_publisher != null) { + if (isHistogram(qualifier)) { + rt_publisher.publishHistogramPoint(metric, timestamp, value, tags, tsuid); + } else { + rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags); + } + } + return result; + } + + @Override + public String toString() { + return "addPointInternal Write Callback"; + } + } + + if (ts_filter != null && ts_filter.filterDataPoints()) { + if (isHistogram(qualifier)) { + return ts_filter.allowHistogramPoint(metric, timestamp, value, tags) + .addCallbackDeferring(new WriteCB()); + } else { + return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags) + .addCallbackDeferring(new WriteCB()); + } + } + return Deferred.fromResult(true).addCallbackDeferring(new WriteCB()); + } + + private final void checkTimestampAndTags(final String metric, final long timestamp, + final byte[] value, + final Map tags, final short flags) { + // we only accept positive unix epoch timestamps in seconds or milliseconds + if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 && + timestamp > 9999999999999L)) { + throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") + + " timestamp=" + timestamp + + " when trying to add value=" + Arrays.toString(value) + '/' + flags + + " to metric=" + metric + ", tags=" + tags); + } + + IncomingDataPoints.checkMetricAndTags(metric, tags); + } + + /** + * Adds a rolled up and/or groupby/pre-agged data point to the proper table. + *

+ * WARNING: The tags map may be modified by this method without a lock. Give + * the method a copy if you plan to use it elsewhere. + *

+ * If {@code interval} is null then the value will be directed to the + * pre-agg table. + * If the {@code is_groupby} flag is set, then the aggregate tag, defined in + * "tsd.rollups.agg_tag", will be added or overwritten with the {@code aggregator} + * value in uppercase as the value. + * + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @param is_groupby Whether or not the value is a pre-aggregate + * @param interval The interval the data reflects (may be null) + * @param rollup_aggregator The aggregator used to generate the data + * @param groupby_aggregator = The aggregator used for pre-aggregated data. + * @return A deferred to optionally wait on to be sure the value was stored + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + * @since 2.4 + */ + public Deferred addAggregatePoint(final String metric, + final long timestamp, + final long value, + final Map tags, + final boolean is_groupby, + final String interval, + final String rollup_aggregator, + final String groupby_aggregator) { + final byte[] val = Internal.vleEncodeLong(value); + + final short flags = (short) (val.length - 1); // Just the length. + + return addAggregatePointInternal(metric, timestamp, + val, tags, flags, is_groupby, interval, rollup_aggregator, + groupby_aggregator); + } + + /** + * Adds a rolled up and/or groupby/pre-agged data point to the proper table. + *

+ * WARNING: The tags map may be modified by this method without a lock. Give + * the method a copy if you plan to use it elsewhere. + *

+ * If {@code interval} is null then the value will be directed to the + * pre-agg table. + * If the {@code is_groupby} flag is set, then the aggregate tag, defined in + * "tsd.rollups.agg_tag", will be added or overwritten with the {@code aggregator} + * value in uppercase as the value. + * + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @param is_groupby Whether or not the value is a pre-aggregate + * @param interval The interval the data reflects (may be null) + * @param rollup_aggregator The aggregator used to generate the data + * @param groupby_aggregator = The aggregator used for pre-aggregated data. + * @return A deferred to optionally wait on to be sure the value was stored + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + * @since 2.4 + */ + public Deferred addAggregatePoint(final String metric, + final long timestamp, + final float value, + final Map tags, + final boolean is_groupby, + final String interval, + final String rollup_aggregator, + final String groupby_aggregator) { + if (Float.isNaN(value) || Float.isInfinite(value)) { + throw new IllegalArgumentException("value is NaN or Infinite: " + value + + " for metric=" + metric + + " timestamp=" + timestamp); + } + + final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes. + + final byte[] val = Bytes.fromInt(Float.floatToRawIntBits(value)); + + return addAggregatePointInternal(metric, timestamp, + val, tags, flags, is_groupby, interval, rollup_aggregator, + groupby_aggregator); + } + + /** + * Adds a rolled up and/or groupby/pre-agged data point to the proper table. + * If {@code interval} is null then the value will be directed to the + * pre-agg table. + * If the {@code is_groupby} flag is set, then the aggregate tag, defined in + * "tsd.rollups.agg_tag", will be added or overwritten with the {@code aggregator} + * value in uppercase as the value. + * + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @param is_groupby Whether or not the value is a pre-aggregate + * @param interval The interval the data reflects (may be null) + * @param rollup_aggregator The aggregator used to generate the data + * @param groupby_aggregator = The aggregator used for pre-aggregated data. + * @return A deferred to optionally wait on to be sure the value was stored + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + * @since 2.4 + */ + public Deferred addAggregatePoint(final String metric, + final long timestamp, + final double value, + final Map tags, + final boolean is_groupby, + final String interval, + final String rollup_aggregator, + final String groupby_aggregator) { + if (Double.isNaN(value) || Double.isInfinite(value)) { + throw new IllegalArgumentException("value is NaN or Infinite: " + value + + " for metric=" + metric + + " timestamp=" + timestamp); + } + + final short flags = Const.FLAG_FLOAT | 0x7; // A float stored on 4 bytes. + + final byte[] val = Bytes.fromLong(Double.doubleToRawLongBits(value)); + + return addAggregatePointInternal(metric, timestamp, + val, tags, flags, is_groupby, interval, rollup_aggregator, + groupby_aggregator); + } + + Deferred addAggregatePointInternal(final String metric, + final long timestamp, + final byte[] value, + final Map tags, + final short flags, + final boolean is_groupby, + final String interval, + final String rollup_aggregator, + final String groupby_aggregator) { + + if (interval != null && !interval.isEmpty() && rollup_config == null) { + throw new IllegalArgumentException( + "No rollup or aggregations were configured"); + } + if (is_groupby && + (groupby_aggregator == null || groupby_aggregator.isEmpty())) { + throw new IllegalArgumentException("Cannot write a group by data point " + + "without specifying the aggregation function. Metric=" + metric + + " tags=" + tags); + } + + // we only accept positive unix epoch timestamps in seconds for rollups + // and allow milliseconds for pre-aggregates + if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0)) { + throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") + + " timestamp=" + timestamp + + " when trying to add value=" + Arrays.toString(value) + '/' + flags + + " to metric=" + metric + ", tags=" + tags); + } + + String agg_tag_value = tags.get(agg_tag_key); + if (agg_tag_value == null) { + if (!is_groupby) { + // it's a rollup on "raw" data. + if (tag_raw_data) { + tags.put(agg_tag_key, raw_agg_tag_value); + } + agg_tag_value = raw_agg_tag_value; + } else { + // pre-agged so use the aggregator as the tag + agg_tag_value = groupby_aggregator.toUpperCase(); + tags.put(agg_tag_key, agg_tag_value); + } + } else { + // sanity check + if (!agg_tag_value.equalsIgnoreCase(groupby_aggregator)) { + throw new IllegalArgumentException("Given tag value for " + agg_tag_key + + " of " + agg_tag_value + " did not match the group by " + + "aggregator of " + groupby_aggregator + " for " + metric + + " " + tags); + } + // force upper case + agg_tag_value = groupby_aggregator.toUpperCase(); + tags.put(agg_tag_key, agg_tag_value); + } + + if (is_groupby) { + try { + Aggregators.get(groupby_aggregator.toLowerCase()); + } catch (NoSuchElementException e) { + throw new IllegalArgumentException("Invalid group by aggregator " + + groupby_aggregator + " with metric " + metric + " " + tags); + } + if (rollups_block_derived && + // TODO - create a better list of aggs to block + (agg_tag_value.equals("AVG") || + agg_tag_value.equals("DEV"))) { + throw new IllegalArgumentException("Derived group by aggregations " + + "are not allowed " + groupby_aggregator + " with metric " + + metric + " " + tags); + } + } + + IncomingDataPoints.checkMetricAndTags(metric, tags); + + final RollupInterval rollup_interval = (interval == null || interval.isEmpty() + ? null : rollup_config.getRollupInterval(interval)); + final int aggregator_id = rollup_interval == null ? -1 : + rollup_config.getIdForAggregator(rollup_aggregator); + final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); + final String rollup_agg = rollup_aggregator != null ? + rollup_aggregator.toUpperCase() : null; + if (rollup_agg != null && rollups_block_derived && + // TODO - create a better list of aggs to block + (rollup_agg.equals("AVG") || + rollup_agg.equals("DEV"))) { + throw new IllegalArgumentException("Derived rollup aggregations " + + "are not allowed " + rollup_agg + " with metric " + + metric + " " + tags); + } + final int base_time = interval == null || interval.isEmpty() ? + (int) (timestamp - (timestamp % Const.MAX_TIMESPAN)) + : RollupUtils.getRollupBasetime(timestamp, rollup_interval); + final byte[] qualifier = interval == null || interval.isEmpty() ? + Internal.buildQualifier(timestamp, flags) + : RollupUtils.buildRollupQualifier( + timestamp, base_time, flags, aggregator_id, rollup_interval); + + /** Callback executed for chaining filter calls to see if the value + * should be written or not. */ + final class WriteCB implements Callback, Boolean> { + @Override + public Deferred call(final Boolean allowed) throws Exception { + if (!allowed) { + rejected_aggregate_dps.incrementAndGet(); + return Deferred.fromResult(null); + } + Internal.setBaseTime(row, base_time); + // NOTE: Do not modify the row key after calculating and applying the salt + RowKey.prefixKeyWithSalt(row); + + Deferred result; + + final PutRequest point; + if (interval == null || interval.isEmpty()) { + if (!is_groupby) { + throw new IllegalArgumentException("Interval cannot be null " + + "for a non-group by point"); + } + point = new PutRequest(default_interval.getGroupbyTable(), row, + FAMILY, qualifier, value); + } else { + point = new PutRequest( + is_groupby ? rollup_interval.getGroupbyTable() : + rollup_interval.getTemporalTable(), + row, FAMILY, qualifier, value); + } + + // TODO: Add a callback to time the latency of HBase and store the + // timing in a moving Histogram (once we have a class for this). + result = client.put(point); + + // TODO - figure out what we want to do with the real time publisher and + // the meta tracking. + return result; + } + } + + if (ts_filter != null) { + return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags) + .addCallbackDeferring(new WriteCB()); + } + try { + return new WriteCB().call(true); + } catch (Exception e) { + return Deferred.fromError(e); + } + } + + /** + * Forces a flush of any un-committed in memory data including left over + * compactions. + *

+ * For instance, any data point not persisted will be sent to HBase. + * + * @return A {@link Deferred} that will be called once all the un-committed + * data has been successfully and durably stored. The value of the deferred + * object return is meaningless and unspecified, and can be {@code null}. + * @throws HBaseException (deferred) if there was a problem sending + * un-committed data to HBase. Please refer to the {@link HBaseException} + * hierarchy to handle the possible failures. Some of them are easily + * recoverable by retrying, some are not. + */ + public Deferred flush() throws HBaseException { + final class HClientFlush implements Callback> { + public Object call(final ArrayList args) { + return client.flush(); + } + + public String toString() { + return "flush HBase client"; + } + } + + return config.enable_compactions() && compactionq != null + ? compactionq.flush().addCallback(new HClientFlush()) + : client.flush(); + } + + /** + * Gracefully shuts down this TSD instance. + *

+ * The method must call {@code shutdown()} on all plugins as well as flush the + * compaction queue. + * + * @return A {@link Deferred} that will be called once all the un-committed + * data has been successfully and durably stored, and all resources used by + * this instance have been released. The value of the deferred object + * return is meaningless and unspecified, and can be {@code null}. + * @throws HBaseException (deferred) if there was a problem sending + * un-committed data to HBase. Please refer to the {@link HBaseException} + * hierarchy to handle the possible failures. Some of them are easily + * recoverable by retrying, some are not. + */ + public Deferred shutdown() { + final ArrayList> deferreds = + new ArrayList>(); + + final class FinalShutdown implements Callback { + @Override + public Object call(Object result) throws Exception { + if (result instanceof Exception) { + LOG.error("A previous shutdown failed", (Exception) result); + } + final Set timeouts = timer.stop(); + // TODO - at some point we should clean these up. + if (timeouts.size() > 0) { + LOG.warn("There were " + timeouts.size() + " timer tasks queued"); + } + LOG.info("Completed shutting down the TSDB"); + return Deferred.fromResult(null); + } + } + + final class SEHShutdown implements Callback { + @Override + public Object call(Object result) throws Exception { + if (result instanceof Exception) { + LOG.error("Shutdown of the HBase client failed", (Exception) result); + } + LOG.info("Shutting down storage exception handler plugin: " + + storage_exception_handler.getClass().getCanonicalName()); + return storage_exception_handler.shutdown().addBoth(new FinalShutdown()); + } + + @Override + public String toString() { + return "SEHShutdown"; + } + } + + final class HClientShutdown implements Callback, ArrayList> { + public Deferred call(final ArrayList args) { + if (storage_exception_handler != null) { + return client.shutdown().addBoth(new SEHShutdown()); + } + return client.shutdown().addBoth(new FinalShutdown()); + } + + public String toString() { + return "shutdown HBase client"; + } + } + + final class ShutdownErrback implements Callback { + public Object call(final Exception e) { + final Logger LOG = LoggerFactory.getLogger(ShutdownErrback.class); + if (e instanceof DeferredGroupException) { + final DeferredGroupException ge = (DeferredGroupException) e; + for (final Object r : ge.results()) { + if (r instanceof Exception) { + LOG.error("Failed to shutdown the TSD", (Exception) r); + } + } + } else { + LOG.error("Failed to shutdown the TSD", e); + } + return new HClientShutdown().call(null); + } + + public String toString() { + return "shutdown HBase client after error"; + } + } + + final class CompactCB implements Callback> { + public Object call(ArrayList compactions) throws Exception { + return null; + } + } + + if (config.enable_compactions()) { + LOG.info("Flushing compaction queue"); + deferreds.add(compactionq.flush().addCallback(new CompactCB())); + } + if (startup != null) { + LOG.info("Shutting down startup plugin: " + + startup.getClass().getCanonicalName()); + deferreds.add(startup.shutdown()); + } + if (authentication != null) { + LOG.info("Shutting down authentication plugin: " + + authentication.getClass().getCanonicalName()); + deferreds.add(authentication.shutdown()); + } + if (search != null) { + LOG.info("Shutting down search plugin: " + + search.getClass().getCanonicalName()); + deferreds.add(search.shutdown()); + } + if (rt_publisher != null) { + LOG.info("Shutting down RT plugin: " + + rt_publisher.getClass().getCanonicalName()); + deferreds.add(rt_publisher.shutdown()); + } + if (meta_cache != null) { + LOG.info("Shutting down meta cache plugin: " + + meta_cache.getClass().getCanonicalName()); + deferreds.add(meta_cache.shutdown()); + } if (storage_exception_handler != null) { - return client.shutdown().addBoth(new SEHShutdown()); - } - return client.shutdown().addBoth(new FinalShutdown()); - } - public String toString() { - return "shutdown HBase client"; - } - } - - final class ShutdownErrback implements Callback { - public Object call(final Exception e) { - final Logger LOG = LoggerFactory.getLogger(ShutdownErrback.class); - if (e instanceof DeferredGroupException) { - final DeferredGroupException ge = (DeferredGroupException) e; - for (final Object r : ge.results()) { - if (r instanceof Exception) { - LOG.error("Failed to shutdown the TSD", (Exception) r); + LOG.info("Shutting down storage exception handler plugin: " + + storage_exception_handler.getClass().getCanonicalName()); + deferreds.add(storage_exception_handler.shutdown()); + } + if (ts_filter != null) { + LOG.info("Shutting down time series filter plugin: " + + ts_filter.getClass().getCanonicalName()); + deferreds.add(ts_filter.shutdown()); + } + if (uid_filter != null) { + LOG.info("Shutting down UID filter plugin: " + + uid_filter.getClass().getCanonicalName()); + deferreds.add(uid_filter.shutdown()); + } + + // wait for plugins to shutdown before we close the client + return deferreds.size() > 0 + ? Deferred.group(deferreds).addCallbackDeferring(new HClientShutdown()) + .addErrback(new ShutdownErrback()) + : new HClientShutdown().call(null); + } + + /** + * Given a prefix search, returns a few matching metric names. + * + * @param search A prefix to search. + */ + public List suggestMetrics(final String search) { + return metrics.suggest(search); + } + + /** + * Given a prefix search, returns matching metric names. + * + * @param search A prefix to search. + * @param max_results Maximum number of results to return. + * @since 2.0 + */ + public List suggestMetrics(final String search, + final int max_results) { + return metrics.suggest(search, max_results); + } + + /** + * Given a prefix search, returns a few matching tag names. + * + * @param search A prefix to search. + */ + public List suggestTagNames(final String search) { + return tag_names.suggest(search); + } + + /** + * Given a prefix search, returns matching tagk names. + * + * @param search A prefix to search. + * @param max_results Maximum number of results to return. + * @since 2.0 + */ + public List suggestTagNames(final String search, + final int max_results) { + return tag_names.suggest(search, max_results); + } + + /** + * Given a prefix search, returns a few matching tag values. + * + * @param search A prefix to search. + */ + public List suggestTagValues(final String search) { + return tag_values.suggest(search); + } + + /** + * Given a prefix search, returns matching tag values. + * + * @param search A prefix to search. + * @param max_results Maximum number of results to return. + * @since 2.0 + */ + public List suggestTagValues(final String search, + final int max_results) { + return tag_values.suggest(search, max_results); + } + + /** + * Discards all in-memory caches. + * + * @since 1.1 + */ + public void dropCaches() { + metrics.dropCaches(); + tag_names.dropCaches(); + tag_values.dropCaches(); + } + + /** + * Attempts to assign a UID to a name for the given type + * Used by the UniqueIdRpc call to generate IDs for new metrics, tagks or + * tagvs. The name must pass validation and if it's already assigned a UID, + * this method will throw an error with the proper UID. Otherwise if it can + * create the UID, it will be returned + * + * @param type The type of uid to assign, metric, tagk or tagv + * @param name The name of the uid object + * @return A byte array with the UID if the assignment was successful + * @throws IllegalArgumentException if the name is invalid or it already + * exists + * @since 2.0 + */ + public byte[] assignUid(final String type, final String name) { + Tags.validateString(type, name); + if (type.toLowerCase().equals("metric")) { + try { + final byte[] uid = this.metrics.getId(name); + throw new IllegalArgumentException("Name already exists with UID: " + + UniqueId.uidToString(uid)); + } catch (NoSuchUniqueName nsue) { + return this.metrics.getOrCreateId(name); + } + } else if (type.toLowerCase().equals("tagk")) { + try { + final byte[] uid = this.tag_names.getId(name); + throw new IllegalArgumentException("Name already exists with UID: " + + UniqueId.uidToString(uid)); + } catch (NoSuchUniqueName nsue) { + return this.tag_names.getOrCreateId(name); + } + } else if (type.toLowerCase().equals("tagv")) { + try { + final byte[] uid = this.tag_values.getId(name); + throw new IllegalArgumentException("Name already exists with UID: " + + UniqueId.uidToString(uid)); + } catch (NoSuchUniqueName nsue) { + return this.tag_values.getOrCreateId(name); + } + } else { + LOG.warn("Unknown type name: " + type); + throw new IllegalArgumentException("Unknown type name"); + } + } + + /** + * Attempts to delete the given UID name mapping from the storage table as + * well as the local cache. + * + * @param type The type of UID to delete. Must be "metrics", "tagk" or "tagv" + * @param name The name of the UID to delete + * @return A deferred to wait on for completion, or an exception if thrown + * @throws IllegalArgumentException if the type is invalid + * @since 2.2 + */ + public Deferred deleteUidAsync(final String type, final String name) { + final UniqueIdType uid_type = UniqueId.stringToUniqueIdType(type); + switch (uid_type) { + case METRIC: + return metrics.deleteAsync(name); + case TAGK: + return tag_names.deleteAsync(name); + case TAGV: + return tag_values.deleteAsync(name); + default: + throw new IllegalArgumentException("Unrecognized UID type: " + uid_type); + } + } + + /** + * Attempts to rename a UID from existing name to the given name + * Used by the UniqueIdRpc call to rename name of existing metrics, tagks or + * tagvs. The name must pass validation. If the UID doesn't exist, the method + * will throw an error. Chained IllegalArgumentException is directly exposed + * to caller. If the rename was successful, this method returns. + * + * @param type The type of uid to rename, one of metric, tagk and tagv + * @param oldname The existing name of the uid object + * @param newname The new name to be used on the uid object + * @throws IllegalArgumentException if error happened + * @since 2.2 + */ + public void renameUid(final String type, final String oldname, + final String newname) { + Tags.validateString(type, oldname); + Tags.validateString(type, newname); + if (type.toLowerCase().equals("metric")) { + try { + this.metrics.getId(oldname); + this.metrics.rename(oldname, newname); + } catch (NoSuchUniqueName nsue) { + throw new IllegalArgumentException("Name(\"" + oldname + + "\") does not exist"); + } + } else if (type.toLowerCase().equals("tagk")) { + try { + this.tag_names.getId(oldname); + this.tag_names.rename(oldname, newname); + } catch (NoSuchUniqueName nsue) { + throw new IllegalArgumentException("Name(\"" + oldname + + "\") does not exist"); + } + } else if (type.toLowerCase().equals("tagv")) { + try { + this.tag_values.getId(oldname); + this.tag_values.rename(oldname, newname); + } catch (NoSuchUniqueName nsue) { + throw new IllegalArgumentException("Name(\"" + oldname + + "\") does not exist"); } - } } else { - LOG.error("Failed to shutdown the TSD", e); - } - return new HClientShutdown().call(null); - } - public String toString() { - return "shutdown HBase client after error"; - } - } - - final class CompactCB implements Callback> { - public Object call(ArrayList compactions) throws Exception { - return null; - } - } - - if (config.enable_compactions()) { - LOG.info("Flushing compaction queue"); - deferreds.add(compactionq.flush().addCallback(new CompactCB())); - } - if (startup != null) { - LOG.info("Shutting down startup plugin: " + - startup.getClass().getCanonicalName()); - deferreds.add(startup.shutdown()); - } - if (authentication != null) { - LOG.info("Shutting down authentication plugin: " + - authentication.getClass().getCanonicalName()); - deferreds.add(authentication.shutdown()); - } - if (search != null) { - LOG.info("Shutting down search plugin: " + - search.getClass().getCanonicalName()); - deferreds.add(search.shutdown()); - } - if (rt_publisher != null) { - LOG.info("Shutting down RT plugin: " + - rt_publisher.getClass().getCanonicalName()); - deferreds.add(rt_publisher.shutdown()); - } - if (meta_cache != null) { - LOG.info("Shutting down meta cache plugin: " + - meta_cache.getClass().getCanonicalName()); - deferreds.add(meta_cache.shutdown()); - } - if (storage_exception_handler != null) { - LOG.info("Shutting down storage exception handler plugin: " + - storage_exception_handler.getClass().getCanonicalName()); - deferreds.add(storage_exception_handler.shutdown()); - } - if (ts_filter != null) { - LOG.info("Shutting down time series filter plugin: " + - ts_filter.getClass().getCanonicalName()); - deferreds.add(ts_filter.shutdown()); - } - if (uid_filter != null) { - LOG.info("Shutting down UID filter plugin: " + - uid_filter.getClass().getCanonicalName()); - deferreds.add(uid_filter.shutdown()); - } - - // wait for plugins to shutdown before we close the client - return deferreds.size() > 0 - ? Deferred.group(deferreds).addCallbackDeferring(new HClientShutdown()) - .addErrback(new ShutdownErrback()) - : new HClientShutdown().call(null); - } - - /** - * Given a prefix search, returns a few matching metric names. - * @param search A prefix to search. - */ - public List suggestMetrics(final String search) { - return metrics.suggest(search); - } - - /** - * Given a prefix search, returns matching metric names. - * @param search A prefix to search. - * @param max_results Maximum number of results to return. - * @since 2.0 - */ - public List suggestMetrics(final String search, - final int max_results) { - return metrics.suggest(search, max_results); - } - - /** - * Given a prefix search, returns a few matching tag names. - * @param search A prefix to search. - */ - public List suggestTagNames(final String search) { - return tag_names.suggest(search); - } - - /** - * Given a prefix search, returns matching tagk names. - * @param search A prefix to search. - * @param max_results Maximum number of results to return. - * @since 2.0 - */ - public List suggestTagNames(final String search, - final int max_results) { - return tag_names.suggest(search, max_results); - } - - /** - * Given a prefix search, returns a few matching tag values. - * @param search A prefix to search. - */ - public List suggestTagValues(final String search) { - return tag_values.suggest(search); - } - - /** - * Given a prefix search, returns matching tag values. - * @param search A prefix to search. - * @param max_results Maximum number of results to return. - * @since 2.0 - */ - public List suggestTagValues(final String search, - final int max_results) { - return tag_values.suggest(search, max_results); - } - - /** - * Discards all in-memory caches. - * @since 1.1 - */ - public void dropCaches() { - metrics.dropCaches(); - tag_names.dropCaches(); - tag_values.dropCaches(); - } - - /** - * Attempts to assign a UID to a name for the given type - * Used by the UniqueIdRpc call to generate IDs for new metrics, tagks or - * tagvs. The name must pass validation and if it's already assigned a UID, - * this method will throw an error with the proper UID. Otherwise if it can - * create the UID, it will be returned - * @param type The type of uid to assign, metric, tagk or tagv - * @param name The name of the uid object - * @return A byte array with the UID if the assignment was successful - * @throws IllegalArgumentException if the name is invalid or it already - * exists - * @since 2.0 - */ - public byte[] assignUid(final String type, final String name) { - Tags.validateString(type, name); - if (type.toLowerCase().equals("metric")) { - try { - final byte[] uid = this.metrics.getId(name); - throw new IllegalArgumentException("Name already exists with UID: " + - UniqueId.uidToString(uid)); - } catch (NoSuchUniqueName nsue) { - return this.metrics.getOrCreateId(name); - } - } else if (type.toLowerCase().equals("tagk")) { - try { - final byte[] uid = this.tag_names.getId(name); - throw new IllegalArgumentException("Name already exists with UID: " + - UniqueId.uidToString(uid)); - } catch (NoSuchUniqueName nsue) { - return this.tag_names.getOrCreateId(name); - } - } else if (type.toLowerCase().equals("tagv")) { - try { - final byte[] uid = this.tag_values.getId(name); - throw new IllegalArgumentException("Name already exists with UID: " + - UniqueId.uidToString(uid)); - } catch (NoSuchUniqueName nsue) { - return this.tag_values.getOrCreateId(name); - } - } else { - LOG.warn("Unknown type name: " + type); - throw new IllegalArgumentException("Unknown type name"); - } - } - - /** - * Attempts to delete the given UID name mapping from the storage table as - * well as the local cache. - * @param type The type of UID to delete. Must be "metrics", "tagk" or "tagv" - * @param name The name of the UID to delete - * @return A deferred to wait on for completion, or an exception if thrown - * @throws IllegalArgumentException if the type is invalid - * @since 2.2 - */ - public Deferred deleteUidAsync(final String type, final String name) { - final UniqueIdType uid_type = UniqueId.stringToUniqueIdType(type); - switch (uid_type) { - case METRIC: - return metrics.deleteAsync(name); - case TAGK: - return tag_names.deleteAsync(name); - case TAGV: - return tag_values.deleteAsync(name); - default: - throw new IllegalArgumentException("Unrecognized UID type: " + uid_type); - } - } - - /** - * Attempts to rename a UID from existing name to the given name - * Used by the UniqueIdRpc call to rename name of existing metrics, tagks or - * tagvs. The name must pass validation. If the UID doesn't exist, the method - * will throw an error. Chained IllegalArgumentException is directly exposed - * to caller. If the rename was successful, this method returns. - * @param type The type of uid to rename, one of metric, tagk and tagv - * @param oldname The existing name of the uid object - * @param newname The new name to be used on the uid object - * @throws IllegalArgumentException if error happened - * @since 2.2 - */ - public void renameUid(final String type, final String oldname, - final String newname) { - Tags.validateString(type, oldname); - Tags.validateString(type, newname); - if (type.toLowerCase().equals("metric")) { - try { - this.metrics.getId(oldname); - this.metrics.rename(oldname, newname); - } catch (NoSuchUniqueName nsue) { - throw new IllegalArgumentException("Name(\"" + oldname + - "\") does not exist"); - } - } else if (type.toLowerCase().equals("tagk")) { - try { - this.tag_names.getId(oldname); - this.tag_names.rename(oldname, newname); - } catch (NoSuchUniqueName nsue) { - throw new IllegalArgumentException("Name(\"" + oldname + - "\") does not exist"); - } - } else if (type.toLowerCase().equals("tagv")) { - try { - this.tag_values.getId(oldname); - this.tag_values.rename(oldname, newname); - } catch (NoSuchUniqueName nsue) { - throw new IllegalArgumentException("Name(\"" + oldname + - "\") does not exist"); - } - } else { - LOG.warn("Unknown type name: " + type); - throw new IllegalArgumentException("Unknown type name"); - } - } - - /** @return the name of the UID table as a byte array for client requests */ - public byte[] uidTable() { - return this.uidtable; - } - - /** @return the name of the data table as a byte array for client requests */ - public byte[] dataTable() { - return this.table; - } - - /** @return the name of the tree table as a byte array for client requests */ - public byte[] treeTable() { - return this.treetable; - } - - /** @return the name of the meta table as a byte array for client requests */ - public byte[] metaTable() { - return this.meta_table; - } - - /** - * Index the given timeseries meta object via the configured search plugin - * @param meta The meta data object to index - * @since 2.0 - */ - public void indexTSMeta(final TSMeta meta) { - if (search != null) { - search.indexTSMeta(meta).addErrback(new PluginError()); - } - } - - /** - * Delete the timeseries meta object from the search index - * @param tsuid The TSUID to delete - * @since 2.0 - */ - public void deleteTSMeta(final String tsuid) { - if (search != null) { - search.deleteTSMeta(tsuid).addErrback(new PluginError()); - } - } - - /** - * Index the given UID meta object via the configured search plugin - * @param meta The meta data object to index - * @since 2.0 - */ - public void indexUIDMeta(final UIDMeta meta) { - if (search != null) { - search.indexUIDMeta(meta).addErrback(new PluginError()); - } - } - - /** - * Delete the UID meta object from the search index - * @param meta The UID meta object to delete - * @since 2.0 - */ - public void deleteUIDMeta(final UIDMeta meta) { - if (search != null) { - search.deleteUIDMeta(meta).addErrback(new PluginError()); - } - } - - /** - * Index the given Annotation object via the configured search plugin - * @param note The annotation object to index - * @since 2.0 - */ - public void indexAnnotation(final Annotation note) { - if (search != null) { - search.indexAnnotation(note).addErrback(new PluginError()); - } - if( rt_publisher != null ) { - rt_publisher.publishAnnotation(note); - } - } - - /** - * Delete the annotation object from the search index - * @param note The annotation object to delete - * @since 2.0 - */ - public void deleteAnnotation(final Annotation note) { - if (search != null) { - search.deleteAnnotation(note).addErrback(new PluginError()); - } - } - - /** - * Processes the TSMeta through all of the trees if configured to do so - * @param meta The meta data to process - * @since 2.0 - */ - public Deferred processTSMetaThroughTrees(final TSMeta meta) { - if (config.enable_tree_processing()) { - return TreeBuilder.processAllTrees(this, meta); - } - return Deferred.fromResult(false); - } - - /** - * Executes a search query using the search plugin - * @param query The query to execute - * @return A deferred object to wait on for the results to be fetched - * @throws IllegalStateException if the search plugin has not been enabled or - * configured - * @since 2.0 - */ - public Deferred executeSearch(final SearchQuery query) { - if (search == null) { - throw new IllegalStateException( - "Searching has not been enabled on this TSD"); - } - - return search.executeQuery(query); - } - - /** - * Simply logs plugin errors when they're thrown by attaching as an errorback. - * Without this, exceptions will just disappear (unless logged by the plugin) - * since we don't wait for a result. - */ - final class PluginError implements Callback { - @Override - public Object call(final Exception e) throws Exception { - LOG.error("Exception from Search plugin indexer", e); - return null; - } - } - - /** @return the rollup config object. May be null - * @since 2.4 */ - public RollupConfig getRollupConfig() { - return rollup_config; - } - - /** @return The default rollup interval config. May be null. - * @since 2.4 */ - public RollupInterval getDefaultInterval() { - return default_interval; - } - - /** - * Blocks while pre-fetching meta data from the data and uid tables - * so that performance improves, particularly with a large number of - * regions and region servers. - * @since 2.2 - */ - public void preFetchHBaseMeta() { - LOG.info("Pre-fetching meta data for all tables"); - final long start = System.currentTimeMillis(); - final ArrayList> deferreds = new ArrayList>(); - deferreds.add(client.prefetchMeta(table)); - deferreds.add(client.prefetchMeta(uidtable)); - - // TODO(cl) - meta, tree, etc - - try { - Deferred.group(deferreds).join(); - LOG.info("Fetched meta data for tables in " + - (System.currentTimeMillis() - start) + "ms"); - } catch (InterruptedException e) { - LOG.error("Interrupted", e); - Thread.currentThread().interrupt(); - return; - } catch (Exception e) { - LOG.error("Failed to prefetch meta for our tables", e); - } - } - - /** @return the timer used for various house keeping functions */ - public Timer getTimer() { - return timer; - } - - /** @return The aggregate tag key if set. May be null. - * @since 2.4 */ - public String getAggTagKey() { - return agg_tag_key; - } - - /** @return The raw tag value if set. May be null. - * @since 2.4 */ - public String getRawTagValue() { - return raw_agg_tag_value; - } - - /** @return The optional histogram manager registered to this TSD. - * @since 2.4 */ - public HistogramCodecManager histogramManager() { - return histogram_manager; - } - - /** @return The search plugin if configured and loaded. May be null. - * @since 2.4 */ - public SearchPlugin getSearchPlugin() { - return this.search; - } - - /** @return The byte limit class for queries */ - public QueryLimitOverride getQueryByteLimits() { - return query_limits; - } - - /** @return The mode of operation for this TSD. - * @since 2.4 */ - public OperationMode getMode() { - return mode; - } - - private final boolean isHistogram(final byte[] qualifier) { - return (qualifier.length & 0x1) == 1; - } - - // ------------------ // - // Compaction helpers // - // ------------------ // - - final KeyValue compact(final ArrayList row, - List annotations, - List histograms) { - return compactionq.compact(row, annotations, histograms); - } - - /** - * Schedules the given row key for later re-compaction. - * Once this row key has become "old enough", we'll read back all the data - * points in that row, write them back to HBase in a more compact fashion, - * and delete the individual data points. - * @param row The row key to re-compact later. Will not be modified. - * @param base_time The 32-bit unsigned UNIX timestamp. - */ - final void scheduleForCompaction(final byte[] row, final int base_time) { - if (config.enable_compactions()) { - compactionq.add(row); - } - } - - // ------------------------ // - // HBase operations helpers // - // ------------------------ // - - /** Gets the entire given row from the data table. */ - final Deferred> get(final byte[] key) { - return client.get(new GetRequest(table, key, FAMILY)); - } - - /** Puts the given value into the data table. */ - final Deferred put(final byte[] key, - final byte[] qualifier, - final byte[] value, - long timestamp) { - return client.put(RequestBuilder.buildPutRequest(config, table, key, FAMILY, qualifier, value, timestamp)); - } - - /** Deletes the given cells from the data table. */ - final Deferred delete(final byte[] key, final byte[][] qualifiers) { - return client.delete(new DeleteRequest(table, key, FAMILY, qualifiers)); - } + LOG.warn("Unknown type name: " + type); + throw new IllegalArgumentException("Unknown type name"); + } + } + + /** + * @return the name of the UID table as a byte array for client requests + */ + public byte[] uidTable() { + return this.uidtable; + } + + /** + * @return the name of the data table as a byte array for client requests + */ + public byte[] dataTable() { + return this.table; + } + + /** + * @return the name of the tree table as a byte array for client requests + */ + public byte[] treeTable() { + return this.treetable; + } + + /** + * @return the name of the meta table as a byte array for client requests + */ + public byte[] metaTable() { + return this.meta_table; + } + + /** + * Index the given timeseries meta object via the configured search plugin + * + * @param meta The meta data object to index + * @since 2.0 + */ + public void indexTSMeta(final TSMeta meta) { + if (search != null) { + search.indexTSMeta(meta).addErrback(new PluginError()); + } + } + + /** + * Delete the timeseries meta object from the search index + * + * @param tsuid The TSUID to delete + * @since 2.0 + */ + public void deleteTSMeta(final String tsuid) { + if (search != null) { + search.deleteTSMeta(tsuid).addErrback(new PluginError()); + } + } + + /** + * Index the given UID meta object via the configured search plugin + * + * @param meta The meta data object to index + * @since 2.0 + */ + public void indexUIDMeta(final UIDMeta meta) { + if (search != null) { + search.indexUIDMeta(meta).addErrback(new PluginError()); + } + } + + /** + * Delete the UID meta object from the search index + * + * @param meta The UID meta object to delete + * @since 2.0 + */ + public void deleteUIDMeta(final UIDMeta meta) { + if (search != null) { + search.deleteUIDMeta(meta).addErrback(new PluginError()); + } + } + + /** + * Index the given Annotation object via the configured search plugin + * + * @param note The annotation object to index + * @since 2.0 + */ + public void indexAnnotation(final Annotation note) { + if (search != null) { + search.indexAnnotation(note).addErrback(new PluginError()); + } + if (rt_publisher != null) { + rt_publisher.publishAnnotation(note); + } + } + + /** + * Delete the annotation object from the search index + * + * @param note The annotation object to delete + * @since 2.0 + */ + public void deleteAnnotation(final Annotation note) { + if (search != null) { + search.deleteAnnotation(note).addErrback(new PluginError()); + } + } + + /** + * Processes the TSMeta through all of the trees if configured to do so + * + * @param meta The meta data to process + * @since 2.0 + */ + public Deferred processTSMetaThroughTrees(final TSMeta meta) { + if (config.enable_tree_processing()) { + return TreeBuilder.processAllTrees(this, meta); + } + return Deferred.fromResult(false); + } + + /** + * Executes a search query using the search plugin + * + * @param query The query to execute + * @return A deferred object to wait on for the results to be fetched + * @throws IllegalStateException if the search plugin has not been enabled or + * configured + * @since 2.0 + */ + public Deferred executeSearch(final SearchQuery query) { + if (search == null) { + throw new IllegalStateException( + "Searching has not been enabled on this TSD"); + } + + return search.executeQuery(query); + } + + /** + * Simply logs plugin errors when they're thrown by attaching as an errorback. + * Without this, exceptions will just disappear (unless logged by the plugin) + * since we don't wait for a result. + */ + final class PluginError implements Callback { + @Override + public Object call(final Exception e) throws Exception { + LOG.error("Exception from Search plugin indexer", e); + return null; + } + } + + /** + * @return the rollup config object. May be null + * @since 2.4 + */ + public RollupConfig getRollupConfig() { + return rollup_config; + } + + /** + * @return The default rollup interval config. May be null. + * @since 2.4 + */ + public RollupInterval getDefaultInterval() { + return default_interval; + } + + /** + * Blocks while pre-fetching meta data from the data and uid tables + * so that performance improves, particularly with a large number of + * regions and region servers. + * + * @since 2.2 + */ + public void preFetchHBaseMeta() { + LOG.info("Pre-fetching meta data for all tables"); + final long start = System.currentTimeMillis(); + final ArrayList> deferreds = new ArrayList>(); + deferreds.add(client.prefetchMeta(table)); + deferreds.add(client.prefetchMeta(uidtable)); + + // TODO(cl) - meta, tree, etc + + try { + Deferred.group(deferreds).join(); + LOG.info("Fetched meta data for tables in " + + (System.currentTimeMillis() - start) + "ms"); + } catch (InterruptedException e) { + LOG.error("Interrupted", e); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + LOG.error("Failed to prefetch meta for our tables", e); + } + } + + /** + * @return the timer used for various house keeping functions + */ + public Timer getTimer() { + return timer; + } + + /** + * @return The aggregate tag key if set. May be null. + * @since 2.4 + */ + public String getAggTagKey() { + return agg_tag_key; + } + + /** + * @return The raw tag value if set. May be null. + * @since 2.4 + */ + public String getRawTagValue() { + return raw_agg_tag_value; + } + + /** + * @return The optional histogram manager registered to this TSD. + * @since 2.4 + */ + public HistogramCodecManager histogramManager() { + return histogram_manager; + } + + /** + * @return The search plugin if configured and loaded. May be null. + * @since 2.4 + */ + public SearchPlugin getSearchPlugin() { + return this.search; + } + + /** + * @return The optional normalize plugin + */ + public NormalizePlugin getNormalizePlugin() { + return this.normalize; + } + + /** + * @return The byte limit class for queries + */ + public QueryLimitOverride getQueryByteLimits() { + return query_limits; + } + + /** + * @return The mode of operation for this TSD. + * @since 2.4 + */ + public OperationMode getMode() { + return mode; + } + + private final boolean isHistogram(final byte[] qualifier) { + return (qualifier.length & 0x1) == 1; + } + + // ------------------ // + // Compaction helpers // + // ------------------ // + + final KeyValue compact(final ArrayList row, + List annotations, + List histograms) { + return compactionq.compact(row, annotations, histograms); + } + + /** + * Schedules the given row key for later re-compaction. + * Once this row key has become "old enough", we'll read back all the data + * points in that row, write them back to HBase in a more compact fashion, + * and delete the individual data points. + * + * @param row The row key to re-compact later. Will not be modified. + * @param base_time The 32-bit unsigned UNIX timestamp. + */ + final void scheduleForCompaction(final byte[] row, final int base_time) { + if (config.enable_compactions()) { + compactionq.add(row); + } + } + + // ------------------------ // + // HBase operations helpers // + // ------------------------ // + + /** + * Gets the entire given row from the data table. + */ + final Deferred> get(final byte[] key) { + return client.get(new GetRequest(table, key, FAMILY)); + } + + /** + * Puts the given value into the data table. + */ + final Deferred put(final byte[] key, + final byte[] qualifier, + final byte[] value, + long timestamp) { + return client.put(RequestBuilder.buildPutRequest(config, table, key, FAMILY, qualifier, value, timestamp)); + } + + /** + * Deletes the given cells from the data table. + */ + final Deferred delete(final byte[] key, final byte[][] qualifiers) { + return client.delete(new DeleteRequest(table, key, FAMILY, qualifiers)); + } } From e58dd54060e2ed40a4e3a323f0d8e494f37c6c6c Mon Sep 17 00:00:00 2001 From: gnydick Date: Wed, 23 Jan 2019 12:02:32 -0800 Subject: [PATCH 3/7] hopefully working --- configure.ac | 2 +- src/normalize/NormalizePlugin.java | 10 +++------- src/utils/PluginConfig.java | 19 ------------------- 3 files changed, 4 insertions(+), 27 deletions(-) delete mode 100644 src/utils/PluginConfig.java diff --git a/configure.ac b/configure.ac index 536f8e1115..6b28989b12 100644 --- a/configure.ac +++ b/configure.ac @@ -14,7 +14,7 @@ # along with this library. If not, see . # Semantic Versioning (see http://semver.org/). -AC_INIT([opentsdb], [2.4.0], [opentsdb@googlegroups.com]) +AC_INIT([opentsdb], [2.4.0-tnp], [opentsdb@googlegroups.com]) AC_CONFIG_AUX_DIR([build-aux]) AM_INIT_AUTOMAKE([foreign]) diff --git a/src/normalize/NormalizePlugin.java b/src/normalize/NormalizePlugin.java index 50091b1d1e..2c483db62f 100644 --- a/src/normalize/NormalizePlugin.java +++ b/src/normalize/NormalizePlugin.java @@ -3,20 +3,16 @@ import com.stumbleupon.async.Deferred; import net.opentsdb.core.TSDB; import net.opentsdb.stats.StatsCollector; -import net.opentsdb.utils.PluginConfig; public abstract class NormalizePlugin { - private PluginConfig pluginConfig; - public NormalizePlugin(PluginConfig pluginConfig){ - this.pluginConfig = pluginConfig; - }; - - private NormalizePlugin() {} public abstract void initialize(final TSDB tsdb); + public abstract Deferred shutdown(); + public abstract String version(); + public abstract void collectStats(final StatsCollector collector); diff --git a/src/utils/PluginConfig.java b/src/utils/PluginConfig.java deleted file mode 100644 index a7ad8d9230..0000000000 --- a/src/utils/PluginConfig.java +++ /dev/null @@ -1,19 +0,0 @@ -package net.opentsdb.utils; - -import java.util.Properties; - -public abstract class PluginConfig { - - protected String configUrl; - private PluginConfig(){} - protected Properties properties; - - public PluginConfig(String configUrl){ - this.configUrl = configUrl; - } - - public Properties getConfig() { - return properties; - } - -} From 8893ca734cdcad6ab182de34bd61f88102dda10a Mon Sep 17 00:00:00 2001 From: gnydick Date: Wed, 23 Jan 2019 12:23:01 -0800 Subject: [PATCH 4/7] added src/normalize/NormalizePlugin.java to the build system --- Makefile.am | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/Makefile.am b/Makefile.am index d3ce9287e7..a60c51e376 100644 --- a/Makefile.am +++ b/Makefile.am @@ -106,6 +106,7 @@ tsdb_SRC := \ src/meta/TSMeta.java \ src/meta/TSUIDQuery.java \ src/meta/UIDMeta.java \ + src/normalize/NormalizePlugin.java \ src/query/QueryUtil.java \ src/query/QueryLimitOverride.java \ src/query/expression/Absolute.java \ @@ -241,7 +242,7 @@ tsdb_SRC := \ src/utils/JSONException.java \ src/utils/Pair.java \ src/utils/PluginLoader.java \ - src/utils/Threads.java + src/utils/Threads.java tsdb_DEPS = \ $(ASM) \ @@ -428,7 +429,7 @@ test_SRC := \ test/utils/TestJSON.java \ test/utils/TestPair.java \ test/utils/TestPluginLoader.java - + test_plugin_SRC := \ test/plugin/DummyPluginA.java \ test/plugin/DummyPluginB.java \ @@ -448,7 +449,7 @@ test_plugin_SVCS := \ META-INF/services/net.opentsdb.tsd.RpcPlugin \ META-INF/services/net.opentsdb.tsd.RTPublisher \ META-INF/services/net.opentsdb.tsd.StorageExceptionHandler - + test_plugin_MF := \ test/META-INF/MANIFEST.MF @@ -676,7 +677,7 @@ install-data-tools: $(tsdb_DEPS) $(jar) $(INSTALL_SCRIPT) $$tools "$$destdatatoolsdir" || exit 1; \ tools="-r $(top_srcdir)/tools/*" ; \ echo " cp" $$tools "$$destdatatoolsdir" ; \ - cp $$tools "$$destdatatoolsdir" || exit 1; + cp $$tools "$$destdatatoolsdir" || exit 1; uninstall-data-tools: @$(NORMAL_UNINSTALL) @@ -746,7 +747,7 @@ uninstall-hook: get_runtime_dep_classpath = `for jar in $(test_DEPS); do $(find_jar); done | tr '\n' ':'` $(test_SRC): $(test_DEPS) @$(refresh_src) - + $(test_plugin_SRC): $(test_DEPS) @$(refresh_src) @@ -888,7 +889,7 @@ pom.xml: pom.xml.in Makefile } >$@-t mv $@-t ../$@ -# Generates a maven pom called fat-jar-pom.xml that builds a fat jar +# Generates a maven pom called fat-jar-pom.xml that builds a fat jar # containing all the dependencies required to run opentsdb fat-jar-pom.xml: ./fat-jar/fat-jar-pom.xml.in Makefile (cd $(top_srcdir) ; ./fat-jar/create-src-dir-overlay.sh) @@ -918,7 +919,7 @@ fat-jar-pom.xml: ./fat-jar/fat-jar-pom.xml.in Makefile -e 's/@ZOOKEEPER_VERSION@/$(ZOOKEEPER_VERSION)/' \ -e 's/@APACHE_MATH_VERSION@/$(APACHE_MATH_VERSION)/' \ -e 's/@JEXL_VERSION@/$(JEXL_VERSION)/' \ - -e 's/@JGRAPHT_VERSION@/$(JGRAPHT_VERSION)/' \ + -e 's/@JGRAPHT_VERSION@/$(JGRAPHT_VERSION)/' \ -e 's/@spec_title@/$(spec_title)/' \ -e 's/@spec_vendor@/$(spec_vendor)/' \ -e 's/@spec_version@/$(PACKAGE_VERSION)/' \ From c3fc2a909714f10a9f45947759d6508cfc53ff90 Mon Sep 17 00:00:00 2001 From: gnydick Date: Thu, 24 Jan 2019 18:12:08 -0800 Subject: [PATCH 5/7] fixed some normalize stuff --- src/core/TSDB.java | 9 ++--- src/normalize/NormalizePlugin.java | 4 +++ src/tsd/RTPublisher.java | 55 +++++++++++++++--------------- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/src/core/TSDB.java b/src/core/TSDB.java index 6abe9e5171..4563ad9fbd 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -1269,6 +1269,7 @@ private final Deferred storeIntoDB(final String metric, final byte[] row, final byte[] qualifier) { final long base_time; + final Map cleaned_tags = normalize.normalizeTags(tags); if ((timestamp & Const.SECOND_MASK) != 0) { // drop the ms timestamp to seconds to calculate the base timestamp @@ -1346,9 +1347,9 @@ public Deferred call(final Boolean allowed) throws Exception { if (rt_publisher != null) { if (isHistogram(qualifier)) { - rt_publisher.publishHistogramPoint(metric, timestamp, value, tags, tsuid); + rt_publisher.publishHistogramPoint(metric, timestamp, value, cleaned_tags, tsuid); } else { - rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags); + rt_publisher.sinkDataPoint(metric, timestamp, value, cleaned_tags, tsuid, flags); } } return result; @@ -1362,10 +1363,10 @@ public String toString() { if (ts_filter != null && ts_filter.filterDataPoints()) { if (isHistogram(qualifier)) { - return ts_filter.allowHistogramPoint(metric, timestamp, value, tags) + return ts_filter.allowHistogramPoint(metric, timestamp, value, cleaned_tags) .addCallbackDeferring(new WriteCB()); } else { - return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags) + return ts_filter.allowDataPoint(metric, timestamp, value, cleaned_tags, flags) .addCallbackDeferring(new WriteCB()); } } diff --git a/src/normalize/NormalizePlugin.java b/src/normalize/NormalizePlugin.java index 2c483db62f..6f12526d70 100644 --- a/src/normalize/NormalizePlugin.java +++ b/src/normalize/NormalizePlugin.java @@ -4,6 +4,8 @@ import net.opentsdb.core.TSDB; import net.opentsdb.stats.StatsCollector; +import java.util.Map; + public abstract class NormalizePlugin { @@ -15,5 +17,7 @@ public abstract class NormalizePlugin { public abstract void collectStats(final StatsCollector collector); + public abstract Map normalizeTags(Map tags); + } diff --git a/src/tsd/RTPublisher.java b/src/tsd/RTPublisher.java index 194a36e353..209ff7c98a 100644 --- a/src/tsd/RTPublisher.java +++ b/src/tsd/RTPublisher.java @@ -28,7 +28,7 @@ * after they are queued for storage. In the future we may support publishing * meta data or other types of information as changes are made. *

- * Note: Implementations must have a parameterless constructor. The + * Note: Implementations must have a parameterless constructor. The * {@link #initialize(TSDB)} method will be called immediately after the plugin is * instantiated and before any other methods are called. *

@@ -43,24 +43,24 @@ public abstract class RTPublisher { * Implementations are responsible for setting up any IO they need as well * as starting any required background threads. * Note: Implementations should throw exceptions if they can't start - * up properly. The TSD will then shutdown so the operator can fix the + * up properly. The TSD will then shutdown so the operator can fix the * problem. Please use IllegalArgumentException for configuration issues. * @param tsdb The parent TSDB object - * @throws IllegalArgumentException if required configuration parameters are + * @throws IllegalArgumentException if required configuration parameters are * missing * @throws RuntimeException if something else goes wrong */ public abstract void initialize(final TSDB tsdb); - + /** - * Called to gracefully shutdown the plugin. Implementations should close + * Called to gracefully shutdown the plugin. Implementations should close * any IO they have open * @return A deferred object that indicates the completion of the request. * The {@link Object} has not special meaning and can be {@code null} * (think of it as {@code Deferred}). */ public abstract Deferred shutdown(); - + /** * Should return the version of this plugin in the format: * MAJOR.MINOR.MAINT, e.g. 2.0.1. The MAJOR version should match the major @@ -68,7 +68,7 @@ public abstract class RTPublisher { * @return A version string used to log the loaded version */ public abstract String version(); - + /** * Called by the TSD when a request for statistics collection has come in. The * implementation may provide one or more statistics. If no statistics are @@ -76,11 +76,11 @@ public abstract class RTPublisher { * @param collector The collector used for emitting statistics */ public abstract void collectStats(final StatsCollector collector); - + /** * Called by the TSD when a new, raw data point is published. Because this * is called after a data point is queued, the value has been converted to a - * byte array so we need to convert it back to an integer or floating point + * byte array so we need to convert it back to an integer or floating point * value. Instead of requiring every implementation to perform the calculation * we perform it here and let the implementer deal with the integer or float. * @param metric The name of the metric associated with the data point @@ -91,22 +91,23 @@ public abstract class RTPublisher { * @param tsuid Time series UID for the value * @param flags Indicates if the byte array is an integer or floating point * value - * @return A deferred without special meaning to wait on if necessary. The + * @return A deferred without special meaning to wait on if necessary. The * value may be null but a Deferred must be returned. */ - public final Deferred sinkDataPoint(final String metric, - final long timestamp, final byte[] value, final Map tags, + public final Deferred sinkDataPoint(final String metric, + final long timestamp, final byte[] value, final Map tags, final byte[] tsuid, final short flags) { if ((flags & Const.FLAG_FLOAT) != 0x0) { - return publishDataPoint(metric, timestamp, - Internal.extractFloatingPointValue(value, 0, (byte) flags), + + return publishDataPoint(metric, timestamp, + Internal.extractFloatingPointValue(value, 0, (byte) flags), tags, tsuid); } else { - return publishDataPoint(metric, timestamp, + return publishDataPoint(metric, timestamp, Internal.extractIntegerValue(value, 0, (byte) flags), tags, tsuid); } } - + /** * Called any time a new data point is published * @param metric The name of the metric associated with the data point @@ -115,13 +116,13 @@ public final Deferred sinkDataPoint(final String metric, * @param value Value for the data point * @param tags Tagk/v pairs * @param tsuid Time series UID for the value - * @return A deferred without special meaning to wait on if necessary. The + * @return A deferred without special meaning to wait on if necessary. The * value may be null but a Deferred must be returned. */ - public abstract Deferred publishDataPoint(final String metric, - final long timestamp, final long value, final Map tags, + public abstract Deferred publishDataPoint(final String metric, + final long timestamp, final long value, final Map tags, final byte[] tsuid); - + /** * Called any time a new data point is published * @param metric The name of the metric associated with the data point @@ -130,21 +131,21 @@ public abstract Deferred publishDataPoint(final String metric, * @param value Value for the data point * @param tags Tagk/v pairs * @param tsuid Time series UID for the value - * @return A deferred without special meaning to wait on if necessary. The + * @return A deferred without special meaning to wait on if necessary. The * value may be null but a Deferred must be returned. */ - public abstract Deferred publishDataPoint(final String metric, - final long timestamp, final double value, final Map tags, + public abstract Deferred publishDataPoint(final String metric, + final long timestamp, final double value, final Map tags, final byte[] tsuid); - + /** * Called any time a new annotation is published * @param annotation The published annotation - * @return A deferred without special meaning to wait on if necessary. The + * @return A deferred without special meaning to wait on if necessary. The * value may be null but a Deferred must be returned. */ public abstract Deferred publishAnnotation(Annotation annotation); - + /** * Called any time a new histogram point is published * @param metric The name of the metric associated with the data point @@ -162,5 +163,5 @@ public Deferred publishHistogramPoint(final String metric, final byte[] tsuid) { throw new UnsupportedOperationException("Not yet implemented"); } - + } From 4b3e7be0062882857d5ac99f7bd899f71240cbaf Mon Sep 17 00:00:00 2001 From: "Gabe E. Nydick" Date: Fri, 25 Jan 2019 11:15:52 -0800 Subject: [PATCH 6/7] updated TSDB to detect whether or not to use normalizer --- src/core/TSDB.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/core/TSDB.java b/src/core/TSDB.java index 4563ad9fbd..c82e1444ce 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -1269,7 +1269,13 @@ private final Deferred storeIntoDB(final String metric, final byte[] row, final byte[] qualifier) { final long base_time; - final Map cleaned_tags = normalize.normalizeTags(tags); + final Map cleaned_tags = new HashMap(tags.size()); + + if (normalize != null) { + cleaned_tags.putAll(normalize.normalizeTags(tags)); + } else { + cleaned_tags.putAll(tags); + } if ((timestamp & Const.SECOND_MASK) != 0) { // drop the ms timestamp to seconds to calculate the base timestamp From fce4d5158af37802e5dccbebb9c7405bf3e7e8b5 Mon Sep 17 00:00:00 2001 From: "Gabe E. Nydick" Date: Fri, 25 Jan 2019 13:53:14 -0800 Subject: [PATCH 7/7] fixes --- pom.xml.in | 21 ++++++++++++++++++++- src/core/TSDB.java | 28 ++++++++++++++-------------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/pom.xml.in b/pom.xml.in index 18f46f539a..f77f2e596e 100644 --- a/pom.xml.in +++ b/pom.xml.in @@ -348,7 +348,26 @@ - + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.1 + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/ + false + false + true + + + + diff --git a/src/core/TSDB.java b/src/core/TSDB.java index c82e1444ce..fe4685a386 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -1252,13 +1252,18 @@ final Deferred addPointInternal(final String metric, final byte[] value, final Map tags, final short flags) { - - checkTimestampAndTags(metric, timestamp, value, tags, flags); - final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); + final Map cleaned_tags = new HashMap(tags.size()); + if (normalize != null) { + cleaned_tags.putAll(normalize.normalizeTags(tags)); + } else { + cleaned_tags.putAll(tags); + } + checkTimestampAndTags(metric, timestamp, value, cleaned_tags, flags); + final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, cleaned_tags); final byte[] qualifier = Internal.buildQualifier(timestamp, flags); - return storeIntoDB(metric, timestamp, value, tags, flags, row, qualifier); + return storeIntoDB(metric, timestamp, value, cleaned_tags, flags, row, qualifier); } private final Deferred storeIntoDB(final String metric, @@ -1269,13 +1274,8 @@ private final Deferred storeIntoDB(final String metric, final byte[] row, final byte[] qualifier) { final long base_time; - final Map cleaned_tags = new HashMap(tags.size()); - if (normalize != null) { - cleaned_tags.putAll(normalize.normalizeTags(tags)); - } else { - cleaned_tags.putAll(tags); - } + if ((timestamp & Const.SECOND_MASK) != 0) { // drop the ms timestamp to seconds to calculate the base timestamp @@ -1353,9 +1353,9 @@ public Deferred call(final Boolean allowed) throws Exception { if (rt_publisher != null) { if (isHistogram(qualifier)) { - rt_publisher.publishHistogramPoint(metric, timestamp, value, cleaned_tags, tsuid); + rt_publisher.publishHistogramPoint(metric, timestamp, value, tags, tsuid); } else { - rt_publisher.sinkDataPoint(metric, timestamp, value, cleaned_tags, tsuid, flags); + rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags); } } return result; @@ -1369,10 +1369,10 @@ public String toString() { if (ts_filter != null && ts_filter.filterDataPoints()) { if (isHistogram(qualifier)) { - return ts_filter.allowHistogramPoint(metric, timestamp, value, cleaned_tags) + return ts_filter.allowHistogramPoint(metric, timestamp, value, tags) .addCallbackDeferring(new WriteCB()); } else { - return ts_filter.allowDataPoint(metric, timestamp, value, cleaned_tags, flags) + return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags) .addCallbackDeferring(new WriteCB()); } }