RabbitMQ Tutorial на Python. Часть 4. (Перевод)

Маршрутизация.
(используется библиотека pika для Python)

Требования.

В этом руководстве предполагается, что RabbitMQ установлен и запущен на localhost на стандартном порту (5672). Если вы используете другой хост, порт или учетные данные, настройки соединения желательно откорректировать.

Где получить помощь?

Если у вас возникли проблемы при прочтении данного руководства, вы можете связаться с разработчиками через список рассылки.


В предыдущей части руководства мы построили простое приложение для логирования. Мы смогли передать лог-сообщения нескольким получателям.

В этой части руководства мы собираемся добавить к нему функцию - сделать возможным подписку только на подмножество сообщений. Например, мы сможем отправлять только критические сообщения в файл журнала (чтобы сэкономить место на диске), При этом все еще можно будет печатать все лог-сообщения в терминале.

Связывание.

В предыдущем примере мы уже делали связывание.

Вы можете вспомнить как это делалось:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

Связывание - это процесс формирования связи между обменником и очередью. Это можно просто прочитать как: очередь интересуется сообщениями из этого обменника.

Связывание может принимать дополнительный параметр routing_key. Чтобы избежать путаницы с параметром basic_publish, мы назовем его binding key (ключом связывания). Вот как мы могли бы сделать связывание с ключом:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

Значение ключа связывания зависит от типа обмена. Обменник типа fanout, с которым мы работали в предыдущей части, просто игнорирует это значение.

Прямой обмен.

Наша система логирования из предыдущей части рассылала все сообщения всем потребителям. Мы хотим расширить ее, чтобы разрешить фильтрацию сообщений в зависимости от их важности. Например, нам может понадобиться сценарий, который запишет только те сообщения на диск, которые содержат критические ошибки, и не будет тратить дисковое пространство на предупреждающие и информационные сообщения.

Мы использовали тип обмена fanout, который не дает нам необходимой гибкости - он способен только на бессмысленную рассылку.

Вместо этого мы будем использовать прямой обмен. Алгоритм маршрутизации для прямого обмена прост - сообщение отправляется в очередь чей binding key (ключ связывания) точно соответствует routing key (ключу маршрутизации) сообщения.

Чтобы проиллюстрировать это, рассмотрим следующую систему:


В этой системе мы можем наблюдать обменник X, типа dicrect, с двумя связанными с ним очередями. Первая очередь с ключом связывания orange, и вторая с двумя связями, одна из которых с ключом связывания black, а вторая с ключом связывания green.

В этой настройке сообщение опубликованное обменником с ключом маршрутизации orange будет направлено в очередь Q1. Сообщения с ключами маршрутизации black или green пойдут в Q2. Все остальные сообщения будут отброшены.

Множественное связывание.

Совершенно нормально привязать несколько очередей по одному ключу связывания. В нашем примере мы можем добавить связь между X и Q1 с ключом связывания black. В этом случаем обмен типа direct будет выглядеть также, как обмен типа fanout и рассылать сообщения во все соответствующие очереди. Сообщения с ключом маршрутизации black будут доставлены как в Q1, так и в Q2.

Рассылающий журнал.

Мы используем эту модель для нашей системы логирования. Вместо обменника типа fanout мы будем отправлять сообщения из обменника типа direct. Мы будем указывать важность сообщения в качестве routing_key. Таким образом, получающий скрипт сможет отбирать сообщения по степени важности.

Как всегда, сначала нам нужно создать обменник:

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

И подготовиться к отправке сообщения:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

Для упрощения мы будем полагать, что важность сообщений может быть одной из следующих: 'info', 'warning', 'error'.

Объединение.

Получение сообщений будет работать также, как и в предыдущей части руководства, за одним исключением: мы создаем новое связывание для каждой степени важности сообщения, которые нас интересуют.

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

Соединяем все вместе.

Код скрипта emit_log_direct.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

Код скрипта receive_logs_direct.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Если вы хотите сохранять только 'warning' и 'error' (и не сохранять 'info') лог-сообщения в файле, просто откройте консоль и наберите:

python receive_logs_direct.py warning error > logs_from_rabbit.log

Если вы хотите увидеть лог-сообщения на вашем экране, откройте новый терминал и выполните:

python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C

И, например, отправьте лог-сообщение типа 'error' просто введя:

python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

Исходный код emit_log_direct.py и receive_logs_direct.py.

Перейдем к 5 части руководства, чтобы узнать, как слушать сообщения основанные на шаблоне.

Комментарии

Популярные сообщения из этого блога

RabbitMQ Tutorial на Python. Часть 1. (Перевод)

RabbitMQ Tutorial на Python. Часть 2. (Перевод)

RabbitMQ Tutorial на Python. Часть 3. (Перевод)