Skip to content

Commit

Permalink
Prototype polars local engine
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron committed Sep 20, 2024
1 parent cc48f58 commit ee86291
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 0 deletions.
8 changes: 8 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import bigframes.core.guid as guid
import bigframes.core.identifiers
import bigframes.core.join_def as join_defs
import bigframes.core.local
import bigframes.core.ordering as ordering
import bigframes.core.schema as bf_schema
import bigframes.core.sql as sql
Expand Down Expand Up @@ -603,6 +604,13 @@ def _copy_index_to_pandas(self, df: pd.DataFrame):
df.index.names = self.index.names # type: ignore
df.columns = self.column_labels

def _execute_local_engine(self) -> pd.DataFrame:
executor = bigframes.core.local.PolarsLocalExecutor()
df = executor.execute_local(self.expr)
self._copy_index_to_pandas(df)
df.set_axis(self.column_labels, axis=1, copy=False)
return df

def _materialize_local(
self, materialize_options: MaterializationOptions = MaterializationOptions()
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
Expand Down
149 changes: 149 additions & 0 deletions bigframes/core/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import dataclasses
import functools

import pandas as pd
import polars as pl

import bigframes.core
import bigframes.core.expression as ex
import bigframes.core.nodes as nodes
import bigframes.operations as ops

SUPPORTED_NODES = (
nodes.ReadLocalNode,
nodes.SelectionNode,
nodes.ProjectionNode,
nodes.OrderByNode,
nodes.ReversedNode,
nodes.ReprojectOpNode,
nodes.FilterNode,
nodes.RowCountNode,
)


@dataclasses.dataclass(frozen=True)
class PolarsExpressionCompiler:
@functools.singledispatchmethod
def compile_expression(self, expression: ex.Expression) -> pl.Expr:
...

@compile_expression.register
def _(
self,
expression: ex.ScalarConstantExpression,
) -> pl.Expr:
return pl.lit(expression.value)

@compile_expression.register
def _(
self,
expression: ex.UnboundVariableExpression,
) -> pl.Expr:
return pl.col(expression.id)

@compile_expression.register
def _(
self,
expression: ex.OpExpression,
) -> pl.Expr:
# TODO: Complete the implementation
op = expression.op
args = tuple(map(self.compile_expression, expression.inputs))
if isinstance(op, ops.invert_op.__class__):
return args[0].neg()
if isinstance(op, ops.add_op.__class__):
return args[0] + args[1]
raise NotImplementedError("Polars compiler hasn't implemented this operation")


@dataclasses.dataclass(frozen=True)
class PolarsLocalExecutor:
"""
A simple local executor for a subset of node types.
"""

expr_compiler = PolarsExpressionCompiler()

# TODO: Support more node types
# TODO: Use lazy frame instead?
def can_execute(self, node: nodes.BigFrameNode) -> bool:
if not isinstance(node, SUPPORTED_NODES):
return False
return all(map(self.can_execute, node.child_nodes))

def execute_local(self, array_value: bigframes.core.ArrayValue) -> pd.DataFrame:
return self.execute_node(array_value.node).collect().to_pandas()

def execute_node(self, node: nodes.BigFrameNode) -> pl.LazyFrame:
"""Compile node into CompileArrayValue. Caches result."""
return self._execute_node(node)

@functools.singledispatchmethod
def _execute_node(self, node: nodes.BigFrameNode) -> pl.DataFrame:
"""Defines transformation but isn't cached, always use compile_node instead"""
raise ValueError(f"Can't compile unrecognized node: {node}")

@_execute_node.register
def compile_readlocal(self, node: nodes.ReadLocalNode):
return pl.read_ipc(node.feather_bytes).lazy()

@_execute_node.register
def compile_filter(self, node: nodes.FilterNode):
return self.execute_node(node.child).filter(
self.expr_compiler.compile_expression(node.predicate)
)

@_execute_node.register
def compile_orderby(self, node: nodes.OrderByNode):
frame = self.execute_node(node.child)
for by in node.by:
frame = frame.sort(
self.expr_compiler.compile_expression(node.by),
descending=not by.direction.is_ascending,
nulls_last=by.na_last,
maintain_order=True,
)
return frame

@_execute_node.register
def compile_reversed(self, node: nodes.ReversedNode):
return self.execute_node(node.child).reverse()

@_execute_node.register
def compile_selection(self, node: nodes.SelectionNode):
return self.execute_node(node.child).select(
**{new: orig for orig, new in node.input_output_pairs}
)

@_execute_node.register
def compile_projection(self, node: nodes.ProjectionNode):
new_cols = [
self.expr_compiler.compile_expression(ex).alias(name)
for ex, name in node.assignments
]
return self.execute_node(node.child).with_columns(new_cols)

@_execute_node.register
def compile_rowcount(self, node: nodes.RowCountNode):
rows = self.execute_node(node.child).count()[0]
return pl.DataFrame({"count": [rows]})

@_execute_node.register
def compile_reproject(self, node: nodes.ReprojectOpNode):
# NOOP
return self.execute_node(node.child)
4 changes: 4 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import bigframes.core.guid
import bigframes.core.indexers as indexers
import bigframes.core.indexes as indexes
import bigframes.core.local
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
Expand Down Expand Up @@ -1222,6 +1223,7 @@ def to_pandas(
random_state: Optional[int] = None,
*,
ordered: bool = True,
local_engine: bool = False,
) -> pandas.DataFrame:
"""Write DataFrame to pandas DataFrame.
Expand Down Expand Up @@ -1251,6 +1253,8 @@ def to_pandas(
downsampled rows and all columns of this DataFrame.
"""
# TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job
if local_engine is True:
return self._block._execute_local_engine()
df, query_job = self._block.to_pandas(
max_download_size=max_download_size,
sampling_method=sampling_method,
Expand Down
6 changes: 6 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def to_pandas(
random_state: Optional[int] = None,
*,
ordered: bool = True,
local_engine: bool = False,
) -> pandas.Series:
"""Writes Series to pandas Series.
Expand Down Expand Up @@ -363,6 +364,11 @@ def to_pandas(
pandas.Series: A pandas Series with all rows of this Series if the data_sampling_threshold_mb
is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame.
"""
if local_engine is True:
df = self._block._execute_local_engine()
series = df.squeeze(axis=1)
series.name = self._name
return series
df, query_job = self._block.to_pandas(
max_download_size=max_download_size,
sampling_method=sampling_method,
Expand Down
27 changes: 27 additions & 0 deletions tests/system/small/test_local_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas as pd
import pandas.testing

import bigframes.pandas as bpd


def test_polars_local_engine():
pd_df = pd.DataFrame({"colA": [1, 2, 3], "colB": [10, 20, 30]})
bf_df = bpd.DataFrame(pd_df)

bf_result = (bf_df["colA"] + bf_df["colB"]).to_pandas(local_engine=True)
pd_result = pd_df.colA + pd_df.colB
pandas.testing.assert_series_equal(bf_result, pd_result)

0 comments on commit ee86291

Please sign in to comment.