A plugin for ElasticSearch to use approximate methods for certain queries, to greatly reduce memory usage and network traffic.
Approximate counting is performed via the excellent probabilistic data structures from stream-lib.
This work is inspired in part by elasticsearch-ls-plugins by Lovely Systems -- some of their code has been reused here.
Recent versions of Elasticsearch now include cardinality estimation using HyperLogLog out of the box -- this can be combined with the date histogram aggregation to achieve similar functionality to that which is provided by this plugin.
They also provide many more ways of controlling the performance tradeoffs involved in term aggregation.
This project has been left here largely for educational reasons.
Plugin < 1.3.0: ElasticSearch 0.19.X, tested on 0.19.11
Plugin 1.3.X: ElasticSearch 0.20.X, tested on 0.20.6
Plugin 2.1.6: ElasticSearch 0.90.2, plus significant feature and performance improvements, and breaking API changes, compared to 1.3.X branch
ElasticSearch 0.90.3 is not supported yet.
N.B. If you are upgrading from a previous version to 2.1.X, please read the following carefully, as the syntax (and semantics) have changed in several places.
This is an alternative to ElasticSearch's date histogram facet which can be used in several modes.
-
Counting records per time interval (like the built-in date histogram)
-
Counting occurrences of a field, per time interval
-
Counting unique values of a field, per time interval
-
Counting records per time interval, per value of another field
-
Counting occurrences of one field per time interval, per value of another field
-
COMING SOON: Counting unique values of one field per time interval, per value of another field
It can be used to answer analytical queries like "how many distinct users have I seen per day?" or "how many logins have occurred per day, broken down by country?".
Unique value counting uses a probabilistic algorithm called HyperLogLog to provide estimates of the number of distinct values without needing to store all values in memory or transfer them across the network between shards. This provideds both memory and speed improvements in most circumstances. This implementation is hardcoded to a relative standard deviation of 0.0025, which uses about 80KB of memory per bucket per shard, and in tests, provides estimates within 1% of the true count reliably.
The API for approximate counting also provides an exact_threshold
parameter.
Each bucket will use an exact counting method (keeping all values in a HashSet)
until this point is reached. Then it will fall back to using HyperLogLog. If
you set this value to -1 it will never use approximate counting -- don't do
this on very large data sets as you will probably get out-of-memory errors. If
you set it to 0, it will never store any values in sets, instead using
HyperLogLog from the start.
{
"query" : {
"match_all" : {}
},
"facets" : {
"histo1" : {
"date_facet" : {
"key_field" : "my_date_time",
"distinct_field" : "user_id",
"interval" : "day"
}
}
}
}
This example will count the number of distinct user IDs per day.
Parameters:
-
key_field
: The datetime field to facet on -
value_field
: A field to count occurrences of -
distinct_field
: A field to count distinct occurrences of
N.B. You can't use value_field
and distinct_field
at the same time.
-
slice_field
: A field to further subdivide the results by -
exact_threshold
: See above -
interval
,time_zone
,pre_zone
,post_zone
,pre_zone_adjust_large_interval
,pre_offset
,post_offset
,factor
: See docs for the date histogram facet.
Of these, only key_field
and interval
are required -- this will perform the
simplest kind of record count.
All of the field parameters support tokenized/multi-valued fields as well as
single-valued ones. (The usual caveats about memory use apply.) e.g. If
distinct_field
is tokenized, the result will indicate the number of distinct
tokens found in that field (post-analysis).
This is very similar to the standard date histogram. Each time period (and the
facet overall) has a COUNT
attribute, and if appropriate, a DISTINCT_COUNT
attribute too. If you are using slice_field
, these are provided for each time
period and for each slice within that time period.
-
Using
slice_field
anddistinct_field
together is not yet tested -
Floating-point fields are not officially supported (they may work but we haven't really tested them thoroughly enough)
-
Script fields are not yet supported
This is a simple facet to quickly retrieve an unsorted term list for a field,
if you don't care about counts or ordering etc. It allows you to set a
max_per_shard
cutoff similar to the previous facet (defaults to 1000).
You can also set a sample
parameter which is a float greater than zero and
less than or equal to 1. This causes the plugin to visit roughly that
proportion of documents matched by your query when gathering the terms list.
For example, sample=0.5 would mean only half the documents, selected randomly,
would be taken into account.
In some circumstances, a sample rate as low as 0.1 (10% of documents) can yield the exact same results as a full exhaustive scan (the default), but much faster. You'll need to experiment on your own data to find the sweet spot.
{
"query": {
"match_all" : {}
},
"facets" : {
"term_list_facet" : {
"term_list" : {
"key_field" : "txt1",
"sample" : 0.25,
"max_per_shard" : 100
}
}
}
}
Returns something like:
{
"took" : 45,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"failed" : 0
},
"hits" : {
"total" : 132,
"max_score" : 1.0,
"hits" : [ ]
},
"facets" : {
"term_list_facet" : {
"_type" : "term_list",
"entries" : [ "trdq", "amrqkke", "ebztmm", "pja", "qobmepbor", "bxpoh", "krsm", "kpgz", "hotodwfq", "qbpxxlfin", "lsnosgx", "qyznyhrqcu", "poekzt", "qbsmks", "adbazy", "swnjdvziqh", "eqabkxb", "xdz", "jlg", "scn", "jdn" ]
}
}
}
N.B. The use_field_data
/read_from_cache
option from previous versions
is no longer supported. However, we now support a mode
parameter that takes
two alternative values, "collector"
and "post"
. In collector mode (the
default), the plugin iterates over documents and retrieves the terms that
appear in them. In post mode, the plugin iterates over terms from the
Lucene index, only keeping those that appear in at least one document.
The mode can be set as follows:
{
"query": {
"match_all" : {}
},
"facets" : {
"term_list_facet" : {
"term_list" : {
"key_field" : "txt1",
"max_per_shard" : 100
},
"mode" : "post"
}
}
}
Post mode is recommended when there are a small number of terms compared to the number of documents (low cardinality). The sample parameter is not supported in post mode.
It's all done via Maven, so just mvn test
to build the plugin and run the
tests. Amongst other things, they check that the approximate distinct counts
are within a tolerance of 1% of the expected values.
To target Java 7 in the compiler (assuming you're running under JDK7), append
-Pjdk7
to the mvn
command line, e.g.:
mvn clean test -Pjdk7
To run the full test suite, you will need to download a fairly large archive of ElasticSearch index data from here:
https://pearson.app.box.com/s/uvsz0gv8rhgex0aacc2u
Download MediumDataSetTest.tar.bz2
and unpack it in src/test/resources/data
.
The tests use quite a lot of memory and take several minutes to run. This is because they use several iterations of randomly generated data and queries, in order to verify the accuracy of the approximate counts in the date histogram, among other things. One run puts over a million distinct values in each bucket.
If you get any out-of-memory errors, you'll need to raise the amount of memory
you allocate to the mvn process. From the command line, the pom takes care of
this via the argLine parameter. If you're an Eclipse user, put -Xms4G -Xmx4G
in the VM Arguments box of the Arguments tab in Run Configurations for that
test run.
You can always build the package with -DskipTests
if this is a problem
(assuming you trust us to have tested before checking in).
Because the error rate for HyperLogLog is a distribution rather than a hard bound, you may in rare circumstances get tests failing due to results being just outside the 1% tolerance. If this happens, re-run the test. It's only a problem if it happens consistently...
NB Github no longer supports file downloads. For now, you'll have to build from source. We will find a location for binary distributions as soon as we can... Promise :-)
Type
mvn package
to rebuild the zipfile (in target/releases
). You may want to add
-DskipTests
if you've just run the tests and haven't changed anything,
since they take a while.
Then create a plugins/approx
directory in your ElasticSearch install dir,
and unzip the zipfile into there.
Each facet has a Builder class which is used in the Java API to build an XContent message (e.g. JSON) representing the facet clause of a query. If you are constructing a query from Java, you can use this class yourself, exactly like the builders for the facets provided with ElasticSearch. Just include the jar file in your client project.
On the server side, each facet has a Parser class which parses the XContent of the facet clause, and invokes an Executor to actually perform the facet computation. This happens in a single thread on each shard separately. The Executors use Collector classes to iterate through the field data supplied by ElasticSearch -- these are invoked directly by ElasticSearch itself. After this collection phase is complete, ElasticSearch calls the buildFacet() method on the Executor to retrieve an InternalFacet object.
Each shard yields a single internal facet. This is responsible for serialization and deserialization, and supplies a reduce() method which enables ElasticSearch to merge the internal facets from multiple shards into a single object.
The internal facet objects are specializations of Facets, which are what are actually returned to the client. Currently, in the term list facet, the public Facet is an interface, while in the date facet, the public Facets are abstract classes. This difference may disappear in the future.
This project was developed by the Data Analytics & Visualization team at Pearson Technology in London.
Copyright 2012-2013 Pearson PLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.