Простой пример взаимодействия с rabbitmq, состоящий из нескольких сервисов:
- api - отправка сообщений в очередь rabbitmq;
- consumer - чтение очереди rabbitmq с обработкой входящих сообщений и последующей отправкой на websocket канал;
- ws - websocket сервер, на котором подключенный клиент пассивно получает результаты обработанных сообщений.
В сервисе api присутствует минималистичное решение с опциональным celery воркером, служащим лишь для наглядной демонстрации отказоустойчивости сервиса в случае падения rabbitmq, в рамках работы стенда. Само собой можно использовать альтернативы вроде dramatiq, arq, saq, taskiq и т.п., исходя уже от потребностей на реальных проектах. Так же ничего не мешает сделать сервис api напрямую зависимым от rabbitmq.
Клонировать репозиторий, перейти в mq-listener/
и копировать переименованный в .env образец файла переменного окружения
git clone https://github.com/AleksandrUsolcev/mq-listener.git
cd mq-listener/
cp example.env .env
Для ознакомления, в .env уже доступны базовые рабочие значения. При необходимости меняем значения на свои
vi .env
Разворачиваем докер контейнеры
docker compose up -d
Далее по тексту хосты и порты указаны с учетом того, что при запуске проекта переменные окружения остались по умолчанию.
После успешного запуска проекта подключаемся к websocket эндпоинту ws://localhost:8001/listen_results
, на который в дальнейшем будут приходить результаты обработанных сообщений из очереди rabbitmq.
Для отправки сообщений необходимо отправить POST запрос на api эндпоинт http://localhost:8000/queue_reverse_text
со следующим форматом содержимого:
{
"text": "my text"
}
Так же сообщение можно отправить из формы со swagger'а http://localhost:8000/docs
При отправке сообщения можно воспользоваться параметром use_celery для отправки текста через активный celery воркер, предотвращая потери и сохраняя очередность отправки, в случае если сервис rabbitmq недоступен. Посмотреть состояние задач в воркере можно во flower http://localhost:5555/
Для подключения к ws и api эндпоинту так же можно воспользоваться приложенными скриптами.
После отправки сообщение переходит в персистентную очередь rabbitmq, откуда в дальнейшем переходит в обработку при получении consumer'ом. После успешного получения и обработки* отправляется сообщение на наш ws канал со следующим содержимым (с учетом примера выше):
{
"reversed_text": "txet ym"
}
В случае если по каким-либо причинам наш consumer не запущен, сообщения остаются в очереди со статусом "Unacked" в нашей очереди rabbitmq, до тех пор пока consumer не будет снова поднят. Ознакомиться со статусом очереди и не только, можно панели администратора http://localhost:15672/
*В данном примере реверс текста служит лишь для простой наглядности работы обработчика, само собой можно написать свой.