From 9049a84bbe53f3482c84ab7ff8b063b812ac1a48 Mon Sep 17 00:00:00 2001 From: "Dr. K. D. Murray" <1560490+kdm9@users.noreply.github.com> Date: Tue, 11 Apr 2023 14:08:51 +0200 Subject: [PATCH] Support conda envs without changeps1 (#125) Most people have conda add the current env name to PS1. This breaks the parsing of PS1, and in particular prevents bash_kernel doing `echo $?` to find if the previous command succeeded. This change makes the PS1 expect string a regex that allows for the `(envname) ` which conda adds. --- bash_kernel/kernel.py | 55 +++++++++++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/bash_kernel/kernel.py b/bash_kernel/kernel.py index ba8dcf4..0496f1e 100644 --- a/bash_kernel/kernel.py +++ b/bash_kernel/kernel.py @@ -5,6 +5,8 @@ from subprocess import check_output import os.path import uuid +import random +import string import re import signal @@ -25,23 +27,39 @@ class IREPLWrapper(replwrap.REPLWrapper): :param line_output_callback: a callback method to receive each batch of incremental output. It takes one string parameter. """ - def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, + def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, unique_prompt, extra_init_cmd=None, line_output_callback=None): + self.unique_prompt = unique_prompt self.line_output_callback = line_output_callback + # The extra regex at the start of PS1 below is designed to catch the + # `(envname) ` which conda/mamba add to the start of PS1 by default. + # Obviously anything else that looks like this, including user output, + # will be eaten. + # FIXME: work out if there is a way to update these by reading PS1 + # after each command and checking that it has changed. The answer is + # probably no, as we never see individual commands but rather cells + # with possibly many commands, and would need to update this half-way + # through a cell. + self.ps1_re = r"(\(\w+\) )?" + re.escape(self.unique_prompt + ">") + self.ps2_re = re.escape(self.unique_prompt + "+") replwrap.REPLWrapper.__init__(self, cmd_or_spawn, orig_prompt, - prompt_change, extra_init_cmd=extra_init_cmd) + prompt_change, new_prompt=self.ps1_re, + continuation_prompt=self.ps2_re, extra_init_cmd=extra_init_cmd) def _expect_prompt(self, timeout=-1): + prompts = [self.ps1_re, self.ps2_re] + if timeout == None: # "None" means we are executing code from a Jupyter cell by way of the run_command - # in the do_execute() code below, so do incremental output. + # in the do_execute() code below, so do incremental output, i.e. + # also look for end of line or carridge return + prompts.extend(['\r?\n', '\r']) while True: - pos = self.child.expect_exact([self.prompt, self.continuation_prompt, u'\r\n', u'\n', u'\r'], - timeout=None) - if pos == 2 or pos == 3: + pos = self.child.expect_list([re.compile(x) for x in prompts], timeout=None) + if pos == 2: # End of line received. self.line_output_callback(self.child.before + '\n') - elif pos == 4: + elif pos == 3: # Carriage return ('\r') received. self.line_output_callback(self.child.before + '\r') else: @@ -50,8 +68,8 @@ def _expect_prompt(self, timeout=-1): self.line_output_callback(self.child.before) break else: - # Otherwise, use existing non-incremental code - pos = replwrap.REPLWrapper._expect_prompt(self, timeout=timeout) + # Otherwise, wait (with timeout) until the next prompt + pos = self.child.expect_list([re.compile(x) for x in prompts], timeout=timeout) # Prompt received, so return normally return pos @@ -79,6 +97,9 @@ def banner(self): 'file_extension': '.sh'} def __init__(self, **kwargs): + # Make a random prompt, further reducing chances of accidental matches. + rand = ''.join(random.choices(string.ascii_uppercase, k=12)) + self.unique_prompt = "PROMPT_" + rand Kernel.__init__(self, **kwargs) self._start_bash() self._known_display_ids = set() @@ -97,12 +118,16 @@ def _start_bash(self): bashrc = os.path.join(os.path.dirname(pexpect.__file__), 'bashrc.sh') child = pexpect.spawn("bash", ['--rcfile', bashrc], echo=False, encoding='utf-8', codec_errors='replace') - ps1 = replwrap.PEXPECT_PROMPT[:5] + u'\[\]' + replwrap.PEXPECT_PROMPT[5:] - ps2 = replwrap.PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + replwrap.PEXPECT_CONTINUATION_PROMPT[5:] + # Following comment stolen from upstream's REPLWrap: + # If the user runs 'env', the value of PS1 will be in the output. To avoid + # replwrap seeing that as the next prompt, we'll embed the marker characters + # for invisible characters in the prompt; these show up when inspecting the + # environment variable, but not when bash displays the prompt. + ps1 = self.unique_prompt + u'\[\]' + ">" + ps2 = self.unique_prompt + u'\[\]' + "+" prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2) - # Using IREPLWrapper to get incremental output - self.bashwrapper = IREPLWrapper(child, u'\$', prompt_change, + self.bashwrapper = IREPLWrapper(child, u'\$', prompt_change, self.unique_prompt, extra_init_cmd="export PAGER=cat", line_output_callback=self.process_output) finally: @@ -182,8 +207,8 @@ def do_execute(self, code, silent, store_history=True, return {'status': 'abort', 'execution_count': self.execution_count} try: - exitcode = int(self.bashwrapper.run_command('echo $?').rstrip()) - except Exception: + exitcode = int(self.bashwrapper.run_command('echo $?').rstrip().split("\r\n")[0]) + except Exception as exc: exitcode = 1 if exitcode: