RabbitMQ with PHP
Using RabbitMQ with PHP: creating a message queue, publishing messages, and consuming them.
Every operation that keeps a user waiting for an HTTP request to finish — bulk email delivery, image conversion, external API calls — is a candidate for being offloaded to a queue. RabbitMQ is an open-source message broker built on the AMQP protocol that decouples the producers and consumers of that workload. It is written in Erlang, and that is no coincidence: Erlang’s concurrency model is a natural fit for managing thousands of connections with low resource consumption.
In this post we will wire up the core PHP flow — a producer and a consumer — by hand. My goal is to make the mechanics behind the abstraction visible; even when it ends up hidden behind Laravel Queue or Symfony Messenger in production, having the fundamentals internalized matters.
Spinning Up the Environment
The fastest way to get RabbitMQ running for local development is Docker:
docker run -d --name test_rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:latest
Port 5672 handles AMQP traffic; 15672 is the management UI. Open http://localhost:15672 and log in with guest/guest to monitor queue state, message flow, and connected consumers in real time — that panel will save you a lot of debugging time later.
The PHP Side: php-amqplib
The de facto standard library for PHP to talk to RabbitMQ is php-amqplib/php-amqplib:
composer require php-amqplib/php-amqplib
The package depends on the MBString and BCMath PHP extensions, which are enabled by default in any modern PHP installation.
Producer: Publishing a Message to the Queue
The producer is the side that defines the work. The example below pushes a new job message to the queue every second:
<?php
require_once(__DIR__ . '/vendor/autoload.php');
const RABBITMQ_HOST = "localhost";
const RABBITMQ_PORT = 5672;
const RABBITMQ_USERNAME = "guest";
const RABBITMQ_PASSWORD = "guest";
const RABBITMQ_QUEUE = "taskQueue";
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);
$channel = $connection->channel();
// Create the queue if it doesn't exist!
$channel->queue_declare(RABBITMQ_QUEUE, false, true, false, false, false, null, null);
$jobId = 0;
while (true) {
$jobData = [
"id" => ++$jobId,
"task" => "sleep",
"period" => rand(0, 3),
];
$job = json_encode($jobData, JSON_UNESCAPED_SLASHES);
$message = new PhpAmqpLib\Message\AMQPMessage($job, [
'delivery_mode' => 2, // Make the message persistent
]);
$channel->basic_publish($message, '', RABBITMQ_QUEUE);
echo '[' . date("Y-m-d H:i:s") . '] Job created!' . PHP_EOL;
sleep(1);
}
The true parameter in the queue_declare call marks the queue as durable — the queue survives a RabbitMQ restart. delivery_mode: 2 instructs RabbitMQ to write the message to disk. You need both: a durable queue without persistent messages (or vice versa) does not give you real delivery guarantees.
To start the producer:
php producer.php
Consumer: Listening to and Processing the Queue
The consumer side subscribes to the queue and invokes a callback for every incoming message:
<?php
require_once(__DIR__ . '/vendor/autoload.php');
const RABBITMQ_HOST = "localhost";
const RABBITMQ_PORT = 5672;
const RABBITMQ_USERNAME = "guest";
const RABBITMQ_PASSWORD = "guest";
const RABBITMQ_QUEUE = "taskQueue";
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);
$channel = $connection->channel();
// Create the queue if it doesn't exist!
$channel->queue_declare(RABBITMQ_QUEUE, false, true, false, false, false, null, null);
echo 'Waiting for messages. Press CTRL+C to exit.' . PHP_EOL;
$worker = function (object $message) {
try {
$job = json_decode($message->body, true);
echo '[-] Processing... #' . $job['id'] . PHP_EOL;
sleep($job['period']);
echo '[+] Done #' . $job['id'] . PHP_EOL;
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} catch (\Throwable $e) {
echo '[x] Failed #' . $job['id'] . PHP_EOL;
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
}
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume(RABBITMQ_QUEUE, '', false, false, false, false, $worker);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
The basic_qos(null, 1, null) line is critical: it tells the consumer “don’t fetch a new message until you have acknowledged the previous one.” Without it, RabbitMQ floods a single consumer with all pending messages; if you are running multiple consumers, load distribution breaks down.
basic_ack and basic_nack determine a message’s fate. Without an ack, the message is never removed from the queue. nack returns it; whether it gets re-queued is controlled by the second parameter (requeue).
To start the consumer:
php consumer.php
Things to Watch Out For
A few points I intentionally kept simple in this example become important in production:
Connection management. AMQPStreamConnection holds an open TCP connection at all times. For long-running consumer processes you need to configure heartbeat settings via the fourth parameter of AMQPStreamConnection; otherwise the broker may time out the connection.
Errors and retries. Returning a message with basic_nack can create an infinite loop — an unprocessable message keeps cycling back into the queue. Setting up a dead letter exchange (DLX) to redirect a message to a separate queue after a certain number of failed attempts is a much more robust approach.
Multiple consumers. If you launch the same consumer.php in multiple terminal windows, RabbitMQ balances the workload across them. With basic_qos in place, each consumer holds exactly one message at a time; horizontal scaling is that straightforward.
That covers the core flow on the PHP side. In real projects you would reach for Laravel Queue or Symfony Messenger’s RabbitMQ driver rather than writing this code directly — but the mechanics underneath are exactly the same.
Comments
Sign in with your GitHub account to join the discussion. Comments are stored in GitHub Discussions.