Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: DataFrame.agg with kwargs failed #703

Closed
qinxuye opened this issue Sep 18, 2023 · 1 comment · Fixed by #707
Closed

BUG: DataFrame.agg with kwargs failed #703

qinxuye opened this issue Sep 18, 2023 · 1 comment · Fixed by #707
Assignees
Labels
bug Something isn't working gpu
Milestone

Comments

@qinxuye
Copy link
Contributor

qinxuye commented Sep 18, 2023

Describe the bug

A clear and concise description of what the bug is.

To Reproduce

To help us to reproduce this bug, please provide information below:

  1. Your Python version
  2. The version of Xorbits you use
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.
In [14]: df = pd.DataFrame({'a': [1, 2, 3], 'b': ['a', 'b', 'a']})

In [15]: df.agg(a=('a', 'sum'))
  0%|                                                                                                                                                                                                                                                                                                                                        |   0.00/100 [00:00<?, ?it/s]2023-09-18 20:21:17,244 xorbits._mars.services.scheduling.worker.execution 27032 ERROR    Failed to run subtask JSeEBRinomiACXfUfWrNRc4o on band numa-0
Traceback (most recent call last):
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py", line 212, in _execute_operand
    return execute(ctx, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/core/operand/core.py", line 492, in execute
    result = executor(results, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/core/custom_log.py", line 95, in wrap
    return func(cls, ctx, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/utils.py", line 1234, in wrapped
    result = func(cls, ctx, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/dataframe/reduction/aggregation.py", line 955, in execute
    result = in_data.agg(op.raw_func, axis=op.axis)
  File "/Users/xuyeqin/miniconda3/lib/python3.9/site-packages/pandas/core/frame.py", line 9193, in aggregate
    relabeling, func, columns, order = reconstruct_func(func, **kwargs)
  File "/Users/xuyeqin/miniconda3/lib/python3.9/site-packages/pandas/core/apply.py", line 1198, in reconstruct_func
    raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
TypeError: Must provide 'func' or tuples of '(column, aggfunc).
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100.00/100 [00:00<00:00, 1527.00it/s]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~/miniconda3/lib/python3.9/site-packages/IPython/core/formatters.py:706, in PlainTextFormatter.__call__(self, obj)
    699 stream = StringIO()
    700 printer = pretty.RepresentationPrinter(stream, self.verbose,
    701     self.max_width, self.newline,
    702     max_seq_length=self.max_seq_length,
    703     singleton_pprinters=self.singleton_printers,
    704     type_pprinters=self.type_printers,
    705     deferred_pprinters=self.deferred_printers)
--> 706 printer.pretty(obj)
    707 printer.flush()
    708 return stream.getvalue()

File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:410, in RepresentationPrinter.pretty(self, obj)
    407                         return meth(obj, self, cycle)
    408                 if cls is not object \
    409                         and callable(cls.__dict__.get('__repr__')):
--> 410                     return _repr_pprint(obj, self, cycle)
    412     return _default_pprint(obj, self, cycle)
    413 finally:

File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:778, in _repr_pprint(obj, p, cycle)
    776 """A pprint that just redirects to the normal repr function."""
    777 # Find newlines and replace them with p.break_()
--> 778 output = repr(obj)
    779 lines = output.splitlines()
    780 with p.group():

File ~/Workspace/xorbits/python/xorbits/utils.py:38, in safe_repr_str.<locals>.inn(self, *args, **kwargs)
     36     return getattr(object, f.__name__)(self)
     37 else:
---> 38     return f(self, *args, **kwargs)

File ~/Workspace/xorbits/python/xorbits/core/data.py:310, in DataRef.__repr__(self)
    308     return self.data._mars_entity.op.data.__repr__()
    309 else:
--> 310     run(self)
    311     return self.data.__repr__()

File ~/Workspace/xorbits/python/xorbits/core/execution.py:55, in run(obj, **kwargs)
     53 mars_tileables = [_get_mars_entity(ref) for ref in refs_to_execute.values()]
     54 if mars_tileables:
---> 55     mars_execute(mars_tileables, **kwargs)

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1760, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
   1758     session = get_default_or_create(**(new_session_kwargs or dict()))
   1759 session = _ensure_sync(session)
-> 1760 return session.execute(
   1761     tileable,
   1762     *tileables,
   1763     wait=wait,
   1764     show_progress=show_progress,
   1765     progress_update_interval=progress_update_interval,
   1766     **kwargs,
   1767 )

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1576, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs)
   1574 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
   1575 try:
-> 1576     execution_info: ExecutionInfo = fut.result(
   1577         timeout=self._isolated_session.timeout
   1578     )
   1579 except KeyboardInterrupt:  # pragma: no cover
   1580     logger.warning("Cancelling running task")

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1740, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
   1737     else:
   1738         # set cancelled to avoid wait task leak
   1739         cancelled.set()
-> 1740     await execution_info
   1741 else:
   1742     return execution_info

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:124, in ExecutionInfo._ensure_future.<locals>.wait()
    123 async def wait():
--> 124     return await self._aio_task

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:873, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling)
    867         logger.warning(
    868             "Profile task %s execution result:\n%s",
    869             task_id,
    870             json.dumps(task_result.profiling, indent=4),
    871         )
    872     if task_result.error:
--> 873         raise task_result.error.with_traceback(task_result.traceback)
    874 if cancelled:
    875     return

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:388, in TaskProcessor.run(self)
    386 async with self._executor:
    387     async for stage_args in self._iter_stage_chunk_graph():
--> 388         await self._process_stage_chunk_graph(*stage_args)
    389 await self._task_info_collector.collect_result_nodes(
    390     self._task, self._subtask_graphs
    391 )
    392 await self._task_info_collector.collect_tileable_structure(
    393     self._task, self.get_tileable_to_subtasks()
    394 )

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:265, in TaskProcessor._process_stage_chunk_graph(self, stage_id, stage_profiler, chunk_graph)
    259 tile_context = await asyncio.to_thread(
    260     self._get_stage_tile_context,
    261     {c for c in chunk_graph.result_chunks if not isinstance(c.op, Fetch)},
    262 )
    264 with Timer() as timer:
--> 265     chunk_to_result = await self._executor.execute_subtask_graph(
    266         stage_id, subtask_graph, chunk_graph, tile_context
    267     )
    268 stage_profiler.set("run", timer.duration)
    270 self._preprocessor.post_chunk_graph_execution()

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/executor.py:203, in MarsTaskExecutor.execute_subtask_graph(self, stage_id, subtask_graph, chunk_graph, tile_context, context)
    201 curr_tile_progress = self._tile_context.get_all_progress() - prev_progress
    202 self._stage_tile_progresses.append(curr_tile_progress)
--> 203 return await stage_processor.run()

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:233, in TaskStageProcessor.run(self)
    229     if self.subtask_graph.num_shuffles() > 0:
    230         # disable scale-in when shuffle is executing so that we can skip
    231         # store shuffle meta in supervisor.
    232         await self._scheduling_api.disable_autoscale_in()
--> 233     return await self._run()
    234 finally:
    235     if self.subtask_graph.num_shuffles() > 0:

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:253, in TaskStageProcessor._run(self)
    251 if self.error_or_cancelled():
    252     if self.result.error is not None:
--> 253         raise self.result.error.with_traceback(self.result.traceback)
    254     else:
    255         raise asyncio.CancelledError()

File ~/Workspace/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py:212, in _execute_operand()
    207 @enter_mode(build=False, kernel=True)
    208 def _execute_operand(
    209     self, ctx: Dict[str, Any], op: OperandType
    210 ):  # noqa: R0201  # pylint: disable=no-self-use
    211     try:
--> 212         return execute(ctx, op)
    213     except BaseException as ex:
    214         # wrap exception in execution to avoid side effects
    215         raise ExecutionError(ex).with_traceback(ex.__traceback__) from None

File ~/Workspace/xorbits/python/xorbits/_mars/core/operand/core.py:492, in execute()
    488 else:
    489     # Cast `UFuncTypeError` to `TypeError` since subclasses of the former is unpickleable.
    490     # The `UFuncTypeError` was introduced by numpy#12593 since v1.17.0.
    491     try:
--> 492         result = executor(results, op)
    493         succeeded = True
    494         if op.stage is not None:

File ~/Workspace/xorbits/python/xorbits/_mars/core/custom_log.py:95, in wrap()
     92 custom_log_dir = ctx.new_custom_log_dir()
     94 if custom_log_dir is None:
---> 95     return func(cls, ctx, op)
     97 log_path = os.path.join(custom_log_dir, op.key)
     99 with _LogWrapper(ctx, op, log_path):

File ~/Workspace/xorbits/python/xorbits/_mars/utils.py:1234, in wrapped()
   1231     _enter_counter += 1
   1233 try:
-> 1234     result = func(cls, ctx, op)
   1235 finally:
   1236     with AbstractSession._lock:

File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/reduction/aggregation.py:955, in execute()
    953     result = cls._cudf_agg(op, in_data)
    954 else:
--> 955     result = in_data.agg(op.raw_func, axis=op.axis)
    956 if op.outputs[0].ndim == 1:
    957     result = result.astype(op.outputs[0].dtype, copy=False)

File ~/miniconda3/lib/python3.9/site-packages/pandas/core/frame.py:9193, in aggregate()
   9189 from pandas.core.apply import frame_apply
   9191 axis = self._get_axis_number(axis)
-> 9193 relabeling, func, columns, order = reconstruct_func(func, **kwargs)
   9195 op = frame_apply(self, func=func, axis=axis, args=args, kwargs=kwargs)
   9196 result = op.agg()

File ~/miniconda3/lib/python3.9/site-packages/pandas/core/apply.py:1198, in reconstruct_func()
   1192         raise SpecificationError(
   1193             "Function names must be unique if there is no new column names "
   1194             "assigned"
   1195         )
   1196     if func is None:
   1197         # nicer error message
-> 1198         raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
   1200 if relabeling:
   1201     func, columns, order = normalize_keyword_aggregation(kwargs)

TypeError: Must provide 'func' or tuples of '(column, aggfunc).
@XprobeBot XprobeBot added bug Something isn't working gpu labels Sep 18, 2023
@XprobeBot XprobeBot added this to the v0.6.2 milestone Sep 18, 2023
@ChengjieLi28
Copy link
Contributor

take

@ChengjieLi28 ChengjieLi28 self-assigned this Sep 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working gpu
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants