Skip to content

HashMap Index

vinoth chandar edited this page Mar 27, 2017 · 4 revisions

Goal

Use HFile (or Voldemort's RO File Format or TFile), something that offers random key lookups, to build a persistent Hashmap to store a mapping from recordKey => fileId, such that

  • Index should support all commit/rollback semantics that Hoodie does (BloomIndex accomplishes this trivially)
  • Global lookup i.e even though the hoodieKey provided has both a recordKey and partitionPath, the lookup/update happens purely on recordKey
  • Is reasonably fast & can handle billions of keys

Going forward, we will use the term hashmap to denote such a persistent hashmap on the backing filesystem.

Basic Idea

  1. We hash recordKey into buckets (statically over-provision at say 1000).
  2. Each bucket has a X hashmaps, contains all keys mapped to the bucket.
  3. tagLocation looks up all hashmaps within each bucket
  4. updateLocation will generate a new hashmap into bucket, with new keys for the bucket
  5. Periodically, all hashmaps are merged back into 1, bound lookup time in #3

tagLocation

The Spark DAG here looks like below.

_____________________    ____________________________    __________________________________________   
| RDD[HoodieRecord] | => |Hash(recordKey) to buckets| => | Check against all HFiles within bucket | => insert or update 
_____________________    ____________________________    __________________________________________  

updateLocation

Spark DAG for updating location.

_____________________   _________________________________________________    ________________________________  
| RDD[WriteStatus] | => |Filter out updates & hash(recordkey) to buckets| => | Add new hashmap into bucket | 
_____________________   _________________________________________________    _________________________________ 

Optionally, if the number of hashmaps exceeds a set threshold (maxMapsPerBucket), merge enough to bring it back to an acceptable threshold (minMapsPerBucket). The number of Hashmaps in each bucket, can keep growing back and forth between these two limits.. We also need another global variable to limit the number of compactions per batch, to amortize these over multiple runs.

rollbackCommit

As long as the hashmaps are stamped with the commits, we should be able to rollback effects of a failed updateLocation call, including a compaction.. Ultimately, we tie back to atomicity of the commit itself.

How do we expect this to perform?

  • Given the index does not treat time as a first class citizen, the compaction cost would keep growing over time, compared to BloomIndex (which only compares incoming records against its target partitions). This is the extra cost paid for global index property.

  • Back of the envelope to store 100 billion records. Assuming each entry is worth 100 bytes (compression offsets extra metadata), across 1000 buckets, minMapsPerBucket = 2, maxMapsPerBucket = 10, each bucket stores ~10GB

  • Assuming random seek takes 10ms & each batch has 20M records (1B records upserted over a day, at 30 min batches). Best case lookup time is with numMapsPerBucket = minMapsPerBucket, i.e 20M/1000 = 20000 records per bucket or 20K * 2 * 10ms of lookups = 400 seconds, worst case is 20K * 10 * 10ms = 2000 seconds. So we should really hope the random lookup is not 10ms.

  • Assuming 50MB/sec of read/write performance(reduce by half for processing overhead), to merge two 1GB files into one, it would take : 10 seconds (read in parallel) + 20 seconds (merge and write)

Open questions

Q: Given our key and value is fixed length (100 bytes total), would a more simpler implementation work better? **A: **

Q: Do we still need to introduce a notion of caching? How effective is the caching on Filesystem? **A: ** From numbers above, it feels like caching can significantly speed up things for lookups and give back a lot of time for compactions.

Q: Should the hashmap be sorted, would it help us take advantage of key ranges? A: With uuid based keys, it does not matter. But if the recordKey is timebased, we can significantly cut down comparisions. So we should do it if possible.

Clone this wiki locally