RabbitMQ Tutorial на Python. Часть 3. (Перевод)

Публикация/подписка.
(используется библиотека pika для Python)

Требования.

В этом руководстве предполагается, что RabbitMQ установлен и запущен на localhost на стандартном порту (5672). Если вы используете другой хост, порт или учетные данные, настройки соединения желательно откорректировать.

Где получить помощь?


Если у вас возникли проблемы при прочтении данного руководства, вы можете связаться с разработчиками через список рассылки.

На предыдущем уроке мы создали обработчик очереди. Предполагается, что обработка очереди такова, что каждая задача доставляется только одному обработчику. В этой части мы будем делать совершенно по-другому - мы отправим сообщение нескольким потребителям. Этот шаблон известен как "публикация/подписка". 

Чтобы проиллюстрировать этот шаблон мы создадим простую систему логирования. Она будет состоять из двух программ: первая будет отправлять лог-сообщения, а вторая будет принимать и выводить их.

В нашей системе логирования каждая запущенная копия принимающей программы будет получать сообщения. Таким образом мы сможем запустить одного получателя и записать логи на диск; и в то же время запустить другого получателя и увидеть логи на экране.

На самом деле, опубликованный лог сообщений будет рассылаться всем получателям.

Обменник.

В предыдущих частях руководства мы отправляли сообщения в очередь и принимали их из очереди. Настало время ввести полную модель сообщений в RabbitMQ.

Давайте быстро рассмотрим то, что мы изучили в предыдущих уроках:

Поставщик использует приложение для отправки сообщений.
Очередь является буфером, хранящим сообщения.
Потребитель использует приложение для получения сообщений.

Основная идея модели сообщений в RabbitMQ заключается в том, что поставщик никогда не отправляет сообщения непосредственно в очередь. Довольно часто поставщик даже не знает, будет ли вообще сообщение отправлено в какую-либо очередь.

Вместо этого поставщик может отправлять сообщения только в обменник. Обменник - очень простая вещь. С одной стороны он получает сообщения от поставщиков, а с другой - толкает их в очереди. Обменник должен точно знать, что делать с полученным им сообщением. Должен ли он добавить его в определенную очередь? Должен ли он добавить его в несколько очередей? Или его нужно отбросить. Правила определяются типом обмена.


Существует несколько типов обмена: direct, topic, headers и fanout. Мы сосредоточимся на последнем -- fanout. Давайте создадим обменник с этим типом и вызовем его logs:


channel.exchange_declare(exchange='logs',
                         type='fanout')

Тип обмена fanout очень простой. Как вы, наверное, догадались из названия, он просто рассылает все сообщения всем получателям о которых он знает. И это то что нужно для нашей системы логирования.

Список обменников.

Чтобы получить список обменников на сервере, вы можете использовать rabbitmqctl:

sudo rabbitmqctl list_exchanges

В этом списке будет несколько amq.* обменников, и обменник без имени по умолчанию. Они создаются по умолчанию, но вам вряд ли нужно будет их использовать в данный момент.

Обменник по умолчанию.

В предыдущих частях руководства мы ничего не знали об обменниках, но все же могли отправлять сообщения в очереди. Это было возможно потому что мы использовали обменник по умолчанию, который мы определили пустой строкой("").

Вспомните, как мы опубликовали сообщение до этого:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

Параметр exchange - это имя обменника. Пустая строка обозначает обмен по умолчанию или безымянный: сообщения отправляются в очередь с именем, указанным в параметре routing_key, если таковая существует.

Теперь мы можем делать публикации в обменнике с нашим именем:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

Временные очереди.

Как вы помните, мы использовали очереди со специфическими именами (вспоминаете, hello и task_queue?) Возможность дать имя очереди была крайне важна для нас, нам нужно было указать обработчиков для определенной очереди. Возможность дать имя очереди крайне важна, когда вы хотите расшарить очередь между поставщиком и потребителем.

Но это не подходит для нашей системы логирования. Мы хотим услышать обо всех сообщениях в логе, а не только их подмножестве. Нас также интересуют только текущие сообщения, которые не устарели. Чтобы решить эту проблему нам понадобятся две вещи.

Во-первых, каждый раз когда мы подключаемся к RabbitMQ нам нужна свежая, пустая очередь. Сделать это можно, создав очередь со случайным именем или, что еще лучше, позволить серверу выбрать имя за нас. Мы можем сделать это, не передавая параметр queue в queue_declare:

result = channel.queue_declare()

В этой точке result.method.queue содержит случайное имя очереди. Например, оно может быть похоже на это: amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Во-вторых, как только потребитель отключится, очередь должна быть удалена. Для этого используется флаг exclusive:

result = channel.queue_declare(exclusive=True)

Привязки.



Мы уже создали обменник типа fanout и очередь. Сейчас нам нужно сказать обменнику, чтобы он отправлял сообщения в нашу очередь. Взаимосвязь между обменником и очередью называется привязкой.

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

Теперь обменник logs будет добавлять сообщения в нашу очередь. 

Список привязок.

Можно перечислить существующие привязки, как вы уже наверное догадались:

rabbitmqctl list_bindings


Собираем все вместе.


Программа-поставщик, которая создает лог-сообщения, не сильно отличается от примера из предыдущего урока. Самое важное изменение заключается в том, что теперь мы хотим отправлять сообщения в наш обменник - log, вместо безымянного. Нам нужно указать параметр routing_key при отправке, но его значение игнорируется для обменников типа fanout. Вот код скрипта emit_log.py:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

emit_log.py source

Как вы видите, после того как соединение установлено мы объявляем обменник. Этот шаг необходим, так как публикация сообщения в несуществующий обменник запрещена.

Сообщения могу т быть потеряны, если очередь не привязана к обменнику, но это нас устраивает; если потребитель еще не слушает, мы можем смело отказаться от сообщения.

Код receive_logs.py:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

receive_logs.py source

Мы закончили. Если хотите сохранять логи в файл, просто откройте терминал и наберите:

python3 receive_logs.py > logs_from_rabbit.log

Если вы хотите увидеть логи на экране, откройте новый терминал и запустите:

python3 receive_logs.py

И конечно, emit_log.py:

python3 emit_log.py

Используя rabbitmqctl list_bindings вы можете проверить, что код действительно создал соединение и очередь, как мы и хотели. Запустив два скрипта receive_logs.py, вы должны увидеть что-то вроде этого:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

Понять результат довольно просто: данные из обменника logs пришли в две очереди со присвоенными сервером именами. А это именно то, что мы и хотели.

Для того чтобы узнать, как получить подмножество сообщений, давайте перейдем к 4 части.

Комментарии

Популярные сообщения из этого блога

RabbitMQ Tutorial на Python. Часть 1. (Перевод)

RabbitMQ Tutorial на Python. Часть 2. (Перевод)