30
Авг
2017

Дублирование сообщений при падении RabbitMQ или воркера

Имеется схема RPC в которой приложение на Java (будем называть его TaskManager'ом) отправляет задания в очередь RabbitMQ (очередь A) к приложениям на PHP (назовем их Worker'ами). После выполнения задания воркер отправляет сообщение с результатом работы обратно к таскменеджеру (очередь B).

Callback Worker'а примерно следующий:

$this->callback = function ($req) {
            $n = $req->body;
            $this->correlation_id = $req->get('correlation_id');

            $msg = new AMQPMessage(
                (string)$this->task($n),
                array('delivery_mode' => 2, 'correlation_id' => $this->correlation_id)
            );

            $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
            $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
        };

У меня возникает следующая проблема:

Запускаем таскменеджер, запускаем около тысячи воркеров. Затем перезагружаем сервер RabbitMQ и видим что некоторые ответы на посланные задачи таскменеджером приходят по два раза.

Есть подозрение что это происходит из за того, что в момент перезагрузки RabbitMQ воркер успевает выполнить функцию basic_publish() в очередь B, а basic_ack() происходит уже при оборванном соединении.

Так как очереди durable, то при поднятии RabbitMQ видит ответ от воркера в очереди к таскменеджеру (в очереди B) и так же видит что воркер не ответил на одно сообщение в очереди A: восстанавливает его и опять отдает воркеру. В итоге таскменеджер получает два одинаковых ответа на одно свое задание.

Пока что это единственное объяснение.

Как решить данную проблему? Есть ли еще варианты её появления?


TEMP:

$this->callback = function ($req) {
            $n = $req->body;
            $this->correlation_id = $req->get('correlation_id');

            $msg = new AMQPMessage(
                (string)$this->task($n),
                array('delivery_mode' => 2, 'correlation_id' => $this->correlation_id)
            );
            $req->delivery_info['channel']->tx_select();    
            $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
            $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
            $req->delivery_info['channel']->tx_commit();
        };

Несколько вопросов:

Если в момент падения RabbitMQ вызвать любую из функций(tx_select, publish, ack или commit) то выпадет исключение о разрыве коннекции?

Как определить что транзакция прошла успешно? (Не будет исключений? tx_commit() возвращает NULL)

В каких случаях вызывать tx_rollback()? В момент получении ошибки в бизнес логике, то есть ошибки не связанной с подключением к rabbit?

Источник: https://ru.stackoverflow.com/questions/712167/%D0%94%D1%83%D0%B1%D0%BB%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5-%D1%81%D0%BE%D0%BE%D0%B1%D1%89%D0%B5%D0%BD%D0%B8%D0%B9-%D0%BF%D1%80%D0%B8-%D0%BF%D0%B0%D0%B4%D0%B5%D0%BD%D0%B8%D0%B8-rabbitmq-%D0%B8%D0%BB%D0%B8-%D0%B2%D0%BE%D1%80%D0%BA%D0%B5%D1%80%D0%B0

Share

Тебе может это понравится...