1 安装
1.在ubuntu上直接使用
apt-get install
安装1
sudo apt-get install rabbitmq-server
2.启用RabbitMQ管理插件(可选):
启用后,可以通过浏览器访问http://<服务器IP>:15672来管理RabbitMQ(默认用户名和密码都是guest)。服务器IP>1
sudo rabbitmq-plugins enable rabbitmq_management
3.安装
C++
客户端库,为了使用C++
与RabbitMQ进行交互,需要安装一个第三方库,这里用SimpleAmqpClient
作为例子(如果你要C++
程序要连接RabbitServer必须做的步骤)在cmake过程中如果报缺失什么依赖,直接安装就好,需要1
2
3
4git clone https://github.com/alanxz/SimpleAmqpClient.git
mkdir simpleamqpclient-build
cd simpleamqpclient-build
cmake ..Boost
库,librabbitmq-dev
等等,cmake..
成功后执行一下命令安装:1
sudo make install
4.启动Rabbit服务器
之后,我们可以从1
sudo service rabbitmq-server start
localhost:5672
访问,用户名密码都默认是guest
2. SimpleAmqpClient的接口
2.1 创建连接
使用了SimpleAmqpClient
,其中主要的接口如下: - 创建连接:create 1
2
3
4AmqpClient::Channel::Create(const std::string &host = "127.0.0.1", int port = 5672,
const std::string &username = "guest",
const std::string &password = "guest",
const std::string &vhost = "/", int frame_max = 131072);
2.2 创建交换机
- 声明一个交换机Exchange
1
2
3
4DeclareExchange(
const std::string &exchange_name,
const std::string &exchange_type = Channel::EXCHANGE_TYPE_DIRECT,
bool passive = false, bool durable = false, bool auto_delete = false); - passive:指示是否以被动方式声明交换机,
passive=false
,则会创建(存在直接返回,不存在创建);如果passive=true
当交换机不存在则会返回一个错误 - exchange_type:交换机类型
- fanout(广播)
- topic(主题)
- headers(头部)
- durable(可选):指示交换机是否是持久化的。默认为 false,表示交换机不是持久化的,即在 RabbitMQ 服务器重启时会被删除。如果设置为 true,则交换机是持久化的,会被保存到磁盘上,即使服务器重启也不会丢失。
- auto_delete(可选):指示是否在不再使用时自动删除交换机。默认为 false,表示交换机不会自动删除。如果设置为 true,则交换机会在最后一个绑定到它的队列被移除后自动删除。
2.3 发送消息
1 | BasicPublish(const std::string &exchange_name, |
- exchange_name:要发布消息的交换机的名称。
- routing_key:消息的路由键。根据路由键的值,消息将被路由到匹配的队列中。对于直连交换机(direct exchange),通常与队列的名称完全匹配;对于其他类型的交换机,根据交换机的规则进行匹配。
- message:要发布的消息对象,通常是 BasicMessage::ptr_t 类型的指针。
- mandatory(可选):指示是否要求消息必须被路由到至少一个队列中。默认为 false,表示不要求。如果设置为 true,但消息未能被路由到任何队列,会触发一个 Basic.Return 命令给发送者。
- immediate(可选):指示是否要求消息不能立即被投递到队列中,只有在至少有一个消费者可以接收到消息时才投递。默认为 false,表示不要求。如果设置为 true,但没有消费者可以接收到消息,会触发一个 Basic.Return 命令给发送者。
2.4 声明一个队列
1 | std::string DeclareQueue(const std::string &queue_name, bool passive = false, |
- queue_name:要声明的队列的名称。
- passive(可选):指示是否以被动方式声明队列。默认为 false,表示如果队列不存在,则创建它;如果存在,则返回成功。如果设置为 true,则只会检查队列是否存在,如果不存在则返回失败,不会创建队列。
- durable(可选):指示队列是否是持久化的。默认为 false,表示队列不是持久化的,即在 RabbitMQ 服务器重启时会被删除。如果设置为 true,则队列是持久化的,会被保存到磁盘上,即使服务器重启也不会丢失。
- exclusive(可选):指示是否将队列设置为独占。默认为 true,表示队列只能被当前连接中的这个消费者订阅。如果设置为 false,则允许其他连接中的消费者也订阅该队列。
- auto_delete(可选):指示是否在不再使用时自动删除队列。默认为 true,表示队列会在最后一个消费者断开连接时自动删除。
2.5 绑定队列到交换机上
1 | BindQueue(const std::string &queue_name, |
- queue_name:要绑定的队列的名称。
- exchange_name:要绑定到的交换机的名称。
- routing_key(可选):指定绑定的路由键。对于直连交换机(direct exchange),通常与队列的名称完全匹配;对于其他类型的交换机,根据交换机的规则进行匹配。默认为空字符串,表示使用默认的路由键规则。
2.6 创建一个消费者,并订阅指定队列的消息
1 | User |
- queue:要消费的队列的名称。
- consumer_tag(可选):消费者标签,用于标识这个消费者。如果未提供,则由 RabbitMQ 自动生成。
- no_local(可选):指示服务器不应该将消息发送给与发布者相同的连接。默认为 true,表示不接收本地(同一连接)发送的消息。
- no_ack(可选):指示是否需要手动确认消息。默认为 true,表示不需要手动确认消息,即消息一旦发送给消费者就会立即被视为已经被消费。
- exclusive(可选):指示是否将队列设置为独占。默认为 true,表示队列只能被当前连接中的这个消费者订阅。如果设置为 false,则允许其他连接中的消费者也订阅该队列。
- message_prefetch_count(可选):指示每次预取(prefetch)的消息数量。默认为 1,表示每次只预取一条消息。预取的消息将在消费者处理完之前存储在客户端,以提高效率和性能。
2.7 手动确认
当我们将no_ack
设置为false时,需要手动确认,SimpleAmqpClient有三个重载函数来进行手动确认 1
2
3
4//message为调用玩BasicConsumeMessage()函数后的指针Envelope::ptr_t
void BasicAck(const Envelope::ptr_t &message);
BasicAck(const Envelope::DeliveryInfo &info);
BasicAck(const Envelope::DeliveryInfo &info, bool multiple);const Envelope::DeliveryInfo
:Envelope::ptr_t的成员 - multiple
:参数设置为 true,则确认该通道上所有交付标签小于或等于当前交付标签的消息。参数设置为 false,则只确认当前交付标签对应的消息。
2.8 消费者接收消息
1 | //下面的接收消息都只能接收单个消息 |
- consumer_tag:消费者标签,用于标识要接收消息的消费者。
3 在C++中使用RabbitMQ
3.1 简单队列模型如图:
简单队列模型如图: - P:消息的生产者 - C:消息的消费者 - 红色:队列
生产者将消息发送到队列,消费者从队列中获取信息,在这一过着中没有涉及到交换机(需要将exclusive设置为false) 根据以上的模型,可以抽取出 3 个对象:生产者(用户发送消息)、队列(中间件):类似于容器(存储消息)、消费者(获取队列中的消息) 。
缺点: - 耦合性高,生产消费一一对应(如果有多个消费者都想消费这个消息就不行了),队列名称变更时需要同时修改代码。
3.2 工作队列
对于简单队列,应用程序在是使用消息系统的时候,一般生产者 P 生产消息是毫不费力的(发送消息即可),而消费者接收完消息后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在队列里面,一个消费者有可能不够用。那么怎么让消费者同时处理多个消息呢? 在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了。 使用任务队列的优点之一就是可以轻易的并行工作。如果积压了好多工作,可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。 >也是需将exclusive设置为false)
公平分发(Fait dispatch)
上面发生轮询分发这种情况发生是因为RabbitMQ在消息进入队列时才调度消息。它不会查看使用者的未确认消息数。它只是盲目地将每第n条消息发送给第n个使用者。 为了解决这个问题,可以将创建消费者中的参数message_prefetch_count = 1
设置为1。这告诉RabbitMQ一次不要给消费者一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给消费者。而是将其分派给不忙的下一个工作程序。 1
2
3
4
5BasicConsume(const std::string &queue,
const std::string &consumer_tag = "",
bool no_local = true, bool no_ack = true,
bool exclusive = true,
boost::uint16_t message_prefetch_count = 1);
消息确认
对no_ack
进行设置:
**bool no_Ack = true
(自动确认模式)**:一旦 RabbitMQ 将消息分发给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息。- bool no_ack = false(手动确认模式):如果不想丢失任何任务,如果有一个消费者挂掉了,那么应该将分发给它的任务交付给另一个消费者去处理。 为了确保消息不会丢失,RabbitMQ 支持消息应答。消费者发送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了。RabbitMQ才可以删除它,保证可靠性。
默认情况下消息应答是打开的
消息持久化
通过消息确认能够保证即使消费者死亡,任务也不会丢失。但是如果RabbitMQ 服务器停止,任务仍将失去。当 RabbitMQ 退出或者崩溃,将会丢失队列和消息。除非不要队列和消息。两件事儿必须保证消息不被丢失,所以必须把“队列”和“消息”设为持久化。
在对交换机和队列的声明中都有durable
字段
durable=true
:开启消息持久化durable=false
:不开启
3.3发布订阅模式(即exchange为fanout模式)示例
如果希望发送一个消息后能够被多个消费者消费,这时候就需要用到消息中的发布订阅模型。此时交换机采用fanout
模式 这种模式需要在生产者和队列之间多加一个交换机,当消费者获取消息时,需要队列绑定到交换机上,交换机把消息发送到队列,消费者就能够获得队列消息。
- 一个生产者,多个消费者
- 每一个消费者都有自己的一个队列
- 生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
- 每个队列都要绑定到交换机
- 生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
发布者
1 |
|
订阅者
我们可以创建多个订阅者对其进行订阅,接收消息 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int main() {
const std::string exchange_name = "test_exchange";
const std::string queue_name = "test_queue";
AmqpClient::Channel::ptr_t channel = AmqpClient::Channel::Create("localhost");
// 声明一个交换机,类型为fanout,表示广播模式
channel->DeclareExchange(exchange_name, AmqpClient::Channel::EXCHANGE_TYPE_FANOUT);
// 声明一个队列,并将其绑定到交换机
channel->DeclareQueue(queue_name);
channel->BindQueue(queue_name, exchange_name, "");
// 创建一个消费者,并订阅消息,no_ack设置false,因此需要手动确认,调用BasicAck进行确认
std::string consumer_tag = channel->BasicConsume("test_queue", "",true,false,true);
while(true){
AmqpClient::Envelope::ptr_t envelope=nullptr;
bool success=false;
success=channel->BasicConsumeMessage(consumer_tag, envelope, 0); // 10 ms timeout
if(envelope!=nullptr&&success){
AmqpClient::Envelope::DeliveryInfo delivery_info = envelope->GetDeliveryInfo();
channel->BasicAck(delivery_info, false);
std::cout << envelope->Message()->Body()<< std::endl;
// 显式释放 envelope 对象
envelope.reset();
}
}
return 0;
}
3.4 路由模式
如果希望根据特定场景将消息发送给特定队列,而不是直接发送到全部队列中,可以使用路由模式。
路由模式可以使用routing_key参数创建绑定键,生产者在发送消息时可以通过制定路由键将消息放入到指定队列,同样消费者也需要通过路由键绑定到指定的队列上获取消息。
此时交换机需要采用direct模式。
3.5 Topic模式
能够对路由键进行匹配。此时消费者能够根据匹配情况使用队列。
- 符号#匹配一个或者多个词
- 符号*匹配一个词
路由键设置为quick.orange.rabbit 的消息将传递到两个队列。消息lazy.orange.elephant 也将发送给他们两个。另一方面,quick.orange.fox只会进入第一个队列,而lazy.brown.fox只会进入第二个队列。lazy.pink.rabbit将仅被传递到第二队列一次,即使两个绑定匹配。quick.brown.fox与任何绑定都不匹配,因此将被丢弃。
4 Exchange交换机
4.1 Nameless exchange(匿名转发)
默认的转发器是空字符串””,也可以理解为未使用转发器: 1
channel->BasicPublish(exchange_name="", routing_key="", message);
4.2 Fanout Exchange
不处理路由键。只需要将队列绑定到交换机上,发送消息到交换机就会被转发到与该交换机绑定的所有队列上。
4.3 Direct Exchange
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键是“dog”,则只有被标记为“dog”的消息才会被转发,而“dog.a”不会被转发。
4.4 Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定到一个模式上,符号#匹配一个或者多个词。符号匹配一个词。因此audit.#能够匹配到audit.irs.corporate,但是audit.只会匹配到audit.irs。
5.事务机制+Confirm
在Rabbitmq中可以通过持久化来解决因为服务器异常而导致的数据丢失问题。
但是还存在另外一个问题:生产者将消息发送出去之后,消息是否真正到达Rabbitmq服务器是无法确定的(即Rabbitmq不会反馈任何消息给生产者),默认情况下是不知道消息有没有正确到达的。
导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本没有到达Rabbitmq服务器。
Rabbitmq提供了两种解决方式:
- 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案。
- 通过将channel设置成confirm模式来实现。
5.1 事务机制
而是通过 AMQP 协议本身来支持事务。在 AMQP 中,可以使用 tx
方法来实现事务。这包括tx.select
用于开启事务,tx.commit
用于提交事务,以及 tx.rollback
用于回滚事务。
在 SimpleAmqpClient
中,你可以使用 Channel::TxSelect
方法来开启事务,Channel::TxCommit
方法来提交事务,以及 Channel::TxRollback
方法来回滚事务
缺点:采用这种模式比较耗时,降低了Rabbitmq的吞吐量
5.2 producer端的Confirm
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于它是异步的,一旦发布了一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
Confirm模式是相对比较高效的解决方案,分为三种情况:普通Confirm模式,批量Confirm模式以及异步Confirm模式。:
- 普通confirm模式:每发送一条消息后,调用waitForConfirms方法,等待服务器端confirm,实际上是一种串行confirm了。
- 批量confirm模式:每发送一批消息后,调用waitForConfirms方法,等待服务器端confirm
- 异步confirm模式:提供了一个回调方法,服务端confirm了一条或者多条消息之后Client端会回调这个方法。