Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Re-opened elsewhere] Handle nullable types and empty partitions before Dask-ML predict #783

Closed

Conversation

sarahyurick
Copy link
Collaborator

Tagging @randerzander and @VibhuJawa

Changes in create_model.py handle nullable types, such as for this example:

df = pd.DataFrame({
    "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype='Int32'),
    "prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype='float32'),
    "rained": pd.Series([False, False, False, True])
})
c.create_table("train_set", df)

model_class = ".linear_model.LogisticRegression'"
if GPU:
    model_class = "'cuml" + model_class
else:
    model_class = "'sklearn" + model_class

c.sql(f"""
CREATE OR REPLACE MODEL model WITH (
    model_class = {model_class},
     wrap_predict = True,
     wrap_fit = False,
    target_column = 'rained'
) AS (
    SELECT *
    FROM train_set
)
""")

c.sql("""
SELECT * FROM PREDICT(
  MODEL model,
  SELECT * FROM train_set
)
""").compute()

Changes in predict.py handle empty partitions, modeled based on this Dask-ML PR: dask/dask-ml#912

@@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
output_meta = np.array([])
Copy link
Collaborator Author

@sarahyurick sarahyurick Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this, output_meta is always []. Should this maybe be in some sort of try/except block since we're only handling the CPU case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we can just hardcode the meta to be output_meta to be np.array([]) . We also use cuML for this case and that outputs a cuDF Series.

prediction = model.predict(df[training_columns])
part = df[training_columns]
output_meta = model.predict_meta
if part.shape[0].compute() == 0 and output_meta is not None:
Copy link
Collaborator Author

@sarahyurick sarahyurick Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute() is needed on the Delayed object to get the number of rows in the partition. I believe that right now, output_meta will always be []?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont need to compute for this to do this, we can do it lazily too.

@@ -59,7 +60,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model, training_columns = context.schema[schema_name].models[model_name]
df = context.sql(sql_select)
prediction = model.predict(df[training_columns])
part = df[training_columns]
output_meta = model.predict_meta
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AttributeError: 'KMeans' object has no attribute 'predict_meta'

Copy link
Collaborator

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not hard code any meta values and should only handle cases when model is ParallelPostFit .

@@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
output_meta = np.array([])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we can just hardcode the meta to be output_meta to be np.array([]) . We also use cuML for this case and that outputs a cuDF Series.

prediction = model.predict(df[training_columns])
part = df[training_columns]
output_meta = model.predict_meta
if part.shape[0].compute() == 0 and output_meta is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont need to compute for this to do this, we can do it lazily too.

empty_output = self.handle_empty_partitions(output_meta)
if empty_output is not None:
return empty_output
prediction = model.predict(part)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wrap the predict like the following for cases only for when we have a ParallelPostFit model.

  if isinstance(model, ParallelPostFit):
      output_meta = model.predict_meta
      if predict_meta is None:
         predict_meta = model.estimator.predict(part._meta_nonempty)
         
      prediction  = part.map_partitions(_predict, predict_meta, model.estimator, meta=predict_meta)

def _pedict(part, predict_meta, estimator):
    if part.shape[0] == 0 and predict_meta is not None:
        empty_output = handle_empty_partitions(output_meta)
        return empty_output
    return estimator.predict(part)

Copy link
Collaborator

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more tests

@charlesbluca charlesbluca deleted the branch dask-contrib:datafusion-sql-planner September 21, 2022 20:57
@sarahyurick sarahyurick changed the title Handle nullable types and empty partitions before Dask-ML predict [Re-opened elsewhere] Handle nullable types and empty partitions before Dask-ML predict Sep 22, 2022
@sarahyurick sarahyurick deleted the predict_bug branch May 26, 2023 22:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants