MQ

03 28,2019

消息队列RabbitMQ(三):PHP的使用示例

之前的文章已经介绍RabbitMQ以及安装,本文就介绍PHP使用RabbitMQ的示例。

安装PHP的RabbitMQ扩展

本文介绍AMQP 0-9-1,这是一个开放的、通用的协议消息,这里我们使用php-amqplib这个PHP扩展。更多PHPAMQP扩展见官网

通过composer安装

composer require php-amqplib/php-amqplib -vvv

如果你还不知道composer,那就out了。:)

Publisher(生产者,消息发送方)

创建publisher.php文件,并引入composer自动加载文件

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

Connection(创建连接)

创建RabbitMQ连接以及channel通道,AQMP的命令都是通过通道发出去的。

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

Send(发送消息)

发消息前,我们必须声明一个队列为我们发送做准备;然后我们可以向队列发布消息:

/**
 * 创建队列(Queue)
 * name: hello         // 队列名称
 * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
 * durable: true       // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
 * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
 *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
 */
$channel->queue_declare('hello', false, false, false, false);

/**
 * 创建交换机(Exchange)
 * name: vckai_exchange// 交换机名称
 * type: direct        // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。
 * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
 * durable: false      // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
 * auto_delete: false  // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
 */
$channel->exchange_declare('vckai_exchange', AMQPExchangeType::DIRECT, false, false, false);

// 绑定消息交换机和队列
$channel->queue_bind('hello', 'vckai_exchange');

/**
 * 创建AMQP消息类型
 * delivery_mode 消息是否持久化
 * AMQPMessage::DELIVERY_MODE_NON_PERSISTENT  不持久化
 * AMQPMessage::DELIVERY_MODE_PERSISTENT      持久化 
 */
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage:: DELIVERY_MODE_NON_PERSISTENT]);

/**
 * 发送消息
 * msg: $msg                // AMQP消息内容
 * exchange: vckai_exchange // 交换机名称
 * queue: hello             // 队列名称
 */
$channel->basic_publish($msg, 'vckai_exchange', 'hello');

echo " [x] Sent 'Hello World!'\n";

这里需要特别说明的是关于Exchange、Queue和Message持久化的问题

如过将Queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue。

队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个Queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。

上面阐述了队列的持久化和消息的持久化,如果不设置Exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果Exchange不设置持久化,那么当Broker服务重启之后,Exchange将不复存在,那么既而发送方RabbitMQ Producer就无法正常发送消息。

Close(关闭链接)

$channel->close();
$connection->close();
03 15,2019

消息队列RabbitMQ(二):安装与配置

RabbitMQ安装

系统以及软件版本

CentOS 7
Erlang 2.1
RabbitMQ 3.7.13

由于RabbitMQ是基于Erlang语言开发的,所以必须先安装Erlang运行环境。请注意RabbitMQ和Erlang需要对应相应的版本,否则可能会安装失败。

安装Erlang

这里可以使用RabbitMQ官方提供的脚本下载官方yum源

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

sudo yum install -y erlang

安装完成之后可以查看下Erlang的版本

[vckai@centos-7 ~]$ erl -v
Erlang/OTP 21 [erts-10.3] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]

Eshell V10.3  (abort with ^G)
1>
05 29,2018

消息队列RabbitMQ(一):介绍

什么是RabbitMQ?

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

基本特性

1)可靠性
消息持久化、消息发送和投递确认机制、集群高可用方案

2)灵活路由
消息通过exchange的方式路由到不同的queue中,提供了包括fanout, direct, topic等多种exchange实现,并且支持通过编写exchange插件的方式自实现路由方案

3)支持集群
同网段下的rabbitmq节点可以通过集群的方式,组成一个逻辑上的单一broker

4)Federation
通过Federation可以在跨网段节点间组件集群

5)高可用消息队列
通过设置镜像队列的方式,消息可以在镜像队列间进行复制,使节点down机或硬件损坏的情况下保证队列服务的高可用

6)多协议支持
包括AMQP, STOMP, MQTT, HTTP等多种消息交换协议

7)多客户端支持
JAVA, .NET, Ruby, Python, PHP, Node, Go……

8)可视化管理界面
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

9)丰富的插件支持
tracing, managment-plugin, and you can also write your own.

RabbitMQ中的重要概念

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Broker

经纪人。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。

Producer

消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。

Consumer

消息消费者。消息的接收者,一般是独立的程序。

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。

对于操作系统来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。

Exchange

消息交换机。指定消息按照什么规则路由到哪个队列Queue。

Producer将并不能直接将消息投递到Queue中。需要将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者没有绑定Queue的情况下将消息丢弃)。

Exchange是按照什么逻辑将消息路由到Queue的?这个将在下面Binding中介绍。 RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在下面Exchange类型中介绍。

Queue

消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。

RabbitMQ中的消息都只能存储在Queue中,Producer生产消息并通过Exchange投递到Queue中,Consumer可以通过Exchange从Queue中获取消息并消费。

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

Binding

绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。