1. 优化ActiveMQ性能
1.1. 一般技术1.1.1. Persistent vs Non-Persistent Message持久化和非持久化传递1.PERSISTENT(持久性消息)
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
2.NON_PERSISTENT(非持久性消息)
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。
此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式;
2.使用send 方法为每一条消息设置传送模式;
方法一:void send(Destination destination, Message message, int deliveryMode, int priority,long timeToLive);
方法二:void send(Message message, int deliveryMode, int priority, longtimeToLive);
其中 deliveryMode 为传送模式,priority 为消息优先级,timeToLive 为消息过期
时间。
方法三:producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
JMS 规范1.1允许消息传递包括Persistent和Non-Persistent。
Non-persistent传递消息比Persistents传递消息速度更快,原因如下:
1) Non-persistent发送消息是异步的,Producer不需要等待Consumer的receipt消息。如下图:
2) Persisting 传递消息是需要把消息存储起来。然后在传递,这样很慢 。
1.1.2. Transactions
事务以下列子说明了Transaction比Non-transaction的性能高。
Transaction和Non-transaction代码如下:
1.1.3. 超快回应消息
内嵌 broker;如下图:
下面以Co-lcate (合作定位)with a broker为例。
其运行原理如下图:
Java代码如下:
创建一个queue服务:
创建一个queueRequestor:
注意:
设置发送的消息不需要copy。
1.1.4. Tuning the OpenWire protocol
跨语言协议//TODO
1.1.5. Tuning the TCP Transport
TCP协议是ActiveMQ使用最常见的协议。有以下两点影响TCP协议性能:
1) socketBufferSize=缓存,默认是65536。
2) tcpNoDelay=默认是false,
示例如下:
1.2. 优化消息发送
1.2.1. Asynchronous Send在ActiveMQ4.0以上,所有的异步或同步对于Consumer来说是变得可配置了。默认是在ConnectionFactory、Connection、Connection URI等方面配置对于一个基于Destination 的Consumer来说。
众所周之,如果你想传递给Slow Consumer那么你可能使用异步的消息传递,但是对于Fast Consumer你可能使用同步发送消息。(这样可以避免同步和上下文切换额外的增加Queue堵塞花费。如果对于一个Slow Consumer,你使用同步发送消息可能出现Producer堵塞等显现。
ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Slow Consumer则使用dispatcheAsync=true,反之,那你使用的是Fast Consumer则使用dispatcheAsync=false。
用Connection URI来配置Async如下:
ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");用ConnectionFactory配置Async如下:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);用Connection配置Async如下:
((ActiveMQConnection)connection).setUseAsyncSend(true);
1.2.2. Producer Flow Control
这种适合于慢的消费者,大量的消息暂时存储到内存中,然后慢慢的dispatche。
运行原理如图下:
Java代码如下:
Xml配置的策略如下:
Disabled Producer Flow Control运行原理:
1.3. 优化消息消费者
消息消费的内部流程结构如下:
1.3.1. Prefetch Limit
ActiveMQ默认的prefetch大小不同的:Queue Consumer 默认大小=1000
Queue Browser Consumer默认大小=500
Persistent Topic Consumer默认大小=100
Non-persistent Topic Consumer默认大小=32766
Prefecth policy设置如下:
设置prefetch policy在 Destinations上:
1.3.2. Delivery and Acknowledgement of messages
传递和回执消息。建议使用Session.DUPS_ACKNOWLEDGE。
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。
在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Sessiion.TRANSACTION。用session.commit()回执确认。 Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。 Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。当消息到达一定数量后,才开始消费该消息。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。 优化回执:
1.3.3. Slow Consumer Handling
慢消费者绑定策略Slow Consumer将一起一个问题,对于非持久主题上,强迫Broker发送的消息堆积起来,使得Broker对于Producer发送慢了下来,同时Fast Consumer也慢了下来。
目前,我们有一个策略来配置在原有Consumer的预存的大小的基础上增加了一定的缓存大小。因此,这个大小最终一旦满了,则旧消息将会丢弃,新消息则会进入。这将保持了一定的缓存大小。
Pending Message Limit Strategy
等待消息限制策略
对于Slow Consumer来说,你将配置PendingMessageLimitStrategy策略来处理不同的策略。
以下有两种实现的策略:
ConstantPendingMessageLimitStrategy
Limit可以设置0、>0、-1三种方式:
0表示:不额外的增加其预存大小。
>0表示:在额外的增加其预存大小。
-1表示:不增加预存也不丢弃旧的消息。
<constantPendingMessageLimitStrategy limit="50"/>PrefetchRatePendingMessageLimitStrategy
这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>Configuring the Eviction Policy
配置去除策略
ActiveMQ有两种方式,默认配置方式如下:
<oldestMessageEvictionStrategy/>
方式二:
去除旧消息根据它的优先级来判断,如果你有一个较高的优先级的旧消息,则去除低优先级的消息。
<oldestMessageWithLowestPriorityEvictionStrategy/> 1.3.4. Destination Options
Destination Options 这种方式提供了扩展了JMS Consumer但并不是扩展了JMS API。以URL的形式来编码的。
Consumer Options
示例如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);