Skip to content

Commit

Permalink
chore: Execution env for running workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
ramedina86 committed Sep 30, 2024
1 parent 44f3829 commit 2118c61
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
17 changes: 14 additions & 3 deletions src/writer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1695,11 +1695,20 @@ def _handle_binding(self, event_type, target_component, instance_path, payload)
return
self.evaluator.set_state(binding["stateRef"], instance_path, payload)

def _get_workflow_callable(self, workflow_key):
def fn(payload, context, session):
execution_env = {
"payload": payload,
"context": context,
"session": session
}
writer.workflows.run_workflow_by_key(self.session, workflow_key, execution_env)
return fn

def _get_handler_callable(self, handler: str) -> Callable:
if handler.startswith("$runWorkflow_"):
workflow_key = handler[13:]
fn = lambda session, state: writer.workflows.run_workflow_by_key(session, state, workflow_key)
return fn
workflow_key = handler[13:]
return self._get_workflow_callable(workflow_key)

current_app_process = get_app_process()
handler_registry = current_app_process.handler_registry
Expand All @@ -1724,6 +1733,8 @@ def _call_handler_callable(
if not handler_callable:
raise ValueError(f"""Invalid handler. Couldn't find the handler "{ handler }".""")

print("paylo is " + repr(payload))

# Preparation of arguments
context_data = self.evaluator.get_context_data(instance_path)
context_data['event'] = event_type
Expand Down
20 changes: 10 additions & 10 deletions src/writer/workflows.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
from typing import TYPE_CHECKING
from typing import Dict
import writer.workflows_blocks
from writer.core_ui import Component
import writer.core

def _get_workflow_nodes(component_id):
return writer.core.base_component_tree.get_descendents(component_id)

def run_workflow_by_key(session, state: "writer.core.WriterState", workflow_key: str):
def run_workflow_by_key(session, workflow_key: str, execution_env: Dict):
state = session.session_state
state.add_log_entry("info", "Workflow", f"""Running workflow "{workflow_key}".""")

# workflows = .get_descendents("workflows_root")
all_components = writer.core.base_component_tree.components.values()
workflows = list(filter(lambda c: c.type == "workflows_workflow" and c.content.get("key") == workflow_key, all_components))
if len(workflows) == 0:
return
workflow = workflows[0]

run_workflow(session, workflow.id)
run_workflow(session, workflow.id, execution_env)
state.add_log_entry("info", "Workflow", f"""Finished executing workflow "{workflow_key}".""")


def run_workflow(session_info, component_id):
def run_workflow(session, component_id: "Component", execution_env: Dict):
final_nodes = _get_final_nodes(component_id)
execution = {}
session = writer.core.session_manager.get_session(session_info.get("id"))
for node in final_nodes:
_run_node(node, execution, session)
_run_node(node, execution, session, execution_env)


def _get_final_nodes(component_id):
Expand Down Expand Up @@ -53,10 +52,11 @@ def _get_dependencies(target_node: "Component"):
return dependencies


def _run_node(target_node: "Component", execution, session):
def _run_node(target_node: "Component", execution, session, execution_env: Dict):
tool_class = writer.workflows_blocks.blocks.block_map.get(target_node.type)
if not tool_class:
raise RuntimeError(f"Couldn't find tool for {target_node.type}.")
session.session_state.add_log_entry("info", "Workflow", f"""Running node "{target_node.id}".""")
dependencies = _get_dependencies(target_node)

tool = execution.get(target_node.id)
Expand All @@ -66,7 +66,7 @@ def _run_node(target_node: "Component", execution, session):
result = None
matched_dependencies = 0
for node, out_id in dependencies:
tool = _run_node(node, execution, session)
tool = _run_node(node, execution, session, execution_env)
if not tool:
continue
if tool.outcome == out_id:
Expand All @@ -76,7 +76,7 @@ def _run_node(target_node: "Component", execution, session):
if len(dependencies) > 0 and matched_dependencies == 0:
return

tool = tool_class(target_node, execution, session, result)
tool = tool_class(target_node, execution, session, (execution_env | {"result": result}))
tool.run()
execution[target_node.id] = tool
return tool

0 comments on commit 2118c61

Please sign in to comment.