Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After two messages, unhandled exemption #18

Closed
medcelerate opened this issue Feb 14, 2023 · 8 comments
Closed

After two messages, unhandled exemption #18

medcelerate opened this issue Feb 14, 2023 · 8 comments

Comments

@medcelerate
Copy link

When sending messages, the server is sending a reply, however after the second message from the client the following error occurs. I can confirm I receive a response, but this error occurs. This seems to be locking the library in a state where it can no longer send messages nor receive them. Current implementation is equivalent to example code in this repo for listening on a stream and sending using _socket.sendString()

[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: ZeroMQException(156384763): Operation cannot be accomplished in current state
#0      _checkErrorCode (package:dartzmq/src/exception.dart:56:5)
#1      _checkReturnCode (package:dartzmq/src/exception.dart:49:5)
#2      ZContext._poll (package:dartzmq/src/zeromq.dart:109:9)
#3      ZContext._startPolling.<anonymous closure> (package:dartzmq/src/zeromq.dart:68:70)
#4      _Timer._runTimers (dart:isolate-patch/timer_impl.dart:398:19)
#5      _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:429:5)
#6      _RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:192:26)
@enwi
Copy link
Owner

enwi commented Feb 18, 2023

I am not quite sure where this error comes from, but I have also experienced it with the example and a python test server. However in a production app using other sockets it just works fine. So how can we find the bug?

@medcelerate
Copy link
Author

I'm unsure, I've gotten it using the basic example setup, with a C++ server. According to the documentation for zmq this happens when a request is not met with a reply. Is it possible this library is not appropriately handling replies on a stream and blocking the socket.

@enwi
Copy link
Owner

enwi commented Feb 25, 2023

Is it possible this library is not appropriately handling replies on a stream and blocking the socket?

I think not, since this library is using a compiled version of libzmq under the hood.

@cvanvlack
Copy link

Any more information on this? I set up the python server according to the example docs. It seems to be running with no issues.

But I am also getting the ZeroMQException(156384763): Operation cannot be accomplished in current state error. I'm fairly new to ZeroMQ, so I am unsure what you mean by "in a production app using other sockets" it works just fine.

Have you simply set up the flutter app to point at something other than localhost? Did you have to then change IP address in your python server?

I'm doing this testing on Windows in case it's relevant.

@enwi
Copy link
Owner

enwi commented Mar 15, 2023

@cvanvlack We are using it for a production app that connects to a backend made with Java, which uses JeroMQ. Maybe the python implementation is not working right. But this could be tested by writing a c++ program connecting to the python server. If the same exception occurs there as well then there is either a bug in libzmq or the python implementation. If not then there must be an issue with dartzmq.

@enwi
Copy link
Owner

enwi commented Mar 15, 2023

I did some testing and can confirm that a simple c++ test works fine

#include <atomic>
#include <chrono>
#include <iostream>

#include <signal.h>
#include <zmq.h>

std::atomic_bool shutdown = false;

void signalHandler(const int signal)
{
    std::cout << "Received signal: " << signal << std::endl;
    if (signal == SIGTERM)
    {
        shutdown = true;
    }
}

void loop()
{
    int ret;
    void* ctx = nullptr;

    try
    {
        // Create context
        ctx = zmq_ctx_new();

        // Create socket
        void* socket = zmq_socket(ctx, ZMQ_REQ);

        // Connect socket
        ret = zmq_connect(socket, "tcp://127.0.0.1:5566");
        if (ret)
        {
            std::cout << "zmq_connect: " << ret << std::endl;
        }

        // Loop
        auto last = std::chrono::system_clock::now();
        while (!shutdown)
        {
            const auto now = std::chrono::system_clock::now();
            if (now - last >= std::chrono::seconds(1))
            {
                last = now;

                // Send test message
                {
                    const char msg[] = "Test";
                    std::cout << "Sending: " << msg << std::endl;
                    zmq_send(socket, msg, 4, 0);
                }

                // Poll reply
                {
                    zmq_pollitem_t pollitem;
                    pollitem.socket = socket;
                    pollitem.events = ZMQ_POLLIN;
                    ret = zmq_poll(&pollitem, 1, ZMQ_DONTWAIT);
                    if (ret)
                    {
                        zmq_msg_t message;
                        ret = zmq_msg_init(&message);
                        if (ret)
                        {
                            std::cout << "zmq_msg_init: " << ret << std::endl;
                        }

                        // Read all messages
                        while ((ret = zmq_msg_recv(&message, socket, ZMQ_DONTWAIT)) >= 0)
                        {
                            const auto data = static_cast<uint8_t*>(zmq_msg_data(&message));
                            // auto size = zmq_msg_size(&message);
                            std::cout << "Received: ";
                            for (size_t i = 0; i < ret; ++i)
                            {
                                std::cout << data[i];
                            }
                            std::cout << std::endl;
                        }
                    }
                }
            }
        }
    }
    catch (const std::exception e)
    {
        std::cout << "Exception: " << e.what() << std::endl;
    }
    catch (...)
    {
        std::cout << "Unknown Exception" << std::endl;
    }

    zmq_ctx_destroy(ctx);

    std::cout << "Done!" << std::endl;

    std::flush(std::cout);
}

int main(const int argc, const char* argv[])
{
    std::cout << "Hello World!" << std::endl;
    signal(SIGTERM, signalHandler);
    loop();
}

Output:

Hello World!
Sending: Test
Received: ECHO
Received: Test
Sending: Test
Received: ECHO
Received: Test
Sending: Test
Received: ECHO
Received: Test

@enwi
Copy link
Owner

enwi commented Mar 15, 2023

Looking a little deeper into this, it seems that the error occurs whenever zmq_send is called while _poll is running. If you wait a second before pressing the button again the exception does not occur.

@enwi
Copy link
Owner

enwi commented Mar 15, 2023

Alright I think the issue is with the REQ and REP sockets, because you cannot send another message on a REQ socket unless you have received the reply from the REP socket already. Otherwise the internal socket state is expecting a reply and sees that another message is being sent, which is forbidden for the synchronous REQ and REP. Since the _poll function only gets called every second the error is noticeable, because you have 1 second to screw up. So the workaround would be to use the asynchronous versions of REQ and REP aka DEALER and ROUTER. The long time solution is fixing #4, though this issue might still occur when sending more than one message at a time like:

_socket.send([_presses]);
_socket.send([_presses]);

@enwi enwi closed this as completed in 63319e6 Mar 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants