2021
All algorithms operate on some data. Traditional algorithms assume that they have full access to the data they need in order to solve the problem at hand. For instance, algorithms for sorting a collection of items naturally assume that they have access to a buffer holding all the items. Similarly, procedures for computing the convex hull of a set of points assume that they have all the points. In some applications, however, the full set of data is not known a priori. In this online setting, algorithms have to provide useful answers to queries before having the chance to see all the data.
The way such algorithms provide their answers depends on the kind of query being asked. Queries can be ad hoc
meaning that we do not know the exact query we'd like to run a priori. In this case, the most common technique used is random sampling
. We can also have standing queries
. In this case, we know exactly the queries that we'd like to answer. Because we know the queries beforehand, we can design specialized data structures capable of providing useful answers to our queries.
These two cases share two key characteristics:
- The answers that they produce will be approximate — not exact.
- They do not seek to store and process all the incoming data. Instead, their goal is to quickly process each observation to create a summary that they can use to answer either standing or ad hoc queries. This summary is often referred to as a Sketch of the data.
In this note, we discuss and implement several ideas that are useful when designing online algorithms. We begin by discussing a classic method for selecting a representative sample from a stream of data. We then discuss the idea that is foundational to most sketching algorithms and show how it's employed to improve the accuracy of approximate online methods. Finally, we discuss a few key algorithms for answering some standing queries.
Reservoir Sampling is a simple procedure for picking a random uniform sample of size k
from a datastream whose size is unknown (it could be infinite). This scheme works as follows: We store the first k
items into a buffer of size k
. Then for each element that arrives afterwards, with probability we decide to keep the sample. If we keep it, we pick an element, uniformly at random, i.e with probability to evict from the buffer and replace it with the new sample. We implement this scheme below.
First, since we may be implementing other samplers later on, we introduce a trait that encapsulates the behavior that we expect from any sampler. Put simply, a sampler is anything that can observe a stream of events and produce a sample from that stream at any time
use rand::{thread_rng, Rng};
/// A sampler will be anything that can observe a possibly infinite
/// number of items and produce a finite random sample from that
/// stream
pub trait Sampler<T> {
/// We observe each item as it comes in
fn observe(&mut self, item: T);
/// Produce a random uniform sample of the all the items that
/// have been observed so far.
fn sample(&self) -> &[T];
}
With that out of the way, we can now introduce the reservoir sampling data structures and procedures. The reservoir sampler only needs to keep track of two values: the number of events that have been observed so far and the requested sample size.
#[derive(Debug)]
pub struct ReservoirSampler<T> {
/// This is what we produce whenever someone calls sample. It
/// maintains this invariant: at any time-step `t`, reservoir
/// contains a uniform random sample of the elements seen thus far
reservoir: Vec<T>,
/// This determines the size of the reservoir. The
/// client sets this value
sample_size: usize,
/// The number of items we have seen so far. We use this
/// to efficiently update the reservoir in constant time
/// while maintaining its invariant.
count: usize,
}
impl<T> ReservoirSampler<T> {
/// Create a new reservoir sampler that will produce a random sample
/// of the given size.
pub fn new(sample_size: usize) -> Self {
ReservoirSampler {
reservoir: Vec::with_capacity(sample_size),
sample_size,
count: 0,
}
}
}
Finally, we turn our ReservoirSampler
object into a sampler by implementing the appropriate trait
impl<T> Sampler<T> for ReservoirSampler<T> {
fn observe(&mut self, item: T) {
// To make sure we that maintain the reservoir invariant,
// we have to ensure that each incoming item has an equal
// probability of being included in the sample. We do so by
// generating a random index `k`, and if `k` falls within
// our reservoir, we replace the item at `k` with the
// new item
if self.reservoir.len() == self.sample_size {
let rand_idx = thread_rng().gen_range(0..self.count);
if rand_idx < self.sample_size {
self.reservoir[rand_idx] = item;
}
} else {
// If the reservoir is not full, no need
// to evict items
self.reservoir.push(item)
}
self.count += 1;
}
fn sample(&self) -> &[T] {
// Since we always have a random sample ready to go, we
// simply return it
self.reservoir.as_ref()
}
}
You can find runnable code for the reservoir sampling procedure in the playground. One final note: The sampling scheme we implemented, while easy to understand, can be a bottleneck in applications where speed is a priority. The slowdown comes from the fact that, hashing is an expensive operation, and our procedure hashes each item it encounters — even those that it eventually discards. Take a look at the wikipedia page for sampling schemes that improve upon the procedure above.
Before we begin discussing strategies for answering standing queries on streaming data, let's first lay the foundation by exploring two key ideas. These ideas are the major motifs that you'll encounter in almost all sketching algorithms.
Although hashing is mostly known as the first key ingredient in the design of hash tables (the other half being techniques for resolving collisions), it is also quite useful when designing probabilistic sketching algorithms. In this setting, we use hashing to introduce randomness. To see why randomness is important, consider an alternate method for sampling k
items from a possibly infinite data stream. We could accomplish this by tagging each incoming item with a random number then storing the items and their tags in a bounded heap. Once the heap reaches capacity, we evict the item with the smallest tag before adding in a new <item, tag>
pair.
At this point, we have a vague idea of why hashing is a useful idea. But, what is hashing — really? Put simply, hashing takes an arbitrary input (often as a collection of bytes) and transforms it, using a hash function, into a random integer. How is this different from tagging each input with a random number as we did above? Well, if the hash function is well constructed, and the functions co-domain is of size m
, then the probability that two distinct items receive the same tag is . A hash function that provides this guarantee is called a universal hash function. In this note, we assume that all our hash functions are universal.
In addition to universality, there are other details that are cool to know about hash functions. First, there exists a tradeoff between the speed of a hash function and its security guarantees. Second, when hashing is used as a component of systems that have exposure to potential adversaries, it as an attack vector — via hash-flooding attacks. This is why we almost always use a randomly initialized hash function. We won't make use of these last two ideas in this note, so we don't discuss them further. For a much more detailed examination fee free to take a look at this write-up and this other rough write up
Below, we explore the hashing interface provided by the rust programming language.
//! Rust exposes hashing utilities via a Streaming Hasher interface. That is, you
//! feed arbitrary bytes into the hash function, then, when done, you ask
//! the hasher to produce a hash value.
//!
//! use std::hash::{Hash, Hasher};
/// The Hasher trait from the standard library. This is implemented
/// by hash functions
pub trait Hasher {
/// Writes the given list of bytes into this hasher. This usually
/// updates some internal state. Because the hasher is stateful,
/// we can keep on calling `write` until we've exhausted all our
/// bytes
fn write(&mut self, bytes: &[u8]);
/// Produces the hash value of the bytes fed into this hasher so far
/// without resetting the hasher's internal state. This means that
/// if we want a hash value for a different stream of bytes, we have to
/// create a new hasher instance.
fn finish(&self) -> u64;
// ... snip
}
/// The Hash trait from the standard library. This is implemented by hashable
/// types
pub trait Hash {
/// Feed the value into a given Hasher
fn hash<H: Hasher>(&self, state: &mut H);
// ... snip
}
The long and short of it is, we create a hasher
from some random state — random_state.build_hasher()
. Then, given an item that implements the Hash
trait, we call its hash
function passing in the hasher — item.hash(hasher)
. Then to extract the hash value, we call the finish
method of the hasher. The tricky bit to remember is that calling finish does not reset the hasher's internal state. As we'll see later on, this means that we have to create a new hasher
instance to hash a fresh item.
Once we apply a hash function to an item, we can use the generated hash value to almost uniquely identify that object. The hash value can thus be thought of as a fingerprint of the initial object in that it is a relatively unique and lightweight identifier of the object -- just like human fingerprints. Because of hash collisions, however, it is not fully unique. When designing probabilistic data structures, we strive to reduce the likelihood of two distinct items colliding. That is where probability amplification comes in. Instead of simply hashing an item once, we rerun the hashing experiment k
times, each time with a different, independent, random hash function.
Now, the fingerprint is composed of the k
hash values. This scheme dramatically reduces the probability of fingerprint collision. In particular, the probability that two fingerprints from two different objects are the same is . where m
is the size of the co-domain of our hash functions (this is often the number of buckets we're hashing our items into).
To recapitulate, we can represent arbitrary objects using their hash values. These values are often smaller (e.g 8 bytes) than the underlying objects. Furthermore, instead of simply using a single hash value, we can use a collection of k
hash values each produced by k
independent hash functions.
A Bloom Filter 🥦 is a compact data structure that summarizes a set of items, allowing us to answer membership questions — has a certain item been seen before? Unlike other set structures — such as hash sets and search trees — a bloom-filter’s answer to a membership query is either a definite NO
or a Probable Yes
. That is, it can have false positives — telling us that an item has been seen when in fact it has never been observed. Therefore, Bloom filters are best suited for cases where false positives can be tolerated and mitigated. Cases where the effect of a false positive is not a wrong program state but extra work.
Below, we introduce the interface shared across all filters capable of answering approximate membership queries.
/// Indicates that the filter has probably seen a given
/// item before
pub struct ProbablyYes;
/// Indicates that a filter hasn't seen a given item before.
pub struct DefinitelyNot;
/// A filter will be any object that is able to observe a possibly
/// infinite stream of items and, at any point, answer if a given
/// item has been seen before
pub trait Filter<T> {
/// We observe each item as it comes in. We do not use terminology such as
/// `insert` because we do not store any of the items.
fn observe(&mut self, item: T);
/// Tells us whether we've seen the given item before. This method
/// can produce false positives. That is why instead of returning a
/// boolean, it returns `Result<ProbablyYes, DefinitelyNot>`
fn has_been_observed_before(&self, item: &T) -> Result<ProbablyYes, DefinitelyNot>;
}
A Bloom Filter is parameterized by two values: a bit array buckets
with m
buckets in it, and a set of k
hash functions . Given an input item
, we apply each of our k
hash functions to it and then set all the indexes that item
hashed to to true
. To check if an item has already been seen, we again first apply each of our k
hash functions and check if all corresponding locations to are true
.
Notice how a bloom filter is simply a direct application of object fingerprinting and probability amplification. We hash each item using multiple hash functions to reduce the chances of getting a false positive.
Below, we provide an implementation of a bloom filter
#[derive(Debug)]
pub struct BloomFilter<T: Hash> {
/// The bit vector. The number of buckets is determined
/// by the client
buckets: Vec<bool>,
/// The list of hash functions. Again, the number of
/// hash functions is determined by the client
hash_functions: Vec<RandomState>,
/// This is here to make the compiler happy. We'd
/// like for our filter to be parameterized by
/// the items it's monitoring. However, we do not
/// store those items in the filter.
_marker: PhantomData<T>,
}
impl<T: Hash> BloomFilter<T> {
/// Creates a new bloom filter with `m` buckets and `k` hash functions.
/// Each hash function is randomly initialized and is independent
/// of the other hash functions
pub fn new(m: usize, k: usize) -> Self {
// initialize all the bucket locations to false
let mut buckets = Vec::with_capacity(m);
for _ in 0..m {
buckets.push(false);
}
// initialize the hash functions randomly.
let mut hash_functions = Vec::with_capacity(k);
for _ in 0..k {
hash_functions.push(RandomState::new());
}
BloomFilter {
buckets,
hash_functions,
_marker: PhantomData,
}
}
/// This performs the actual hashing.
fn get_index(&self, state: &RandomState, item: &T) -> usize {
let mut hasher = state.build_hasher();
item.hash(&mut hasher);
let idx = hasher.finish() % self.buckets.len() as u64;
idx as usize
}
}
With the above abstractions in place, we can go ahead and implement the core filter procedures.
impl<T: Hash> Filter<T> for BloomFilter<T> {
/// As already explained, we apply each of our `k` hash functions to the
/// given item and set the locations it hashes to to true
fn observe(&mut self, item: T) {
for state in &self.hash_functions {
let index = self.get_index(state, &item);
self.buckets[index] = true;
}
}
/// Again, we start by applying each of our `k` hash functions
/// to the given item then. If all resultant `k` locations are `true`,
/// we conclude that we MIGHT have observed this item. If any location
/// is false, we immediately conclude that we NEVER saw this item
fn has_been_observed_before(&self, item: &T) -> Result<ProbablyYes, DefinitelyNot> {
for state in &self.hash_functions {
let index = self.get_index(state, &item);
if !self.buckets[index] {
return Err(DefinitelyNot);
}
}
Ok(ProbablyYes)
}
}
Count-Min Sketch 🍅 is a compact structure for estimating the counts of each type of items in our set. A naive way of solving the count each problem would be to allocate an integer counter for each class of items. However, this may not be practical when the number of item types grows huge. The CMSketch provides a tradeoff between count accuracy and low memory usage — it encodes a potentially massive number of item types in a small array, guaranteeing that large counts will be preserved fairly accurately while small counts may incur greater relative error. Count-Min sketches are thus best suited to cases where a slight inflation in frequency does not lead to an illegal program state.
As with the bloom filter, we use multiple hash functions. However, unlike with the bloom filter where all the functions hashed to the same array, each function in this case has its own dedicated set of buckets. A CMSketch is thus a matrix with as many rows as the number of hash functions and as many columns as the number or buckets. When we see an item, we apply our k
hash functions to it and increment the slots that it maps to in each row of the sketch. To estimate the count of an item, we apply our hash functions and read the values at the slots that it maps to and then select the minimum out of these. One cool thing to note is that the accuracy guarantees of the CMSketch are not in any way related to the size of the dataset — they depend entirely on the number of hash functions k
and the number of buckets m
.
#[derive(Debug)]
pub struct CountMinSketch<T: Hash, const M: usize, const N: usize> {
/// A count sketch is defined by `N` hash functions
/// and `M` bucket groups. Each of the `N` hash functions
/// maps an item into a single slot in a corresponding bucket group
sketch_matrix: [[u64; M]; N],
///
hash_functions: [RandomState; N],
/// As with the Bloom Filter, we'd like for our sketch to be
/// parameterized by the type of objects it is counting
_marker: PhantomData<T>
}
impl <T: Hash, const M: usize, const N: usize> CountMinSketch<T, M, N> {
/// Create a new instance of the CMSketch
pub fn new() -> Self {
let hash_functions = unsafe {
let mut hash_function_slots: [MaybeUninit<RandomState>; N] = MaybeUninit::uninit().assume_init();
for slot in &mut hash_function_slots {
slot.write(RandomState::new());
}
mem::transmute_copy::<_, [RandomState; N]>(&hash_function_slots)
};
CountMinSketch {
sketch_matrix: [[0; M]; N],
hash_functions,
_marker: PhantomData::default(),
}
}
/// Increment the count of the given item
pub fn inc(&mut self, item: &T) {
for (i, state) in self.hash_functions.iter().enumerate() {
let idx = self.get_index(state, item);
self.sketch_matrix[i][idx] += 1;
}
}
/// Retrieve the approximate count of the given item
pub fn count(&mut self, item: &T) -> u64{
let mut cur_min = u64::MIN;
for (i, state) in self.hash_functions.iter().enumerate() {
let idx = self.get_index(state, item);
let cur_value = self.sketch_matrix[i][idx];
if cur_value < cur_min {
cur_min = cur_value;
}
}
cur_min
}
/// Hashes the given item and maps it to the appropriate index location within
/// a single bucket
fn get_index(&self, state: &RandomState, item: &T) -> usize {
let mut hasher = state.build_hasher();
item.hash(&mut hasher);
let idx = hasher.finish() % self.sketch_matrix[0].len() as u64;
idx as usize
}
}
When in doubt, count. But, what if we do not have enough scratch paper? What should we do in that case? Well, in that case, we still count, but do so while being probably approximately correct.
The hyper log log (HLL) data structure is used to estimate the number of distinct/unique items present in some large collection of items with duplicates. That is, it is used to estimate the Cardinality of a Multiset.
If our data is such the number of unique items can fit in memory, then this so called, Count Distinct
problem can easily be solved by maintaining a counter and a HashSet
of elements that have already been observed.
In the streaming setting, we cannot afford to do that as the incoming data could even have infinite cardinality. Consider this:
You are running a popular web service and you'd like to keep track of the number of unique visitors to your site. Each visitor is uniquely identified by their IP address. Suppose your service has
$10^9$ unique visitors. Using theHashSet
approach, you'd need between 4GB forIPV4
addresses and16GB
forIPV6
addresses.
the HLL is able to solve this problem in a much more space efficient manner by trading off accuracy. In the following sections, we'll demonstrate how to construct the HLL structure using Rust.
As usual, we begin by defining our interface.
/// A cardinality estimator will be any object that is able to observe a possibly
/// infinite stream of items and, at any point, estimate the number of unique items
/// seen so far.
pub trait CardinalityEstimator<T> {
/// We observe each item as it comes in.
fn observe(&mut self, item: &T);
/// Return an estimation of the number of unique items see thus far
fn current_estimate(&self) -> u64;
}
We are going to implement three versions of probabilistic cardinality estimators. The first one, the naïve estimator, although easy to implement, will result in estimates with fairly high variance. The second one will improve upon this by making the observation that "interesting" patterns in our hash values (such as a trail of all zeroes) are rare enough that we can use their presence as a proxy for how many unique hash values we've observed. The final estimator will be based on the second. It'll simply introduce key optimization to improve teh second's efficiency.
/// A naïve estimator that estimates the cardinality
/// to simply be 1 divided by the smallest hash value
/// observed thus far, after mapping it to number
/// between 0 and 1
///
pub struct NaiveEstimator<T> {
// The smallest hash value seen so far.
cur_hash_value_min: Option<u64>,
/// We want our estimator to be parameterized by the type
/// it is estimating, but we don't need to store anything
/// so we never use the provided type. To keep the compiler
/// happy, we sacrifice a single ghost as an offering
_marker: PhantomData<T>,
}
/// Estimate the number of distinct items in the stream as
/// `2^k` where `k` is the longest trail of leading zeroes
/// among the hash values of all the items observed so far
pub struct MaxTrailEstimator<T> {
// The smallest hash value seen so far.
cur_max_trail_length: Option<u64>,
/// We want our estimator to be parameterized by the type
/// it is estimating, but we don't need to store anything
/// so we never use the provided type. To keep the compiler
/// happy, we sacrifice a single ghost as an offering to the
/// compiler gods
_marker: PhantomData<T>,
}
/// WIP
/// WIP
/// WIP
In the preceding sections, we cursorily talked about the "guarantees" of our different methods. In order to fully formalize the correctness and efficiency guarantees of probabilistic algorithms, we often turn to two basic tools:
Markov’s Inequality
tells us that the probability that a random variable, , is times the average value is at most . That is
Chebyshev’s Inequality
tells us that the probability that a random variable is more than standard deviations from its mean is at most . That is