RabbitMQ Tutorial на Python. Часть 4. (Перевод)
Маршрутизация.
(используется библиотека pika для Python)
Требования.
В этом руководстве предполагается, что RabbitMQ установлен и запущен на localhost на стандартном порту (5672). Если вы используете другой хост, порт или учетные данные, настройки соединения желательно откорректировать.
Где получить помощь?
Если у вас возникли проблемы при прочтении данного руководства, вы можете связаться с разработчиками через список рассылки.
В предыдущей части руководства мы построили простое приложение для логирования. Мы смогли передать лог-сообщения нескольким получателям.
В этой части руководства мы собираемся добавить к нему функцию - сделать возможным подписку только на подмножество сообщений. Например, мы сможем отправлять только критические сообщения в файл журнала (чтобы сэкономить место на диске), При этом все еще можно будет печатать все лог-сообщения в терминале.
Связывание.
В предыдущем примере мы уже делали связывание.
Вы можете вспомнить как это делалось:
Связывание - это процесс формирования связи между обменником и очередью. Это можно просто прочитать как: очередь интересуется сообщениями из этого обменника.
Связывание может принимать дополнительный параметр routing_key. Чтобы избежать путаницы с параметром basic_publish, мы назовем его binding key (ключом связывания). Вот как мы могли бы сделать связывание с ключом:
Значение ключа связывания зависит от типа обмена. Обменник типа fanout, с которым мы работали в предыдущей части, просто игнорирует это значение.
Прямой обмен.
Наша система логирования из предыдущей части рассылала все сообщения всем потребителям. Мы хотим расширить ее, чтобы разрешить фильтрацию сообщений в зависимости от их важности. Например, нам может понадобиться сценарий, который запишет только те сообщения на диск, которые содержат критические ошибки, и не будет тратить дисковое пространство на предупреждающие и информационные сообщения.
Мы использовали тип обмена fanout, который не дает нам необходимой гибкости - он способен только на бессмысленную рассылку.
Вместо этого мы будем использовать прямой обмен. Алгоритм маршрутизации для прямого обмена прост - сообщение отправляется в очередь чей binding key (ключ связывания) точно соответствует routing key (ключу маршрутизации) сообщения.
Чтобы проиллюстрировать это, рассмотрим следующую систему:
В этой системе мы можем наблюдать обменник X, типа dicrect, с двумя связанными с ним очередями. Первая очередь с ключом связывания orange, и вторая с двумя связями, одна из которых с ключом связывания black, а вторая с ключом связывания green.
В этой настройке сообщение опубликованное обменником с ключом маршрутизации orange будет направлено в очередь Q1. Сообщения с ключами маршрутизации black или green пойдут в Q2. Все остальные сообщения будут отброшены.
Множественное связывание.
Совершенно нормально привязать несколько очередей по одному ключу связывания. В нашем примере мы можем добавить связь между X и Q1 с ключом связывания black. В этом случаем обмен типа direct будет выглядеть также, как обмен типа fanout и рассылать сообщения во все соответствующие очереди. Сообщения с ключом маршрутизации black будут доставлены как в Q1, так и в Q2.
Рассылающий журнал.
Мы используем эту модель для нашей системы логирования. Вместо обменника типа fanout мы будем отправлять сообщения из обменника типа direct. Мы будем указывать важность сообщения в качестве routing_key. Таким образом, получающий скрипт сможет отбирать сообщения по степени важности.
Как всегда, сначала нам нужно создать обменник:
И подготовиться к отправке сообщения:
Для упрощения мы будем полагать, что важность сообщений может быть одной из следующих: 'info', 'warning', 'error'.
Объединение.
Получение сообщений будет работать также, как и в предыдущей части руководства, за одним исключением: мы создаем новое связывание для каждой степени важности сообщения, которые нас интересуют.
Соединяем все вместе.
Код скрипта emit_log_direct.py:
Код скрипта receive_logs_direct.py:
Если вы хотите сохранять только 'warning' и 'error' (и не сохранять 'info') лог-сообщения в файле, просто откройте консоль и наберите:
Если вы хотите увидеть лог-сообщения на вашем экране, откройте новый терминал и выполните:
И, например, отправьте лог-сообщение типа 'error' просто введя:
Исходный код emit_log_direct.py и receive_logs_direct.py.
Перейдем к 5 части руководства, чтобы узнать, как слушать сообщения основанные на шаблоне.
В предыдущей части руководства мы построили простое приложение для логирования. Мы смогли передать лог-сообщения нескольким получателям.
В этой части руководства мы собираемся добавить к нему функцию - сделать возможным подписку только на подмножество сообщений. Например, мы сможем отправлять только критические сообщения в файл журнала (чтобы сэкономить место на диске), При этом все еще можно будет печатать все лог-сообщения в терминале.
Связывание.
В предыдущем примере мы уже делали связывание.
Вы можете вспомнить как это делалось:
channel.queue_bind(exchange=exchange_name, queue=queue_name)
Связывание - это процесс формирования связи между обменником и очередью. Это можно просто прочитать как: очередь интересуется сообщениями из этого обменника.
Связывание может принимать дополнительный параметр routing_key. Чтобы избежать путаницы с параметром basic_publish, мы назовем его binding key (ключом связывания). Вот как мы могли бы сделать связывание с ключом:
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
Значение ключа связывания зависит от типа обмена. Обменник типа fanout, с которым мы работали в предыдущей части, просто игнорирует это значение.
Прямой обмен.
Наша система логирования из предыдущей части рассылала все сообщения всем потребителям. Мы хотим расширить ее, чтобы разрешить фильтрацию сообщений в зависимости от их важности. Например, нам может понадобиться сценарий, который запишет только те сообщения на диск, которые содержат критические ошибки, и не будет тратить дисковое пространство на предупреждающие и информационные сообщения.
Мы использовали тип обмена fanout, который не дает нам необходимой гибкости - он способен только на бессмысленную рассылку.
Вместо этого мы будем использовать прямой обмен. Алгоритм маршрутизации для прямого обмена прост - сообщение отправляется в очередь чей binding key (ключ связывания) точно соответствует routing key (ключу маршрутизации) сообщения.
Чтобы проиллюстрировать это, рассмотрим следующую систему:
В этой системе мы можем наблюдать обменник X, типа dicrect, с двумя связанными с ним очередями. Первая очередь с ключом связывания orange, и вторая с двумя связями, одна из которых с ключом связывания black, а вторая с ключом связывания green.
В этой настройке сообщение опубликованное обменником с ключом маршрутизации orange будет направлено в очередь Q1. Сообщения с ключами маршрутизации black или green пойдут в Q2. Все остальные сообщения будут отброшены.
Множественное связывание.
Совершенно нормально привязать несколько очередей по одному ключу связывания. В нашем примере мы можем добавить связь между X и Q1 с ключом связывания black. В этом случаем обмен типа direct будет выглядеть также, как обмен типа fanout и рассылать сообщения во все соответствующие очереди. Сообщения с ключом маршрутизации black будут доставлены как в Q1, так и в Q2.
Рассылающий журнал.
Мы используем эту модель для нашей системы логирования. Вместо обменника типа fanout мы будем отправлять сообщения из обменника типа direct. Мы будем указывать важность сообщения в качестве routing_key. Таким образом, получающий скрипт сможет отбирать сообщения по степени важности.
Как всегда, сначала нам нужно создать обменник:
channel.exchange_declare(exchange='direct_logs',
type='direct')
И подготовиться к отправке сообщения:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
Для упрощения мы будем полагать, что важность сообщений может быть одной из следующих: 'info', 'warning', 'error'.
Объединение.
Получение сообщений будет работать также, как и в предыдущей части руководства, за одним исключением: мы создаем новое связывание для каждой степени важности сообщения, которые нас интересуют.
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
Соединяем все вместе.
Код скрипта emit_log_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
Код скрипта receive_logs_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
Если вы хотите сохранять только 'warning' и 'error' (и не сохранять 'info') лог-сообщения в файле, просто откройте консоль и наберите:
python receive_logs_direct.py warning error > logs_from_rabbit.log
Если вы хотите увидеть лог-сообщения на вашем экране, откройте новый терминал и выполните:
python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C
И, например, отправьте лог-сообщение типа 'error' просто введя:
python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
Исходный код emit_log_direct.py и receive_logs_direct.py.
Перейдем к 5 части руководства, чтобы узнать, как слушать сообщения основанные на шаблоне.
Комментарии
Отправить комментарий