Skip to content

Commit

Permalink
Rewrite delta bitpack reader (#912)
Browse files Browse the repository at this point in the history
Allows for int64 output
  • Loading branch information
martindurant authored Dec 22, 2023
1 parent 8682784 commit fb545a5
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
56 changes: 43 additions & 13 deletions fastparquet/cencoding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,30 @@ cpdef void read_rle_bit_packed_hybrid(NumpyIO io_obj, int32_t width, uint32_t le


cdef void delta_read_bitpacked(NumpyIO file_obj, uint8_t bitwidth,
NumpyIO o, uint64_t count, uint8_t itemsize=4):
NumpyIO o, uint64_t count, uint8_t longval=0):
cdef:
uint64_t data = 0
int8_t stop = -bitwidth
int8_t left = 0
int8_t right = 0
uint64_t mask = 0XFFFFFFFFFFFFFFFF >> (64 - bitwidth)
while count > 0:
if stop < 0:
data = ((data & 0X00FFFFFFFFFFFFFF) << 8) | file_obj.read_byte()
stop += 8
if (left - right) < bitwidth:
data = data | (<uint64_t>file_obj.read_byte() << left)
left += 8
elif right > 8:
data >>= 8
left -= 8
right -= 8
else:
o.write_int((data >> stop) & mask)
stop -= bitwidth
if longval:
o.write_long((data >> right) & mask)
else:
o.write_int((data >> right) & mask)
right += bitwidth
count -= 1


cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o):
cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o, uint8_t longval=0):
cdef:
uint64_t block_size = read_unsigned_var_int(file_obj)
uint64_t miniblock_per_block = read_unsigned_var_int(file_obj)
Expand All @@ -248,19 +256,27 @@ cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o):
temp = o.loc
if count > 1:
# no more diffs if on last value
delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, count)
delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, longval)
o.loc = temp
for j in range(values_per_miniblock):
temp = o.read_int()
o.loc -= 4
o.write_int(value)
if longval:
temp = o.read_long()
o.loc -= 8
o.write_long(value)
else:
temp = o.read_int()
o.loc -= 4
o.write_int(value)
value += min_delta + temp
count -= 1
if count <= 0:
return
else:
for j in range(values_per_miniblock):
o.write_int(value)
if longval:
o.write_long(value)
else:
o.write_int(value)
value += min_delta
count -= 1
if count <= 0:
Expand Down Expand Up @@ -372,6 +388,20 @@ cdef class NumpyIO(object):
(<int32_t*> self.get_pointer())[0] = i
self.loc += 4

cdef void write_long(self, int64_t i):
if self.nbytes - self.loc < 8:
return
(<int64_t*> self.get_pointer())[0] = i
self.loc += 8

cdef int64_t read_long(self):
cdef int64_t i
if self.nbytes - self.loc < 8:
return 0
i = (<int64_t*> self.get_pointer())[0]
self.loc += 8
return i

cdef void write_many(self, char b, int32_t count):
cdef int32_t i
for i in range(count):
Expand Down
5 changes: 3 additions & 2 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False,
else:
values = np.zeros(nval, dtype=np.int8)
elif daph.encoding == parquet_thrift.Encoding.DELTA_BINARY_PACKED:
values = np.empty(daph.num_values - num_nulls, dtype=np.int32)
values = np.empty(daph.num_values - num_nulls,
dtype=np.int64 if metadata.type == 2 else np.int32)
o = encoding.NumpyIO(values.view('uint8'))
encoding.delta_binary_unpack(io_obj, o)
encoding.delta_binary_unpack(io_obj, o, longval=metadata.type == 2)
else:
raise NotImplementedError('Encoding %s' % daph.encoding)
return definition_levels, repetition_levels, values[:nval]
Expand Down
4 changes: 2 additions & 2 deletions fastparquet/test/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ def test_delta_from_def_2():
# one and only miniblock
cencoding.encode_unsigned_varint(zigzag(-2), o) # minimum delta (zigzag)
o.write_byte(2) # bit-width list (only one)
o.write_byte(0b00000011) # [0, 0, 0, 3]
o.write_byte(0b11111100) # [3, 3, 3, pad]
o.write_byte(0b11000000) # rev([0, 0, 0, 3])
o.write_byte(0b00111111) # rev([3, 3, 3, pad])

o.seek(0)

Expand Down

0 comments on commit fb545a5

Please sign in to comment.