Skip to content

Commit

Permalink
Reformatted with poe format.
Browse files Browse the repository at this point in the history
  • Loading branch information
twin-drill committed Jun 3, 2024
1 parent 9579582 commit 3cfcce0
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 40 deletions.
35 changes: 20 additions & 15 deletions temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,29 +1045,34 @@ def _type_hints_from_func(
args.append(arg_hint) # type: ignore
return args, ret


@dataclass
class _SDKInfo:
name : str
version : str

name: str
version: str


@dataclass
class _FileSlice:
content : str
lineOffset : int
content: str
lineOffset: int


@dataclass
class _FileLocation:
filePath : str
line : Optional[int] = -1
column : Optional[int] = -1
functionName : Optional[str] = None

filePath: str
line: Optional[int] = -1
column: Optional[int] = -1
functionName: Optional[str] = None


@dataclass
class _StackTrace:
locations : list[_FileLocation]

locations: list[_FileLocation]


@dataclass
class _EnhancedStackTrace:
sdk : _SDKInfo
sources : dict[str, _FileSlice]
stacks : list[_StackTrace]
sdk: _SDKInfo
sources: dict[str, _FileSlice]
stacks: list[_StackTrace]
22 changes: 11 additions & 11 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import temporalio.converter
import temporalio.exceptions
import temporalio.workflow
from temporalio.service import __version__

from ._interceptor import (
ContinueAsNewInput,
Expand All @@ -69,8 +70,6 @@
WorkflowOutboundInterceptor,
)

from temporalio.service import __version__

logger = logging.getLogger(__name__)

# Set to true to log all cases where we're ignoring things during delete
Expand Down Expand Up @@ -1800,36 +1799,37 @@ def _stack_trace(self) -> str:
+ "\n".join(traceback.format_list(frames))
)
return "\n\n".join(stacks)

def _enhanced_stack_trace(self) -> temporalio.common._EnhancedStackTrace:
sdk = temporalio.common._SDKInfo("sdk-python", __version__)

sources = dict()
stacks = []

for task in list(self._tasks):
for frame in task.get_stack():
filename = frame.f_code.co_filename
line_number = frame.f_lineno
func_name = frame.f_code.co_name

try:
source = inspect.getsourcelines(frame)
code = ''.join(source[0])
code = "".join(source[0])
line_number = int(source[1])
except OSError as ose:
code = "Cannot access code.\n---\n%s" % ose.strerror
# TODO possibly include sentinel/property for success of src scrape? work out with ui
except Exception:
code = "Generic Error.\n\n%s" % traceback.format_exc()

file_slice = temporalio.common._FileSlice(code, line_number)
file_location = temporalio.common._FileLocation(filename, line = line_number, functionName = func_name)

file_location = temporalio.common._FileLocation(
filename, line=line_number, functionName=func_name
)

sources["%s %d" % (filename, line_number)] = file_slice
stacks.append(file_location)



est = temporalio.common._EnhancedStackTrace(sdk, sources, stacks)
return est

Expand Down
9 changes: 6 additions & 3 deletions tests/helpers/externalstacktrace.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import timedelta
import asyncio
from datetime import timedelta

from temporalio import activity, workflow

from temporalio import workflow, activity

@workflow.defn
class ExternalLongSleepWorkflow:
Expand All @@ -14,6 +15,7 @@ async def run(self) -> None:
def started(self) -> bool:
return self._started


@activity.defn
async def wait_cancel() -> str:
try:
Expand All @@ -27,6 +29,7 @@ async def wait_cancel() -> str:
except asyncio.CancelledError:
return "Got cancelled error, cancelled? " + str(activity.is_cancelled())


@workflow.defn
class ExternalStackTraceWorkflow:
def __init__(self) -> None:
Expand All @@ -48,7 +51,7 @@ async def run(self) -> None:
await asyncio.wait([asyncio.create_task(v) for v in awaitables])

async def never_completing_coroutine(self) -> None:
self._status = "waiting" # with external comment
self._status = "waiting" # with external comment
await workflow.wait_condition(lambda: False)

@workflow.query
Expand Down
36 changes: 25 additions & 11 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@
find_free_port,
new_worker,
)
from tests.helpers.externalstacktrace import ExternalStackTraceWorkflow, ExternalLongSleepWorkflow
from tests.helpers.externalstacktrace import (
ExternalLongSleepWorkflow,
ExternalStackTraceWorkflow,
)


@workflow.defn
class HelloWorkflow:
Expand Down Expand Up @@ -2097,6 +2101,7 @@ async def status() -> str:
# TODO(cretz): Do more specific checks once we clean up traces
assert "never_completing_coroutine" in trace


async def test_workflow_enhanced_stack_trace(client: Client):
async with new_worker(
client, StackTraceWorkflow, LongSleepWorkflow, activities=[wait_cancel]
Expand All @@ -2115,29 +2120,38 @@ async def status() -> str:

# Send stack trace query
trace = await handle.query("__enhanced_stack_trace")

assert "never_completing_coroutine" in [ stack['functionName'] for stack in trace['stacks'] ]

assert "never_completing_coroutine" in [
stack["functionName"] for stack in trace["stacks"]
]
# first line of never_completing_coroutine
assert 'self._status = "waiting"' in str(trace['sources'])
assert trace['sdk']['version'] == __version__
assert 'self._status = "waiting"' in str(trace["sources"])
assert trace["sdk"]["version"] == __version__

async with new_worker(
client, ExternalStackTraceWorkflow, ExternalLongSleepWorkflow, activities=[wait_cancel]
client,
ExternalStackTraceWorkflow,
ExternalLongSleepWorkflow,
activities=[wait_cancel],
) as worker:
handle = await client.start_workflow(
ExternalStackTraceWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue
task_queue=worker.task_queue,
)

await assert_eq_eventually("waiting", status)

trace = await handle.query("__enhanced_stack_trace")

assert "never_completing_coroutine" in [ stack['functionName'] for stack in trace['stacks'] ]
assert 'self._status = "waiting" # with external comment' in str(trace['sources'])
assert 'externalstacktrace.py' in str(trace['sources'])
assert trace['sdk']['version'] == __version__
assert "never_completing_coroutine" in [
stack["functionName"] for stack in trace["stacks"]
]
assert 'self._status = "waiting" # with external comment' in str(
trace["sources"]
)
assert "externalstacktrace.py" in str(trace["sources"])
assert trace["sdk"]["version"] == __version__


@dataclass
Expand Down

0 comments on commit 3cfcce0

Please sign in to comment.