Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TcpStream incompatibility: StubbornTcpStream is not Sync #29

Closed
bemyak opened this issue May 2, 2024 · 13 comments
Closed

TcpStream incompatibility: StubbornTcpStream is not Sync #29

bemyak opened this issue May 2, 2024 · 13 comments

Comments

@bemyak
Copy link

bemyak commented May 2, 2024

Hi!

I'm trying to replace TcpStream with SubbornTcpStream in zmq.rs here, but I'm getting an error: (dyn Iterator<Item = Duration> + std::marker::Send + 'static) cannot be shared between threads safely

I tried going down the rabbit hole of making it Sync, and somehow stumbled upon tokio::ToSocketAddr being opaque, and I'm unable to proceed further.

Any advice will be greatly appreciated 🙏

@craftytrickster
Copy link
Owner

I will look into it either today or tomorrow. Maybe I need to just add a + Sync bounds to the iterator on my end here.

@craftytrickster
Copy link
Owner

I created a branch, which I published in the associated PR. would you be able to point to my branch in github from your cargo.toml, and see if it compiles correctly?

Additionally, can you please show me the actual code snippet where you are adding StubbornIo, complete with the options? That way I can see if there is anything else that I need to be aware of.

Thanks.

@bemyak
Copy link
Author

bemyak commented May 3, 2024

Hi, thank you for working on this!

Unfortunately, it didn't fix the issue. Here's the diff of my changes: zeromq/zmq.rs@master...bemyak:zmq.rs:subborn

The error is pretty confusing:

error[E0277]: `(dyn Fn() -> Box<(dyn Iterator<Item = Duration> + std::marker::Send + 'static)> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/transport/tcp.rs:27:21
    |
27  |     Ok((make_framed(raw_socket), Endpoint::from_tcp_addr(peer_addr)))
    |         ----------- ^^^^^^^^^^ `(dyn Fn() -> Box<(dyn Iterator<Item = Duration> + std::marker::Send + 'static)> + std::marker::Send + 'static)` cannot be shared between threads safely
    |         |
    |         required by a bound introduced by this call
    |
    = help: the trait `Sync` is not implemented for `(dyn Fn() -> Box<(dyn Iterator<Item = Duration> + std::marker::Send + 'static)> + std::marker::Send + 'static)`, which is required by `StubbornIo<async_rt::tokio::net::TcpStream, (&str, u16)>: Sync`
    = note: required for `Unique<(dyn Fn() -> Box<(dyn Iterator<Item = Duration> + std::marker::Send + 'static)> + std::marker::Send + 'static)>` to implement `Sync`

@craftytrickster
Copy link
Owner

I am able to reproduce it as well, currently I am not able to find an easy solution to getting it to work, I will continue to investigate.

@craftytrickster
Copy link
Owner

I posted a new change that in theory should compile and work, but I want to see if I can avoid the mutex if possible.

@bemyak
Copy link
Author

bemyak commented May 4, 2024

There's one other small incompatibility:

error[E0716]: temporary value dropped while borrowed
  --> src/transport/tcp.rs:20:50
   |
20 |     let raw_socket = StubbornTcpStream::connect((host.to_string().as_str(), port)).await?;
   |                      ----------------------------^^^^^^^^^^^^^^^^-----------------       - temporary value is freed at the end of this statement
   |                      |                           |
   |                      |                           creates a temporary value which is freed while still in use
   |                      argument requires that borrow lasts for `'static`

But it's easy to fix now (pass host as String by value), so it's not a big deal.

Thank you for the fix! I'll give it a proper testing soon, to see if zmq is able to recover from connection losses.

@craftytrickster
Copy link
Owner

craftytrickster commented May 4, 2024 via email

@bemyak
Copy link
Author

bemyak commented May 7, 2024

Hmm, not sure if I'm doing something wrong, but it seems that stubborn is unable to re-establish a connection. I have a zmq client, which PUBlishes messages, and another one, which is SUBscribed to it. I restart the publisher once, and here's what I get:

06:46:58 [INFO] Initial connection succeeded.
06:47:03 [ERROR] Disconnect occurred
06:47:03 [INFO] Will perform reconnect attempt #1 in 4.137203725s.
06:47:07 [INFO] Attempting reconnect #1 now.
06:47:07 [INFO] Connection re-established
06:47:37 [ERROR] Disconnect occurred
06:47:37 [INFO] Will perform reconnect attempt #1 in 4.102957762s.
06:47:41 [INFO] Attempting reconnect #1 now.
06:47:41 [INFO] Connection re-established
06:48:12 [ERROR] Disconnect occurred
06:48:12 [INFO] Will perform reconnect attempt #1 in 3.899307349s.
06:48:15 [INFO] Attempting reconnect #1 now.
06:48:15 [INFO] Connection re-established
…

The connection seems to time-out in 30s after being re-established 🤔

@craftytrickster
Copy link
Owner

This evening I'll clone your project and see what's going on. Thank you for the error logs.

@bemyak
Copy link
Author

bemyak commented May 7, 2024

I don't have the project published, but I'll now try to write a test for stubborn-io to reproduce the problem.

@craftytrickster
Copy link
Owner

I was able to reproduce a failure to reconnect using the example weather server + client in the repo.

Basically, what I am observing is the ZmqCodec Decoder maintains internal state. When the reconnect occurs after a disconnect, the state of the codec should be reset to ZmqCodec { state: Greeting, waiting_for: 64, buffered_message: None } , but instead , it retains the last state it had prior to the connection going down, causing subsequent messages to fail, because the codec thinks it is in the middle of reading some kind of previous message.

There are probably a few ways to handle this, once such way would be, to subscribe to https://github.com/craftytrickster/stubborn-io/blob/main/src/config.rs#L83 , the on_disconnect_callback, and when that is triggered, make sure that the state of the decoder is reset, so that when the TcpConnect is brought back up again, it begins cleanly.

@bemyak
Copy link
Author

bemyak commented May 7, 2024

Thank you for investigating it, @craftytrickster!
I believe that with changes from #31, this issue is resolved.
I will look into fixing this in zmq, the stubborn callbacks will come really handy!

@bemyak bemyak closed this as completed May 7, 2024
@craftytrickster
Copy link
Owner

Thanks for help here, published new crate version with the sync change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants