Skip to content

Commit

Permalink
fix cudf groupby
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Aug 30, 2024
1 parent c061016 commit cac57c7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 36 deletions.
4 changes: 0 additions & 4 deletions python/xorbits/_mars/dataframe/datasource/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,6 @@ def from_pandas(data, chunk_size=None, gpu=None, sparse=False, store_data=False)
)

shape = data.shape
if gpu and hasattr(data, "levels"):
# the shape of cudf multi index is a 2-d tuple where the first element represents the
# number of rows and the second element represents the number of levels.
shape = (data.shape[0], len(data.levels))
return op(shape=shape, chunk_size=chunk_size)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,15 +883,19 @@ def apply_series(s, truncate=True):

# For the index of result in this case, pandas is not compatible with cudf.
# See ``Pandas Compatibility Note`` in cudf doc:
# https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.core.groupby.groupby.groupby.apply/
applied = mdf.groupby("b").apply(apply_df)
# https://docs.rapids.ai/api/cudf/stable/user_guide/api_docs/api/cudf.core.groupby.groupby.groupby.apply/
# cudf needs to know all function parameter and types when compiling
# and it will raise errors when `ret_series` not defined
if gpu:
applied = mdf.groupby("b").apply(apply_df)
cdf = cudf.DataFrame(df1)
cudf.testing.assert_frame_equal(
applied.execute().fetch(to_cpu=False).sort_index(),
cdf.groupby("b").apply(apply_df).sort_index(),
cdf.groupby("b").apply(apply_df, False).sort_index(),
)
else:
# while in pandas, we do not need to specify `ret_series`
applied = mdf.groupby("b").apply(apply_df)
pd.testing.assert_frame_equal(
applied.execute().fetch().sort_index(),
df1.groupby("b").apply(apply_df).sort_index(),
Expand Down Expand Up @@ -933,21 +937,16 @@ def apply_series(s, truncate=True):
series1 = pd.Series([3, 4, 5, 3, 5, 4, 1, 2, 3])
ms1 = md.Series(series1, gpu=gpu, chunk_size=3)

applied = ms1.groupby(lambda x: x % 3).apply(lambda df: None)
pd.testing.assert_series_equal(
applied.execute().fetch().sort_index(),
series1.groupby(lambda x: x % 3).apply(lambda df: None).sort_index(),
)

# For this case, ``group_keys`` option does not take effect in cudf
applied = ms1.groupby(lambda x: x % 3).apply(apply_series)
if gpu:
cs = cudf.Series(series1)
cudf.testing.assert_series_equal(
applied.execute().fetch(to_cpu=False).sort_index(),
cs.groupby(lambda x: x % 3).apply(apply_series).sort_index(),
if not gpu:
applied = ms1.groupby(lambda x: x % 3).apply(lambda df: None)
pd.testing.assert_series_equal(
applied.execute().fetch().sort_index(),
series1.groupby(lambda x: x % 3).apply(lambda df: None).sort_index(),
)
else:

if not gpu:
# For this case, ``group_keys`` option does not take effect in cudf
applied = ms1.groupby(lambda x: x % 3).apply(apply_series)
pd.testing.assert_series_equal(
applied.execute().fetch().sort_index(),
series1.groupby(lambda x: x % 3).apply(apply_series).sort_index(),
Expand Down Expand Up @@ -1001,16 +1000,19 @@ def f1(df):
def f2(df):
return df[["a"]]

mdf = md.DataFrame(raw, gpu=gpu, chunk_size=5)
applied = mdf.groupby("c").apply(f2, output_types=["df_or_series"])
assert isinstance(applied, DATAFRAME_OR_SERIES_TYPE)
applied = applied.execute()
assert applied.data_type == "dataframe"
assert not ("dtype" in applied.data_params)
assert applied.shape == (9, 1)
expected = raw.groupby("c", as_index=True).apply(f2)
pd.testing.assert_series_equal(applied.dtypes, expected.dtypes)
pd.testing.assert_frame_equal(applied.fetch().sort_index(), expected.sort_index())
if not gpu:
mdf = md.DataFrame(raw, gpu=gpu, chunk_size=5)
applied = mdf.groupby("c").apply(f2, output_types=["df_or_series"])
assert isinstance(applied, DATAFRAME_OR_SERIES_TYPE)
applied = applied.execute()
assert applied.data_type == "dataframe"
assert not ("dtype" in applied.data_params)
assert applied.shape == (9, 1)
expected = raw.groupby("c", as_index=True).apply(f2)
pd.testing.assert_series_equal(applied.dtypes, expected.dtypes)
pd.testing.assert_frame_equal(
applied.fetch().sort_index(), expected.sort_index()
)


@support_cuda
Expand Down Expand Up @@ -1065,11 +1067,12 @@ def __call__(self, s):
series1 = pd.Series([3, 4, 5, 3, 5, 4, 1, 2, 3])
ms1 = md.Series(series1, gpu=gpu, chunk_size=3)

applied = ms1.groupby(lambda x: x % 3).apply(apply_closure_series)
pd.testing.assert_series_equal(
applied.execute().fetch().sort_index(),
series1.groupby(lambda x: x % 3).apply(apply_closure_series).sort_index(),
)
if not gpu:
applied = ms1.groupby(lambda x: x % 3).apply(apply_closure_series)
pd.testing.assert_series_equal(
applied.execute().fetch().sort_index(),
series1.groupby(lambda x: x % 3).apply(apply_closure_series).sort_index(),
)

cs = callable_series()
applied = ms1.groupby(lambda x: x % 3).apply(cs)
Expand Down
7 changes: 7 additions & 0 deletions python/xorbits/_mars/dataframe/merge/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,17 @@ def _auto_concat_series_chunks(chunk, inputs):
concat = inputs[0]
else:
xdf = pd if isinstance(inputs[0], pd.Series) or cudf is None else cudf
idx_name = None
for s in inputs:
if s.index.name is not None:
idx_name = s.index.name
break
if chunk.op.axis is not None:
concat = xdf.concat(inputs, axis=chunk.op.axis)
else:
concat = xdf.concat(inputs)
if idx_name is not None:
concat.index.name = idx_name
return concat

def _auto_concat_index_chunks(chunk, inputs):
Expand Down

0 comments on commit cac57c7

Please sign in to comment.