成都网站建设设计

将想法与焦点和您一起共享

rabbitmq学习笔记-创新互联

一、消息确认机制

创新互联公司专注于平顺企业网站建设,自适应网站建设,商城网站建设。平顺网站建设公司,为平顺等地区提供建站服务。全流程按需策划,专业设计,全程项目跟踪,创新互联公司专业和态度为您提供的服务

rabbitmq在发送消息后立即从内存中删除消息,因此如果消费者处理消息耗时较长,在处理过程中消费者被kill,则处理中的消息、以及其他发往该消费者的消息都将丢失。

为了保证消息不丢失,rabbitmq支持消息确认机制,消费者可以发送ack告诉rabbitm指定消息已经收到并处理,因此rabbitmq可以删除该消息。

如果消费者死掉(channel关闭、connection关闭、或者TCP connection丢失),导致rabbitmq没有收到ack,rabbitmq将把消息重入队列。

不存在消息超时,这意味着处理一个消息非常长的时间也是ok的。

消息确认机制默认是开启的,通过在channel.basic_consume中设置no_ack=True关闭。

注意消费者在处理消息后,不要忘记调用channel.basic_ack进行消息确认,否则rabbitmq将不断消耗内存把消息重入队列。

二、队列/消息持久化

为了防止rabbitmq服务终止导致队列和消息丢失,需要将队列和消息标记为持久化的:

  1. 确保rabbitmq永远不丢失队列,需要将队列 声明为持久化的:

  2. channel.queue_declare(queue='task_queue', durable=True)
  3. 将消息声明为持久化的:

  4. channel.basic_publish(exchange='',                       routing_key="task_queue",                       body=message,                       properties=pika.BasicProperties(                          delivery_mode = 2, # make message persistent                       ))

注意:尽管已经很健壮了,但是仍然无法完全保证消息不会丢失,例如rabbitmq接收消息但是还没有保存到硬盘的情况。

三、exchange

简单的说,exchange的一端接收消息,另一端把消息放进队列。

在rabbitmq中生产者不会将请求直接发送给消费者,生产值只会把消息发给exchange,exchange收到消息后需要知道怎么做:添加到特定队列、添加到多个队列、还是丢弃。

exchange的类型包括direct,topic,headers,fanout

四、绑定

exchange和queue之间的联系被称为绑定(binding),可以简单的看:队列对于特定exchange上的消息感兴趣

channel.queue_bind(exchange='logs',                    queue=result.method.queue)

此时'logs' exchange将添加消息到指定queue

绑定可以使用一个额外的routing_key参数,例如:

channel.queue_bind(exchange=exchange_name,                    queue=queue_name,                    routing_key='black')

对于fanout类型的exchange来说,routing_key参数是被忽略的

五、topic exchange

发往topic exchange的消息不能携带任意的routing_key,必须是以点隔开的一串字符,大255个字节

binding key也必须是相同的形式,注意binding key有两个重要的特殊情况:

* 可以替代一个单词

#可以替代零个或多个单词

例如,如果binding key是*.orange.*,则可以匹配所有.orange.的key,但是如果key不是*.*.*的形式,例如orange,或者quick.orange.male.rabbit,则消息将被丢弃。

如果binding key是lazy.#,则类似于带有lazy.orange.male.rabbit的key的消息可以匹配。

topic exchange非常强大,通过匹配routing_key可以表现的像存在多个exchange

六、RPC

为了接收响应,客户端需要在发送请求时附加发送回调队列地址:

result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='',                       routing_key='rpc_queue',                       properties=pika.BasicProperties(                             reply_to = callback_queue,                             ),                       body=request)                        # ... and some code to read a response message from the callback_queue ...

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网站名称:rabbitmq学习笔记-创新互联
地址分享:http://chengdu.cdxwcx.cn/article/jisce.html