RocketMQ 消费者Rebalance算法 解析——图解、源码级解析
🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年10月15日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
- 平均分配算法
- 环形平均分配算法
- 一致性哈希算法
- 指定机房算法
- 就进机房算法
- 手动配置负载均衡参数
平均分配算法
这也是消息消费时候的默认算法,所谓平均,就是同一个Topic
主题下的所有队列被同一个消费者组中的所有Consumer平均消费掉。
例如有5个队列和2和Consumer,就会根据下面的步骤进行分配:
- 5除以2不能整除,所以队列无法均分
- 每个消费者先分到2个队列
- 多出来的1个队列按照顺序分配给了第一个Consumer
具体的源码如下:
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 当前分配到的Consumer的索引
int index = cidAll.indexOf(currentCID);
// 余数
int mod = mqAll.size() % cidAll.size();
// 队列总数小于Consumer总数时,给当前Consumer分配一个队列消费
// 不能均分且当前编号小于余数时,需要给当前Consumer分配x + 1个队列,否则分配x个队列
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 取min的原因是,如果Consumer多,队列少,多出来的Consumer分配不到队列
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
@Override
public String getName() {
return "AVG";
}
}
环形平均分配算法
使用方法:
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
也可以自定义消费策略:
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
// 自定义负载策略
return null;
}
@Override
public String getName() {
return null;
}
});
所谓环形分配算法,就是把消息队列按照环形进行排列,然后同一个组下的所有Consumer按照顺序进行匹配即可,如下图所示:
上图中Topic
下共有10个消息队列,假设消费者组里有4个Consumer,分配过程如下:
- 对所有的消息队列和Consumer分别排序
- 按照顺序让Consumer和消息队列进行匹配
第一轮分配Queue1到Queue4,第二轮分配Queue5到Queue8,第三轮分配Queue9和Queue10。经过3轮分配完毕
具体源码如下所示:
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
return result;
}
@Override
public String getName() {
return "AVG_BY_CIRCLE";
}
}
一致性哈希算法
首先先介绍一下一致性哈希算法:
hash算法带来的问题:
假设后台有多个服务器,我们就可以做负载均衡将前端来的请求“平均”分配到各个服务器上来处理,如果是按照用户id对服务器个数N取模来计算hash的话,如果有一台服务器宕机,之前所有的求模计算都要重来,开销较大。
一致性哈希的思路就是:用户按照顺时针方向做排列,离哪个节点近,就去访问哪个节点。
用户按照顺时针方向,离哪个节点近,就去访问哪个节点。
此时如果有服务器宕机,直接顺着找下一个服务器节点就可以了。
如果要增加节点:
RocketMQ中对于一致性哈希的源码级实现:
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
public AllocateMessageQueueConsistentHash() {
this(10);
}
// 设计虚拟节点数量
public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
this(virtualNodeCnt, null);
}
public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
if (virtualNodeCnt < 0) {
throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
}
this.virtualNodeCnt = virtualNodeCnt;
this.customHashFunction = customHashFunction;
}
// 负载均衡算法主要实现
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 把所有消费者放到一个List里
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
// 创建hash环形结构
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
// 根据一致性hash算法,基于客户端节点,把分配到当前消费者组的MQ添加到集合里并返回
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}
return results;
}
@Override
public String getName() {
return "CONSISTENT_HASH";
}
private static class ClientNode implements Node {
private final String clientID;
public ClientNode(String clientID) {
this.clientID = clientID;
}
@Override
public String getKey() {
return clientID;
}
}
}
上面代码在ConsistentHashRouter
中创建了hash环,算法的主要流程是在这个类中实现的,主要是基于TreeMap
,感兴趣的小伙伴可以深入研究一下它的源码~
指定机房算法
假设有两个机房,则对应的消费关系如下图:
指定机房分配算法先根据MQ所述的Broker找出有效的机房里的所有MQ,然后再平分给所有的Consumer
public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
private Set<String> consumeridcs;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 计算出当前消费者ID在消费者集合中的具体位置
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
return result;
}
// 拿出BrokerName下的所有MQ
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {
premqAll.add(mq);
}
}
// 队列长度除以客户端长度
int mod = premqAll.size() / cidAll.size();
// 队列长度mod客户端长度
int rem = premqAll.size() % cidAll.size();
// 给Consumer分配MQ
int startIndex = mod * currentIndex;
int endIndex = startIndex + mod;
for (int i = startIndex; i < endIndex; i++) {
result.add(premqAll.get(i));
}
if (rem > currentIndex) {
result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}
return result;
}
@Override
public String getName() {
return "MACHINE_ROOM";
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
就进机房算法
顾名思义,就近机房分配策略是一种基于Consumer
和机房距离来分配的策略。部署在同一个机房的MQ会被先分配给同一个机房里的Consumer
。
具体步骤是先统计Consumer
与Broker
所在的机房,之后再将Broker
中的MQ分配给同机房的Consumer
消费,如果本机房里没有Consumer
,则再尝试分配给其他机房的Consumer
public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
private final MachineRoomResolver machineRoomResolver;
public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MachineRoomResolver machineRoomResolver) throws NullPointerException {
if (allocateMessageQueueStrategy == null) {
throw new NullPointerException("allocateMessageQueueStrategy is null");
}
if (machineRoomResolver == null) {
throw new NullPointerException("machineRoomResolver is null");
}
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
this.machineRoomResolver = machineRoomResolver;
}
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 将MQ按照不同的机房归纳
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {
String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
if (mr2Mq.get(brokerMachineRoom) == null) {
mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
}
mr2Mq.get(brokerMachineRoom).add(mq);
} else {
throw new IllegalArgumentException("Machine room is null for mq " + mq);
}
}
// 将consumer按照不同的机房归纳
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {
String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
if (mr2c.get(consumerMachineRoom) == null) {
mr2c.put(consumerMachineRoom, new ArrayList<String>());
}
mr2c.get(consumerMachineRoom).add(cid);
} else {
throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
}
}
List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
// 1. 分配与当前消费者部署在同一机房的MQ
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}
//2.如果机房没有活着的消费者,则将其MQ分配给每个其他的机房
for (String machineRoom : mr2Mq.keySet()) {
if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
}
}
return allocateResults;
}
@Override
public String getName() {
return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
}
/**
* 一个解析器对象,用于确定消息队列或客户端部署在哪个机房。
*
* AllocateMachineRoomNearby将使用该结果按机房对消息队列和客户端进行分组。
*
* 返回值不能为null
*/
public interface MachineRoomResolver {
String brokerDeployIn(MessageQueue messageQueue);
String consumerDeployIn(String clientID);
}
}
手动配置负载均衡参数
除了使用内置的负载均衡算法以外,还可以手动配置相关的参数,例如设置消费的队列、消费的Topic
、消费的机器等,在消费端直接设置消费队列即可:
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByConfig(){
{
this.setMessageQueueList(Collections.<MessageQueue>singletonList(new MessageQueue(){{
this.setQueueId(0);
this.setTopic("Topic name");
this.setBrokerName("Broker name");
}}));
}
});
上面的代码里,手动指定了消费队列的索引,Topic和Broker服务器的名称,之后Consumer就会在指定的服务器中进行消费,源码如下:
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
private List<MessageQueue> messageQueueList;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
return this.messageQueueList;
}
@Override
public String getName() {
return "CONFIG";
}
public List<MessageQueue> getMessageQueueList() {
return messageQueueList;
}
public void setMessageQueueList(List<MessageQueue> messageQueueList) {
this.messageQueueList = messageQueueList;
}
}
可以看到源码里只提供了一个消息队列集合,就是我们上面传入的自定义配置的MQ列表,配置完成之后就可以进行负载均衡及消费。