RabbitMQ消息队列

一. RabbitMQ 简介

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

二. RabbitMQ 使用场景

1. 解耦(为面向服务的架构(SOA)提供基本的最终一致性实现)

img

传统模式的缺点:

  • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
  • 订单系统与库存系统耦合

引入消息队列

img

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
  • 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。

2. 异步提升效率

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式

(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

img

(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

img

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

img

3. 流量削峰

流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛

应用场景:系统其他时间A系统每秒请求量就100个,系统可以稳定运行。系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。

之前架构:大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的请求打到MySQL上,每秒钟预计执行3000条SQL。但是一般的MySQL每秒钟扛住2000个请求就不错了,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。但是高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。

引入MQ:100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统A每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。系统A从MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。

关于流量削峰:秒杀系统流量削峰这事儿应该怎么做?

这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。我们在此计算一下,每秒在MQ积压3000条消息,1分钟会积压18万,1小时积压1000万条消息,高峰期过后,1个多小时就可以将积压的1000万消息消费掉。

img

三. 引入消息队列的优缺点

优点

优点就是以上的那些场景应用,就是在特殊场景下有其对应的好处,解耦、异步、削峰。

缺点

  • 系统的可用性降低

系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。

  • 系统的复杂性提高

引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?

  • 一致性问题

A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。

解决

  • 系统的可用性降低

    做MQ集群

  • 对于复杂性问题

    • 1.如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
      1. 如果你拿到这个消息做redis的set的操作,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
    • 3.如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
    • 对于可靠性:可以做持久化,采用了手动确认消息模式
  • 如何解决消息队列的延时以及过期失效问题?有几百万消息持续积压几小时,怎么解决?

    消息队列的延迟和过期失效是消息队列的自我保护机制,目的是为了防止本身被挤爆,当然是可以关闭保护,比如当某个消息消费失败5次后,就把这个消息丢弃等,尽量不要关掉保护机制

    那些被丢弃的消息,我们可以针对该业务,查询出来将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新放入mq中,把丢的数据找回。

四. 消息队列代码实例

1. 简单模式

send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2. 声明一个队列
channel.queue_declare(queue='myqueue1', durable=True)

# 3. 向指定队列插入数据
channel.basic_publish(exchange='',
routing_key='myqueue1',
body='hello world',
properties=pika.BasicProperties(
delivery_mode=2, # 设置持久化
))

print("消息发送完成!")
# 4. 关闭连接
connection.close()

receive.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2. 声明一个队列
channel.queue_declare(queue='myqueue1', durable=True)

# 轮询模式改为公平分发模式
channel.basic_qos(prefetch_count=1)

# 3. 定义回调函数
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"收到消息:{body}")


# 4. 绑定回调函数
channel.basic_consume(queue='myqueue1',
auto_ack=False, # 默认应答
on_message_callback=callback)

# 5. 实时监听
channel.start_consuming()

2.交换机模式

exchange_send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2. 声明一个交换机
channel.exchange_declare(exchange='logs',
exchange_type='fanout') # 发布订阅模式

# 3. 向指定队列插入数据
channel.basic_publish(exchange='logs',
routing_key='',
body='hello logs',
)
print("消息发送完成!")
# 4. 关闭连接
connection.close()

exchange_recv.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2. 声明一个交换机
channel.exchange_declare(exchange='logs',
exchange_type='fanout')

# 3. 声明一个队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue # 随机队列的名字


# 4. 定义回调函数
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"收到消息:{body}")


# 5. 队列绑定交换机
channel.queue_bind(exchange='logs',
queue=queue_name)

# 6. 绑定回调函数
channel.basic_consume(queue=queue_name,
auto_ack=False, # 默认应答
on_message_callback=callback)

# 7. 实时监听
channel.start_consuming()

3.关键字模式

direct_send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2. 声明一个交换机
channel.exchange_declare(exchange='logs2',
exchange_type='direct') # 关键字模式

# 3. 向指定队列插入数据
channel.basic_publish(exchange='logs2',
routing_key='info',
body='hello logs',
)
print("消息发送完成!")
# 4. 关闭连接
connection.close()

direct_recv.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Author : zhan
# Time : 2020/6/11 21:04
# File : simple_recv.py

import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2. 声明一个交换机
channel.exchange_declare(exchange='logs2',
exchange_type='direct')

# 3. 声明一个队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue # 随机队列的名字


# 4. 定义回调函数
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"收到消息:{body}")


# 5. 队列绑定交换机,配合关键字
#channel.queue_bind(exchange='logs2',
# queue=queue_name,
# routing_key='warning')
channel.queue_bind(exchange='logs2',
queue=queue_name,
routing_key='info')
#channel.queue_bind(exchange='logs2',
# queue=queue_name,
# routing_key='error')

# 6. 绑定回调函数
channel.basic_consume(queue=queue_name,
auto_ack=False, # 默认应答
on_message_callback=callback)

# 7. 实时监听
channel.start_consuming()

4.通配符模式

re_send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2. 声明一个交换机
channel.exchange_declare(exchange='logs3',
exchange_type='topic') # 通配符模式

# 3. 向指定队列插入数据
channel.basic_publish(exchange='logs3',
routing_key='info.123',
body='[info]: info.123',
)
print("消息发送完成!")
# 4. 关闭连接
connection.close()

re.recv.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import pika

# 1. 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 2. 声明一个交换机
channel.exchange_declare(exchange='logs3',
exchange_type='topic')

# 3. 声明一个队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue # 随机队列的名字


# 4. 定义回调函数
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"收到消息:{body}")


# 5. 队列绑定交换机,配合关键字
channel.queue_bind(exchange='logs3',
queue=queue_name,
routing_key='#.123')

# 6. 绑定回调函数
channel.basic_consume(queue=queue_name,
auto_ack=False, # 默认应答
on_message_callback=callback)

# 7. 实时监听
channel.start_consuming()

都看到这里了,不赏点银子吗^v^