-
Notifications
You must be signed in to change notification settings - Fork 234
Vector Store
AresDB partitions data records into batches, with each record identified by a batch ID and its record index within the batch. The values of each column within a batch is stored as a columnar vector. Validity/nullness of the values in each value vector is stored as a separate null vector, with the validity of each value represented by one bit. Here's an example:
SF | completed | 11.0 |
SF | cancelled | NULL |
LA | completed | 12.0 |
NY | completed | 15.0 |
OC | completed | 16.0 |
The table is partitioned into two batches of records:
batch 0
:
record 0 |
SF | completed |
record 1 |
SF | cancelled |
record 2 |
LA | completed |
batch 1
:
record 0 |
NY | completed |
record 1 |
OC | completed |
Each batch is then represented by value vectors and null vectors. Here's an example for batch 0, fare column:
value vector:
11.0 |
0.0 |
12.0 |
null vector (validity):
1 |
0 |
1 |
The purpose of batching is to enable pipelined transferring and processing, and to optimize storage space allocation. There are two types of batches, live batches and archive batches. Live batches are used for dimension tables and for records in fact tables before archival. Archive batches are used for archived records in fact tables.
IDs for live batches are negative integers, starting incrementing from INT32_MIN. With proper batch size we do not need to handle ID overflow or rotation.
Capacities for live batches are configured as part of the schema at table level. For fact tables the goal is to divide a day of records to 10-15 batches so that:
- A small portion of the live data are periodically archived;
- The overhead of unused space is reasonably bounded;
- The number of vectors for live batches transferred to GPU memory at query time is bounded.
New records are appended to the end of the last live batch. Existing records are updated in place, located by the primary key, which stores [batch ID, record index]
for each key. After a record in a fact table is archived or deleted, we use a deletion marker (a bit vector for record validities similar to null vectors) to mark that the record does not exist any more (0) and should be ignored by the query engine.
When all records in a batch are archived or deleted, the batch space is freed.
For late arrivals (backfill for instance), we need to identify them (by maintaining an event time archiving cutoff) and merge them asap. They should not be queried until merged to avoid over counting. Late arrival from recovery can be handled by triggering an archiving process right after log replays.
Archive batches are created by partitioning stable fact records using the designated time series column into UTC days. Each batch is identified by the positive number of days from Epoc to its start time. The tradeoffs to use day as batch size include:
- The total number of batches that needs to be transferred to GPU memory is in a reasonable range: e.g., 7-180. Reasonably big so that we can perform enough pipelining, and sufficiently small so that the amount of work to perform on each batch is non-trivial, and the overhead of initiating each transfer is not too big.
- The resulting number of files (in hundreds) on disk is in a reasonable range for both human and os.
- Lots of queries process multi-days of data, for queries that process less than that, the performance is usually so high that we could afford to over-process a day of data.
Records in an archive batch are not sorted by time, instead they are sorted by a list of columns specified in the schema. The sort order is specified to:
- Enabling processing a sub-range of the batch to match common filters. For instance, most of our queries filter by a single city_id; by sorting by city_id first, such queries only need to process a sub-range of the batch.
- Maximize compression ratios on certain columns by putting low cardinality columns earlier in the sort order.
After sorting, a count vector is created out of adjacent repeating values using run-length encoding. Notice that the counts stored in the vector are accumulative counts, i.e, the sum of all counts from the beginning of the vector, to the current position. The value and null vectors are then compressed by consolidating adjacent repeating values. **The count vector is only for sort columns user specified". Here's the corresponding compressed city column in batch 0 for the above example:
value | null | count |
---|---|---|
SF | 1 | 0 |
LA | 1 | 2 |
3 |
Note the count vector is shifted from value and null vectors by 1 position. The first value is always 0 and last value is number of total records. We can get a count of current value by count[i+1] - count[i] for value[i]
The size of the archive batch is still 3 (records), while the (compressed) length of city column in this batch is 2. Assuming the data type for city column is small enum that each value occupies 1 byte, the bytes of the value/null/count vector for this column in this batch is 2+62padding/1+63padding/12+52padding; the bytes for this vector party is 192.
A vector party represents the (compressed) data of a column in an archive batch. It can be in one of the following modes:
Mode | Value Vector Presence | Null Vector Presence | Count Vector Presence | Meaning |
---|---|---|---|---|
0 | no | no | no | all values are default values |
1 | yes | no | no | all values are present and not null, no run-length compression |
2 | yes | yes | no | no run-length compression |
3 | yes | yes | yes | run-length compressed |
A vector party is stored as a file on disk in the following format (using little endian):
Section | Field | Bytes | Starting Offset |
---|---|---|---|
header | magic number 0xFADEFACE | 4 | 0 |
header | length | 4 | 4 |
header | data type* | 4 | 8 |
header | non default value count | 4 | 12 |
header | mode | 2 | 16 |
header | unused | 6 | 18 |
data | value vector | vb = bytes(value vector) for mode 1+ | 24 |
data | null vector | nb = bytes(null vector) for mode 2+ | 24 + vb |
data | count vector | bytes(count vector) for mode 3 | 24 + vb + nb |
- Please see serialization format column_data_type for details
- For all value vectors we do 64 bytes (512 bits) padding (per Apache Arrow recommendation)
The vector party file can be placed as {root_path}/data/{table_name}{shard_id}/archive_batches/{batch_id}{batch_version}/{column}.data
When there is no run-length compression, the size and length of the vector party are the same. The simplest way to represents 100 nulls is: length = 100, mode = 0
. This is the default state when a new column is created without values being populated.
A vector party file is loaded into vector store in memory as:
{
Length int
NonDefaultValueCount int
ColumnMode ColumnMode
DataType DataType
DefaultValue DataValue
Values *Vector
Nulls *Vector
Counts *Vector
}
The value/null/count vector can be nil correspondingly.
Archiving can happen more than once a day, this would require us to merge newly archived records onto existing vector party. This can be achieved by loading existing vector party files into memory, creating an archive batch and compressed vector parties out of the newly archived records, and recursive merge the two sources along the sort order defined as part of the schema. The vector party file on disk is then replaced with the new merged copy.
At query time, it is possible that only a subset of a vector party is transferred to GPU memory. For instance, when the query has a filter of city_id = 1
and when the archive batch is first sorted by city_id, only the corresponding range on the value/null/count vectors is transferred to GPU memory.