Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataReader Listener Thread Safety #208

Open
clunietp opened this issue Aug 7, 2023 · 2 comments
Open

DataReader Listener Thread Safety #208

clunietp opened this issue Aug 7, 2023 · 2 comments

Comments

@clunietp
Copy link
Contributor

clunietp commented Aug 7, 2023

The following code appears to trigger a deadlock when attempting to use a Listener on a DataReader.

I'm using CycloneDDS commit: d38e63ff8ed5123650beab9bef2b294b56628696: 28 Jun 23
and CycloneDDS Python commit: 7486f5504a988efbd4838b8ecb881d20e34a7644: 25 May 23
and installing the Python library via CYCLONEDDS_HOME=${CYCLONEDDS_HOME} python3 -m pip install /path/to/cyclonedds-python

from concurrent.futures import ThreadPoolExecutor, wait
from dataclasses import dataclass
from time import sleep
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from cyclonedds.core import Listener
from cyclonedds.topic import Topic
from cyclonedds.domain import DomainParticipant
from cyclonedds.idl.annotations import key as _key
from cyclonedds.idl import IdlStruct


@dataclass
class KeyedString(IdlStruct, typename="DDS.KeyedString"):
    "Represents a key-value string pair"
    key: str
    value: str
    _key("key")


class MyListener(Listener):
    def on_data_available(self, _):
        print("on_data_available")
        for i in range(10):
            print(i)
            sleep(0.1)


def test_listener():

    participant = DomainParticipant()
    topic = Topic(domain_participant=participant,
                topic_name="foo", data_type=KeyedString)
    writer = DataWriter(publisher_or_participant=participant, topic=topic)
    _ = DataReader(subscriber_or_participant=participant,
                topic=topic, listener=MyListener())

    def do_write():
        writer.write(
            sample=KeyedString("hello", "world"))

    with ThreadPoolExecutor(max_workers=2) as thread_pool:
        wait([thread_pool.submit(do_write) for _ in range(10)])


if __name__ == '__main__':
    test_listener()

Output:

on_data_available
0
(then it just hangs here until I kill the python process)

Setting the DataReader listener to None resolves the deadlock. Is this an issue with the listener (or elsewhere), or am I doing something wrong here?

@eboasson
Copy link
Contributor

eboasson commented Aug 8, 2023

There is a lot here that I don't enough about (I don't know Python all that well) to give a definitive answer, but:

  1. Listeners in the Cyclone core are (still) somewhat finicky and can easily deadlock if you try to do "too much" in them. The reason is that I aimed to minimise the overhead by invoking them from fairly deep inside the stack. It is the only way to really make listeners have some expressive power that waitsets don't offer, but I have become convinced that it was the wrong choice for the default behaviour. (Several issues on cyclonedds to be found because of this.)
  2. Listeners in Python build upon those in the code (suprise!) and so has the same limitations, but I'm sure the Python GIL makes them somewhat worse.

I would have expected your listener to have at least printed 10 lines, even if it'd then hang because of something in Cyclone: inside the listener you're not doing anything with Cyclone, so there's no reason you'd deadlock in Cyclone. That suggests to me that it is instead deadlocking on the GIL. I'd guess it has something to do with unlocking the GIL on I/O, but here I'm really in what (for me) are uncharted waters.

If I am guessing right, then the only proper solution is to not call directly from the Cyclone core into a Python listener, but instead do it on a separate thread. Supporting calling application listeners on a separate thread also happens to be the solution to 1. There's even code floating around that kinda does that, e.g., https://github.com/eclipse-cyclonedds/cyclonedds/blob/master/src/tools/ddsperf/async_listener.c, but that's just a hack to solve a problem without having to solve the general case. I suppose it could be ported to the Python binding by someone who knows Python a lot better than I do.

I would personally go for addressing 1, but I don't have the time for that right now, unfortunately.

@clunietp
Copy link
Contributor Author

clunietp commented Aug 8, 2023

@eboasson Thanks for taking a look. It is unfortunate that Python+Listeners, in their current state, are unsuitable for usage in a concurrent writer scenario. I'll see what I can do with a WaitSet instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants