Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RLE and dictionary bugs #81

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# This is important, or the unit tests fail on a Windows box.
*.csv text eol=lf
21 changes: 16 additions & 5 deletions lib/codec/rle.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// For questions about RLE encoding, see the spec:
//
// https://github.com/apache/parquet-format/blob/master/Encodings.md

const varint = require('varint')

function encodeRunBitpacked(values, opts) {
Expand All @@ -20,10 +24,13 @@ function encodeRunBitpacked(values, opts) {

function encodeRunRepeated(value, count, opts) {
let buf = Buffer.alloc(Math.ceil(opts.bitWidth / 8));
let remainingValue = value

// This is encoded LSB to MSB, so we pick off the least
// significant byte and shift to get the next one.
for (let i = 0; i < buf.length; ++i) {
buf.writeUInt8(value & 0xff, i);
value >> 8;
buf.writeUInt8(remainingValue & 0xff, i);
remainingValue = remainingValue >> 8;
}

return Buffer.concat([
Expand Down Expand Up @@ -109,10 +116,14 @@ function decodeRunBitpacked(cursor, count, opts) {
}

function decodeRunRepeated(cursor, count, opts) {
var bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8);
let value = 0;
for (let i = 0; i < Math.ceil(opts.bitWidth / 8); ++i) {
value << 8;
value += cursor.buffer[cursor.offset];

for (let i = 0; i < bytesNeededForFixedBitWidth; ++i) {
const byte = cursor.buffer[cursor.offset]
// Bytes are stored LSB to MSB, so we need to shift
// each new byte appropriately.
value += byte << (i * 8);
cursor.offset += 1;
}

Expand Down
12 changes: 9 additions & 3 deletions lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,11 @@ function decodePages(buffer, opts) {
continue;
}

if (opts.dictionary) {
// It's possible to have a column chunk where some pages should use
// the dictionary (PLAIN_DICTIONARY for example) and others should
// not (PLAIN for example).

if (opts.dictionary && pageData.useDictionary) {
pageData.values = pageData.values.map(d => opts.dictionary[d]);
}

Expand Down Expand Up @@ -862,7 +866,8 @@ function decodeDataPage(cursor, header, opts) {
dlevels: dLevels,
rlevels: rLevels,
values: values,
count: valueCount
count: valueCount,
useDictionary: valueEncoding === 'PLAIN_DICTIONARY' || valueEncoding === 'RLE_DICTIONARY'
};
}

Expand Down Expand Up @@ -938,7 +943,8 @@ function decodeDataPageV2(cursor, header, opts) {
dlevels: dLevels,
rlevels: rLevels,
values: values,
count: valueCount
count: valueCount,
useDictionary: valueEncoding === 'PLAIN_DICTIONARY' || valueEncoding === 'RLE_DICTIONARY'
};
}

Expand Down
12 changes: 6 additions & 6 deletions test/codec_rle.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,29 @@ describe('ParquetCodec::RLE', function() {
it('should encode repeated values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
[42, 42, 42, 42, 42, 42, 42, 42],
[1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567],
{
disableEnvelope: true,
bitWidth: 6
bitWidth: 21
});

assert.deepEqual(buf, Buffer.from([0x10, 0x2a]));
assert.deepEqual(buf, Buffer.from([0x10, 0x87, 0xD6, 0x12]));
});

it('should decode repeated values', function() {
let vals = parquet_codec_rle.decodeValues(
'INT32',
{
buffer: Buffer.from([0x10, 0x2a]),
buffer: Buffer.from([0x10, 0x87, 0xD6, 0x12]),
offset: 0,
},
8,
{
disableEnvelope: true,
bitWidth: 3
bitWidth: 21
});

assert.deepEqual(vals, [42, 42, 42, 42, 42, 42, 42, 42]);
assert.deepEqual(vals, [1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567]);
});

it('should encode mixed runs', function() {
Expand Down