Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement libboostasio #534

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ target_include_directories(${PROJECT_NAME} SYSTEM PUBLIC
$<INSTALL_INTERFACE:include/>
)

if(AMQP-CPP_LINUX_TCP)
if(AMQP-CPP_LINUX_TCP OR AMQP-CPP_BUILD_EXAMPLES)
target_link_libraries(${PROJECT_NAME} ${CMAKE_DL_LIBS})
# Find OpenSSL and provide include dirs
find_package(OpenSSL REQUIRED)
Expand Down
19 changes: 18 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,24 @@ add_executable(amqpcpp_boost_example libboostasio.cpp)

add_dependencies(amqpcpp_boost_example amqpcpp)

target_link_libraries(amqpcpp_boost_example amqpcpp boost_system pthread dl ssl)
target_link_libraries(amqpcpp_boost_example amqpcpp boost_system pthread dl)


add_executable(amqpcpp_boost_ssl_example libboostasio_ssl.cpp)

add_dependencies(amqpcpp_boost_ssl_example amqpcpp)

target_link_libraries(amqpcpp_boost_ssl_example amqpcpp boost_system pthread dl ssl crypto)


add_executable(amqpcpp_boost_multithreading_example libboostasio_multithreading.cpp)

add_dependencies(amqpcpp_boost_multithreading_example amqpcpp)

target_link_libraries(amqpcpp_boost_multithreading_example amqpcpp boost_system pthread dl)


if(AMQP-CPP_LINUX_TCP)
###################################
# Libev
###################################
Expand All @@ -30,3 +46,4 @@ add_executable(amqpcpp_libuv_example libuv.cpp)
add_dependencies(amqpcpp_libuv_example amqpcpp)

target_link_libraries(amqpcpp_libuv_example amqpcpp uv pthread dl ssl)
endif(AMQP-CPP_LINUX_TCP)
51 changes: 30 additions & 21 deletions examples/libboostasio.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/**
* LibBoostAsio.cpp
*
*
* Test program to check AMQP functionality based on Boost's asio io_service.
*
*
* @author Gavin Smith <[email protected]>
*
* Compile with g++ -std=c++14 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp
Expand All @@ -11,10 +11,10 @@
/**
* Dependencies
*/
#include <boost/asio/io_service.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/deadline_timer.hpp>

#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/signal_set.hpp>

#include <amqpcpp.h>
#include <amqpcpp/libboostasio.h>
Expand All @@ -25,32 +25,41 @@
*/
int main()
{
boost::asio::io_context io_context;

boost::asio::signal_set signal_set{io_context, SIGINT, SIGTERM};

signal_set.async_wait([&io_context] (const boost::system::error_code& error, int signal_number) {
std::cerr << "Got signal " << signal_number << ", terminating..." << std::endl;

io_context.stop();
});

// access to the boost asio handler
// note: we suggest use of 2 threads - normally one is fin (we are simply demonstrating thread safety).
boost::asio::io_service service(4);
const AMQP::Address address("amqp://guest:guest@localhost/");

boost::asio::ip::tcp::resolver resolver(io_context);
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(address.hostname(), address.secure() ? "amqps" : "amqp");

boost::asio::ip::tcp::socket socket(io_context);
boost::asio::connect(socket, endpoints);

// handler for libev
AMQP::LibBoostAsioHandler handler(service);

// make a connection
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/"));
AMQP::LibBoostAsioConnection connection(std::move(socket), address.login(), address.vhost());

// we need a channel too
AMQP::TcpChannel channel(&connection);
AMQP::LibBoostAsioChannel channel(&connection);

// create a temporary queue
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {

// report the name of the temporary queue
std::cout << "declared queue " << name << std::endl;

// now we can close the connection
connection.close();
});

// run the handler
// a t the moment, one will need SIGINT to stop. In time, should add signal handling through boost API.
return service.run();
return io_context.run();
}

111 changes: 111 additions & 0 deletions examples/libboostasio_multithreading.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* LibBoostAsio.cpp
*
* Test program to check AMQP functionality based on Boost's asio io_service.
*
* @author Gavin Smith <[email protected]>
*
* Compile with g++ -std=c++14 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp
*/

/**
* Dependencies
*/

#include <array>
#include <chrono>
#include <thread>

#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/signal_set.hpp>

#include <amqpcpp.h>
#include <amqpcpp/libboostasio.h>


/**
* Main program
* @return int
*/
int main()
{
using namespace std::chrono_literals;

boost::asio::io_context io_context;

boost::asio::signal_set signal_set{io_context, SIGINT, SIGTERM};

signal_set.async_wait([&io_context] (const boost::system::error_code& error, int signal_number) {
std::cerr << "Got signal " << signal_number << ", terminating..." << std::endl;

io_context.stop();
});

boost::asio::strand<boost::asio::io_context::executor_type> strand(io_context.get_executor());

const AMQP::Address address("amqp://guest:guest@localhost/");

boost::asio::ip::tcp::resolver resolver(io_context);
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(address.hostname(), address.secure() ? "amqps" : "amqp");

boost::asio::ip::tcp::socket socket(strand);
boost::asio::connect(socket, endpoints);

// make a connection
AMQP::LibBoostAsioConnection connection(std::move(socket), address.login(), address.vhost());

std::array<AMQP::LibBoostAsioChannel, 4> channels {{
AMQP::LibBoostAsioChannel{&connection},
AMQP::LibBoostAsioChannel{&connection},
AMQP::LibBoostAsioChannel{&connection},
AMQP::LibBoostAsioChannel{&connection}
}};

// create a temporary queue
channels[0].declareQueue(AMQP::exclusive).onSuccess([&io_context, &strand, &connection, &channels](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
// report the name of the temporary queue
std::cout << "declared queue " << name << std::endl;

// fill the queue
for (std::size_t i = 0; i < 100; ++i) {
channels[0].publish("", name, std::to_string(i).c_str());
}

for (auto& channel: channels) {
channel.setQos(1).onSuccess([&io_context, &strand, &channel, name] () {
channel.consume(name).onReceived([&io_context, &strand, &channel] (const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "delivery tag " << deliveryTag << " by " << channel.id() << " body " << std::string(message.body(), message.bodySize()) << std::endl;

boost::asio::post(io_context, [&channel, &strand, deliveryTag] () {
std::this_thread::sleep_for(1s);

boost::asio::post(strand, [&channel, deliveryTag] () {
channel.ack(deliveryTag);

std::cout << "ack " << deliveryTag << " by " << channel.id() << std::endl;
});
});
});
});
}
});

const auto tf = [&io_context] () {
io_context.run();
};
std::array<std::thread, 4> pool {{
std::thread{tf},
std::thread{tf},
std::thread{tf},
std::thread{tf}
}};

for (auto& thread: pool) {
thread.join();
}

return 0;
}

71 changes: 71 additions & 0 deletions examples/libboostasio_ssl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* LibBoostAsio.cpp
*
* Test program to check AMQP functionality based on Boost's asio io_service.
*
* @author Gavin Smith <[email protected]>
*
* Compile with g++ -std=c++14 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp
*/

/**
* Dependencies
*/
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/ssl.hpp>

#include <amqpcpp.h>
#include <amqpcpp/libboostasio.h>

/**
* Main program
* @return int
*/
int main()
{
boost::asio::io_context io_context;

boost::asio::signal_set signal_set{io_context, SIGINT, SIGTERM};

signal_set.async_wait([&io_context] (const boost::system::error_code& error, int signal_number) {
std::cerr << "Got signal " << signal_number << ", terminating..." << std::endl;

io_context.stop();
});

const AMQP::Address address("amqps://guest:guest@localhost/");

boost::asio::ip::tcp::resolver resolver(io_context);
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(address.hostname(), address.secure() ? "amqps" : "amqp");

boost::asio::ssl::context ssl_context(boost::asio::ssl::context::sslv23);
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> ssl_socket(io_context, ssl_context);
boost::asio::connect(ssl_socket.lowest_layer(), endpoints);

ssl_socket.set_verify_mode(boost::asio::ssl::verify_none);
ssl_socket.handshake(ssl_socket.client);

// make a connection
AMQP::LibBoostAsioConnection connection(std::move(ssl_socket), address.login(), address.vhost());

// we need a channel too
AMQP::LibBoostAsioChannel channel(&connection);

// create a temporary queue
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {

// report the name of the temporary queue
std::cout << "declared queue " << name << std::endl;

// now we can close the connection
connection.close();
});

// run the handler
// at the moment, one will need SIGINT to stop. In time, should add signal handling through boost API.
return io_context.run();
}

Loading