03 28

消息队列RabbitMQ(三):PHP的使用示例

之前的文章已经介绍RabbitMQ以及安装,本文就介绍PHP使用RabbitMQ的示例。

安装PHP的RabbitMQ扩展

本文介绍AMQP 0-9-1,这是一个开放的、通用的协议消息,这里我们使用php-amqplib这个PHP扩展。更多PHPAMQP扩展见官网

通过composer安装

composer require php-amqplib/php-amqplib -vvv

如果你还不知道composer,那就out了。:)

Publisher(生产者,消息发送方)

创建publisher.php文件,并引入composer自动加载文件

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

Connection(创建连接)

创建RabbitMQ连接以及channel通道,AQMP的命令都是通过通道发出去的。

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

Send(发送消息)

发消息前,我们必须声明一个队列为我们发送做准备;然后我们可以向队列发布消息:

/**
 * 创建队列(Queue)
 * name: hello         // 队列名称
 * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
 * durable: true       // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
 * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
 *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
 */
$channel->queue_declare('hello', false, false, false, false);

/**
 * 创建交换机(Exchange)
 * name: vckai_exchange// 交换机名称
 * type: direct        // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。
 * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
 * durable: false      // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
 * auto_delete: false  // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
 */
$channel->exchange_declare('vckai_exchange', AMQPExchangeType::DIRECT, false, false, false);

// 绑定消息交换机和队列
$channel->queue_bind('hello', 'vckai_exchange');

/**
 * 创建AMQP消息类型
 * delivery_mode 消息是否持久化
 * AMQPMessage::DELIVERY_MODE_NON_PERSISTENT  不持久化
 * AMQPMessage::DELIVERY_MODE_PERSISTENT      持久化 
 */
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage:: DELIVERY_MODE_NON_PERSISTENT]);

/**
 * 发送消息
 * msg: $msg                // AMQP消息内容
 * exchange: vckai_exchange // 交换机名称
 * queue: hello             // 队列名称
 */
$channel->basic_publish($msg, 'vckai_exchange', 'hello');

echo " [x] Sent 'Hello World!'\n";

这里需要特别说明的是关于Exchange、Queue和Message持久化的问题

如过将Queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue。

队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个Queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。

上面阐述了队列的持久化和消息的持久化,如果不设置Exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果Exchange不设置持久化,那么当Broker服务重启之后,Exchange将不复存在,那么既而发送方RabbitMQ Producer就无法正常发送消息。

Close(关闭链接)

$channel->close();
$connection->close();

Consumer(消费者,消息接收方)

创建consumer.php文件,并引入composer自动加载文件

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

// 设置消费者(Consumer)客户端同时只处理一条队列
// 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。
$channel->basic_qos(0, 1, false);

// 同样是创建路由和队列,以及绑定路由队列,注意要跟publisher的一致
// 这里其实可以不用,但是为了防止队列没有被创建所以做的容错处理
$channel->queue_declare('hello', false, false, false, false);
$channel->exchange_declare('vckai_exchange', AMQPExchangeType::DIRECT, false, false, false);
$channel->queue_bind('hello', 'vckai_exchange');

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

Receive(接收消息)

// 消息处理的逻辑回调函数
$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";

  // 手动确认ack,确保消息已经处理
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  if ($msg->body === 'quit') {
     $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
  }
};

/**
 * queue: hello               // 被消费的队列名称
 * consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端
 * no_local: false            // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
 * no_ack: true               // 收到消息后,是否不需要回复确认即被认为被消费
 * exclusive: false           // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
 * nowait: false              // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
 * callback: $callback        // 回调逻辑处理函数
 */
$channel->basic_consume('hello', 'consumer_tag', false, true, false, false, $callback);

// 程序运行完成后关闭链接
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);

// 阻塞队列监听事件
while(count($channel->callbacks)) {
    $channel->wait();
}

关于ACK确认

做任务可能需要几秒钟的时间。你可能想知道如果其中一个消费者(Consumer)开始一项长期任务并且只是部分完成而死亡会发生什么。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。在这种情况下,如果你杀了一个工人,我们将失去刚刚处理的信息。 我们也会失去所有派发给这个特定工作人员但尚未处理的消息。

但我们不想失去任何任务。 如果一名消费者(Consumer)死亡,我们希望将任务交付给另一名消费者(Consumer)。

为了确保消息永不丢失,RabbitMQ支持消息确认。消费者发回询问(acknowledgement),告诉 RabbitMQ收到,处理了特定的消息,并且RabbitMQ可以自由删除它。

如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将对其重新排队。如果有其他消费者(Consumer)同时在线,它会迅速将其重新发送给另一位消费者(Consumer)。这样,即使消费者(Consumer)偶尔死亡,也可以确保没有任何信息丢失。

没有任何消息超时;当消费者(Consumer)死亡时,RabbitMQ将传递消息。即使处理消息需要很长时间也没关系。

消息确认默认关闭。通过将第四个参数设置为basic_consume的参数no_ack为false(true表示自动确认),并在完成任务后向RabbitMQ发送确认,现在是时候打开它们了。

一个很容易犯的错误就是忘记basic_ack,或者程序异常退出没有ack,消息一直不会被释放,导致重复执行以及RabbitMQ内存越来越高。

源码

publisher.php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$channel->exchange_declare('vckai_exchange', AMQPExchangeType::DIRECT, false, false, false);

// 绑定消息交换机和队列
$channel->queue_bind('hello', 'vckai_exchange');

$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage:: DELIVERY_MODE_NON_PERSISTENT]);

$channel->basic_publish($msg, 'vckai_exchange', 'hello');

echo " [x] Sent 'Hello World!'\n";

consumer.php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

// 设置消费者(Consumer)客户端同时只处理一条队列
$channel->basic_qos(0, 1, false);

// 同样是创建路由和队列,以及绑定路由队列,注意要跟publisher的一致
$channel->queue_declare('hello', false, false, false, false);
$channel->exchange_declare('vckai_exchange', AMQPExchangeType::DIRECT, false, false, false);
$channel->queue_bind('hello', 'vckai_exchange');

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

// 消息处理的逻辑回调函数
$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";

  // 手动确认ack,确保消息已经处理
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  if ($msg->body === 'quit') {
     $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
  }
};

$channel->basic_consume('hello', 'consumer_tag', false, true, false, false, $callback);

// 程序运行完成后关闭链接
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);

// 阻塞队列监听事件
while(count($channel->callbacks)) {
    $channel->wait();
}

总结

以上就是PHP使用RabbitMQ的基础使用范例了,更多的范例说明请参考这里

参考资料

https://www.rabbitmq.com/tutorials/tutorial-one-php.html

https://github.com/php-amqplib/php-amqplib

https://learnku.com/articles/9117/rabbitmq-entry-work-queue

https://my.oschina.net/u/1186749/blog/786459