RabbitMQ Tutorial на Python. Часть 2. (Перевод)

 Обработка очередей
(используется библиотека pika для Python)

Требования.

В этом руководстве предполагается, что RabbitMQ установлен и запущен на localhost на стандартном порту (5672). Если вы используете другой хост, порт или учетные данные, настройки соединения желательно откорректировать.

Где получить помощь?
Если у вас возникли проблемы при прочтении данного руководства, вы можете связаться с разработчиками через список рассылки.

В первом уроке мы написали программу для отправки и приема сообщений из именованной очереди. В этом уроке мы создадим обработчик очереди, которая будет использоваться для распределения трудоемких задач среди нескольких сотрудников.

Основная идея обработчика очередей (обработчика задач) заключается в том, чтобы не выполнять ресурсоемкую задачу(task) немедленно и не ожидать завершения ее выполнения. Вместо этого мы запланируем выполнить эту задачу позже. Мы инкапсулируем задачу в сообщение и отправим его в очередь. Процесс обработчика, выполняющийся в фоновом режиме, выполнит задачи (task) и в конечном итоге выполнит всё задание(job). Когда вы запустите несколько обработчиков, задачи будут распределены между ними.

Эта концепция особенно полезна в Web-приложениях, где невозможно обработать сложную задачу за короткое время окна HTTP-запроса.

Подготовка.

В первой части нашего руководства мы отправили сообщение, содержащее "Hello World!". Теперь мы будем отправлять строки, на которых основываются сложные задачи(complex task). У нас нет реальных задач, таких как изменение размера(resize) изображений или рендеринга pdf-файлов, поэтому сымитируем нашу занятость с помощью функции time.sleep(). Мы будем считать количество точек в строке, как нашу сложность; каждая точка будет занимать одну секунду "на обработку". Например, сымитированная задача, состоящая из строки "Hello..." займет 3 секунды.

Мы немного изменим код send.py из предыдущего примера, чтобы разрешить отправку произвольных сообщений из командной строки. Эта программа будет отправлять задачи в наш обработчик очереди, назовем ее new_task.py:
import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)

Полный исходный код модуля здесь.

Наш старый скрипт receive.py также необходимо немного изменить: ему нужно имитировать секунду обработки для каждой точки в теле сообщения. Он будет брать сообщение из очереди и выполнять задачу, поэтому назовем его worker.py:
import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

Полный исходный код модуля здесь. (Внимание! Полная версия, со всеми изменениями внесенными в этой части урока).

Round-robin диспетчерезация.

Одним из преимуществ очередей задач является простое распараллеливание процесса обработки. Если нам необходимо сократить время обработки, мы можем просто добавить больше обработчиков и тем самым сделать масштабирование проще.

Во-первых, давайте попробуем запустить два скрипта worker.py в одно и то же время. Они оба получат сообщения из очереди, но как именно? Давайте посмотрим.

Вам нужно открыть три терминала. В двух будет запущен скрипт worker.py. Эти терминалы будут нашими двумя потребителями - C1 и C2.
# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

В третьем мы будем запускать новые задачи. После того, как вы запустили потребителей, вы можете опубликовать несколько сообщений:
# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....
Давайте посмотрим, что доставлено нашим обработчикам:
# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
По умолчанию, RabbitMQ будет отправлять сообщение каждому из последующих потребителей последовательно. В среднем каждый потребитель получит одинаковое количество сообщений. Этот способ распространения сообщений называется round-robin. Попробуйте запустить 3 или более обработчика и повторить опыт.

Подтверждение получения сообщения.
Выполнение задачи может занять несколько секунд. У вас может возникнуть вопрос, что произойдет, если потребитель начнет выполнять долгую задачу и "умрет", лишь частично ее выполнив. В том коде, который мы написали, RabbitMQ доставляет сообщение потребителю один раз, после чего немедленно удаляет его из памяти. В этом случае, если вы убьете обработчик, мы потеряем сообщение которое он обрабатывал. Мы также потеряем все сообщения, которые были отправлены этому обработчику, но еще не были обработаны.

Но мы не хотим терять никаких задач. Если обработчик умер, мы хотим чтобы задание было отправлено другому обработчику.

Чтобы удостовериться, что сообщения никогда не потеряются, RabbitMQ поддерживает подтверждение принятия сообщений. Подтверждение отправляется обратно от потребителя, чтобы сообщить RabbitMQ, что определенное сообщение было принято, обработано и RabbitMQ имеет право удалить его.

Если умрет потребитель (например упадет канал, соединение с RabbitMQ или TCP-соединение) без отправки подтверждения, RabbitMQ поймет, что сообщение не было полностью обработано и поставит его в очередь повторно. Если какой-то из других потребителей будет online в это время, RabbitMQ быстро перенаправит сообщения этому потребителю. Таким образом вы можете быть уверены, что сообщения не теряются, даже если обработчики время от времени умирают.

Нет никаких таймаутов сообщений; RabbitMQ повторно отправит сообщение когда потребитель умрет. Это прекрасно, даже если обработка сообщения занимает очень много времени.

Подтверждение о получении сообщение включено по умолчанию. В предыдущих примерах мы явно выключили их с помощью флага no_ack=True. Пришло время удалить этот флаг и отправить подтверждение от обработчика, после того как мы выполним задачу.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

Исходный код.

Используя этот исходный код мы можем быть уверены, что даже если вы убьете обработчик используя Ctrl+C пока сообщение обрабатывается, ничего не будет потеряно. Вскоре после того как обработчик умрет, все непринятые сообщения будут повторно отправлены.

Забытое подтверждение.

Распространенная ошибка заключается в том, что пользователи забывают про basic_ack. Это простая ошибка, но она приводит к серьезным последствиям. Сообщение будет повторно отправлено, когда ваш клиент выйдет(что может показаться случайной повторной отправкой), но RabbitMQ будет потреблять все больше и больше памяти, так как не сможет отправить другие неподтвержденные сообщения.

Для отладки вы можете использовать rabbitmqctl и вывести поле messages_unacknowledges:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

На Windows просто удалите sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Долговременные сообщения.

Мы узнали, как убедиться, что задача не потеряется, даже если потребитель умрет. Но наши задачи могут потеряться, если остановится сервер RabbitMQ.

Когда сервер RabbitMQ отключается или ломается, все наши очереди и сообщения теряются, если вы не объявили иное. Чтобы убедиться, что сообщения не потеряны, нам необходимо выполнить две вещи: нужно пометить очередь и сообщения как долговременные.

Сначала мы должны сделать так, чтобы RabbitMQ никогда не потеряла нашу очередь. Чтобы сделать это, нам необходимо объявить ее долговременной:

channel.queue_declare(queue='hello', durable=True)

Хотя эта команда верна сама по себе, но с нашими настройками она не сработает. Это потому, что мы уже объявили очередь с именем hello как недолговременную. RabbitMQ не позволяет переопределять существующую очередь с другими параметрами и возвращает ошибку любой программе, которая пытается это сделать. Но есть быстрый обходной путь, объявить очередь с другим именем, например task_queue_d:

channel.queue_declare(queue='task_queue_d', durable=True)

Пояснение: в оригинальной статье очередь переименовали в task_queue, но очередь с таким именем уже была создана и она не durable, вследствие чего возникает ошибка исполнение, поэтому я поправил код на очередь с другим именем.

Эти изменения нужно внести в код поставщика и потребителя.

Исходник worker_d.py
Исходник new_task_d.py

Теперь мы уверены, что очередь task_queue_d не будет потеряна, даже если перезапустится сервер RabbitMQ.  Теперь нам нужно пометить наши сообщения как постоянные, объявив свойство delivery_mode со значением 2:

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

Примечание о постоянных сообщениях.

Маркировка сообщений как постоянных не дает полной гарантии того, что сообщение не будет потеряно. Несмотря на то, что это указывает RabbitMQ на то, что сообщение необходимо сохранить на диск, все еще остается короткое окно, когда RabbitMQ уже принял сообщение, но все еще не сохранил его. Также, RabbitMQ не делает fsync(2) для каждого сообщения - оно может быть просто сохранено в кэше и на самом деле не записано на диск. Гарантированная устойчивость не высока, но этого более чем достаточно для нашей простой очереди задач. Если вам нужны более надежные гарантии, вы можете использовать подтверждение поставщика.

Справедливая рассылка.

Как вы могли заметить, рассылка по-прежнему работает не так, как нам нужно. Например, в ситуации с двумя обработчиками, когда все нечетные сообщения тяжелые, а все четные - легкие, один из обработчиков будет постоянно занят, а другой будет простаивать. Ну, RabbitMQ ничего не знает об этом и будет равномерно распределять сообщения.

Это происходит потому что RabbitMQ просто отсылает сообщение, когда оно попадает в очередь. Он не смотрит на количество неподтвержденных сообщений отправленных потребителю. Он слепо посылает каждое n-ое сообщение n-ому потребителю:


Чтобы исправить это, мы можем использовать метод basic_qos с параметром prefetch_count=1. Это сообщает RabbitMQ, чтобы он не давал обработчику более одного сообщения в единицу времени. Или, другими словами, не отправлять новое сообщение пока не завершится процесс обработки и не будет получено подтверждение на предыдущее сообщение. Вместо этого он отправит его следующему обработчику, который не занят.

channel.basic_qos(prefetch_count=1)

Примечание о размере очереди.

Если все обработчики заняты, ваша очередь может заполниться. Вам необходимо следить за этим и, возможно, добавить несколько обработчиков или использовать TTL сообщений.

Сложим все вместе.

Окончательный код для нашего сценария new_task.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

new_task.py

И нашего обработчика:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

worker.py

Используя подтверждение сообщений и prefetch_count вы можете настроить обработку очереди. Настройка долговечности позволяет задачам выживать даже при перезапуске RabbitMQ.

Теперь мы можем перейти к третьему уроку и научиться доставлять одно и то же сообщение нескольким потребителям.

Оригинал статьи на rabbitmq.com

Комментарии

Популярные сообщения из этого блога

RabbitMQ Tutorial на Python. Часть 1. (Перевод)

RabbitMQ Tutorial на Python. Часть 3. (Перевод)