Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Global events #677

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ui/src/components/workflows/WorkflowsWorkflow.vue
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ import { useDragDropComponent } from "@/builder/useDragDropComponent";
import injectionKeys from "@/injectionKeys";

const renderProxiedComponent = inject(injectionKeys.renderProxiedComponent);
const instancePath = inject(injectionKeys.instancePath);
const workflowComponentId = inject(injectionKeys.componentId);

const rootEl: Ref<HTMLElement | null> = ref(null);
Expand Down Expand Up @@ -224,9 +223,10 @@ async function handleRun() {
callback: () => {
isRunning.value = false;
},
handler: `$runWorkflowById_${workflowComponentId}`,
},
}),
instancePath,
null,
false,
);
}
Expand Down
3 changes: 3 additions & 0 deletions src/ui/src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,17 @@ export function generateCore() {
? getPayloadFromEvent(event)
: null;
let callback: Function;
let handler: string;

if (event instanceof CustomEvent) {
callback = event.detail?.callback;
handler = event.detail?.handler;
}

const messagePayload = async () => ({
type: event.type,
instancePath,
handler,
payload: await eventPayload,
});

Expand Down
130 changes: 75 additions & 55 deletions src/writer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,8 @@ def transform(self, ev: WriterEvent) -> None:
def _transform_tag_click(self, ev: WriterEvent) -> Optional[str]:
payload = ev.payload
instance_path = ev.instancePath
if not instance_path:
raise ValueError("This event cannot be run as a global event.")
options = self.evaluator.evaluate_field(
instance_path, "tags", True, "{ }")
if not isinstance(options, dict):
Expand All @@ -1266,6 +1268,8 @@ def _transform_tag_click(self, ev: WriterEvent) -> Optional[str]:
def _transform_option_change(self, ev: WriterEvent) -> Optional[str]:
payload = ev.payload
instance_path = ev.instancePath
if not instance_path:
raise ValueError("This event cannot be run as a global event.")
options = self.evaluator.evaluate_field(
instance_path, "options", True, """{ "a": "Option A", "b": "Option B" }""")
if not isinstance(options, dict):
Expand All @@ -1277,6 +1281,8 @@ def _transform_option_change(self, ev: WriterEvent) -> Optional[str]:
def _transform_options_change(self, ev: WriterEvent) -> Optional[List[str]]:
payload = ev.payload
instance_path = ev.instancePath
if not instance_path:
raise ValueError("This event cannot be run as a global event.")
options = self.evaluator.evaluate_field(
instance_path, "options", True, """{ "a": "Option A", "b": "Option B" }""")
if not isinstance(options, dict):
Expand Down Expand Up @@ -1591,7 +1597,7 @@ def _handle_binding(self, event_type, target_component, instance_path, payload)
return
self.evaluator.set_state(binding["stateRef"], instance_path, payload)

def _get_workflow_callable(self, workflow_key: Optional[str], workflow_id: Optional[str]):
def _get_workflow_callable(self, workflow_key: Optional[str] = None, workflow_id: Optional[str] = None):
def fn(payload, context, session):
execution_environment = {
"payload": payload,
Expand All @@ -1604,96 +1610,110 @@ def fn(payload, context, session):
self.workflow_runner.run_workflow(workflow_id, execution_environment, "Workflow execution triggered on demand")
return fn

def _get_handler_callable(self, target_component: Component, event_type: str) -> Optional[Callable]:
if event_type == "wf-builtin-run" and Config.mode == "edit":
return self._get_workflow_callable(None, target_component.id)

if not target_component.handlers:
return None
handler = target_component.handlers.get(event_type)
if not handler:
return None

def _get_handler_callable(self, handler: str) -> Optional[Callable]:
if handler.startswith("$runWorkflow_"):
workflow_key = handler[13:]
return self._get_workflow_callable(workflow_key, None)
return self._get_workflow_callable(workflow_key=workflow_key)

if handler.startswith("$runWorkflowById_"):
workflow_id = handler[17:]
return self._get_workflow_callable(workflow_id=workflow_id)

current_app_process = get_app_process()
handler_registry = current_app_process.handler_registry
callable_handler = handler_registry.find_handler_callable(handler)
return callable_handler

def _get_calling_arguments(self, ev: WriterEvent, instance_path: Optional[InstancePath] = None):
context_data = self.evaluator.get_context_data(instance_path) if instance_path else {}
context_data["event"] = ev.type
return {
"state": self.session_state,
"payload": ev.payload,
"context": context_data,
"session":_event_handler_session_info(),
"ui": _event_handler_ui_manager()
}

def _call_handler_callable(
self,
event_type: str,
target_component: Component,
instance_path: InstancePath,
payload: Any
) -> Any:

handler_callable = self._get_handler_callable(target_component, event_type)
if not handler_callable:
return

# Preparation of arguments
context_data = self.evaluator.get_context_data(instance_path)
context_data['event'] = event_type
writer_args = {
'state': self.session_state,
'payload': payload,
'context': context_data,
'session':_event_handler_session_info(),
'ui': _event_handler_ui_manager()
}

# Invocation of handler
handler_callable: Callable,
calling_arguments: Dict
) -> Any:
current_app_process = get_app_process()
result = None
captured_stdout = None
with core_ui.use_component_tree(self.session.session_component_tree), \
contextlib.redirect_stdout(io.StringIO()) as f:
middlewares_executors = current_app_process.middleware_registry.executors()
result = EventHandlerExecutor.invoke_with_middlewares(middlewares_executors, handler_callable, writer_args)
result = EventHandlerExecutor.invoke_with_middlewares(middlewares_executors, handler_callable, calling_arguments)
captured_stdout = f.getvalue()

if captured_stdout:
self.session_state.add_log_entry(
"info",
"Stdout message",
captured_stdout
)
self.session_state.add_log_entry("info", "Stdout message", captured_stdout)

return result

def handle(self, ev: WriterEvent) -> WriterEventResult:
ok = True

def _deserialize(self, ev: WriterEvent):
try:
self.deser.transform(ev)
except BaseException:
ok = False
except BaseException as e:
self.session_state.add_notification(
"error", "Error", f"A deserialisation error occurred when handling event '{ ev.type }'.")
self.session_state.add_log_entry("error", "Deserialisation Failed",
f"The data sent might be corrupt. A runtime exception was raised when deserialising event '{ ev.type }'.", traceback.format_exc())
"error", "Error", f"A deserialization error occurred when handling event '{ ev.type }'.")
self.session_state.add_log_entry("error", "Deserialization Failed",
f"The data sent might be corrupt. A runtime exception was raised when deserializing event '{ ev.type }'.", traceback.format_exc())
raise e

def _handle_global_event(self, ev: WriterEvent):
if not ev.isSafe:
error_message = "Attempted executing a global event in an unsafe context."
self.session_state.add_log_entry("error", "Forbidden operation", error_message, traceback.format_exc())
raise PermissionError(error_message)
if not ev.handler:
raise ValueError("Handler not specified when attempting to execute global event.")
handler_callable = self._get_handler_callable(ev.handler)
if not handler_callable:
return
calling_arguments = self._get_calling_arguments(ev, instance_path=None)
return self._call_handler_callable(handler_callable, calling_arguments)

result = None
def _handle_component_event(self, ev: WriterEvent):
instance_path = ev.instancePath
try:
instance_path = ev.instancePath
if not instance_path:
raise ValueError("Component event must specify an instance path.")
target_id = instance_path[-1]["componentId"]
target_component = cast(Component, self.session_component_tree.get_component(target_id))

self._handle_binding(ev.type, target_component, instance_path, ev.payload)
result = self._call_handler_callable(ev.type, target_component, instance_path, ev.payload)
except BaseException:
ok = False
if not target_component.handlers:
return None
handler = target_component.handlers.get(ev.type)
if not handler:
return None
handler_callable = self._get_handler_callable(handler)
if not handler_callable:
return
calling_arguments = self._get_calling_arguments(ev, instance_path)
return self._call_handler_callable(handler_callable, calling_arguments)
except BaseException as e:
self.session_state.add_notification("error", "Runtime Error", f"An error occurred when processing event '{ ev.type }'.",
)
self.session_state.add_log_entry("error", "Runtime Exception",
f"A runtime exception was raised when processing event '{ ev.type }'.", traceback.format_exc())
raise e

def handle(self, ev: WriterEvent) -> WriterEventResult:
try:
if not ev.isSafe and ev.handler is not None:
raise PermissionError("Unexpected handler set on event.")
self._deserialize(ev)
if not ev.instancePath:
return {"ok": True, "result": self._handle_global_event(ev)}
else:
return {"ok": True, "result": self._handle_component_event(ev)}
except BaseException as e:
return {"ok": False, "result": str(e)}

return {"ok": ok, "result": result}

class EventHandlerExecutor:

Expand Down
30 changes: 27 additions & 3 deletions src/writer/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,26 @@
logging.getLogger().setLevel(logging.INFO)


class JobVault:

def __init__(self):
self.counter = 0
self.vault = {}

def generate_job_id(self):
self.counter += 1
return self.counter

def set(self, job_id: str, value: Any):
self.vault[job_id] = value

def get(self, job_id: str):
return self.vault.get(job_id)

class WriterState(typing.Protocol):
app_runner: AppRunner
writer_app: bool
job_vault: JobVault
is_server_static_mounted: bool
meta: Union[Dict[str, Any], Callable[[], Dict[str, Any]]] # meta tags for SEO
opengraph_tags: Union[Dict[str, Any], Callable[[], Dict[str, Any]]] # opengraph tags for social networks integration (facebook, discord)
Expand Down Expand Up @@ -122,6 +139,7 @@ async def lifespan(asgi_app: FastAPI):
"""
app.state.writer_app = True
app.state.app_runner = app_runner
app.state.job_vault = JobVault()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job Vault will be used to store results of async jobs.

A basic dict-based structure for single-node deployments. There'll be an option to use Redis for bigger deployments (we can then use this same Redis to do other interesting stuff).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one concern. I don't know how to save the result of the job in the job vault.

If I understand correctly the goal is to create an API endpoint for triggering a job decoupled from a user session. WF retrieves it and executes it on an Event handler of the AppProcess

In this case, the job vault will serve as memory. When the job is finished, it will allow the user to retrieve the result. It is only used for "synchronous" jobs for which we are interested in the result.

My concern is the job vault is in the primary process, the thread pool that run a job in async in WF is in the secondary process. You can write something into the job vault from the secondary process. I have tried to explain what I have understand in the schema below.

image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that a job is always a single event. We dispatch to app_runner, then we get the result and, instead of sending via WebSockets, we save it in the JobVault. The vault stays completely independent from the internals (AppProcess, AppRunner, etc) - it all happens at serve level.

I wrote some working code that I didn't include as part of the PR so that this PR can be focused on foundations.

    @app.post("/api/job/workflow/{workflow_key}")
    async def create_workflow_job(workflow_key: str, request: Request, response: Response):
        def save_job_result(task, job_id: str):
            try:
                apsr: Optional[AppProcessServerResponse] = None
                apsr = task.result()
                app.state.job_vault.set(job_id, apsr)
            except Exception as e:
                app.state.job_vault.set(job_id, e)

        app_response = await app_runner.init_session(InitSessionRequestPayload(
            cookies=dict(request.cookies),
            headers=dict(request.headers),
            proposedSessionId=None
        ))

        session_id = app_response.payload.sessionId
        is_session_ok = await app_runner.check_session(session_id)
        if not is_session_ok:
            return

        loop = asyncio.get_running_loop()
        task = loop.create_task(app_runner.handle_event(
            session_id, WriterEvent(
                type="wf-builtin-run",
                isSafe=True,
                handler=f"$runWorkflow_{workflow_key}"
            )))

        job_id = app.state.job_vault.generate_job_id()
        task.add_done_callback(lambda t: save_job_result(t, job_id))

    @app.get("/api/job/{job_id}")
    async def get_workflow_job(job_id: str, request: Request, response: Response):
        return app.state.job_vault.get(job_id)
        ```

Copy link
Collaborator

@FabienArcellier FabienArcellier Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, you rely on loop.create_task. That was the part I wasn't sure. Looks ok for me then.

image

loop = asyncio.get_running_loop()
task = loop.create_task(app_runner.handle_event(
    session_id, WriterEvent(
        type="wf-builtin-run",
        isSafe=True,
        handler=f"$runWorkflow_{workflow_key}"
    )))


def _get_extension_paths() -> List[str]:
extensions_path = pathlib.Path(user_app_path) / "extensions"
Expand Down Expand Up @@ -310,13 +328,19 @@ async def _handle_incoming_event(websocket: WebSocket, session_id: str, req_mess
trackingId=req_message.trackingId,
payload=None
)

# Allows for global events if in edit mode (such as "Run workflow" for previewing a workflow)

is_safe = serve_mode == "edit"
res_payload: Optional[Dict[str, Any]] = None
apsr: Optional[AppProcessServerResponse] = None
apsr = await app_runner.handle_event(
session_id, WriterEvent(
type=req_message.payload["type"],
instancePath=req_message.payload["instancePath"],
payload=req_message.payload["payload"]
type=req_message.payload.get("type"),
handler=req_message.payload.get("handler"),
isSafe=is_safe,
instancePath=req_message.payload.get("instancePath"),
payload=req_message.payload.get("payload")
))
if apsr is not None and apsr.payload is not None:
res_payload = typing.cast(
Expand Down
4 changes: 3 additions & 1 deletion src/writer/ss_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ class ComponentUpdateRequest(AppProcessServerRequest):

class WriterEvent(BaseModel):
type: str
instancePath: InstancePath
isSafe: Optional[bool] = False
handler: Optional[str] = None
instancePath: Optional[InstancePath] = None
payload: Optional[Any] = None


Expand Down
31 changes: 31 additions & 0 deletions tests/backend/test_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,37 @@ async def test_bad_event_handler(self, setup_app_runner) -> None:
assert ev_res.status == "ok"
assert not ev_res.payload.result.get("ok")

@pytest.mark.asyncio
@pytest.mark.usefixtures("setup_app_runner")
async def test_unsafe_event(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run", load = True) as ar:
await init_app_session(ar, session_id=self.proposed_session_id)
ev_req = EventRequest(type="event", payload=WriterEvent(
type="wf-built-run",
handler="nineninenine",
instancePath=None,
payload=None
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
assert ev_res.status == "ok"
assert not ev_res.payload.result.get("ok")

@pytest.mark.asyncio
@pytest.mark.usefixtures("setup_app_runner")
async def test_safe_global_event(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run", load = True) as ar:
await init_app_session(ar, session_id=self.proposed_session_id)
ev_req = EventRequest(type="event", payload=WriterEvent(
type="wf-built-run",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the moment, type in WriterEvent isn't being used when specifying a global event (event with no instancePath).

isSafe=True,
handler="nineninenine",
instancePath=None,
payload=None
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
assert ev_res.status == "ok"
assert ev_res.payload.result.get("result") == 999

@pytest.mark.usefixtures("setup_app_runner")
def test_run_code_edit(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run") as ar:
Expand Down
5 changes: 5 additions & 0 deletions tests/backend/testapp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def update_cities(state, payload):
"br": "Bristol"
}


def nineninenine():
return 999


def create_text_widget(ui: WriterUIManager):
with ui.find('bb4d0e86-619e-4367-a180-be28ab6059f4'):
ui.Text({"text": "Hello world"})
Expand Down
Loading