diff --git a/temporalio/common.py b/temporalio/common.py index 078db968..c70427ea 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -1045,29 +1045,34 @@ def _type_hints_from_func( args.append(arg_hint) # type: ignore return args, ret + @dataclass class _SDKInfo: - name : str - version : str - + name: str + version: str + + @dataclass class _FileSlice: - content : str - lineOffset : int + content: str + lineOffset: int + @dataclass class _FileLocation: - filePath : str - line : Optional[int] = -1 - column : Optional[int] = -1 - functionName : Optional[str] = None - + filePath: str + line: Optional[int] = -1 + column: Optional[int] = -1 + functionName: Optional[str] = None + + @dataclass class _StackTrace: - locations : list[_FileLocation] - + locations: list[_FileLocation] + + @dataclass class _EnhancedStackTrace: - sdk : _SDKInfo - sources : dict[str, _FileSlice] - stacks : list[_StackTrace] \ No newline at end of file + sdk: _SDKInfo + sources: dict[str, _FileSlice] + stacks: list[_StackTrace] diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 7e5e0d40..05d81925 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -53,6 +53,7 @@ import temporalio.converter import temporalio.exceptions import temporalio.workflow +from temporalio.service import __version__ from ._interceptor import ( ContinueAsNewInput, @@ -69,8 +70,6 @@ WorkflowOutboundInterceptor, ) -from temporalio.service import __version__ - logger = logging.getLogger(__name__) # Set to true to log all cases where we're ignoring things during delete @@ -1800,36 +1799,37 @@ def _stack_trace(self) -> str: + "\n".join(traceback.format_list(frames)) ) return "\n\n".join(stacks) - + def _enhanced_stack_trace(self) -> temporalio.common._EnhancedStackTrace: sdk = temporalio.common._SDKInfo("sdk-python", __version__) sources = dict() stacks = [] - + for task in list(self._tasks): for frame in task.get_stack(): filename = frame.f_code.co_filename line_number = frame.f_lineno func_name = frame.f_code.co_name - + try: source = inspect.getsourcelines(frame) - code = ''.join(source[0]) + code = "".join(source[0]) line_number = int(source[1]) except OSError as ose: code = "Cannot access code.\n---\n%s" % ose.strerror # TODO possibly include sentinel/property for success of src scrape? work out with ui except Exception: code = "Generic Error.\n\n%s" % traceback.format_exc() - + file_slice = temporalio.common._FileSlice(code, line_number) - file_location = temporalio.common._FileLocation(filename, line = line_number, functionName = func_name) - + file_location = temporalio.common._FileLocation( + filename, line=line_number, functionName=func_name + ) + sources["%s %d" % (filename, line_number)] = file_slice stacks.append(file_location) - - + est = temporalio.common._EnhancedStackTrace(sdk, sources, stacks) return est diff --git a/tests/helpers/externalstacktrace.py b/tests/helpers/externalstacktrace.py index abc6cfa3..61cac39f 100644 --- a/tests/helpers/externalstacktrace.py +++ b/tests/helpers/externalstacktrace.py @@ -1,7 +1,8 @@ -from datetime import timedelta import asyncio +from datetime import timedelta + +from temporalio import activity, workflow -from temporalio import workflow, activity @workflow.defn class ExternalLongSleepWorkflow: @@ -14,6 +15,7 @@ async def run(self) -> None: def started(self) -> bool: return self._started + @activity.defn async def wait_cancel() -> str: try: @@ -27,6 +29,7 @@ async def wait_cancel() -> str: except asyncio.CancelledError: return "Got cancelled error, cancelled? " + str(activity.is_cancelled()) + @workflow.defn class ExternalStackTraceWorkflow: def __init__(self) -> None: @@ -48,7 +51,7 @@ async def run(self) -> None: await asyncio.wait([asyncio.create_task(v) for v in awaitables]) async def never_completing_coroutine(self) -> None: - self._status = "waiting" # with external comment + self._status = "waiting" # with external comment await workflow.wait_condition(lambda: False) @workflow.query diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 1e3facef..4a589354 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -107,7 +107,11 @@ find_free_port, new_worker, ) -from tests.helpers.externalstacktrace import ExternalStackTraceWorkflow, ExternalLongSleepWorkflow +from tests.helpers.externalstacktrace import ( + ExternalLongSleepWorkflow, + ExternalStackTraceWorkflow, +) + @workflow.defn class HelloWorkflow: @@ -2097,6 +2101,7 @@ async def status() -> str: # TODO(cretz): Do more specific checks once we clean up traces assert "never_completing_coroutine" in trace + async def test_workflow_enhanced_stack_trace(client: Client): async with new_worker( client, StackTraceWorkflow, LongSleepWorkflow, activities=[wait_cancel] @@ -2115,29 +2120,38 @@ async def status() -> str: # Send stack trace query trace = await handle.query("__enhanced_stack_trace") - - assert "never_completing_coroutine" in [ stack['functionName'] for stack in trace['stacks'] ] + + assert "never_completing_coroutine" in [ + stack["functionName"] for stack in trace["stacks"] + ] # first line of never_completing_coroutine - assert 'self._status = "waiting"' in str(trace['sources']) - assert trace['sdk']['version'] == __version__ + assert 'self._status = "waiting"' in str(trace["sources"]) + assert trace["sdk"]["version"] == __version__ async with new_worker( - client, ExternalStackTraceWorkflow, ExternalLongSleepWorkflow, activities=[wait_cancel] + client, + ExternalStackTraceWorkflow, + ExternalLongSleepWorkflow, + activities=[wait_cancel], ) as worker: handle = await client.start_workflow( ExternalStackTraceWorkflow.run, id=f"workflow-{uuid.uuid4()}", - task_queue=worker.task_queue + task_queue=worker.task_queue, ) await assert_eq_eventually("waiting", status) trace = await handle.query("__enhanced_stack_trace") - assert "never_completing_coroutine" in [ stack['functionName'] for stack in trace['stacks'] ] - assert 'self._status = "waiting" # with external comment' in str(trace['sources']) - assert 'externalstacktrace.py' in str(trace['sources']) - assert trace['sdk']['version'] == __version__ + assert "never_completing_coroutine" in [ + stack["functionName"] for stack in trace["stacks"] + ] + assert 'self._status = "waiting" # with external comment' in str( + trace["sources"] + ) + assert "externalstacktrace.py" in str(trace["sources"]) + assert trace["sdk"]["version"] == __version__ @dataclass