RabbitMQ 4.3
Карта курсу
Шість розділів, від примітивів AMQP до моніторингу і production checklist. Час орієнтовний.
Основи та оточення
Розуміти, навіщо RabbitMQ, його архітектурну модель і primitives. Підняти sandbox у Docker і зайти у management UI.
- 1.1 Що таке RabbitMQ і де його використовують
- 1.2 Архітектура AMQP 0-9-1: producer / exchange / queue / binding
- 1.3 Підняти sandbox у Docker
- 1.4 Management UI: огляд
Що таке RabbitMQ
Цитата з офіційної головної сторінки:
Дві ролі в одному продукті
- Messaging broker - класичні черги повідомлень (AMQP 0-9-1, AMQP 1.0, MQTT, STOMP).
- Streaming broker - append-only логи з offset-based consume (Streams), для event sourcing і replay.
Позиціонування з тієї ж сторінки: "ideal for distributed microservices, real-time data, and IoT".
AMQP - розшифровка
AMQP = Advanced Message Queuing Protocol.
Wire-level vs API-level
AMQP - це wire-level протокол (бінарний, по TCP), а не API. Wire-level означає, що специфікація фіксує точний формат байтів, які летять по мережі ("over the wire"): фрейми, opcodes (коди операцій), кодування типів.
Контраст - API-level (як JDBC, PDO): описує функції бібліотеки, а як саме воно йде по мережі - справа драйвера.
Дві несумісні версії в одному broker'і
- AMQP 0-9-1 - історично основний у RabbitMQ, на ньому всі PHP-клієнти (php-amqplib, ext-amqp). Курс йде по 0-9-1.
- AMQP 1.0 - окремий протокол OASIS-стандарту (інший дизайн, не "оновлення" 0-9-1). У RabbitMQ 4.0+ став core. У курсі - тільки згадка.
Use cases
Чотири канонічні сценарії з офіційної головної (rabbitmq.com).
| Сценарій | Що означає термін | Приклад з сайту |
|---|---|---|
| Decoupling services | Decoupling = розчеплення: producer (відправник) не знає про consumer (отримувач), broker буферизує. Consumer лежить - producer все одно пише. | один event → email + push |
| RPC | RPC = Remote Procedure Call (віддалений виклик процедури). Producer шле запит у чергу, чекає відповідь у reply-черзі. Синхронний сценарій поверх асинхронного брокера. | обробка замовлень білетів з кількох каналів продажу |
| Streaming | Append-only лог з offset (зміщенням), повідомлення НЕ видаляється після прочитання, новий consumer може зробити replay (перечитати з минулого). | відеоплатформа: post-upload analysis, transcode, notify |
| IoT | IoT = Internet of Things. Пристрої з нестабільним зв'язком. Брокер буферизує телеметрію поки пристрій offline. | status reports з розподілених пристроїв |
Місце в стеку: producer → broker → consumer
Що дає брокер посередині
- Producer і consumer не мусять бути одночасно онлайн.
- Один producer → багато consumer'ів (і навпаки), без взаємного знання.
- На брокері - persistence (зберігання на диск), retry-логіка, маршрутизація.
Архітектура AMQP 0-9-1
Сутності протоколу та як вони складаються у систему доставки повідомлень. Producer → exchange → binding → queue → consumer.
- 1.2.1 Primitives: exchange, queue, binding
- 1.2.2 Routing key
- 1.2.3 Direct exchange (унікаст)
- 1.2.4 Fanout exchange (broadcast)
- 1.2.5 Topic exchange (pattern matching)
- 1.2.6 Headers exchange
- 1.2.7 Connection vs Channel
Primitives: exchange, queue, binding
| Сутність | Цитата з документації |
|---|---|
| Exchange | "AMQP 0-9-1 entities where messages are sent to." Producer публікує завжди в exchange (не в queue). Exchange "take a message and route it into zero or more queues". |
| Queue | "store messages that are consumed by applications". Місце, де повідомлення лежить до прочитання. |
| Binding | "rules that exchanges use to route messages to queues". Щоб exchange E доставив у queue Q, треба зробити Q bound to E. Binding може мати routing key - рядок, за яким exchange вирішує куди класти. |
Queue - properties (прапори при declare)
Queue - не "просто черга". При queue_declare задається 4 незалежних прапори, які визначають lifecycle і поведінку.
| Прапор | Цитата з документації | Практичне значення |
|---|---|---|
| durable | "the queue will survive a broker restart" | Метадата queue зберігається на диск. Без durable - queue зникає після рестарту broker'а. Окремо від цього - повідомлення мають мати delivery_mode=2 (persistent), див. 2.2.4. |
| exclusive | "used by only one connection and the queue will be deleted when that connection closes" | Прив'язана до конкретного connection-а. Інший connection не може ні читати, ні declare ту саму queue. Closed connection → queue знесено. Use case: temporary subscribers (Pub/Sub - 2.3), RPC reply queues. |
| auto-delete | "queue that has had at least one consumer is deleted when last consumer unsubscribes" | Тригер видалення - останній consumer відписався. Якщо ніколи не було consumer'а - не видалиться. Не плутати з exclusive (там тригер - close connection). |
| passive | (не в публ. summary, але стандарт AMQP 0-9-1) | "Тільки перевір, чи queue існує". Не створює, не змінює. Якщо queue нема - помилка NOT_FOUND. Корисно для validate-перед-publish без побічних ефектів. |
Як вони комбінуються
| Use case | durable | exclusive | auto-delete |
|---|---|---|---|
| Production work queue (orders) | ✓ | ✗ | ✗ |
| Pub/Sub temporary subscriber | ✗ | ✓ | (implicit при exclusive) |
| RPC reply queue | ✗ | ✓ | (implicit) |
| Дешбоард, що сам зникає коли всі дашбоарди закрилися | ✗ | ✗ | ✓ |
deprecated_features.permit.transient_nonexcl_queues = true у rabbitmq.conf. Стимулює перехід на durable + quorum.Queue - три типи у 4.x
Окрім прапорів, у 4.x queue має ще тип - спосіб реалізації. Задається при declare через x-queue-type argument. Default історично - classic, але production-стандарт у 4.x - quorum.
| Тип | Що це | Коли |
|---|---|---|
| classic | Single-node queue. Зберігається на одній ноді кластера. Найшвидша, найдешевша. | Transient subscribers (exclusive auto-delete), RPC reply queues, dev/sandbox. НЕ для production-критичних work queues - падіння ноди = втрата queue. |
| quorum | "replicated, data safety and consistency-oriented queue type". Реплікація на 3+ ноди через Raft консенсус. Завжди durable. | Production work queues (orders, payments, ETL). Default-вибір у 4.x для будь-чого важливого. |
| stream | "immutable append-only log" з offset-based consumption. Повідомлення НЕ видаляється після прочитання. | Event sourcing, audit log, fan-out з replay для нових consumer'ів. Великі backlogs. Розділ 4.2. |
Як задавати тип
$channel->queue_declare(
'orders', false, true, false, false, false,
new AMQPTable(['x-queue-type' => 'quorum']) // ← обов'язково для production
);
Базові queue arguments (огляд, deep-dive у пізніших розділах)
| Argument | Що робить | Деталі |
|---|---|---|
x-queue-type | Тип (classic / quorum / stream). | 1.2.1b (тут), 4.1, 4.2 |
x-message-ttl | TTL для всіх повідомлень у queue, ms. | 3.3.2 (TTL + DLX retry) |
x-max-length | Ліміт по кількості повідомлень. | з x-overflow - reject-publish, drop-head |
x-dead-letter-exchange | Куди слати rejected/expired. | 3.3.1, 3.3.2 (DLX) |
x-delivery-limit | Скільки разів re-queue до відмови (тільки quorum). | 3.3.1, 4.1.3 |
Routing key
Routing key (ключ маршрутизації) - рядок, який producer додає до кожного publish. Адресна "етикетка" повідомлення.
Хто його ставить
- Producer - вказує routing key у виклику
basic_publish(body, exchange, routing_key). - Binding - має свій binding key (інший рядок). Exchange матчить routing key повідомлення проти binding key за правилом, специфічним для типу exchange.
Як трактується різними типами exchange
| Exchange | Що робить з routing key |
|---|---|
| direct | Точна рівність: доставка тільки в queues, де binding_key == routing_key. |
| fanout | "the routing key is ignored" - повністю ігнорується, доставка в усі bound queues. |
| topic | Pattern matching: routing key матчиться проти binding patterns з wildcards * (одне слово) і # (нуль і більше слів). Приклад: order.paid.eu матчить order.*.eu і order.#. |
| headers | "Headers exchanges ignore the routing key attribute". Маршрутизація через message headers. |
<entity>.<action>.<region> → user.created.eu. Це не вимога специфікації - це прийнята практика. Довжина routing key на wire-рівні - до 255 байт (shortstr).Direct exchange
Unicast (одноадресна доставка) - "одне повідомлення йде в одну конкретну точку". Контраст: broadcast (всім - радіо) - це fanout; multicast (кільком конкретним) - підмножина.
Алгоритм
З документації: "When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R", де K - binding key. Якщо в кількох queues однаковий K - всі отримають копію.
Default exchange (нюанс)
publish("", "my_queue", body) виглядає як пряма публікація в queue, але технічно йде через default direct exchange. Цей трюк - в "Hello World" tutorial.Fanout exchange
Алгоритм: fanout "routes messages to all of the queues that are bound to it and the routing key is ignored". При publish "a copy of the message is delivered to all N queues".
Use cases (з документації)
- MMO-ігри: оновлення лідерборду і глобальні події всім гравцям.
- Спортивні сайти: realtime-оновлення рахунку мобільним клієнтам.
- Розподілені системи: broadcast стану і конфігурацій усім нодам.
- Групові чати: розсилка повідомлень учасникам.
Topic exchange
Жорстка вимога до routing key (з tutorial-five-php): "it must be a list of words, delimited by dots". Довжина: до 255 байт.
Wildcards (підстановочні символи у binding key)
*"can substitute for exactly one word".#"can substitute for zero or more words".
Приклади matching
| Binding pattern | Збігається | Не збігається |
|---|---|---|
*.orange.* | quick.orange.rabbit, lazy.orange.elephant | orange, quick.orange.new.rabbit |
*.*.rabbit | quick.orange.rabbit, lazy.brown.rabbit | quick.brown.fox |
lazy.# | lazy, lazy.brown.fox, lazy.pink.rabbit.male | quick.brown.fox |
# поводиться як fanout (матчить усе); binding без * і # поводиться як direct (точна рівність). Topic - надмножина direct і fanout за виразністю.Use cases
- Геолокація:
point_of_sale.eu.de.berlin. - Workers за типом задачі:
task.image.resize,task.video.transcode. - Stocks/News по категоріях. Build-системи:
build.linux.x86_64.
Headers exchange
Як працює
- Ігнорує
routing_key. - Маршрутизує за message headers (заголовки повідомлення - пари ключ-значення, метадані поверх body).
- "A message matches when the value of the header equals the value specified upon binding".
Аргумент binding'а x-match
x-match: any- "just one matching header value is sufficient".x-match: all- "all the values must match" (default).any-with-x/all-with-x- те саме, але враховуються headers що починаються зx-. За замовчуваннямx-headers ігноруються.
Connection vs Channel
Connection (з'єднання)
- "AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery".
- "AMQP 0-9-1 connections are typically long-lived" - відкриваєш один раз при старті worker'а.
Channel (канал)
Multiplexing (мультиплексування) - "кілька логічних потоків живуть в одному фізичному каналі зв'язку, кожен зі своїм id". Один TCP - багато паралельних "логічних з'єднань".
Чому через channels, а не багато connections
TCP - дорогий ресурс (file descriptors, kernel-стан, TLS-handshake, NAT/firewall). Channels - дешеві логічні id, тисячі на одне з'єднання.
Підняти sandbox у Docker
Compose-файл, запуск, перевірка readiness. Сюди відкриваєш management UI у наступному 1.4.
- 1.3.1 Розбір
sandbox/compose.yaml - 1.3.2 Запуск
docker compose up -d - 1.3.3 Healthcheck:
rabbitmq-diagnostics ping - 1.3.4 Порти і їх призначення
Розбір sandbox/compose.yaml
services:
rabbitmq:
image: rabbitmq:4.3-management
container_name: rabbitmq-study
hostname: rabbitmq-study
ports:
- "5672:5672" # AMQP 0-9-1 / 1.0
- "15672:15672" # management UI + HTTP API
- "15692:15692" # Prometheus metrics
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq-data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 10s
timeout: 5s
retries: 10
start_period: 30s
restart: unless-stopped
volumes:
rabbitmq-data:
image: rabbitmq:4.3-management- офіц. документація: "second set of tags provided with the management plugin installed and enabled by default". Без-management- broker без UI на 15672.hostname: rabbitmq-study- критично. Erlang-нода прив'язана до nodenamerabbit@<hostname>. Зміна hostname → broker не знайде свою data в томі.volumes: rabbitmq-data:/var/lib/rabbitmq- named volume (постійне сховище Docker, переживає рестарти і recreate). Без volume -docker compose downстирає все.healthcheck- Docker сам викликаєrabbitmq-diagnostics -q pingкожні 10 с.start_period: 30s- перші 30 секунд невдачі не рахуються (broker довго стартує). Важливо дляdepends_on: condition: service_healthyу multi-container setup.restart: unless-stopped- перезапуск після crash або reboot хоста; не перезапускає, якщо ми зупинили вручну.
RABBITMQ_DEFAULT_*) спрацьовують тільки при першому запуску, коли data-volume порожній. Подальша зміна env нічого не дасть - credentials уже в Khepri-стані.Запуск sandbox
Команди
cd /home/serhii/study/courses/rabbitmq/sandbox
# Pull image (~250MB) і запуск у detached mode
docker compose up -d
# Перевірити стан контейнера
docker compose ps
# Лог-стрім (Ctrl+C для виходу, контейнер працює далі)
docker compose logs -f rabbitmq
Очікуваний вивід docker compose ps
NAME IMAGE COMMAND STATUS
rabbitmq-study rabbitmq:4.3-management "docker-entrypoint.s…" Up 35s (healthy)
Що відбувається при першому запуску
- Docker pull
rabbitmq:4.3-managementз Docker Hub. - Створюється named volume
rabbitmq-data(порожній). - Контейнер стартує, entrypoint застосовує env-змінні (створює user
guest, vhost/). - Erlang VM піднімає broker, ініціалізує Khepri-store на диску.
- Healthcheck після 30 с (start_period) починає викликати
rabbitmq-diagnostics ping. - Коли ping проходить - status стає
healthy.
Як зупинити / перезапустити
docker compose stop # стоп без видалення контейнера
docker compose start # старт існуючого контейнера
docker compose restart # alias
docker compose down # стоп + видалення контейнера (volume лишається!)
docker compose down -v # ⚠ + видалення volume - стирає УСІ дані
Перевірка readiness: rabbitmq-diagnostics ping
rabbitmq-diagnostics - офіційний CLI broker'а. Команда ping перевіряє, що нода відповідає на Erlang-distribution rpc-виклик.
docker compose exec rabbitmq rabbitmq-diagnostics -q ping
# Pong
Прапор -q (quiet) виводить тільки результат без banner'а. Exit code 0 = ping OK.
Інші діагностичні команди (часто стають у нагоді)
# статус ноди (uptime, memory, queues, consumers)
docker compose exec rabbitmq rabbitmq-diagnostics status
# coverage check всіх listeners (порти 5672, 15672, тощо)
docker compose exec rabbitmq rabbitmq-diagnostics check_port_listener 5672
# health check (комплексний - listeners + ack-tracking + alarms)
docker compose exec rabbitmq rabbitmq-diagnostics -q check_running
# overview через CLI rabbitmqctl
docker compose exec rabbitmq rabbitmqctl list_queues
docker compose exec rabbitmq rabbitmqctl list_exchanges
docker compose exec rabbitmq rabbitmqctl list_bindings
rabbitmq-diagnostics -q ping. Docker викликає його сам - тому docker compose ps показує status healthy або unhealthy.Порти і їх призначення
RabbitMQ - мульти-протокольний broker. Кожен протокол слухає на окремому порту. У нашому sandbox експонуємо три з них.
| Порт | Протокол | Що там |
|---|---|---|
5672 | AMQP 0-9-1 / 1.0 | Основний канал для PHP-клієнтів (php-amqplib, ext-amqp). Producer і consumer connect'яться сюди. |
15672 | HTTP (Management UI + API) | Web-інтерфейс і REST API: /api/queues, /api/overview, /api/exchanges. Включений тільки в *-management tag. |
15692 | HTTP (Prometheus) | Endpoint /metrics у форматі Prometheus exposition. Включений у плагіні rabbitmq_prometheus (built-in у дистрибутиві з 3.8+). |
5552 | RabbitMQ Streams (native) | Stream protocol для high-throughput append-only. У sandbox НЕ експонуємо - використаємо AMQP 0-9-1 access до streams. |
1883 / 8883 | MQTT (з/без TLS) | Поза скоупом курсу. |
61613 / 61614 | STOMP | Поза скоупом курсу. |
4369 | epmd (Erlang Port Mapper) | Discovery для cluster-формування. У single-node sandbox не експонуємо. |
25672 | Inter-node Erlang distribution | Між нодами кластера. У single-node sandbox не експонуємо. |
docker compose exec rabbitmq rabbitmq-diagnostics listeners
Management UI: огляд
Web-інтерфейс RabbitMQ - читай queues, оголошуй exchanges, дивись live-метрики. Доступний на http://localhost:15672.
- 1.4.1 Доступ і обмеження guest user'а
- 1.4.2 Що можна робити з UI
- 1.4.3 Створити exchange + queue + binding вручну
Доступ до management UI
URL і credentials
- URL:
http://localhost:15672(порт експоновано в compose.yaml). - Дефолтний user:
guest/guestз повним доступом до vhost/.
Обмеження guest user'а
access-control: "By default, the guest user is prohibited from connecting from remote hosts; it can only connect over a loopback interface (localhost)".З не-localhost log на broker'і покаже:
PLAIN login refused: user 'guest' can only connect via localhost
Чому це важливо. Якщо ви розгортаєте sandbox на віддаленому сервері і не можете залогінитись через UI - це не баг, це security default. Рішення:
- Створити нового користувача з паролем (production-style):
docker compose exec rabbitmq rabbitmqctl add_user me <strong-password>
docker compose exec rabbitmq rabbitmqctl set_user_tags me administrator
docker compose exec rabbitmq rabbitmqctl set_permissions -p / me ".*" ".*" ".*"
- Або (тільки для dev) - дозволити guest з remote: env-змінна
RABBITMQ_LOOPBACK_USERS=""у compose.yaml. Не для production.
guest/guest працює без ніяких змін.Можливості management UI
Цитати з офіційної сторінки docs/management. Згруповано за функцією.
Адміністрування
- "Declare, list and delete exchanges, queues, bindings, users, virtual hosts and user permissions"
- "Manage users", "Manage policies and runtime parameters"
- "Force close client connections, purge queues"
Моніторинг
- "Monitor queue length, message rates (globally and per queue, exchange or channel)"
- "Monitor node resource use: sockets and file descriptors, memory usage breakdown, available disk space"
- "View other users's connections and channels"
Дані
- "Export schema (vhosts, users, permissions, queues, exchanges, bindings, parameters, policies)" - експорт топології як JSON, можна імпортувати в інший broker.
- "Send and receive messages (useful in development environments and for troubleshooting)" - тестова публікація і Get-message прямо з UI.
Стандартні tabs у UI (станом на 4.x)
- Overview - графіки rates, totals, ноди кластера.
- Connections - відкриті TCP-з'єднання, peer host:port, idle/active, connection close.
- Channels - logical channels всередині кожного connection, prefetch, unacked.
- Exchanges - список exchanges, тип, durable/auto-delete; кнопка Add new exchange.
- Queues and Streams - queues, type (classic/quorum/stream), depth, consumers, rates.
- Admin - users, vhosts, policies, federation/shovel.
Hands-on: створити exchange + queue + binding
Мета: побачити primitives з 1.2 у живому UI і отримати feel за них. Без коду.
Крок 1: declare exchange
- Exchanges tab → Add a new exchange.
- Name:
demo.events. Type:fanout. Durable: ✓. Auto delete: ✗. - Add exchange. Має з'явитись у списку.
Крок 2: declare queues
- Queues tab → Add a new queue.
- Type:
Classic(для цього прикладу - але production-default ужеQuorum; туди прийдемо в розділі 4). - Name:
demo.email. Durable. Add queue. - Повторити для
demo.analytics.
Крок 3: bind queues to exchange
- Відкрити exchange
demo.events→ секція Bindings → Add binding from this exchange. - To queue:
demo.email. Routing key порожній (fanout ігнорує). Bind. - Повторити для
demo.analytics.
Крок 4: publish тестове повідомлення
- На сторінці exchange
demo.events→ секція Publish message. - Routing key: будь-який (ігнорується для fanout). Payload:
{"event": "user_signed_up"}. Publish. - UI покаже "Message published".
Крок 5: подивитися де воно
- Queues tab → видно що
demo.emailіdemo.analyticsмають Ready: 1 (по копії в кожній - типовий fanout). - Відкрити queue → Get messages → Get Message(s). Побачити payload.
basic_get або basic_consume. UI - просто HTTP-обгортка над тим самим API.queue_declare, exchange_declare, queue_bind) - тоді деплой нового сервісу автоматично створює свої queues і bindings.Hands-on: побачити queue properties у дії
Закріплення 1.2.1a (properties) і 1.2.1b (types) на живому broker'і. Усі дії - через management UI на http://localhost:15672.
Крок 1: оголосити три queues з різними прапорами
На Queues and Streams tab → Add a new queue. Створи по черзі:
| Name | Type | Durable | Auto-delete | Arguments |
|---|---|---|---|---|
demo.production | Quorum | (implicit ✓) | ✗ | (empty) |
demo.transient | Classic | ✗ | ✓ | (empty) |
demo.bounded | Quorum | (implicit ✓) | ✗ | x-max-length=5, x-overflow=reject-publish |
Важливо: для demo.transient (classic + non-durable + auto-delete без exclusive) можеш отримати помилку "queue declaration not allowed" у 4.3 - це той самий deprecated_features з 1.2.1a. Якщо так - просто пропусти або змінь на classic + durable.
Крок 2: подивись що видно у UI
Відкрий кожну queue по черзі. У секції Details:
- Type, Features (D = durable, AD = auto-delete, тощо).
- State (running / idle).
- Consumers - 0 на старті.
- Messages - 0.
- Arguments - те, що додав (x-max-length, x-queue-type).
Крок 3: тест auto-delete
- Створи fanout-exchange
demo.testі binding'и до всіх трьох queues. - Publish тестове повідомлення з UI у
demo.testexchange. Усі три queues отримують копію (depth = 1). - Відкрий
demo.transient→ Get messages → Get Message(s) (натисни кілька разів). - Це не симулює "consumer subscribed/unsubscribed" - get-mode не лічиться. Auto-delete не спрацює.
- Щоб реально побачити auto-delete - запусти PHP
basic_consumeна цю queue (з 2.1), потім Ctrl+C. Queue зникне з UI.
Крок 4: тест x-max-length + reject-publish
- На сторінці
demo.boundedexchange → Publish message. Опублікуй 6 повідомлень підряд. - Перші 5 - депт стає 5.
- 6-те (без publisher confirms) - тихо дропнеться. З publisher confirms - повернеться
basic.nack. - Перевір у
demo.bounded→ Messages = 5, не більше. Це і є back-pressure про який питав вище.
Cleanup
docker compose exec rabbitmq rabbitmqctl delete_queue demo.production
docker compose exec rabbitmq rabbitmqctl delete_queue demo.bounded
# demo.transient або зникне сама, або:
docker compose exec rabbitmq rabbitmqctl delete_queue demo.transient
docker compose exec rabbitmq rabbitmqctl delete_exchange demo.test
Базові патерни обміну
П'ять офіційних RabbitMQ tutorials на PHP. Клієнт - php-amqplib v3.7.4. Кожен tutorial демонструє один тип exchange або одну concurrency-властивість.
- 2.1 "Hello World" - найпростіший producer / consumer
- 2.2 Work Queues - розподіл задач, ack, prefetch, durability
- 2.3 Publish / Subscribe - fanout
- 2.4 Routing - direct exchange з routing keys
- 2.5 Topics - pattern matching
"Hello World" - producer (send.php)
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare(
'hello', // name
false, // passive
true, // durable
false, // exclusive
false, // auto_delete
false, // nowait
new AMQPTable(['x-queue-type' => 'quorum']) // arguments
);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello'); // exchange="", routing_key="hello"
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
basic_publish($msg, '', 'hello')- порожній exchange = default direct exchange. Routing key = ім'я queue. Це єдиний випадок, коли publish "виглядає" як прямий запис у queue (див. 1.2.3).x-queue-type: quorum- офіц. tutorial 2026 використовує quorum-черги за дефолтом замість class. (Розділ 4 - чому).
"Hello World" - consumer (receive.php)
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, true, false, false, false,
new AMQPTable(['x-queue-type' => 'quorum']));
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function (AMQPMessage $msg) {
echo ' [x] Received ', $msg->getBody(), "\n";
};
$channel->basic_consume(
'hello', // queue
'', // consumer_tag (auto)
false, // no_local
true, // no_ack ⚠ auto-ack (продакшн: false)
false, // exclusive
false, // nowait
$callback // callback
);
try {
$channel->consume();
} catch (\Throwable $e) {
echo $e->getMessage();
}
no_ack=true для простоти. У production - завжди manual ack (no_ack=false), інакше повідомлення вважається доставленим у ту саму секунду, як broker його відправив - і губиться, якщо consumer впав під час обробки. Тема 2.2.Work Queues
Розподіл важких задач між кількома worker'ами. Round-robin dispatch + ack + prefetch + durability.
- 2.2.1 Round-robin dispatch
- 2.2.2 Manual acknowledgement
- 2.2.3 Fair dispatch (prefetch_count)
- 2.2.4 Durability: durable queue + persistent message
Round-robin dispatch
Кілька consumer'ів підписуються на ту саму queue → broker роздає повідомлення по черзі.
Code не потребує спецналаштувань - просто запусти кілька receive.php з тим самим queue.
Manual acknowledgement
Ack (acknowledgement, підтвердження) - сигнал від consumer'а до broker'а: "повідомлення оброблено успішно, можна стерти". Broker тримає повідомлення in-flight (на льоту) до отримання ack.
$callback = function (AMQPMessage $msg) {
echo ' [x] Received ', $msg->getBody(), "\n";
sleep(substr_count($msg->getBody(), '.')); // імітація роботи
echo " [x] Done\n";
$msg->ack(); // ← підтвердження після успішної обробки
};
$channel->basic_consume(
'task_queue',
'',
false,
false, // no_ack=false → manual ack обов'язковий
false,
false,
$callback
);
Що буває без ack
- Worker отримав m1, почав обробляти, впав → broker через connection-close re-queue m1 → інший worker отримає m1.
- Worker завершив, але не послав ack → broker думає що повідомлення in-flight, тримає його у пам'яті, не видаляє. Утечка memory.
Три способи відмови від обробки
| Метод | Що робить | Коли вживати |
|---|---|---|
$msg->ack() | Підтвердити, broker стирає. | Успішна обробка. |
$msg->nack(requeue: true) | Відкинути + повернути в queue. | Тимчасова помилка (DB недоступна, retry може допомогти). |
$msg->reject(requeue: false) | Відкинути назавжди (→ DLX, якщо налаштовано). | Принципово невалідне повідомлення (poison message). |
delivery acknowledgement timeout. Розділ 3.2 - як налаштувати.Fair dispatch (basic_qos + prefetch_count)
Round-robin сліпий. Рішення: сказати broker'у "не давай worker'у наступного, поки попереднє не ack'ed".
$channel->basic_qos(
null, // prefetch_size (bytes; null = unlimited)
1, // prefetch_count = скільки in-flight на consumer'а
false // global (per-channel vs per-consumer)
);
Як це змінює поведінку
prefetch_count=1- найсуворіший: worker тримає максимум 1 unacked. Сповільнює, бо більше round-trip'ів broker↔worker.prefetch_count=10-50- типове production-значення для CPU-bound worker'а: дозволяє pipeline без надлишкового memory.prefetch_count=1000+- для дуже легких задач (millions/sec), щоб не упертись у round-trip latency.
Залежність від worker speed
| Час обробки | Рекомендоване prefetch |
|---|---|
| > 1 сек | 1 - 5 |
| 10-100 мс | 20 - 50 |
| < 1 мс | 200 - 1000 |
Durability: durable queue + persistent message
Дві НЕЗАЛЕЖНІ опції, які мусять бути ОБИДВІ ввімкнені для гарантії "повідомлення переживе рестарт broker'а".
Durable queue
$channel->queue_declare(
'task_queue',
false, // passive
true, // durable ← переживає рестарт broker'а
false, // exclusive
false, // auto_delete
false,
new AMQPTable(['x-queue-type' => 'quorum'])
);
Без durable=true queue видаляється при рестарті broker'а - разом з усіма повідомленнями.
Persistent message
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // = 2
]);
$channel->basic_publish($msg, '', 'task_queue');
Без delivery_mode=2 повідомлення сидить тільки у RAM. Broker впав → повідомлення зникло, навіть якщо queue durable.
Матриця гарантій
| Queue durable | Msg persistent | Що буде з повідомленням після рестарту? |
|---|---|---|
| ✗ | будь-який | Queue зникне, повідомлення з нею. |
| ✓ | ✗ | Queue відновиться порожньою. |
| ✓ | ✓ | Queue відновиться, повідомлення на місці. |
x-queue-type: quorum) durability у дещо іншому форматі: дані пишуться через Raft на 3+ ноди, повідомлення переживають падіння будь-якої меншості нод. Розділ 4 - детально.Publish / Subscribe з fanout
Сценарій: один producer кидає event - кожен consumer отримує копію (broadcast). Реалізація - fanout exchange + temporary exclusive queues.
Producer
$channel->exchange_declare(
'logs', // name
'fanout', // type
false, // passive
false, // durable
false // auto_delete
);
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
// routing_key не вказаний → fanout все одно ігнорує
Consumer (логіка одноразової підписки)
$channel->exchange_declare('logs', 'fanout', false, false, false);
// queue з порожнім ім'ям → broker згенерує унікальне (amq.gen-XYZ)
// exclusive=true → видалиться при disconnect
list($queue_name, ,) = $channel->queue_declare(
"", // name (auto-generate)
false, // passive
false, // durable
true, // exclusive ← тільки для цього connection
false // auto_delete
);
$channel->queue_bind($queue_name, 'logs');
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
Чому exclusive queue, а не named
- Fresh empty: новий subscriber не повинен отримати повідомлення, які були до його старту.
- Auto-cleanup: consumer закрив connection → queue зникає, broker не зберігає мертві подвиги.
Routing з direct exchange
Класичний приклад tutorial - логи з рівнями info, warning, error. Consumer обирає, які рівні слухати.
Exchange + публікація
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
// $severity ∈ {'info', 'warning', 'error'}
Consumer з кількома bindings
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Кілька bindings на ту саму queue з РІЗНИМИ routing keys
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
Use case з консолі
# Логер що пише ВСЕ у файл
php receive_logs_direct.php info warning error > logs/all.log
# Окрема панель для критичних
php receive_logs_direct.php error | mail -s "ERROR" oncall@example.com
Topics - routing з pattern matching
Direct - точна рівність. Topic - patterns з wildcards. Routing key стає ієрархічним: <facility>.<severity> або <entity>.<action>.<region>.
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
// Producer публікує з ієрархічним routing key
$channel->basic_publish($msg, 'topic_logs', 'kern.critical');
$channel->basic_publish($msg, 'topic_logs', 'auth.warning');
$channel->basic_publish($msg, 'topic_logs', 'kern.info');
// Consumer 1: всі kern-events
$channel->queue_bind($queueA, 'topic_logs', 'kern.*');
// Consumer 2: всі critical, незалежно від facility
$channel->queue_bind($queueB, 'topic_logs', '*.critical');
// Consumer 3: ВСЕ
$channel->queue_bind($queueC, 'topic_logs', '#');
Wildcards (нагадування з 1.2.5)
*- рівно одне слово.#- нуль або більше слів.- Слова відокремлюються крапкою. Routing key до 255 байт.
Сценарій з курсу: e-commerce events
// Producer - кожна доменна подія публікується з ієрархічним key
publish($msg, 'events', 'order.created.eu');
publish($msg, 'events', 'order.paid.us');
publish($msg, 'events', 'user.signed_up.eu');
// Consumer'и за зоною відповідальності:
queue_bind($euAnalyticsQ, 'events', '*.*.eu'); // тільки EU events
queue_bind($paymentsQ, 'events', 'order.paid.*'); // тільки payments
queue_bind($auditQ, 'events', '#'); // ВСЕ для аудиту
# = fanout. У production часто стартують з topic, бо легко додавати нових consumer'ів з новими patterns без зміни producer'а.Reliability
Як гарантувати доставку без втрат і без дублікатів-привидів. Найбільш практично-важливий розділ для backend-команди.
- 3.1 Publisher Confirms - гарантії від broker'а до producer'а
- 3.2 Consumer Acknowledgements - manual ack, timeout у 4.3
- 3.3 Dead Letter Exchange (DLX) і retry-патерни
- 3.4 Idempotency на consumer-side
Publisher Confirms vs AMQP Transactions
Питання, що вирішується: як producer'у дізнатись, що broker реально прийняв повідомлення? Без додаткових механізмів basic_publish - "fire-and-forget": producer запхав у TCP-сокет, broker міг впасти між publish і flush на диск.
Два механізми
| Підхід | Як працює | Throughput |
|---|---|---|
AMQP Transactions (tx.select / tx.commit) | Стандарт AMQP. Synchronous round-trip після кожного publish. | "unnecessarily heavyweight and decrease throughput by a factor of 250". Не використовується. |
Publisher Confirms (confirm.select) | Async. Broker шле basic.ack/basic.nack для кожного publish (за seq #). Producer пише далі без блокування. | ~production throughput, без транзакційного оверхеду. De facto стандарт. |
Як вмикати в php-amqplib
$channel->confirm_select(); // confirm.select - переводить channel у confirm mode
// Реєструємо handlers до ack/nack від broker'а
$channel->set_ack_handler(function (AMQPMessage $msg) {
// повідомлення з sequence number $msg->getDeliveryTag() прийнято
});
$channel->set_nack_handler(function (AMQPMessage $msg) {
// broker відмовився - log, retry, alert
});
for ($i = 0; $i < 1000; $i++) {
$channel->basic_publish(new AMQPMessage("msg $i"), '', 'tasks');
}
// Чекати, поки прийдуть всі ack'и (або timeout 5 сек)
$channel->wait_for_pending_acks_returns(5);
Mandatory flag + return listener
Що буде, якщо publish у exchange без bindings (або routing key не матчить жодну binding)? За замовчуванням - тиха втрата ("zero or more queues" з 1.2.1).
Прапор mandatory
$msg = new AMQPMessage('payload');
$channel->basic_publish(
$msg,
'orders',
'unmatched.key',
true // mandatory = true ← вимагати, щоб повідомлення потрапило хоча б в одну queue
);
$channel->set_return_listener(function ($replyCode, $replyText, $exchange, $routingKey, $msg) {
// повідомлення повернуто broker'ом, бо нікуди не маршрутизувалось
error_log("Returned: $replyText for $exchange/$routingKey");
});
Чому це важливо
- У production легко "забути" створити binding після додавання нового consumer'а - повідомлення тихо губляться.
- Mandatory + return listener перетворюють "тиху втрату" на гучну (log, alert).
- Альтернатива: alternate exchange (
x-alternate-exchangeу exchange-args) - перенаправляє unrouted повідомлення в окрему "catch-all" чергу. Менший overhead, ніж listener у кожному producer'і.
basic.return (повернення невмаршрутизованого), потім basic.ack (підтвердження що broker його обробив). Не плутати - ack не означає "доставлено в queue", означає "broker його прийняв (і повернув якщо unroutable)".Consumer ack: timeout у 4.3 (важлива зміна!)
Базовий manual ack-flow вже знайомий з 2.2.2. У RabbitMQ 4.3 - значуща зміна щодо acknowledgement timeout.
Тобто classic queues у 4.3 НЕ enforces ack timeout. Це означає, що
x-queue-type: quorum стає необхідним для production-надійності.Що таке consumer ack timeout
- Default value:
30 minutes("The default timeout value for RabbitMQ is 30 minutes"). - Якщо consumer мовчить (не
ack/nack) довше за timeout - broker закриває channel помилкою:
Consumer 'consumer-tag-998754663370' on channel 1 and queue 'qq.1' in vhost '/'
has timed out waiting for a consumer acknowledgement of a delivery with delivery tag = 10.
Timeout used: 1800000 ms.
Як налаштувати
# /etc/rabbitmq/rabbitmq.conf (мілісекунди)
consumer_timeout = 1800000 # 30 хв (default)
# або 1 година для довгих ETL-задач:
consumer_timeout = 3600000
Practical impact для backend-команди
- Worker, що робить тривалу задачу (відеоконвертація, великий ETL) має або: (а) ack одразу і записати state у DB як "in progress" (idempotent recovery), або (б) збільшити
consumer_timeout, або (в) розбити задачу на чанки з ack після кожного. - Без ack у відведений час - broker re-queue'ить повідомлення, інший worker його обробить → можливі дублікати (тема 3.4).
Dead Letter Exchange - коли повідомлення стає "мертвим"
Чотири причини dead-letter
| # | Причина | Хто викликає |
|---|---|---|
| 1 | "basic.reject or basic.nack with requeue=false" | Consumer явно: "відмовляюсь, не вертайте назад". |
| 2 | "The message expires due to per-message TTL" | Час повідомлення (expiration property) вийшов. |
| 3 | "The message is dropped because its queue exceeded a length limit" | Queue має x-max-length або x-max-length-bytes, новіші витискають старіші. |
| 4 | "The message is returned more times to a quorum queue than the delivery-limit" | Quorum queue з x-delivery-limit - повідомлення re-queue'ed забагато разів. |
Що таке "стало мертвим"
Broker re-publish'є його у Dead Letter Exchange (DLX) з:
- оригінальним body;
- headers
x-death- історія, чому повідомлення стало dead (queue, reason, count, time); - routing key = original або
x-dead-letter-routing-key, якщо налаштовано.
DLX: налаштування і retry-патерни
Базове налаштування queue
$channel->exchange_declare('dlx', 'direct', false, true, false);
$channel->queue_declare('dead_letters', false, true, false, false);
$channel->queue_bind('dead_letters', 'dlx', 'failed');
$channel->queue_declare('tasks', false, true, false, false, false, new AMQPTable([
'x-queue-type' => 'quorum',
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'failed',
]));
Retry з exponential backoff (через TTL + DLX)
Класичний патерн: повідомлення впало → іде в "park"-queue з TTL → expire → DLX повертає назад у головну queue. Між спробами - exponential delay.
// Дві queues:
// 1) main "tasks" - звичайна
// 2) "tasks.retry.5s" з x-message-ttl=5000 і DLX = main exchange
$channel->queue_declare('tasks.retry.5s', false, true, false, false, false, new AMQPTable([
'x-message-ttl' => 5000, // 5 секунд
'x-dead-letter-exchange' => '', // default exchange
'x-dead-letter-routing-key' => 'tasks', // повертає в main queue
]));
// Consumer при тимчасовій помилці:
function onTransientError(AMQPMessage $msg, $channel) {
// Re-publish у retry queue замість nack
$channel->basic_publish($msg, '', 'tasks.retry.5s');
$msg->ack(); // ack оригіналу
}
Patterns на DLX
- Parking lot: один dead_letters queue для аудиту. Inспектор переглядає, можливо ручне re-publish.
- Exponential backoff: кілька retry queues з різним TTL:
retry.1s,retry.5s,retry.30s,retry.5m. Лічильник - у header'і повідомлення. - Native delayed retry (4.3): quorum queues підтримують
x-delivery-limit+x-dead-letter-strategy: at-least-once. Простіше, ніж параметр.
Idempotency на consumer-side
Idempotent (ідемпотентна) операція
Операція, повторне виконання якої не змінює результат після першого. Формально: f(f(x)) == f(x).
| Операція | Idempotent? | Чому |
|---|---|---|
SET balance = 100 | ✓ | Скільки разів не повторюй - balance залишається 100. |
UPDATE balance = balance + 10 | ✗ | Повтор два рази → +20 замість +10. Фінансова катастрофа. |
DELETE FROM users WHERE id = 5 | ✓ | Друге виконання нічого не робить. |
INSERT INTO users (...) | ✗ | Дублікат рядка або UNIQUE-помилка. |
Дві стратегії на consumer-side
1. Idempotent by design
Спроектувати операцію так, щоб повтор був безпечний. Useful patterns:
UPSERT(PostgreSQLON CONFLICT DO NOTHING/UPDATE) замістьINSERT.SET state = 'paid'замістьUPDATE state = state | PAID_FLAG.- Перевірка стану перед дією: "чи це вже виконано?".
2. Deduplication через зовнішній storage
$messageId = $msg->get('message_id'); // або власний idempotency key у payload
// Redis SETNX (set if not exists) з TTL
$inserted = $redis->set("processed:$messageId", "1", ['EX' => 86400, 'NX' => true]);
if ($inserted) {
// Перша спроба - обробити
handleMessage($msg);
$msg->ack();
} else {
// Дублікат - просто ack без обробки
$msg->ack();
}
message_id або doc-specific idempotency_key.Quorum Queues і Streams
Сучасні replicated primitives RabbitMQ 4.x. Classic Mirrored Queues свідомо пропускаємо - "removed starting with RabbitMQ 4.0".
- 4.1.1 Quorum Queues - Raft + replicated FIFO
- 4.1.2 Коли НЕ використовувати quorum queues
- 4.1.3 Quorum queues: новинки 4.3
- 4.2.1 Streams - append-only лог
- 4.2.2 Native stream protocol vs AMQP
Quorum Queues - replicated FIFO на Raft
Як працює реплікація
"All operations (state changes) on a quorum queue are sent to a primary member called a leader which in turn replicates the operations to the remaining members, called followers."
- Raft - алгоритм консенсусу (як Paxos, але простіший в реалізації). Гарантує: якщо більшість (quorum) нод погодилась записати, дані не загубляться.
- Leader / Follower - на кожну queue є один leader, кілька followers. Producer пише leader, leader реплікує followers, дочекавшись ACK від quorum (більшість) - відповідає producer'у.
- Auto-failover: якщо leader ноду губиться, followers вибирають нового leader'а через Raft-голосування.
Декларування
$channel->queue_declare('orders', false, true, false, false, false, new AMQPTable([
'x-queue-type' => 'quorum',
// 'x-quorum-initial-group-size' => 3, // default 3
// 'x-delivery-limit' => 10, // після 10 redelivers → DLX
]));
"To declare a quorum queue set the x-queue-type queue argument to quorum (the default is classic)."
Розмір кластера
- 3 ноди - офіц. рекомендований default. Витримує падіння 1 ноди.
- 5 нод - для критичних navrh. Витримує падіння 2 нод.
- "performance tails off quite a bit for quorum queue node sizes larger than 5. We do not recommend running quorum queues on more than 7 RabbitMQ nodes".
Коли НЕ варто використовувати quorum queues
З документації - чотири категорії, де quorum queues неефективні або непридатні:
| Категорія | Чому | Що замість |
|---|---|---|
| "Temporary queues: transient or exclusive queues, high queue churn" | Raft має overhead на ініціалізацію. Створювати/видаляти 1000 queues/хв через Raft - марно. | Classic queue (для exclusive use, transient subscribers - як у 2.3 Pub/Sub). |
| "Lowest possible latency" | Raft round-trip між нодами додає 1-3 мс. Для прямих RPC-чатів недопустимо. | Classic queue (одна нода, без реплікації). |
| "Very long queue backlogs (5M+ messages)" | Quorum queue зберігає весь стан у пам'яті лідера. Великі backlogs з'їдають RAM. | Streams (диск-first, мільярди повідомлень). |
| "Large fanouts" | Реплікація через Raft не оптимізована для broadcast. | Streams (один write, багато consumers читають з offset). |
Як вирішити: classic, quorum чи stream
| Сценарій | Тип |
|---|---|
| Production-критична work queue (платежі, замовлення) | Quorum |
| Transient subscribers (live notifications, dashboards) | Classic (exclusive auto-delete) |
| Event sourcing, audit log, високий throughput | Stream |
| RPC reply queues (короткі, exclusive) | Classic (exclusive) |
| Великий backlog (DAG retries, ETL з годинами) | Stream |
deprecated_features.permit.transient_nonexcl_queues = true у rabbitmq.conf (тільки тимчасово).Quorum queues - новинки 4.3
Версія 4.3.0 принесла кілька значущих покращень саме для quorum queues. Цитати з release notes.
1. Strict priority queues
- Раніше priority queues були тільки classic. У 4.3 - quorum.
x-max-priority: Nпри declare. Producer вказуєpriority: Kу message properties.- Strict = вищий priority обробляється спочатку. Не "вища ймовірність", саме black-and-white.
2. Delayed retry з configurable backoff
- Раніше для retry-with-backoff потрібно було будувати ланцюг TTL+DLX (як у 3.3.2). У 4.3 quorum queue робить це нативно.
- Налаштовується через
x-overflow: reject-publish+x-delivery-limit+ новийx-dead-letter-strategy.
3. Configurable consumer timeout per queue
- Раніше - тільки global config. У 4.3 -
x-consumer-timeout: 1800000per-queue. - Корисно: довгі ETL queues - 1 година, швидкі notifications - 10 секунд.
4. Memory optimization
Для systems з мільйонами queued messages - до 50% memory зменшення на broker'і.
Streams - append-only лог
Ключова відмінність від queues
| Queue (classic / quorum) | Stream | |
|---|---|---|
| Семантика consume | Destructive: ack → broker стирає. | "non-destructive consumer semantics": повідомлення лишається. |
| Replay | Неможливий (повідомлення вже стерто). | Так. Будь-який offset → читаєш з минулого. |
| Кілька consumers | Round-robin (work queue) або копії в окремі queues. | Кожен consumer читає той самий лог зі свого offset, незалежно. |
| Зберігання | Memory + Disk (durable). | Disk-first, append-only сегменти. |
| Cleanup | Через ack або TTL. | За часом (x-max-age) або розміром (x-stream-max-segment-size-bytes). |
Декларування
$channel->queue_declare('events', false, true, false, false, false, new AMQPTable([
'x-queue-type' => 'stream',
'x-max-age' => '7D', // зберігати 7 днів
'x-stream-max-segment-size-bytes' => 100_000_000, // 100MB segments
]));
"set the x-queue-type queue argument to stream ... must be provided by a client at declaration time; it cannot be set or changed using a policy."
Use cases
- Event sourcing: кожен domain event у stream, materialized views читають з offset 0 при старті.
- Audit log: compliance вимагає replay - stream не дає видалити випадково.
- Fan-out з replay: новий consumer (наприклад analytics-сервіс) робить read-from-beginning без впливу на існуючих.
- Високий throughput: sequential append на диск - набагато швидший за random write.
Stream native protocol vs AMQP 0-9-1 access
Stream доступний по двох протоколах одночасно:
| Native protocol (port 5552) | AMQP 0-9-1 (port 5672) | |
|---|---|---|
| Throughput | Сотні тисяч/сек на consumer'а. | Десятки тисяч/сек. |
| Specific stream features | Server-side filter, super-streams (sharding), single active consumer. | Базова robocon: read-from-offset, basic_consume. |
| PHP client | Немає офіційного. Тільки Java, .NET, Go, Python, Rust. | php-amqplib працює як з queue. |
| Рекомендується для | Production stream-heavy load. | Прості use cases, де PHP - не bottleneck. |
PHP сценарій (через AMQP 0-9-1)
// Декларуємо stream
$channel->queue_declare('events', false, true, false, false, false, new AMQPTable([
'x-queue-type' => 'stream',
]));
// Consumer з offset = "first" (з самого початку)
$channel->basic_qos(null, 100, false); // prefetch обов'язковий для streams
$channel->basic_consume(
'events',
'',
false,
false, // manual ack
false,
false,
$callback,
null,
new AMQPTable([
'x-stream-offset' => 'first', // або: 'last', 'next', timestamp, або int offset
])
);
Offset стратегії
'first'- з самого початку (replay all).'last'- з останнього chunk'а (catch-up).'next'- тільки нові повідомлення.- Timestamp - з конкретного моменту.
- Numeric offset - з конкретного зміщення.
Інтеграція з PHP
Три рівні абстракції для роботи з RabbitMQ у PHP-проєктах. Від low-level wire-protocol клієнта до drop-in queue driver'а.
- 5.1 php-amqplib - native AMQP 0-9-1 client
- 5.2 Symfony Messenger з AMQP transport
- 5.3 Laravel Queue з RabbitMQ driver
- 5.4 Коли який інструмент
php-amqplib - native AMQP 0-9-1 клієнт
Чому "pure PHP"
- Не вимагає PECL-extension. Інша бібліотека (php-amqp) вимагає
ext-amqpз C-кодом - складно ставити у Docker, конфлікти з PHP-версіями. - php-amqplib працює на будь-якому PHP-хості.
- Trade-off: повільніше за C-extension у high-throughput сценаріях (~10-30%).
Інсталяція
composer require php-amqplib/php-amqplib
Базовий flow
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
// 1. Connection (TCP) - один раз на процес
$connection = new AMQPStreamConnection(
'rabbitmq-host', 5672, 'user', 'password',
'/', // vhost
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
3.0, // connection_timeout
3.0, // read_write_timeout
null, // context (TLS)
false, // keepalive
60, // heartbeat (секунди!)
0.0, // channel_rpc_timeout
);
// 2. Channel - дешевий, можна багато на 1 connection
$channel = $connection->channel();
// 3. Декларуй topology idempotently
$channel->exchange_declare('orders', 'topic', false, true, false);
$channel->queue_declare('order_processing', false, true, false, false, false,
new AMQPTable(['x-queue-type' => 'quorum']));
$channel->queue_bind('order_processing', 'orders', 'order.*.processing');
// 4. Publish (з headers і properties)
$msg = new AMQPMessage(json_encode($payload), [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => Uuid::v4()->toRfc4122(),
'timestamp' => time(),
'application_headers' => new AMQPTable(['source' => 'orders-service']),
]);
$channel->basic_publish($msg, 'orders', 'order.created.eu');
// 5. Завжди закривати в кінці процесу
$channel->close();
$connection->close();
Дві варіанти Connection класу
AMQPStreamConnection- PHP streams API. Default. Підтримує TLS через context.AMQPSocketConnection- low-levelsocket_*функції. Швидше, але потребуєext-sockets. Без TLS support.AMQPSSLConnection- спеціалізований для TLS поверх stream connection.
heartbeat (рекомендоване 60 сек) - інакше "тихий" TCP не виявить розрив до first IO. На AWS NLB/firewall'и часто закривають idle conn через 5-10 хв без RST.Symfony Messenger з AMQP transport
Інсталяція (зверни увагу: PECL-extension!)
composer require symfony/amqp-messenger
# І в Docker - PECL extension:
# docker-php-ext-install ... && pecl install amqp && docker-php-ext-enable amqp
ext-amqp (PECL), не php-amqplib. Це принципова відмінність - бінарне залежить від C-extension. Якщо хочеш pure-PHP transport - є community-пакет php-amqplib/symfony-bundle або власний transport через RabbitMqBundle.DSN format
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# scheme user pass host port vhost exchange
# amqps:// для TLS
%2f - це URL-encoded / (default vhost).
Конфігурація з retry strategy
# config/packages/messenger.yaml
framework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
auto_setup: true # автоматично створює exchange/queue/bindings
exchange:
name: 'orders'
type: 'topic'
queues:
order_processing:
arguments:
x-queue-type: 'quorum'
binding_keys: ['order.*.processing']
retry_strategy:
max_retries: 3
delay: 1000 # ms перед першим retry
multiplier: 2 # exponential: 1s, 2s, 4s
max_delay: 10000
jitter: 0.1 # ±10% randomness
routing:
App\Message\OrderCreatedEvent: async
auto_setup: "By default, the transport will automatically create any exchanges, queues and binding keys that are needed. That can be disabled." У production часто auto_setup: false + topology через migration.
Worker
# Запуск consumer'а (виконується infinitely до Ctrl+C / SIGTERM)
php bin/console messenger:consume async
# З verbose logging
php bin/console messenger:consume async -vv
# З time limit (для systemd / supervisord auto-restart)
php bin/console messenger:consume async --time-limit=3600
# Зупинити gracefully
php bin/console messenger:stop-workers
retry queue, потім назад. Якщо хочеш RabbitMQ-native DLX - треба configure manually через transport options.Laravel Queue з RabbitMQ driver
Community-стандарт: vladimir-yuldashev/laravel-queue-rabbitmq. Drop-in заміна Redis/SQS - вся Queue-фасадна логіка Laravel працює без змін.
Інсталяція
composer require vladimir-yuldashev/laravel-queue-rabbitmq
# Service provider реєструється автоматично через Laravel Package Discovery
Конфігурація
// config/queue.php
'default' => env('QUEUE_CONNECTION', 'rabbitmq'),
'connections' => [
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'default'),
'hosts' => [
[
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
],
],
'options' => [
'queue' => [
'job' => \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
'prefetch' => 10,
'declare' => true, // auto-declare queue
'arguments' => [
'x-queue-type' => 'quorum',
],
],
],
],
],
Використання (стандартне Laravel API)
// Job клас не змінюється - той самий що для Redis/SQS
class ProcessOrder implements ShouldQueue {
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(public Order $order) {}
public function handle(): void {
// ... обробка
}
}
// Dispatch як зазвичай
ProcessOrder::dispatch($order)->onQueue('orders.processing');
Дві команди для consumer'а
| Команда | Як працює | Швидкість |
|---|---|---|
php artisan queue:work rabbitmq | Стандартна Laravel-команда. Polling через basic_get. Підтримує кілька queues одночасно. | Базова. |
php artisan rabbitmq:consume | Native через basic_consume (push-based, без polling). | ~2x швидше. Single queue only. |
config/horizon.php працює як для Redis: connection: 'rabbitmq'.options.queue.arguments.x-queue-type: 'quorum' (як вище). Перевірити для production - issues на GitHub містять обговорення edge cases.Коли який інструмент
| Інструмент | Коли вибирати | Anti-cases |
|---|---|---|
| php-amqplib |
|
Простий job dispatch у Laravel/Symfony - надмірний boilerplate. |
| Symfony Messenger |
|
Stream-native features (server-side filter, super-streams). Низько-рівневі patterns. |
| Laravel Queue + RabbitMQ |
|
Складна topology (multiple exchanges, custom bindings). Stream-features. RPC. |
Гібрид у production
У реальних проєктах часто комбінують:
- Symfony Messenger для domain events (decoupling сервісів) -
UserSignedUp,OrderPaid. - php-amqplib у окремому worker'і для streams з replay (analytics, audit).
- Laravel Queue якщо легасі частина в Laravel - щоб не писати їх своїм Messenger.
DevOps базовий
Як підняти RabbitMQ для production-команди. Без deep SRE - acceptable knowledge для backend-розробника, що деплоїть та підтримує сервіс.
- 6.1.1 Кластер на 3 ноди - peer discovery
- 6.1.2 Sandbox: 3-node compose
- 6.2.1 Моніторинг через management HTTP API
- 6.2.2 Prometheus + key metrics
- 6.3 Production checklist
Кластер на 3 ноди - peer discovery
П'ять discovery механізмів
| Механізм | Як працює | Коли |
|---|---|---|
| Classic config | "read a list of nodes from the config file". Hardcoded у cluster_formation.classic_config.nodes. | Бare metal, статичні IP, наш sandbox. |
| Kubernetes | Plugin використовує K8s API. "node with the lowest ordinal index (almost always the pod with the -0 suffix) to form a new cluster". | K8s deployments, StatefulSet. |
| Consul | "Nodes register with Consul on boot and unregister when they leave". | HashiCorp stack. |
| etcd | Реєструються через ключі в etcd directory. | Kubernetes-aligned. |
| DNS | Query A-records seed hostname → reverse DNS lookup. | AWS Cloud Map / kube-dns. |
Чому 3 ноди (а не 2 чи 1)
Базується на правилі quorum = majority для Raft (Quorum Queues) і Khepri (metadata store).
| Розмір кластера | Quorum | Витримує падіння |
|---|---|---|
| 1 | 1 | 0 нод (no HA) |
| 2 | 2 | 0 нод (split-brain ризик) |
| 3 | 2 | 1 ноду |
| 5 | 3 | 2 ноди |
| 7 | 4 | 3 ноди |
Sandbox: 3-нодний кластер у docker-compose
Створи окремий compose.cluster.yaml у sandbox/ для практики. Базовий каркас:
x-rabbit-base: &rabbit-base
image: rabbitmq:4.3-management
environment: &rabbit-env
RABBITMQ_ERLANG_COOKIE: 'study-cluster-cookie'
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 15s
timeout: 10s
retries: 5
services:
rabbit1:
<<: *rabbit-base
hostname: rabbit1
container_name: rabbit1
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbit1-data:/var/lib/rabbitmq
- ./cluster-config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
rabbit2:
<<: *rabbit-base
hostname: rabbit2
container_name: rabbit2
volumes:
- rabbit2-data:/var/lib/rabbitmq
- ./cluster-config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
rabbit3:
<<: *rabbit-base
hostname: rabbit3
container_name: rabbit3
volumes:
- rabbit3-data:/var/lib/rabbitmq
- ./cluster-config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
volumes:
rabbit1-data:
rabbit2-data:
rabbit3-data:
cluster-config/rabbitmq.conf
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbit1
cluster_formation.classic_config.nodes.2 = rabbit@rabbit2
cluster_formation.classic_config.nodes.3 = rabbit@rabbit3
# Khepri за замовчуванням у 4.3
# log і management plugin вже включені у -management образі
# Disk alarm
disk_free_limit.absolute = 2GB
# Memory alarm
vm_memory_high_watermark.relative = 0.6
Перевірка кластера
docker compose -f compose.cluster.yaml up -d
# Дочекатися healthy всіх 3 нод (60-90 с)
docker compose -f compose.cluster.yaml ps
# Перевірити cluster status з будь-якої ноди
docker exec rabbit1 rabbitmqctl cluster_status
# Має показати 3 nodes: [rabbit@rabbit1, rabbit@rabbit2, rabbit@rabbit3]
# Створити quorum queue з 3-way replication
docker exec rabbit1 rabbitmqadmin declare queue name=test \
arguments='{"x-queue-type":"quorum"}'
RABBITMQ_ERLANG_COOKIE або файл /var/lib/rabbitmq/.erlang.cookie.Моніторинг: Management HTTP API
Окрім UI, management plugin експонує REST API на тому ж порту 15672. Все що видно в UI, можна скачати JSON'ом.
Ключові endpoint'и
| Endpoint | Що повертає |
|---|---|
GET /api/overview | Cluster-wide: total connections, channels, queues, message rates. |
GET /api/nodes | Список нод з memory, disk, FD usage, Erlang procs. |
GET /api/nodes/{node} | Детально по конкретній ноді. |
GET /api/nodes/{node}/memory | Memory breakdown (queues, connections, plugins, ETS). |
GET /api/queues | Усі queues - depth, consumers, message rates. |
GET /api/queues/{vhost}/{name} | Детально по конкретній queue. |
GET /api/exchanges / /api/bindings | Топологія. |
Приклад: швидкий health-check скрипт
#!/bin/bash
# health.sh
HOST="http://guest:guest@localhost:15672"
# Перевіряємо: всі ноди running?
unhealthy=$(curl -s "$HOST/api/nodes" | jq '[.[] | select(.running == false)] | length')
[[ "$unhealthy" -gt 0 ]] && echo "FAIL: $unhealthy node(s) not running" && exit 1
# Перевіряємо: queues з depth > 10000?
backed_up=$(curl -s "$HOST/api/queues" \
| jq '[.[] | select(.messages > 10000)] | length')
[[ "$backed_up" -gt 0 ]] && echo "WARN: $backed_up queue(s) backed up"
# Memory alarm?
mem_alarm=$(curl -s "$HOST/api/overview" | jq '.message_stats // {}')
echo "OK"
Ключові метрики для backend-команди
- Queue depth (
messages+messages_unacknowledged) - чи consumers встигають. - Publish/deliver rate - throughput; різке падіння → щось зламалось.
- Connection churn (відкриття/закриття на хвилину) - якщо worker reconnect'ться - неправильний lifecycle.
- Memory used vs limit - якщо >80% від
vm_memory_high_watermark, broker почне throttle producers. - FD usage vs limit - кожне з'єднання + queue + open file - 1 FD.
Prometheus + Grafana
Включений за замовчуванням у -management tag
На наших sandbox-нодах він уже працює - це той самий порт 15692, який ми експонуємо у compose.yaml.
# Перевірити
curl -s localhost:15692/metrics | head -n 20
# rabbitmq_build_info{erlang_version="26.2.5",rabbitmq_version="4.3.0"} 1
# rabbitmq_identity_info{...} 1
# rabbitmq_alarms_memory_used_watermark 0
# rabbitmq_alarms_disk_limit_exceeded 0
# rabbitmq_queue_messages 0
# ...
Якщо вимкнено - як включити
rabbitmq-plugins enable rabbitmq_prometheus
Два режими metrics
- Aggregated (default): "metrics are aggregated by name. This mode has lower performance overhead with the output size constant, even as the number of objects grows." Один rabbitmq_queue_messages показує total по всіх queues.
- Per-object: "individual metric for each object-metric pair. With a large number of stats-emitting entities, this can result in very large payloads." Окремий rabbitmq_queue_messages{queue="orders"} на кожну queue. Корисно для drill-down, але дорого при тисячах queues.
# Per-object endpoint
curl -s localhost:15692/metrics/per-object
Топ-7 алертів (PromQL)
# 1. Нода не відповідає 2 хв
up{job="rabbitmq"} == 0 for 2m
# 2. Memory alarm активний
rabbitmq_alarms_memory_used_watermark == 1
# 3. Disk alarm активний
rabbitmq_alarms_disk_limit_exceeded == 1
# 4. Queue depth перевищив поріг
rabbitmq_queue_messages > 10000
# 5. Queue без consumers
rabbitmq_queue_consumers == 0 and rabbitmq_queue_messages > 100
# 6. Connection churn (rate of new connections / sec)
rate(rabbitmq_connections_opened_total[5m]) > 10
# 7. FD usage > 80%
rabbitmq_process_open_fds / rabbitmq_process_max_fds > 0.8
Production checklist
Скорочена вибірка з офіційного production-checklist, з акцентом на пункти, які напряму зачіпають backend-команду.
Operating system
- File descriptors: "at least 50K open file descriptors for the effective RabbitMQ user". Кожен connection + queue + open file = FD.
- Перевірка:
cat /proc/$(pidof beam.smp)/limits | grep "open files". - Storage: "Modern RabbitMQ features, such as Khepri, quorum queues and streams, are designed for durable storage only." NFS / Ceph / GlusterFS - заборонені. Local SSD / EBS gp3.
Resource alarms
- Memory: default
vm_memory_high_watermark.relative = 0.6(60%). При перевищенні broker блокує publishers (back-pressure). "keep the relative watermark between 0.4-0.7". - Disk:
disk_free_limit.absolute = 2GB(мінімум). "all disk writes will fail" при перевищенні - дані в небезпеці.
Security
- Видалити default user: "For production environments, delete the default user (guest)". Створити іменованих з мінімальними permissions.
- TLS: "We recommend using TLS connections when possible, at least to encrypt traffic". Сертифікати на 5671 (AMQPS), 15671 (Management HTTPS), 15691 (Prometheus HTTPS).
- Vhosts: один vhost на кожен tenant / середовище. Permissions per-vhost.
- Inter-node ports: 4369 (epmd), 25672 (Erlang dist) - закрити від external traffic, дозволити тільки між нодами.
Erlang OTP
- Вимога RabbitMQ 4.3: Erlang/OTP 26.2+, рекомендовано 27.x.
- Однакова версія на всіх нодах кластера. Mismatched OTP - ризик невидимих compatibility-проблем.
Cluster topology
- "an odd number of nodes (3, 5, 7, etc)".
- Partition handling:
cluster_partition_handling = pause_minority- меншина при split-brain паузить себе, чекає на возз'єднання. - Backups: топологію
rabbitmqctl export_definitions /tmp/defs.json- users, vhosts, exchanges, bindings, policies. Зберігати у git або S3.
Курс пройдено ✓
- Архітектура AMQP 0-9-1: producer / exchange / binding / queue / consumer.
- 4 типи exchange (direct, fanout, topic, headers) і коли який вибирати.
- Connection vs Channel - multiplexing і чому це матеріально для production.
- Reliability patterns: publisher confirms, consumer ack timeout у 4.3, DLX і retry.
- Quorum queues (Raft) та Streams (append-only) - сучасні replicated primitives.
- Інтеграція з PHP: php-amqplib, Symfony Messenger, Laravel Queue.
- DevOps: 3-нодний кластер, Prometheus monitoring, production checklist.
- Federation, Shovel - cross-cluster replication (advanced).
- STOMP, MQTT - інші протоколи (поза PHP-екосистемою).
- AMQP 1.0 deep dive - тільки згадка про core у 4.0+.
- Classic Mirrored Queues - deprecated у 4.x, видалено у 4.0.
- Quorum queues internals - детально про Raft реалізацію.
- Streams advanced - super-streams (sharding), single active consumer.
- Troubleshooting - performance, network, alarms.
- RabbitMQ Operator для Kubernetes (готова manifests-base).