diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 99358f9b..5acc7424 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -48,7 +48,6 @@ Instance, Integer, List, - Set, Unicode, default, ) @@ -199,9 +198,6 @@ def _parent_header(self): # by record_ports and used by connect_request. _recorded_ports = Dict() - # set of aborted msg_ids - aborted = Set() - # Track execution count here. For IPython, we override this to use the # execution count we store in the shell. execution_count = 0 @@ -217,14 +213,12 @@ def _parent_header(self): "shutdown_request", "is_complete_request", "interrupt_request", - # deprecated: - "apply_request", ] - # add deprecated ipyparallel control messages + + # control channel accepts all shell messages + # and some of its own control_msg_types = [ *msg_types, - "clear_request", - "abort_request", "debug_request", "usage_request", "create_subshell_request", @@ -308,17 +302,15 @@ async def process_control_message(self, msg=None): sys.stderr.flush() self._publish_status("idle", "control") - async def should_handle(self, stream, msg, idents): + def should_handle(self, stream, msg, idents): """Check whether a shell-channel message should be handled Allows subclasses to prevent handling of certain messages (e.g. aborted requests). + + .. versionchanged:: 7 + Subclass should_handle _may_ be async. + Base class implementation is not async. """ - msg_id = msg["header"]["msg_id"] - if msg_id in self.aborted: - # is it safe to assume a msg_id will not be resubmitted? - self.aborted.remove(msg_id) - await self._send_abort_reply(stream, msg, idents) - return False return True async def enter_eventloop(self): @@ -483,7 +475,11 @@ async def process_shell_message(self, msg=None, socket=None): self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) self.log.debug(" Content: %s\n --->\n ", msg["content"]) - if not await self.should_handle(socket, msg, idents): + should_handle: bool | t.Awaitable[bool] = self.should_handle(socket, msg, idents) + if inspect.isawaitable(should_handle): + should_handle = await should_handle + if not should_handle: + self.log.debug("Not handling %s:%s", msg_type, msg["header"].get("msg_id")) return handler = self.shell_handlers.get(msg_type) @@ -1126,84 +1122,6 @@ async def list_subshell_request(self, socket, ident, parent) -> None: self.session.send(socket, "list_subshell_reply", reply, parent, ident) - # --------------------------------------------------------------------------- - # Engine methods (DEPRECATED) - # --------------------------------------------------------------------------- - - async def apply_request(self, socket, ident, parent): # pragma: no cover - """Handle an apply request.""" - self.log.warning("apply_request is deprecated in kernel_base, moving to ipyparallel.") - try: - content = parent["content"] - bufs = parent["buffers"] - msg_id = parent["header"]["msg_id"] - except Exception: - self.log.error("Got bad msg: %s", parent, exc_info=True) # noqa: G201 - return - - md = self.init_metadata(parent) - - reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) - - # flush i/o - if sys.stdout is not None: - sys.stdout.flush() - if sys.stderr is not None: - sys.stderr.flush() - - md = self.finish_metadata(parent, md, reply_content) - if not self.session: - return - self.session.send( - socket, - "apply_reply", - reply_content, - parent=parent, - ident=ident, - buffers=result_buf, - metadata=md, - ) - - def do_apply(self, content, bufs, msg_id, reply_metadata): - """DEPRECATED""" - raise NotImplementedError - - # --------------------------------------------------------------------------- - # Control messages (DEPRECATED) - # --------------------------------------------------------------------------- - - async def abort_request(self, socket, ident, parent): # pragma: no cover - """abort a specific msg by id""" - self.log.warning( - "abort_request is deprecated in kernel_base. It is only part of IPython parallel" - ) - msg_ids = parent["content"].get("msg_ids", None) - if isinstance(msg_ids, str): - msg_ids = [msg_ids] - for mid in msg_ids: - self.aborted.add(str(mid)) - - content = dict(status="ok") - if not self.session: - return - reply_msg = self.session.send( - socket, "abort_reply", content=content, parent=parent, ident=ident - ) - self.log.debug("%s", reply_msg) - - async def clear_request(self, socket, idents, parent): # pragma: no cover - """Clear our namespace.""" - self.log.warning( - "clear_request is deprecated in kernel_base. It is only part of IPython parallel" - ) - content = self.do_clear() - if self.session: - self.session.send(socket, "clear_reply", ident=idents, parent=parent, content=content) - - def do_clear(self): - """DEPRECATED since 4.0.3""" - raise NotImplementedError - # --------------------------------------------------------------------------- # Protected interface # --------------------------------------------------------------------------- diff --git a/tests/test_kernel_direct.py b/tests/test_kernel_direct.py index 50801b03..ab62404b 100644 --- a/tests/test_kernel_direct.py +++ b/tests/test_kernel_direct.py @@ -129,10 +129,9 @@ async def test_process_control(kernel): await kernel.process_control_message(msg) -async def test_should_handle(kernel): +def test_should_handle(kernel): msg = kernel.session.msg("debug_request", {}) - kernel.aborted.add(msg["header"]["msg_id"]) - assert not await kernel.should_handle(kernel.control_socket, msg, []) + assert kernel.should_handle(kernel.control_socket, msg, []) is True async def test_dispatch_shell(kernel):