Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmentation fault happens frequently when server send requests to client #113

Open
WWaynee opened this issue May 8, 2024 · 3 comments
Open

Comments

@WWaynee
Copy link

WWaynee commented May 8, 2024

I am currently writing a test, client sends requests to server and after some asynchronous processing the server sends the result back to the client, which is not directly enqueuing a response in server thread. So I need to make server connect to client's rpc, but something strange happens when server sends requests to client that causes segmentation fault.
The error is occurred in the enqueue_request function, on this line:

if (unlikely(session->client_info_.sslot_free_vec_.size() == 0)) {

I print the variable that the size() uses. The access of size_t type variable free_index_ caused segmentation fault.
https://github.com/erpc-io/eRPC/blob/82b31fc4df53dd7a2aff1fc6ad1a3888c5c7b6ac/src/util/fixed_vector.h#L36C3-L36C10
I don't know the real cause of the problem. Does the startup sequence of client and server matter? I start the server first and then client, and I use a event_loop in a while to check the count of connected session to until they are fully and successfully connected.

@anujkaliaiitd
Copy link
Collaborator

Interesting. There is a test that stresses sending requests from within request handlers: https://github.com/erpc-io/eRPC/blob/master/tests/client_tests/req_in_req_func_test.cc. If you are able to create a small example program that reproduces the issue, I can take a look.

@WWaynee
Copy link
Author

WWaynee commented May 11, 2024

Here is a small exmaple(C++20) that has the posibility of segfault, sometimes it can run successfully. And the cause is still the variable free_index_. It's quite different from the example of sending requests from request handlers you provided. In my test, the server send requests to the client's rpcs that don't create sessions with server's rpcs.
Sometimes the segfault happens in the client, sometimes in the server.

Here is my test code.
test.h

#include <string>
#include <vector>
#include "rpc.h"

enum RPCType : int {
    kTest1 = 1,
    kTest2 = 2,
};

std::string client_uri = "localhost:31850";
std::string server_uri = "localhost:31851";

erpc::FastRand fastrand_;

// For choosing a random session number
inline auto FastRandSessionNum(const std::vector<int> &sn_vec) -> int {
    uint32_t x = fastrand_.next_u32();
    size_t rand_index = (static_cast<size_t>(x) * sn_vec.size()) >> 32;
    return sn_vec[rand_index];
}

struct ContContextT {
  int finished_;
  char *buf_;
};

void ContFunc(void *context, void *tag) {
  auto *ret = reinterpret_cast<ContContextT *>(tag);
  ret->finished_ = 1;
}

int rpc_count = 4;

test_client.cc

#include "msg_buffer.h"
#include "transport_impl/infiniband/ib_transport.h"
#include "test.h"
#include "nexus.h"
#include <cstdint>
#include <thread>
#include <iostream>
#include <memory>
#include <rpc.h>
#include <sys/sdt.h>
#include <sys/types.h>
#include <atomic>

std::shared_ptr<erpc::Nexus> nexus;

std::vector<std::thread> jobs;
std::atomic<int> server_connected_count = 0;
std::vector<int> session_nums;

std::atomic<bool> get_result = false;
std::vector<std::thread> client_threads;

struct ClientContext {
  erpc::Rpc<erpc::CTransport> *rpc_ = nullptr;
  size_t rpc_id_;
  uint64_t nr_reqs_ = 0;
};

void SessionHandler(int /*unused*/, erpc::SmEventType /*unused*/,
                    erpc::SmErrType /*unused*/, void * /*unused*/) {
  server_connected_count.fetch_add(1);
}

void SessionHandler2(int /*unused*/, erpc::SmEventType /*unused*/,
                    erpc::SmErrType /*unused*/, void * /*unused*/) {
}

void Test2handler(erpc::ReqHandle *req_handle, void *_ctx) {
  auto *c = static_cast<ClientContext*>(_ctx);
  auto data = req_handle->get_req_msgbuf()->buf_;
  auto &resp = req_handle->pre_resp_msgbuf_;
  c->rpc_->enqueue_response(req_handle, &resp);
  std::cout<<"Receive result from server"<<std::endl;
  get_result.store(true);
  get_result.notify_all();
}

void EventLoopThread(erpc::Nexus *nexus, size_t rpc_id) {
  ClientContext client_context = {.rpc_id_ = rpc_id};
  client_context.rpc_ =
      new erpc::Rpc<erpc::CTransport>(nexus, &client_context, rpc_id, SessionHandler2);
  while (!get_result.load()) {
    client_context.rpc_->run_event_loop(1000);
  }
  delete client_context.rpc_;
}

int main() {
  nexus = std::make_shared<erpc::Nexus>(client_uri);
  nexus->register_req_func(RPCType::kTest2, Test2handler);

  for (int i = 0; i < rpc_count; i++) {
    client_threads.emplace_back(EventLoopThread, nexus.get(), i);
  }

  std::unique_ptr<erpc::Rpc<erpc::CTransport>> rpc_;

  // Connect Server
  rpc_ = std::make_unique<erpc::Rpc<erpc::CTransport>>(nexus.get(), nullptr, rpc_count, SessionHandler);
  for (int i = 0; i < rpc_count; i++) {
    int session_num = rpc_->create_session(server_uri, i);
    if (session_num >= 0) {
      session_nums.push_back(session_num);
    }
  }
  while (server_connected_count.load() != rpc_count) {
    rpc_->run_event_loop_once();
  }

  // Send request to server
  auto req_buf = rpc_->alloc_msg_buffer_or_die(16);
  auto resp_buf = rpc_->alloc_msg_buffer_or_die(16);
  ContContextT cc = {0, reinterpret_cast<char *>(resp_buf.buf_)};
  rpc_->enqueue_request(FastRandSessionNum(session_nums), RPCType::kTest1, &req_buf, &resp_buf, ContFunc, &cc);
  while (cc.finished_ == 0) {
    rpc_->run_event_loop_once();
  }
  rpc_->free_msg_buffer(req_buf);
  rpc_->free_msg_buffer(resp_buf);

  // Block until receive result from server
  get_result.wait(false);

  for (int i = 0; i < rpc_count; i++) {
    client_threads[i].join();
  }
  return 0;
}

test_server.cc

#include "msg_buffer.h"
#include "transport_impl/infiniband/ib_transport.h"
#include "test.h"
#include <cstdint>
#include <thread>
#include <memory>
#include "rpc.h"
#include <sys/sdt.h>
#include <sys/types.h>
#include <util/numautils.h>
#include <iostream>

std::shared_ptr<erpc::Nexus> nexus;

std::vector<std::thread> jobs;
std::atomic<int> gConnectedSessionNum(0);
std::atomic<bool> serverStop = false;

thread_local std::vector<int> session_nums;
thread_local std::atomic<int> client_connected_count = 0;

struct ServerContext {
  erpc::Rpc<erpc::CTransport> *rpc_ = nullptr;
  size_t rpc_id_;
  uint64_t nr_reqs_ = 0;
};

void ServerHandler(int /*unused*/, erpc::SmEventType /*unused*/,
                    erpc::SmErrType /*unused*/, void * /*unused*/) {
  client_connected_count.fetch_add(1);
}

void EventLoopThread(size_t rpc_id) {
  ServerContext server_context = {.rpc_id_ = rpc_id};
  server_context.rpc_ =
      new erpc::Rpc<erpc::CTransport>(nexus.get(), &server_context, rpc_id, ServerHandler);
  // Connect to client
  for (int i = 0; i < rpc_count; i++) {
    int session_num = server_context.rpc_->create_session(client_uri, i);
    if (session_num >= 0) {
      session_nums.push_back(i);
    }
  }
  while (client_connected_count.load() != rpc_count) {
    server_context.rpc_->run_event_loop_once();
  }
  
  while (true) {
    server_context.rpc_->run_event_loop(1000);
    if (serverStop.load()) {
      break;
    }
  }
  delete server_context.rpc_;
}

void Test1handler(erpc::ReqHandle *req_handle, void *_ctx) {
  auto *c = static_cast<ServerContext*>(_ctx);
  auto data = req_handle->get_req_msgbuf()->buf_;
  auto &resp = req_handle->pre_resp_msgbuf_;
  
  // Send request to Client
  auto req_buf = c->rpc_->alloc_msg_buffer_or_die(16);
  auto resp_buf = c->rpc_->alloc_msg_buffer_or_die(16);
  ContContextT cc = {0, reinterpret_cast<char *>(resp_buf.buf_)};
  c->rpc_->enqueue_request(FastRandSessionNum(session_nums), RPCType::kTest2, &req_buf, &resp_buf, ContFunc, &cc);
  while (cc.finished_ == 0) {
    c->rpc_->run_event_loop_once();
  }
  // Enqueue a response to the original request from client
  c->rpc_->enqueue_response(req_handle, &resp);
  serverStop.store(true);
}

int main() {
  nexus = std::make_shared<erpc::Nexus>(server_uri);
  nexus->register_req_func(RPCType::kTest1, Test1handler);
  for (int i = 0; i < rpc_count; i++) {
    jobs.emplace_back(EventLoopThread, i);
  }
  for (int i = 0; i < rpc_count; i++) {
    jobs[i].join();
  }
  return 0;
}

@WWaynee
Copy link
Author

WWaynee commented May 14, 2024

It seems like something wrong happens when enqueuing requests on the server side, but the session is already connected. That's strange.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants