最近学习架构设计的时候经常看见基于消息的架构。这里就来总结一下一个常见的高性能消息队列中间件——RabbitMQ,它支持各种各样的消息队列协议(如JMS和AMQP),并且消息传递比较可靠,并发效率也比较高(RabbitMQ的实现语言是大名鼎鼎的Erlang,天生支持高并发)。这里就来总结一下RabbitMQ的基本使用,以及消息队列和消息架构的一些注意事项。
生产者-消费者模型
Intro
最简单的消息队列模型应该就是 生产者-消费者模型 了。简单的一对一模型如下图:
其中P代表生产者(Producer),C代表消费者(Consumer),中间部分代表消息队列(Message Queue)。
下面我们用RabbitMQ来实现,首先是生产者一端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static final String QUEUE_NAME = "test_q1"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "[Message Producer] 66666"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("[MessageQueue] Message sent => message=" + message); channel.close(); connection.close(); } }
|
消费者一端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.rabbitmq.client.*; import java.io.IOException; public class Recv { private static final String QUEUE_NAME = "test_q1"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("[Message Recv] => " + msg); } }; channel.basicConsume(QUEUE_NAME, false, consumer); } }
|
可以看到,无论是消费者端还是生产者端,首先要建立通信连接(ConnectionFactory -> Connection -> Channel
)。
这只是最简单的一种情况,后边将介绍更普遍、更复杂的情况。
定义消息队列
定义消息队列可以用Channel
接口的queueDeclare
方法,它有两个重载版本。含参的queueDeclare
方法定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
|
可以看到,它接受五个参数。第一个参数对应消息队列名称,第二个参数对应消息队列是否可以持久化,第三个参数对应消息队列是否仅限于当前连接,第四个参数对应消息队列是否在不用的情况下自动删除,第五个参数对应消息队列其他的一些设置。
另外还有一个无参的queueDeclare
方法,实现如下:
1 2 3 4
| public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException { return queueDeclare("", false, true, true, null); }
|
注意:Producer端和Consumer端的消息队列定义需要保持一致。
Envelope对象
Envelope对象封装了AMQP通信需要的数据,如deliveryTag, redeliver flag, exchange, routingKey等。其构造函数注释中有这几个变量的解释:
1 2 3 4 5 6 7 8 9 10 11 12 13
| * Construct an {@link Envelope} with the specified construction parameters * @param deliveryTag the delivery tag * @param redeliver true if this is a redelivery following a failed ack * @param exchange the exchange used for the current operation * @param routingKey the associated routing key */ public Envelope(long deliveryTag, boolean redeliver, String exchange, String routingKey) { this._deliveryTag = deliveryTag; this._redeliver = redeliver; this._exchange = exchange; this._routingKey = routingKey; }
|
消息分发机制(Dispatch)
RabbitMQ采用 轮询分发机制(Round-robin dispatching),每个消费者将接收到数量相近的消息。
扩展:如果想控制每次给消费者端传递的消息数量(流量控制),可以通过Channel
的basicQos
方法,它有三个重载版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| * Request specific "quality of service" settings. * * These settings impose limits on the amount of data the server * will deliver to consumers before requiring acknowledgements. * Thus they provide a means of consumer-initiated flow control. */ public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException { exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global)); } public void basicQos(int prefetchCount, boolean global) throws IOException { basicQos(0, prefetchCount, global); } public void basicQos(int prefetchCount) throws IOException { basicQos(0, prefetchCount, false); }
|
其中:
prefetchSize
表示服务器给消费者端传递数据大小的上限,0为不限
prefetchCount
表示服务器给消费者端传递数据数量的上限,0为不限
global
标志位表示此设置是否应用于整个Channel
而不是每个Consumer
消息确认机制(Acknowledge)
设想一下这种场景:消费者端处理一个耗时任务时被强制结束任务,此时任务还没有完成,但是消息却已从消息队列中发送了出去。如果没有一定的消息确认机制,那么我们将丢失掉此消息及后面一堆发送至此消费者但还未经处理的消息。
RabbitMQ支持消息确认机制。如果消费者已处理完任务,那么它将向Broker发送ACK消息,告知某条消息已被成功处理,可以从队列中移除。如果消费者端没有发送回ACK消息,那么Broker会认为消息处理失败,会将此消息及后续消息分发给其他消费者端进行处理(redeliver flag置为true)。
这种确认机制和TCP/IP协议确立连接类似。不同的是,TCP/IP确立连接需要经过三次握手,而RabbitMQ只需要一次ACK。
注意:RabbitMQ当且仅当检测到ACK消息未发出且消费者端的连接终止时才会将消息重新分发给其他消费者端,因此不需要担心消息处理时间过长而被重新分发的情况。
我们可以通过设置basicConsume
方法的autoAck
标志位来设置其消息确认机制。basicConsume
方法的定义如下:
1
| String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
|
当autoAck
为true时,不启用显式消息确认机制,消息分发出去即为确认完毕。autoAck
为false时,启用上述的消息确认机制。
消费者端发送ACK消息可以通过在回调函数中调用Channel的basicAck
方法实现:
1
| channel.basicAck(envelope.getDeliveryTag(), false);
|
其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| * Acknowledge one or several received * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method * containing the received message being acknowledged. * @see com.rabbitmq.client.AMQP.Basic.Ack * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param multiple true to acknowledge all messages up to and * including the supplied delivery tag; false to acknowledge just * the supplied delivery tag. * @throws java.io.IOException if an error is encountered */ void basicAck(long deliveryTag, boolean multiple) throws IOException;
|
WARNING:使用的时候千万不要忘了调用basicAck
方法!必要的时候可以对Consumer类再做一层封装。
简单的消息持久化(Message durability)
RabbitMQ的消息确认机制保证了即使消费者端挂了,我们的消息也可以被顺利处理。然而,如果碰上意外情况,Broker(RabbitMQ Server)挂了(比如意外重启),这种情况下不做任何设置的话,我们的信息仍然会丢失。好在,RabbitMQ同样提供了消息持久化的功能。
首先,我们需要保证我们的消息队列可以持久化,方法上面已经提到过,就是将queueDeclare
方法的durable
标志位设为true。
然后,我们需要在发布消息的时候通过设置basicPublish
方法的props
参数为PERSISTENT_TEXT_PLAIN
来实现消息可持久化,比如:
1 2
| channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
|
注意:尽管设置了消息可持久化,但这并不能完全保证消息就一定可以存储到磁盘里。RabbitMQ存在一段时间,接受了一个消息但还没来得及存储。并且,RabbitMQ不一定对所有消息都做fsync(即同步内存中的消息到硬盘),有些消息可能只是被缓存而不会被持久化。因此,这种消息持久化机制是不可靠的,特别是对大型项目。如果需要可靠的消息持久化,可以使用 publisher confirm。
Exchange
在RabbitMQ中,producer并不是直接将消息发送到message queue中,而是将它传递给一种叫exchage的结构。顾名思义,exchange用于交换消息,它接受publisher发送的消息,并按照一定的策略传递给下层的消息队列,最后传递到对应的subscriber中。Exchange有好几种:direct
, topic
, headers
, fanout
等,分别对应不同的消息分发策略。
在消费者一端,我们需要通过queueBind
函数将消息队列绑定到对应的exchange上;如果没有绑定,则RabbitMQ会给其指定一个匿名exchage(即上面生产者-消费者模型中的exchange)。
发布/订阅模型
发布-订阅模型也是一种典型的消息模型,它相当于一个“一对多”模型,也就是说publisher可以向多个subscriber发送消息。我们可以用下面的图来表示这种模型:
在RabbitMQ中,我们可以通过exchange来实现发布-订阅模型。我们需要在订阅者一端将期望接收消息的队列绑定到我们定义的exchange上(最简单的fanout
广播类型即可),然后在发布者端将消息发布至定义的exchange上即可。
路由与模式匹配
RabbitMQ还支持路由模式,即像路由那样根据path来分派消息,只要在queueBind
函数中指定感兴趣的routingKey
即可。路由模式下我们一般使用direct
类型的exchange,它会将与队列binding key匹配的消息分发到指定的队列中。
当然如果我们需要根据path的模式来匹配的话,我们可以使用topic
类型的exchange,它可以用于匹配多种pattern下的path并且分发消息。
RPC
消息队列可以用来实现RPC。我们可以利用消息队列的API封装一个RPC代理类,调用端在通过RPC Proxy调用方法时,调用端会将调用方法的元数据(名称,参数等)包装成一条消息,然后通过相应的消息队列发送至集群的另一节点中,被调用者接收到此消息,将其解码后在本地进行方法调用(LPC)。如果需要返回结果,那么被调用端在方法调用完毕后将结果包装成消息,通过消息队列再发送回调用端即可。这就是用消息队列实现RPC的一般思路。
在RabbitMQ中,实现RPC的思路比较简单:使用两个队列,分别处理调用请求及调用回复即可:
注意RPC的调用时间可能会比较长(受到网络、本地调用执行时间等因素影响),因此可能会阻塞执行线程,这也是RPC为人诟病的一点。我们可以对其稍加改造,改成异步模式,这样会使整个系统更加灵活。
与其他消息队列的对比
TODO: 待分析;后面有时间研究研究ZeroMQ和Kafka的设计及实现。
参考资料