Skip to content

Commit

Permalink
Merge branch 'branch-23.12' into unskip-dask-cudf-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar authored Oct 25, 2023
2 parents 04a4bcf + 91aeec8 commit bb95df7
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 395 deletions.
1 change: 0 additions & 1 deletion conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ dependencies:
- ptxcompiler
- pyarrow==12.0.1.*
- pydata-sphinx-theme
- pyorc
- pytest
- pytest-benchmark
- pytest-cases
Expand Down
1 change: 0 additions & 1 deletion conda/environments/all_cuda-120_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ dependencies:
- protobuf>=4.21,<5
- pyarrow==12.0.1.*
- pydata-sphinx-theme
- pyorc
- pytest
- pytest-benchmark
- pytest-cases
Expand Down
14 changes: 5 additions & 9 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,20 +1299,16 @@ TEST_F(OrcStatisticsTest, Overflow)

TEST_F(OrcStatisticsTest, HasNull)
{
// This test can now be implemented with libcudf; keeping the pyorc version to keep the test
// This test can now be implemented with libcudf; keeping the pandas version to keep the test
// inputs diversified
// Method to create file:
// >>> import pyorc
// >>> output = open("./temp.orc", "wb")
// >>> writer = pyorc.Writer(output, pyorc.Struct(a=pyorc.BigInt(), b=pyorc.BigInt()))
// >>> writer.write((1, 3))
// >>> writer.write((2, 4))
// >>> writer.write((None, 5))
// >>> writer.close()
// >>> import pandas as pd
// >>> df = pd.DataFrame({'a':pd.Series([1, 2, None], dtype="Int64"), 'b':[3, 4, 5]})
// >>> df.to_orc("temp.orc")
//
// Contents of file:
// >>> import pyarrow.orc as po
// >>> po.ORCFile('new.orc').read()
// >>> po.ORCFile('temp.orc').read()
// pyarrow.Table
// a: int64
// b: int64
Expand Down
1 change: 0 additions & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ dependencies:
- fastavro>=0.22.9
- hypothesis
- mimesis>=4.1.0
- pyorc
- pytest-benchmark
- pytest-cases
- python-snappy>=0.6.0
Expand Down
1 change: 1 addition & 0 deletions docs/cudf/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"twitter_url": "https://twitter.com/rapidsai",
"show_toc_level": 1,
"navbar_align": "right",
"navigation_with_keys": True,
}
include_pandas_compat = True

Expand Down
1 change: 1 addition & 0 deletions docs/dask_cudf/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"twitter_url": "https://twitter.com/rapidsai",
"show_toc_level": 1,
"navbar_align": "right",
"navigation_with_keys": True,
}
include_pandas_compat = True

Expand Down
18 changes: 6 additions & 12 deletions python/cudf/cudf/_fuzz_testing/orc.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

import copy
import io
import logging
import random

import numpy as np
import pyorc
import pyarrow as pa

import cudf
from cudf._fuzz_testing.io import IOFuzz
from cudf._fuzz_testing.utils import (
ALL_POSSIBLE_VALUES,
_generate_rand_meta,
pandas_to_orc,
pyarrow_to_pandas,
)
from cudf.testing import dataset_generator as dg
Expand Down Expand Up @@ -82,12 +81,7 @@ def generate_input(self):
logging.info(f"Shape of DataFrame generated: {table.shape}")
self._df = df
file_obj = io.BytesIO()
pandas_to_orc(
df,
file_io_obj=file_obj,
stripe_size=self._rand(len(df)),
arrow_table_schema=table.schema,
)
pa.orc.write_table(table, file_obj, stripe_size=self._rand(len(df)))
file_obj.seek(0)
buf = file_obj.read()
self._current_buffer = copy.copy(buf)
Expand All @@ -109,8 +103,8 @@ def set_rand_params(self, params):
)
elif param == "stripes":
f = io.BytesIO(self._current_buffer)
reader = pyorc.Reader(f)
stripes = [i for i in range(reader.num_of_stripes)]
orcFile = pa.orc.ORCFile(f)
stripes = list(range(orcFile.nstripes))
params_dict[param] = np.random.choice(
[
None,
Expand All @@ -119,7 +113,7 @@ def set_rand_params(self, params):
int,
np.unique(
np.random.choice(
stripes, reader.num_of_stripes
stripes, orcFile.nstripes
)
),
)
Expand Down
160 changes: 5 additions & 155 deletions python/cudf/cudf/_fuzz_testing/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

import random
from collections import OrderedDict

import fastavro
import numpy as np
import pandas as pd
import pyarrow as pa
import pyorc

import cudf
from cudf.testing._utils import assert_eq
Expand Down Expand Up @@ -41,40 +39,6 @@
cudf.dtype("<M8[us]"): {"type": "long", "logicalType": "timestamp-micros"},
}

PANDAS_TO_ORC_TYPES = {
cudf.dtype("int8"): pyorc.TinyInt(),
pd.Int8Dtype(): pyorc.TinyInt(),
pd.Int16Dtype(): pyorc.SmallInt(),
pd.Int32Dtype(): pyorc.Int(),
pd.Int64Dtype(): pyorc.BigInt(),
pd.Float32Dtype(): pyorc.Float(),
pd.Float64Dtype(): pyorc.Double(),
pd.BooleanDtype(): pyorc.Boolean(),
cudf.dtype("bool_"): pyorc.Boolean(),
cudf.dtype("int16"): pyorc.SmallInt(),
cudf.dtype("int32"): pyorc.Int(),
cudf.dtype("int64"): pyorc.BigInt(),
cudf.dtype("O"): pyorc.String(),
pd.StringDtype(): pyorc.String(),
cudf.dtype("float32"): pyorc.Float(),
cudf.dtype("float64"): pyorc.Double(),
cudf.dtype("<M8[ns]"): pyorc.Timestamp(),
cudf.dtype("<M8[ms]"): pyorc.Timestamp(),
cudf.dtype("<M8[us]"): pyorc.Timestamp(),
}

ORC_TO_PANDAS_TYPES = {
pyorc.TinyInt().name: pd.Int8Dtype(),
pyorc.Int().name: pd.Int32Dtype(),
pyorc.Boolean().name: pd.BooleanDtype(),
pyorc.SmallInt().name: pd.Int16Dtype(),
pyorc.BigInt().name: pd.Int64Dtype(),
pyorc.String().name: pd.StringDtype(),
pyorc.Float().name: pd.Float32Dtype(),
pyorc.Double().name: pd.Float64Dtype(),
pyorc.Timestamp().name: cudf.dtype("<M8[ns]"),
}


def _generate_rand_meta(obj, dtypes_list, null_frequency_override=None):
obj._current_params = {}
Expand Down Expand Up @@ -213,24 +177,6 @@ def get_avro_dtype_info(dtype):
)


def get_orc_dtype_info(dtype):
if dtype in PANDAS_TO_ORC_TYPES:
return PANDAS_TO_ORC_TYPES[dtype]
else:
raise TypeError(
f"Unsupported dtype({dtype}) according to orc spec:"
f" https://orc.apache.org/specification/"
)


def get_arrow_dtype_info_for_pyorc(dtype):
if isinstance(dtype, pa.StructType):
return get_orc_schema(df=None, arrow_table_schema=dtype)
else:
pd_dtype = cudf.dtype(dtype.to_pandas_dtype())
return get_orc_dtype_info(pd_dtype)


def get_avro_schema(df):
fields = [
{"name": col_name, "type": get_avro_dtype_info(col_dtype)}
Expand All @@ -240,22 +186,6 @@ def get_avro_schema(df):
return schema


def get_orc_schema(df, arrow_table_schema=None):
if arrow_table_schema is None:
ordered_dict = OrderedDict(
(col_name, get_orc_dtype_info(col_dtype))
for col_name, col_dtype in df.dtypes.items()
)
else:
ordered_dict = OrderedDict(
(field.name, get_arrow_dtype_info_for_pyorc(field.type))
for field in arrow_table_schema
)

schema = pyorc.Struct(**ordered_dict)
return schema


def convert_nulls_to_none(records, df):
columns_with_nulls = {col for col in df.columns if df[col].isnull().any()}
scalar_columns_convert = [
Expand Down Expand Up @@ -296,99 +226,19 @@ def pandas_to_avro(df, file_name=None, file_io_obj=None):
fastavro.writer(file_io_obj, avro_schema, records)


def _preprocess_to_orc_tuple(df, arrow_table_schema):
def _null_to_None(value):
if value is pd.NA or value is pd.NaT:
return None
else:
return value

def sanitize(value, struct_type):
if value is None:
return None

values_list = []
for name, sub_type in struct_type.fields.items():
if isinstance(sub_type, cudf.StructDtype):
values_list.append(sanitize(value[name], sub_type))
else:
values_list.append(value[name])
return tuple(values_list)

has_nulls_or_nullable_dtype = any(
(col := df[colname]).dtype in pandas_dtypes_to_np_dtypes
or col.isnull().any()
for colname in df.columns
)
pdf = df.copy(deep=True)
for field in arrow_table_schema:
if isinstance(field.type, pa.StructType):
pdf[field.name] = pdf[field.name].apply(
sanitize, args=(cudf.StructDtype.from_arrow(field.type),)
)
else:
pdf[field.name] = pdf[field.name]

tuple_list = [
tuple(map(_null_to_None, tup)) if has_nulls_or_nullable_dtype else tup
for tup in pdf.itertuples(index=False, name=None)
]

return tuple_list, pdf, df


def pandas_to_orc(
df,
file_name=None,
file_io_obj=None,
stripe_size=67108864,
arrow_table_schema=None,
):
schema = get_orc_schema(df, arrow_table_schema=arrow_table_schema)

tuple_list, pdf, df = _preprocess_to_orc_tuple(
df, arrow_table_schema=arrow_table_schema
)

if file_name is not None:
with open(file_name, "wb") as data:
with pyorc.Writer(data, schema, stripe_size=stripe_size) as writer:
writer.writerows(tuple_list)
elif file_io_obj is not None:
with pyorc.Writer(
file_io_obj, schema, stripe_size=stripe_size
) as writer:
writer.writerows(tuple_list)


def orc_to_pandas(file_name=None, file_io_obj=None, stripes=None):
if file_name is not None:
f = open(file_name, "rb")
elif file_io_obj is not None:
f = file_io_obj

reader = pyorc.Reader(f)

dtypes = {
col: ORC_TO_PANDAS_TYPES[pyorc_type.name]
for col, pyorc_type in reader.schema.fields.items()
}

if stripes is None:
df = pd.DataFrame.from_records(
reader, columns=reader.schema.fields.keys()
)
df = pd.read_orc(f)
else:
records = [
record for i in stripes for record in list(reader.read_stripe(i))
]
df = pd.DataFrame.from_records(
records, columns=reader.schema.fields.keys()
)

# Need to type-cast to extracted `dtypes` from pyorc schema because
# a fully empty/ full <NA> can result in incorrect dtype by pandas.
df = df.astype(dtypes)
orc_file = pa.orc.ORCFile(f)
records = [orc_file.read_stripe(i) for i in stripes]
pa_table = pa.Table.from_batches(records)
df = pa_table.to_pandas()

return df

Expand Down
Loading

0 comments on commit bb95df7

Please sign in to comment.