Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support corr in GroupBy.apply through the jit engine #13767

Merged
merged 13 commits into from
Aug 2, 2023
58 changes: 57 additions & 1 deletion python/cudf/cudf/core/udf/groupby_lowering.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from functools import partial

from numba import types
from numba import cuda, types
from numba.core import cgutils
from numba.core.extending import lower_builtin
from numba.core.typing import signature as nb_signature
Expand Down Expand Up @@ -55,6 +55,62 @@ def group_reduction_impl_basic(context, builder, sig, args, function):
)


block_corr = cuda.declare_device(
"BlockCorr",
types.float64(
types.CPointer(types.int64),
types.CPointer(types.int64),
group_size_type,
),
)


def _block_corr(lhs_ptr, rhs_ptr, size):
return block_corr(lhs_ptr, rhs_ptr, size)


@cuda_lower(
"GroupType.corr",
GroupType(types.int64, types.int64),
GroupType(types.int64, types.int64),
)
def group_corr(context, builder, sig, args):
"""
Instruction boilerplate used for calling a groupby correlation
"""
lhs_grp = cgutils.create_struct_proxy(sig.args[0])(
context, builder, value=args[0]
)
rhs_grp = cgutils.create_struct_proxy(sig.args[1])(
context, builder, value=args[1]
)

# logically take the address of the group's data pointer
lhs_group_data_ptr = builder.alloca(lhs_grp.group_data.type)
builder.store(lhs_grp.group_data, lhs_group_data_ptr)

# logically take the address of the group's data pointer
rhs_group_data_ptr = builder.alloca(rhs_grp.group_data.type)
builder.store(rhs_grp.group_data, rhs_group_data_ptr)

result = context.compile_internal(
builder,
_block_corr,
nb_signature(
types.float64,
types.CPointer(types.int64),
types.CPointer(types.int64),
group_size_type,
),
(
builder.load(lhs_group_data_ptr),
builder.load(rhs_group_data_ptr),
lhs_grp.size,
),
)
return result


@lower_builtin(Group, types.Array, group_size_type, types.Array)
def group_constructor(context, builder, sig, args):
"""
Expand Down
12 changes: 12 additions & 0 deletions python/cudf/cudf/core/udf/groupby_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ def generic(self, args, kws):
return nb_signature(self.this.index_type, recvr=self.this)


class GroupCorr(AbstractTemplate):
key = "GroupType.corr"

def generic(self, args, kws):
return nb_signature(types.float64, args[0], recvr=self.this)


@cuda_registry.register_attr
class GroupAttr(AttributeTemplate):
key = GroupType
Expand Down Expand Up @@ -197,6 +204,11 @@ def resolve_idxmin(self, mod):
GroupIdxMin, GroupType(mod.group_scalar_type, mod.index_type)
)

def resolve_corr(self, mod):
return types.BoundFunction(
GroupCorr, GroupType(mod.group_scalar_type, mod.index_type)
)


for ty in SUPPORTED_GROUPBY_NUMBA_TYPES:
_register_cuda_reduction_caller("Max", ty, ty)
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/core/udf/groupby_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def _get_groupby_apply_kernel(frame, func, args):
"types": types,
}
kernel_string = _groupby_apply_kernel_string_from_template(frame, args)

kernel = _get_kernel(kernel_string, global_exec_context, None, func)

return kernel, return_type
Expand Down
24 changes: 24 additions & 0 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -3302,3 +3302,27 @@ def test_group_by_pandas_sort_order(groups, sort):
pdf.groupby(groups, sort=sort).sum(),
df.groupby(groups, sort=sort).sum(),
)


def test_corr_jit():
def func(group):
return group["b"].corr(group["c"])

size = int(1000000)
gdf = cudf.DataFrame(
{
"a": np.random.randint(0, 10000, size),
"b": np.random.randint(0, 1000, size),
"c": np.random.randint(0, 1000, size),
}
)
gdf = gdf.sort_values("a")
pdf = gdf.to_pandas()

gdf_grouped = gdf.groupby("a")
pdf_grouped = pdf.groupby("a", as_index=False)

expect = pdf_grouped.apply(func)
got = gdf_grouped.apply(func, engine="jit")

assert_eq(expect, got)
44 changes: 44 additions & 0 deletions python/cudf/udf_cpp/shim.cu
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,47 @@ make_definition_idx(BlockIdxMax, int64, int64_t);
make_definition_idx(BlockIdxMax, float64, double);
#undef make_definition_idx
}

extern "C" __device__ int BlockCorr(double* numba_return_value,
int64_t* const lhs_ptr,
int64_t* rhs_ptr,
int64_t size)
{
double lhs_mean = BlockMean(lhs_ptr, size);
double rhs_mean = BlockMean(rhs_ptr, size);

// cuda::atomic<double, cuda::thread_scope_block> numerator = 0;
// cuda::atomic<double, cuda::thread_scope_block> sum_sq_l = 0;
// cuda::atomic<double, cuda::thread_scope_block> sum_sq_r = 0;

__shared__ double numerators[1024];
__shared__ double sum_sq_ls[1024];
__shared__ double sum_sq_rs[1024];

numerators[threadIdx.x] = 0.0;
sum_sq_ls[threadIdx.x] = 0.0;
sum_sq_rs[threadIdx.x] = 0.0;

auto block = cooperative_groups::this_thread_block();

for (int64_t idx = block.thread_rank(); idx < size; idx += block.size()) {
double delta_l = lhs_ptr[idx] - lhs_mean;
double delta_r = rhs_ptr[idx] - rhs_mean;

numerators[idx] = delta_l * delta_r;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reimplement BlockVar to be a special case of a new function for computing Covariance (where the variance is the covariance of the data with itself). Then I think you can use the covariance function to compute the numerators and the variance function for the terms in the denominator.

sum_sq_ls[idx] = (delta_l * delta_l);
sum_sq_rs[idx] = (delta_r * delta_r);
}
__syncthreads();

double numerator = BlockSum(numerators, block.size());
double denominator =
sqrt(BlockSum(sum_sq_ls, block.size())) * sqrt(BlockSum(sum_sq_rs, block.size()));

block.sync();

if (denominator == 0.0) { *numba_return_value = 0.0; }
*numba_return_value = numerator / denominator;
__syncthreads();
return 0;
}
Loading