Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Re-opened elsewhere] Handle nullable types and empty partitions before Dask-ML predict #783

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import TYPE_CHECKING

import numpy as np
from dask import delayed

from dask_sql.datacontainer import DataContainer
Expand Down Expand Up @@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
output_meta = np.array([])
Copy link
Collaborator Author

@sarahyurick sarahyurick Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this, output_meta is always []. Should this maybe be in some sort of try/except block since we're only handling the CPU case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we can just hardcode the meta to be output_meta to be np.array([]) . We also use cuML for this case and that outputs a cuDF Series.

model = ParallelPostFit(
estimator=model,
predict_meta=output_meta,
predict_proba_meta=output_meta,
transform_meta=output_meta,
)

else:
model.fit(X, y, **fit_kwargs)
Expand Down
42 changes: 41 additions & 1 deletion dask_sql/physical/rel/custom/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import uuid
from typing import TYPE_CHECKING

import numpy as np

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin

Expand Down Expand Up @@ -59,7 +61,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model, training_columns = context.schema[schema_name].models[model_name]
df = context.sql(sql_select)
prediction = model.predict(df[training_columns])
part = df[training_columns]
try:
output_meta = model.predict_meta
except AttributeError:
output_meta = None
if part.shape[0].compute() == 0 and output_meta is not None:
Copy link
Collaborator Author

@sarahyurick sarahyurick Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute() is needed on the Delayed object to get the number of rows in the partition. I believe that right now, output_meta will always be []?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont need to compute for this to do this, we can do it lazily too.

empty_output = self.handle_empty_partitions(output_meta)
if empty_output is not None:
return empty_output
prediction = model.predict(part)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wrap the predict like the following for cases only for when we have a ParallelPostFit model.

  if isinstance(model, ParallelPostFit):
      output_meta = model.predict_meta
      if predict_meta is None:
         predict_meta = model.estimator.predict(part._meta_nonempty)
         
      prediction  = part.map_partitions(_predict, predict_meta, model.estimator, meta=predict_meta)

def _pedict(part, predict_meta, estimator):
    if part.shape[0] == 0 and predict_meta is not None:
        empty_output = handle_empty_partitions(output_meta)
        return empty_output
    return estimator.predict(part)

predicted_df = df.assign(target=prediction)

# Create a temporary context, which includes the
Expand All @@ -79,3 +90,32 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
dc = DataContainer(predicted_df, cc)

return dc

def handle_empty_partitions(self, output_meta):
if hasattr(output_meta, "__array_function__"):
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0
ar = np.zeros(
shape=shape,
dtype=output_meta.dtype,
like=output_meta,
)
return ar
elif "scipy.sparse" in type(output_meta).__module__:
# sparse matrices don't support
# `like` due to non implimented __array_function__
# Refer https://github.com/scipy/scipy/issues/10362
# Note below works for both cupy and scipy sparse matrices
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0

ar = type(output_meta)(shape, dtype=output_meta.dtype)
return ar
elif hasattr(output_meta, "iloc"):
return output_meta.iloc[:0, :]