Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for streaming multi-part messages #6

Open
peterjoel opened this issue Apr 7, 2020 · 8 comments
Open

Support for streaming multi-part messages #6

peterjoel opened this issue Apr 7, 2020 · 8 comments

Comments

@peterjoel
Copy link
Contributor

peterjoel commented Apr 7, 2020

A problem with multipart messages in the current implementation is that senders need to allocate all of the message parts to a VecDeque before sending. Likewise, receivers must allocate message parts before they can be processed. Applications such as an XPUB/XSUB proxy would be very inefficient with this API.

Ideally, a sender should be able to stream parts from another source, without an intermediate allocation. Receivers should be able to start processing the first part, before the rest arrive.

The API that I am envisioning could look something like this:

  • socket.send(msg) can send only single messages
  • Use socket.send_multipart(iter) to send messages from an IntoIterator
  • Use socket.send_multipart_stream(stream) to stream messages
  • socket.recv() returns an error if the message is multipart
  • socket.recv_multipart() returns a Stream. It works for either multipart or single message.

This is quite a big change, which will impact the public API and internal structures. It will need some careful design, and my intention with opening this issue is to trigger some conversation first.

@wusyong
Copy link
Collaborator

wusyong commented Apr 7, 2020

Back when it needed to stream the multipart, I thought it would be useful to use a Vecdeque to help. But as time goes by, it did feel more like an overhead.

The proposed methods look reasonable. I also have an idea that maybe we could make socket types generic. So they are able to send/receive messages that satisfy some trait bounds. This should be easier to refactor and the interface would probably still the same (or similar).

Btw I do agree we should expand more methods on socket types. For instance, there could be a method for xpub to set welcome message directly instead of having to get the original zmq socket.

@peterjoel
Copy link
Contributor Author

I agree about adding a type parameter for the message type.

One slightly awkward thing is how ZMQ treats topics. It is reasonable for a topic to be of a different type than the rest of the message parts, but it's also possible for the topic to be the leading bytes of a message. For example you could serialise structs so that the id field is guaranteed to be serialized first, so can act as topic without duplication.

So if I create a Subscribe<T, M>, I would still need to be able to express whether the topic should be discarded or kept as part of M.

@wusyong
Copy link
Collaborator

wusyong commented Apr 10, 2020

Sorry for the late reply. It's a busy week for me. Anyway, indeed it's unfortunately that it will have to do something like this. But I guess this is the trade off. I would prefer to just stick with one generic to make it simple IMHO.

This also why this crate provides a method to reach inner zmq socket. If someone wants flexibility, he could still find a way to call original socket methods. I'll take some experiments and see how it turns out in next couple of days.

@peterjoel
Copy link
Contributor Author

It definitely needs some thought to get the right balance between simplicity, safety and versatility.

One possibility is to have a sensible default for the topic, e.g. Subscribe<M, T = [u8]>. This seems to work well for things like the HashMap hasher in std, which mostly you don't change but you can if you want to.

@wusyong
Copy link
Collaborator

wusyong commented Apr 16, 2020

Yeah, that sounds pretty great! I'm refactoring with generic <M> with trait bound of IntoIterator first. Then let's see what methods should be added with <T> introduced.

@peterjoel
Copy link
Contributor Author

We could also just make the decision to only support topics which are sent as a separate message part. I have no idea how common the other patterns are in the wild, but this would meet my use case. If users want more versatility then we can think about it then.

@wusyong
Copy link
Collaborator

wusyong commented Apr 17, 2020

Right, let's do this for now since this is also what I need in most of time.
Btw, I have some update on generic of IntoIterator. It seems the send method on original zmq socket will consume the data but Pin<&mut Self> is a reference that cant be moved. I have tried some work around, but it seems it will have to create another instance no matter what in the end.

What one of trait bound for Message from original zmq has is Into<Message> + Clone. If users send with u8 slice, it's still able to not allocate the memory. I'll probably do a slice with this bound on the sending side and return a owned vector as multipart on receiving side.

@wusyong
Copy link
Collaborator

wusyong commented Apr 18, 2020

With the release of v0.3.0, it introduces two new types Multipart/MultipartIter to replace MessageBuf. Sending multipart message would not allocate additional memory anymore.

However, I couldn't find a way to make multipart message queue to turn into iterator under the hood yet. Users will have to explicitly use .into() to turn their type into iterator when calling send on type with Sink implemented.

I would prefer to leave this issue open in hope for some enhancement in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants