Skip to content

Commit

Permalink
Merge pull request #8 from unkcpz/rmq-out
Browse files Browse the repository at this point in the history
Rmq out
  • Loading branch information
unkcpz authored Jan 17, 2025
2 parents c88fc05 + caba4a2 commit 236d291
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 66 deletions.
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [
'importlib-metadata~=6.0',
'numpy~=1.21',
'paramiko~=3.0',
'plumpy~=0.24.0',
"plumpy~=0.24.0",
'pgsu~=0.3.0',
'psutil~=5.6',
'psycopg[binary]~=3.0',
Expand All @@ -47,7 +47,7 @@ dependencies = [
'tqdm~=4.45',
'typing-extensions~=4.0;python_version<"3.10"',
'upf_to_json~=0.9.2',
'wrapt~=1.11'
'wrapt~=1.11',
]
description = 'AiiDA is a workflow manager for computational science with a strong focus on provenance, performance and extensibility.'
dynamic = ['version'] # read from aiida/__init__.py
Expand Down Expand Up @@ -513,3 +513,7 @@ commands = molecule {posargs:test}

[tool.uv]
required-version = ">=0.5.20"

[tool.uv.sources]
# plumpy = { path = "../plumpy", editable = true }
plumpy = { git = "https://github.com/unkcpz/plumpy", branch = "master" }
23 changes: 12 additions & 11 deletions src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import functools
import logging
import tempfile
import concurrent.futures
from typing import TYPE_CHECKING, Any, Callable, Optional

import plumpy
Expand Down Expand Up @@ -101,13 +102,13 @@ async def do_upload():

try:
logger.info(f'scheduled request to upload CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption)
ignore_exceptions = (concurrent.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption)
skip_submit = await exponential_backoff_retry(
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except PreSubmitException:
raise
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
except (concurrent.futures.CancelledError, plumpy.process_states.Interruption):
raise
except Exception as exception:
logger.warning(f'uploading CalcJob<{node.pk}> failed')
Expand Down Expand Up @@ -149,11 +150,11 @@ async def do_submit():

try:
logger.info(f'scheduled request to submit CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
except (concurrent.futures.CancelledError, plumpy.process_states.Interruption):
raise
except Exception as exception:
logger.warning(f'submitting CalcJob<{node.pk}> failed')
Expand Down Expand Up @@ -207,11 +208,11 @@ async def do_update():

try:
logger.info(f'scheduled request to update CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption)
job_done = await exponential_backoff_retry(
do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
except (concurrent.futures.CancelledError, plumpy.process_states.Interruption):
raise
except Exception as exception:
logger.warning(f'updating CalcJob<{node.pk}> failed')
Expand Down Expand Up @@ -257,11 +258,11 @@ async def do_monitor():

try:
logger.info(f'scheduled request to monitor CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption)
monitor_result = await exponential_backoff_retry(
do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
except (concurrent.futures.CancelledError, plumpy.process_states.Interruption):
raise
except Exception as exception:
logger.warning(f'monitoring CalcJob<{node.pk}> failed')
Expand Down Expand Up @@ -325,11 +326,11 @@ async def do_retrieve():

try:
logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
ignore_exceptions = (concurrent.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
except (concurrent.futures.CancelledError, plumpy.process_states.Interruption):
raise
except Exception as exception:
logger.warning(f'retrieving CalcJob<{node.pk}> failed')
Expand Down Expand Up @@ -561,7 +562,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
await self._kill_job(node, transport_queue)
node.set_process_status(str(exception))
return self.retrieve(monitor_result=self._monitor_result)
except (plumpy.futures.CancelledError, asyncio.CancelledError):
except (concurrent.futures.CancelledError, asyncio.CancelledError):
node.set_process_status(f'Transport task {self._command} was cancelled')
raise
except plumpy.process_states.Interruption:
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/engine/processes/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import kiwipy
from kiwipy import communications
from plumpy.futures import unwrap_kiwi_future
from plumpy.rmq.futures import unwrap_kiwi_future

from aiida.brokers import Broker
from aiida.common.exceptions import AiidaException
Expand Down
5 changes: 2 additions & 3 deletions src/aiida/engine/processes/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ def handle_continue_exception(node, exception, message):
node.set_process_state(ProcessState.EXCEPTED)
node.seal()

async def _continue(self, communicator, pid, nowait, tag=None):
async def _continue(self, pid, nowait, tag=None):
"""Continue the task.
Note that the task may already have been completed, as indicated from the corresponding the node, in which
case it is not continued, but the corresponding future is reconstructed and returned. This scenario may
occur when the Process was already completed by another worker that however failed to send the acknowledgment.
:param communicator: the communicator that called this method
:param pid: the pid of the process to continue
:param nowait: if True don't wait for the process to finish, just return the pid, otherwise wait and
return the results
Expand Down Expand Up @@ -84,7 +83,7 @@ async def _continue(self, communicator, pid, nowait, tag=None):
return future.result()

try:
result = await super()._continue(communicator, pid, nowait, tag)
result = await super()._continue(pid, nowait, tag)
except ImportError as exception:
message = 'the class of the process could not be imported.'
self.handle_continue_exception(node, exception, message)
Expand Down
11 changes: 7 additions & 4 deletions src/aiida/engine/processes/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import plumpy.futures
import plumpy.persistence
import plumpy.processes
# XXX: remove me
from kiwipy.communications import UnroutableError
from plumpy.process_states import Finished, ProcessState
from plumpy.processes import ConnectionClosed # type: ignore[attr-defined]
# XXX: remove me
from aio_pika.exceptions import ConnectionClosed
from plumpy.processes import Process as PlumpyProcess
from plumpy.utils import AttributesFrozendict

Expand Down Expand Up @@ -178,7 +180,7 @@ def __init__(
inputs=self.spec().inputs.serialize(inputs),
logger=logger,
loop=self._runner.loop,
communicator=self._runner.communicator,
coordinator=self._runner.communicator,
)

self._node: Optional[orm.ProcessNode] = None
Expand Down Expand Up @@ -318,7 +320,8 @@ def load_instance_state(
else:
self._runner = manager.get_manager().get_runner()

load_context = load_context.copyextend(loop=self._runner.loop, communicator=self._runner.communicator)
# XXX: worth to check and improve debugger, if coordinator argument name is incorrect, the process is unreachable but no erorr message
load_context = load_context.copyextend(loop=self._runner.loop, coordinator=self._runner.communicator)
super().load_instance_state(saved_state, load_context)

if self.SaveKeys.CALC_ID.value in saved_state:
Expand Down Expand Up @@ -363,7 +366,7 @@ def kill(self, msg_text: str | None = None) -> Union[bool, plumpy.futures.Future

if killing:
# We are waiting for things to be killed, so return the 'gathered' future
kill_future = plumpy.futures.gather(*killing)
kill_future = asyncio.gather(*killing)
result = self.loop.create_future()

def done(done_future: plumpy.futures.Future):
Expand Down
3 changes: 1 addition & 2 deletions src/aiida/engine/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Type, Union

import kiwipy
from plumpy.communications import wrap_communicator
from plumpy.rmq import wrap_communicator, RemoteProcessThreadController
from plumpy.events import reset_event_loop_policy, set_event_loop_policy
from plumpy.persistence import Persister
from plumpy.process_comms import RemoteProcessThreadController

from aiida.common import exceptions
from aiida.orm import ProcessNode, load_node
Expand Down
7 changes: 3 additions & 4 deletions src/aiida/manage/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any, Optional, Union

if TYPE_CHECKING:
import asyncio

from kiwipy.rmq import RmqThreadCommunicator
from plumpy.process_comms import RemoteProcessThreadController
from plumpy.rmq import RemoteProcessThreadController

from aiida.brokers.broker import Broker
from aiida.engine.daemon.client import DaemonClient
Expand Down Expand Up @@ -369,7 +368,7 @@ def get_process_controller(self) -> 'RemoteProcessThreadController':
:return: the process controller instance
"""
from plumpy.process_comms import RemoteProcessThreadController
from plumpy.rmq import RemoteProcessThreadController

if self._process_controller is None:
self._process_controller = RemoteProcessThreadController(self.get_communicator())
Expand Down
2 changes: 1 addition & 1 deletion tests/cmdline/commands/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"""Tests for ``verdi devel rabbitmq``."""

import pytest
from plumpy.process_comms import RemoteProcessThreadController
from plumpy.rmq import RemoteProcessThreadController

from aiida.cmdline.commands import cmd_rabbitmq
from aiida.engine import ProcessState, submit
Expand Down
2 changes: 1 addition & 1 deletion tests/engine/processes/test_control.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Tests for the :mod:`aiida.engine.processes.control` module."""

import pytest
from plumpy.process_comms import RemoteProcessThreadController
from plumpy.rmq import RemoteProcessThreadController

from aiida.engine import ProcessState
from aiida.engine.launch import submit
Expand Down
Loading

0 comments on commit 236d291

Please sign in to comment.