Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sliced string cast for hash column generator #249

Merged
merged 4 commits into from
Dec 1, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion deltacat/compute/compactor_v2/utils/primary_key_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,40 @@ def group_by_pk_hash_bucket(
return result


def _sliced_string_cast(array: pa.ChunkedArray) -> pa.ChunkedArray:
rkenmi marked this conversation as resolved.
Show resolved Hide resolved
"""performs slicing of a pyarrow array prior casting to a string.
This prevents a pyarrow from allocating too large of an array causing a failure.
rkenmi marked this conversation as resolved.
Show resolved Hide resolved
"""
dtype = array.type
MAX_BYTES = 2147483646
max_str_len = None
if pa.types.is_integer(dtype):
rkenmi marked this conversation as resolved.
Show resolved Hide resolved
max_str_len = 21 # -INT_MAX
elif pa.types.is_floating(dtype):
max_str_len = 24
elif pa.types.is_decimal128(dtype):
max_str_len = 39
elif pa.types.is_decimal256(dtype):
max_str_len = 77
rkenmi marked this conversation as resolved.
Show resolved Hide resolved

if max_str_len is not None:
max_elems_per_chunk = MAX_BYTES // (2 * max_str_len) # safety factor of 2
all_chunks = []
for chunk in array.chunks:
if len(chunk) < max_elems_per_chunk:
all_chunks.append(chunk)
else:
curr_pos = 0
total_len = len(chunk)
while curr_pos < total_len:
sliced = chunk.slice(curr_pos, max_elems_per_chunk)
curr_pos += len(sliced)
all_chunks.append(sliced)
array = pa.chunked_array(all_chunks, type=dtype)

return pc.cast(array, pa.string())


def generate_pk_hash_column(
tables: List[pa.Table],
primary_keys: Optional[List[str]] = None,
Expand All @@ -182,7 +216,7 @@ def generate_pk_hash_column(
def _generate_pk_hash(table: pa.Table) -> pa.Array:
pk_columns = []
for pk_name in primary_keys:
pk_columns.append(pc.cast(table[pk_name], pa.string()))
pk_columns.append(_sliced_string_cast(table[pk_name]))

pk_columns.append(PK_DELIMITER)
hash_column = pc.binary_join_element_wise(*pk_columns)
Expand Down
Loading