Skip to content

Commit

Permalink
Merge branch 'main' into graph_vis
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Sep 8, 2023
2 parents f89460e + d6aeeb5 commit 090614e
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,27 @@ def test_read_parquet_arrow(setup, engine):
)


@pytest.mark.skipif(
len(parquet_engines) == 1, reason="pyarrow and fastparquet are not installed"
)
@pytest.mark.parametrize("engine", parquet_engines)
def test_read_parquet_with_getting_index(setup, engine):
test_df = pd.DataFrame(
{
"a": np.arange(10).astype(np.int64, copy=False),
"b": [f"s{i}" for i in range(10)],
"c": np.random.rand(10),
}
)
with tempfile.TemporaryDirectory() as tempdir:
file = f"{tempdir}/test.pq"
test_df.to_parquet(file)
mdf = md.read_parquet(file, engine=engine)
res = mdf["a"].mean().execute().fetch()
assert res == test_df["a"].mean()
pd.testing.assert_index_equal(mdf.keys().execute().fetch(), test_df.keys())


@pytest.mark.skipif(
len(parquet_engines) == 1, reason="pyarrow and fastparquet are not installed"
)
Expand Down
2 changes: 1 addition & 1 deletion python/xorbits/_mars/dataframe/indexing/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _call_dataframe_dataframe(self, lhs: TileableType, rhs: TileableType):
if self.axis is None or self.axis == 1:
l_empty = build_empty_df(lhs.dtypes)
r_empty = build_empty_df(rhs.dtypes)
aligned, _ = l_empty.align(r_empty, axis=1)
aligned, _ = l_empty.align(r_empty, join=self.join, axis=self.axis)
l_dtypes = r_dtypes = aligned.dtypes
l_col_val = r_col_val = parse_index(aligned.columns, store_data=True)
l_shape[1] = r_shape[1] = len(l_dtypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,10 +1037,7 @@ def _wrap_execute_data_source_usecols(usecols, op_cls):
def _execute_data_source(ctx, op): # pragma: no cover
op_cls.execute(ctx, op)
result = ctx[op.outputs[0].key]
if not isinstance(usecols, list):
if not isinstance(result, pd.Series):
raise RuntimeError(f"Out data should be a Series, got {type(result)}")
elif len(result.columns) > len(usecols):
if isinstance(usecols, list) and len(result.columns) > len(usecols):
params = dict(
(k, getattr(op, k, None))
for k in op._keys_
Expand All @@ -1058,10 +1055,8 @@ def _wrap_execute_data_source_mixed(limit, usecols, op_cls):
def _execute_data_source(ctx, op): # pragma: no cover
op_cls.execute(ctx, op)
result = ctx[op.outputs[0].key]
if not isinstance(usecols, list):
if not isinstance(result, pd.Series):
raise RuntimeError("Out data should be a Series")
elif len(result.columns) > len(usecols):

if isinstance(usecols, list) and len(result.columns) > len(usecols):
raise RuntimeError("have data more than expected")
if len(result) > limit:
raise RuntimeError("have data more than expected")
Expand Down Expand Up @@ -1127,16 +1122,6 @@ def test_optimization(setup):
result.reset_index(drop=True, inplace=True)
pd.testing.assert_series_equal(result, expected)

r = df["d"].head(3)
operand_executors = {
DataFrameReadCSV: _wrap_execute_data_source_mixed(3, "d", DataFrameReadCSV)
}
result = r.execute(
extra_config={"operand_executors": operand_executors}
).fetch()
expected = pd_df["d"].head(3)
pd.testing.assert_series_equal(result, expected)

# test DataFrame.head
r = df.head(3)
operand_executors = {
Expand Down
73 changes: 62 additions & 11 deletions python/xorbits/_mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,29 @@ class Profiling:
result: dict = None


class TileableWrapper:
"""
This class gives the Tileable object an additional attribute of whether it needs an attached session or not,
which is used in the ipython environment to determine whether the Tileable object needs to be re-executed.
"""

def __init__(self, tileable: TileableType):
self._data = ref(tileable)
self._attach_session = True

@property
def tileable(self):
return self._data

@property
def attach_session(self) -> bool:
return self._attach_session

@attach_session.setter
def attach_session(self, value):
self._attach_session = value


class ExecutionInfo:
def __init__(
self,
Expand All @@ -89,8 +112,7 @@ def __init__(
self._progress = progress
self._profiling = profiling
self._loop = loop
self._to_execute_tileables = [ref(t) for t in to_execute_tileables]

self._to_execute_tileables = to_execute_tileables
self._future_local = threading.local()

def _ensure_future(self):
Expand Down Expand Up @@ -119,7 +141,7 @@ def progress(self) -> float:

@property
def to_execute_tileables(self) -> List[TileableType]:
return [t() for t in self._to_execute_tileables]
return self._to_execute_tileables

def profiling_result(self) -> dict:
return self._profiling.result
Expand Down Expand Up @@ -797,7 +819,10 @@ async def _run_in_background(
progress: Progress,
profiling: Profiling,
):
from ...dataframe.core import DataFrameData

with enter_mode(build=True, kernel=True):
exec_tileables = [t.tileable() for t in tileables]
# wait for task to finish
cancelled = False
progress_task = asyncio.create_task(
Expand Down Expand Up @@ -849,12 +874,35 @@ async def _run_in_background(
if cancelled:
return
fetch_tileables = await self._task_api.get_fetch_tileables(task_id)
assert len(tileables) == len(fetch_tileables)
assert len(exec_tileables) == len(fetch_tileables)

for tileable, fetch_tileable in zip(tileables, fetch_tileables):
self._tileable_to_fetch[tileable] = fetch_tileable
# update meta, e.g. unknown shape
tileable.params = fetch_tileable.params
re_execution_indexes = []
for i, (tileable, fetch_tileable) in enumerate(
zip(exec_tileables, fetch_tileables)
):
tileable_shape = tileable.params.get("shape", None)
fetch_tileable_shape = fetch_tileable.params.get("shape", None)
# The shape inconsistency is usually due to column pruning,
# which in ipython can't be fetched directly and needs to be recalculated.
if (
tileable_shape is not None
and fetch_tileable_shape is not None
and isinstance(tileable, DataFrameData)
and len(tileable_shape) == 2
and 0
< tileable_shape[1]
!= fetch_tileable_shape[1]
> 0 # `>0` condition is for possible `nan`
):
tileable._executed_sessions.clear()
re_execution_indexes.append(i)
else:
self._tileable_to_fetch[tileable] = fetch_tileable
# update meta, e.g. unknown shape
tileable.params = fetch_tileable.params

for idx in re_execution_indexes:
tileables[idx].attach_session = False

async def execute(self, *tileables, **kwargs) -> ExecutionInfo:
if self._closed:
Expand Down Expand Up @@ -895,16 +943,18 @@ async def execute(self, *tileables, **kwargs) -> ExecutionInfo:

progress = Progress()
profiling = Profiling()

tileable_wrappers = [TileableWrapper(t) for t in to_execute_tileables]
# create asyncio.Task
aio_task = asyncio.create_task(
self._run_in_background(to_execute_tileables, task_id, progress, profiling)
self._run_in_background(tileable_wrappers, task_id, progress, profiling)
)
return ExecutionInfo(
aio_task,
progress,
profiling,
asyncio.get_running_loop(),
to_execute_tileables,
tileable_wrappers,
)

def _get_to_fetch_tileable(
Expand Down Expand Up @@ -1663,7 +1713,8 @@ async def _execute(
def _attach_session(future: asyncio.Future):
if future.exception() is None:
for t in execution_info.to_execute_tileables:
t._attach_session(session)
if t.attach_session:
t.tileable()._attach_session(session)

execution_info.add_done_callback(_attach_session)
cancelled = cancelled or asyncio.Event()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .column_pruning import ChunkGetitemPruneDataSource
from .core import optimize
from .head import ChunkHeadPushDown
25 changes: 0 additions & 25 deletions python/xorbits/_mars/optimization/logical/chunk/column_pruning.py

This file was deleted.

This file was deleted.

Loading

0 comments on commit 090614e

Please sign in to comment.