Skip to content

Commit

Permalink
Message race (#2165)
Browse files Browse the repository at this point in the history
* repro message race condition

* some more debugging w/ burnettk

* safe exception handling

* more deubbing w/ burnettk

* add failure log

* load script

* current state, db.session.begin did not actually help w/ burnettk

* do not save items on message start event w/ burnettk

* fixed tests w/ burnettk

* some cleanup w/ burnettk

* avoid test load script w/ burnettk

---------

Co-authored-by: burnettk <[email protected]>
Co-authored-by: jasquat <[email protected]>
  • Loading branch information
3 people authored Dec 2, 2024
1 parent ee3d545 commit def33b2
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ repos:
require_serial: true
# this is also specified in spiffworkflow-backend/pyproject.toml but we run pre-commit
# with all-files which ignores that
exclude: "/migrations/"
exclude: "/migrations/|bin/load_test_message_start_event.py"
- id: ruff-format
args: [format]
files: ^spiffworkflow-backend/
Expand Down
112 changes: 112 additions & 0 deletions spiffworkflow-backend/bin/load_test_message_start_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python3
import concurrent.futures
import json
import os
import subprocess
import sys


def get_access_token(script_dir, username="admin", password="admin", realm_name="spiffworkflow"):
"""
Get access token once
"""
get_token_cmd = f"{script_dir}/get_token {username} {password} {realm_name}"
return subprocess.check_output(get_token_cmd, shell=True, text=True).strip()


def run_curl_command(message_identifier, access_token, backend_base_url):
"""
Execute the curl command for load testing
:return: Tuple of (success, result)
"""
try:
# Login command
login_cmd = f"curl --silent -X POST '{backend_base_url}/v1.0/login_with_access_token?access_token={access_token}' -H 'Authorization: Bearer {access_token}' >/dev/null"
subprocess.run(login_cmd, shell=True, check=True)

# Message sending command
message_cmd = f"curl --silent -X POST '{backend_base_url}/v1.0/messages/{message_identifier}?execution_mode=asynchronous' -H 'Authorization: Bearer {access_token}' -d '{{\"payload\": {{\"email\": \"[email protected]\"}}}}' -H 'Content-type: application/json'"
result = subprocess.check_output(message_cmd, shell=True, text=True)

# Check for errors
try:
error_code = json.loads(result).get("error_code")
if error_code is not None and error_code != "null":
return False, result
except json.JSONDecodeError:
pass

return True, result

except subprocess.CalledProcessError as e:
return False, str(e)
except Exception as e:
return False, str(e)


def load_test(message_identifier, num_requests=10, max_workers=5, username="admin", password="admin", realm_name="spiffworkflow"):
"""
Perform load testing with concurrent requests and failure logging
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
backend_base_url = os.environ.get("BACKEND_BASE_URL", "http://localhost:7000")

# Get access token once
access_token = get_access_token(script_dir, username, password, realm_name)

successful_requests = 0
failed_requests = 0
failure_log = []

# Use ThreadPoolExecutor for I/O-bound tasks like network requests
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Create futures for all requests
futures = [
executor.submit(run_curl_command, message_identifier, access_token, backend_base_url) for _ in range(num_requests)
]

# Collect and process results
for i, future in enumerate(concurrent.futures.as_completed(futures), 1):
success, result = future.result()
if success:
successful_requests += 1
print(f"Request {i}: Success")
else:
failed_requests += 1
failure_log.append({"request_number": i, "error_message": result})
print(f"Request {i}: Failure")

# Log failures to a file if any exist
if failure_log:
filename = "failure_log.json"
with open(filename, "w") as f:
json.dump(failure_log, f, indent=2)
print(f"\nFailure details logged to {filename}")

# Print summary
print("\nLoad Test Summary:")
print(f"Total Requests: {num_requests}")
print(f"Successful Requests: {successful_requests}")
print(f"Failed Requests: {failed_requests}")
print(f"Success Rate: {successful_requests/num_requests*100:.2f}%")


def main():
# Parse command-line arguments
if len(sys.argv) < 2:
print("Usage: python load_test.py <message_identifier> [num_requests] [max_workers] [username] [password] [realm_name]")
sys.exit(1)

message_identifier = sys.argv[1]
num_requests = int(sys.argv[2]) if len(sys.argv) > 2 else 10
max_workers = int(sys.argv[3]) if len(sys.argv) > 3 else 5
username = sys.argv[4] if len(sys.argv) > 4 else "admin"
password = sys.argv[5] if len(sys.argv) > 5 else "admin"
realm_name = sys.argv[6] if len(sys.argv) > 6 else "spiffworkflow"

load_test(message_identifier, num_requests, max_workers, username, password, realm_name)


if __name__ == "__main__":
main()
7 changes: 5 additions & 2 deletions spiffworkflow-backend/bin/login_with_user
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ if [[ -z "${1:-}" ]]; then
exit 1
fi

script_dir="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
script_dir="$(
cd -- "$(dirname "$0")" >/dev/null 2>&1
pwd -P
)"

if [[ -z "${REALM_NAME:-}" ]]; then
REALM_NAME=spiffworkflow
Expand All @@ -30,5 +33,5 @@ if [[ -z "$access_token" || "$access_token" == "null" ]]; then
else

echo "access_token: ${access_token}"
curl -v -X POST "${BACKEND_BASE_URL}/v1.0/login_with_access_token?access_token=${access_token}&authentication_identifier=default" -H "Authorization: Bearer $access_token"
curl -v -X POST "${BACKEND_BASE_URL:-http://localhost:7000}/v1.0/login_with_access_token?access_token=${access_token}&authentication_identifier=default" -H "Authorization: Bearer $access_token"
fi
3 changes: 2 additions & 1 deletion spiffworkflow-backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ line-length = 130
target-version = "py310"

exclude = [
"migrations"
"migrations",
"bin/load_test_message_start_event.py"
]

[tool.ruff.lint.per-file-ignores]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db
Expand All @@ -18,6 +19,8 @@ class ProcessInstanceQueueModel(SpiffworkflowBaseDBModel):
locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True)
status: str = db.Column(db.String(50), index=True)

process_instance = relationship(ProcessInstanceModel)

# for timers. right now the apscheduler jobs without celery check for waiting process instances.
# if the instance's run_at_in_seconds is now or earlier, the instance will run.
# so we can save some effort if we detect that it is scheduled to run later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ def correlate_send_message(
message_type=MessageTypes.receive.value,
).all()
message_instance_receive: MessageInstanceModel | None = None
processor_receive = None
try:
for message_instance in available_receive_messages:
if message_instance.correlates(message_instance_send, CustomBpmnScriptEngine()):
message_instance_receive = message_instance
message_triggerable_process_model = None
receiving_process_instance = None
if message_instance_receive is None:
# Check for a message triggerable process and start that to create a new message_instance_receive
message_triggerable_process_model = MessageTriggerableProcessModel.query.filter_by(
Expand All @@ -72,7 +75,7 @@ def correlate_send_message(
user: UserModel | None = message_instance_send.user
if user is None:
user = UserService.find_or_create_system_user()
receiving_process_instance = MessageService.start_process_with_message(
receiving_process_instance, processor_receive = MessageService.start_process_with_message(
message_triggerable_process_model,
user,
message_instance_send=message_instance_send,
Expand All @@ -86,36 +89,62 @@ def correlate_send_message(
else:
receiving_process_instance = MessageService.get_process_instance_for_message_instance(message_instance_receive)

# Assure we can send the message, otherwise keep going.
if message_instance_receive is None or not receiving_process_instance.can_receive_message():
if processor_receive is None and message_instance_receive is not None:
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"
db.session.add(message_instance_receive)
db.session.commit()

if (
message_instance_receive is None
or receiving_process_instance is None
or not receiving_process_instance.can_receive_message()
):
# Assure we can send the message, otherwise keep going.
message_instance_send.status = "ready"
db.session.add(message_instance_send)
db.session.commit()
if message_instance_receive is not None:
message_instance_receive.status = "ready"
db.session.add(message_instance_receive)
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
return None

try:
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"

with ProcessInstanceQueueService.dequeued(receiving_process_instance, needs_dequeue=False):
cls.process_message_receive(
receiving_process_instance, message_instance_receive, message_instance_send, execution_mode=execution_mode
receiving_process_instance,
message_instance_receive,
message_instance_send,
execution_mode=execution_mode,
processor_receive=processor_receive,
)
message_instance_receive.status = "completed"
message_instance_receive.counterpart_id = message_instance_send.id
db.session.add(message_instance_receive)
message_instance_send.status = "completed"
message_instance_send.counterpart_id = message_instance_receive.id
db.session.add(message_instance_send)
db.session.commit()
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode):
queue_process_instance_if_appropriate(receiving_process_instance, execution_mode=execution_mode)
return message_instance_receive

except ProcessInstanceIsAlreadyLockedError:
message_instance_send.status = "ready"
db.session.add(message_instance_send)
db.session.commit()
if message_instance_receive is not None:
message_instance_receive.status = "ready"
db.session.add(message_instance_receive)
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
return None

except Exception as exception:
Expand All @@ -127,7 +156,10 @@ def correlate_send_message(
message_instance_receive.status = "failed"
message_instance_receive.failure_cause = str(exception)
db.session.add(message_instance_receive)
db.session.commit()
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
raise exception

@classmethod
Expand All @@ -152,16 +184,14 @@ def start_process_with_message(
user: UserModel,
message_instance_send: MessageInstanceModel | None = None,
execution_mode: str | None = None,
) -> ProcessInstanceModel:
) -> tuple[ProcessInstanceModel, ProcessInstanceProcessor]:
"""Start up a process instance, so it is ready to catch the event."""
receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier,
user,
message_triggerable_process_model.process_model_identifier, user, commit_db=False
)
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
with ProcessInstanceQueueService.dequeued(receiving_process_instance, needs_dequeue=False):
processor_receive = ProcessInstanceProcessor(receiving_process_instance)
cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model)
processor_receive.save()

execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
Expand All @@ -176,16 +206,17 @@ def start_process_with_message(
):
processor_receive.bpmn_process_instance.correlations = message_instance_send.correlation_keys

processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
processor_receive.do_engine_steps(save=False, execution_strategy_name=execution_strategy_name, needs_dequeue=False)

return receiving_process_instance
return (receiving_process_instance, processor_receive)

@staticmethod
def process_message_receive(
receiving_process_instance: ProcessInstanceModel,
message_instance_receive: MessageInstanceModel,
message_instance_send: MessageInstanceModel,
execution_mode: str | None = None,
processor_receive: ProcessInstanceProcessor | None = None,
) -> None:
correlation_properties = []
for cr in message_instance_receive.correlation_rules:
Expand All @@ -205,22 +236,31 @@ def process_message_receive(
payload=message_instance_send.payload,
correlations=message_instance_send.correlation_keys,
)
processor_receive = ProcessInstanceProcessor(receiving_process_instance)
processor_receive.bpmn_process_instance.send_event(bpmn_event)
processor_receive_to_use = processor_receive
save_engine_steps = False
if processor_receive_to_use is None:
processor_receive_to_use = ProcessInstanceProcessor(receiving_process_instance)
save_engine_steps = True
processor_receive_to_use.bpmn_process_instance.send_event(bpmn_event)
execution_strategy_name = None

if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode):
# even if we are queueing, we ran a "send_event" call up above, and it updated some tasks.
# we need to serialize these task updates to the db. do_engine_steps with save does that.
processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks")
processor_receive_to_use.do_engine_steps(
save=save_engine_steps, execution_strategy_name="run_current_ready_tasks", needs_dequeue=save_engine_steps
)
elif not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(receiving_process_instance):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
processor_receive_to_use.do_engine_steps(
save=save_engine_steps, execution_strategy_name=execution_strategy_name, needs_dequeue=save_engine_steps
)
message_instance_receive.status = MessageStatuses.completed.value
db.session.add(message_instance_receive)
db.session.commit()
if save_engine_steps:
db.session.commit()

@classmethod
def find_message_triggerable_process_model(cls, modified_message_name: str) -> MessageTriggerableProcessModel:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1575,14 +1575,15 @@ def do_engine_steps(
execution_strategy: ExecutionStrategy | None = None,
should_schedule_waiting_timer_events: bool = True,
ignore_cannot_be_run_error: bool = False,
needs_dequeue: bool = True,
) -> TaskRunnability:
if not ignore_cannot_be_run_error and not self.process_instance_model.allowed_to_run():
raise ProcessInstanceCannotBeRunError(
f"Process instance '{self.process_instance_model.id}' has status "
f"'{self.process_instance_model.status}' and therefore cannot run."
)
if self.process_instance_model.persistence_level != "none":
with ProcessInstanceQueueService.dequeued(self.process_instance_model):
with ProcessInstanceQueueService.dequeued(self.process_instance_model, needs_dequeue=needs_dequeue):
# TODO: ideally we just lock in the execution service, but not sure
# about _add_bpmn_process_definitions and if that needs to happen in
# the same lock like it does on main
Expand All @@ -1592,6 +1593,7 @@ def do_engine_steps(
execution_strategy_name,
execution_strategy,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
needs_dequeue=needs_dequeue,
)
else:
return self._do_engine_steps(
Expand All @@ -1609,6 +1611,7 @@ def _do_engine_steps(
execution_strategy_name: str | None = None,
execution_strategy: ExecutionStrategy | None = None,
should_schedule_waiting_timer_events: bool = True,
needs_dequeue: bool = True,
) -> TaskRunnability:
self._add_bpmn_process_definitions(
self.serialize(),
Expand Down Expand Up @@ -1645,6 +1648,7 @@ def _do_engine_steps(
save,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
# profile=True,
needs_dequeue=needs_dequeue,
)
self.task_model_mapping, self.bpmn_subprocess_mapping = task_model_delegate.get_guid_to_db_object_mappings()
self.check_all_tasks()
Expand Down
Loading

0 comments on commit def33b2

Please sign in to comment.