本文共 10488 字,大约阅读时间需要 34 分钟。
在 RocketMQ 中,定时消息和消息重试是两大重要的功能模块,分别用于支持消息的定时投递和处理,以及在消费失败时提供重试机制。理解这两部分的实现原理,对于优化消息系统的性能和可靠性非常有帮助。
本文将从以下几个方面展开分析:
定时消息是指在 Broker 收到消息后,无法立即被 Consumer 消费,而是需要等待特定时间点或达到特定时间后才能被消费的消息。这种消息在 RocketMQ 中被称为“延迟消息”,其核心特点是支持在指定时间后自动投递。
RocketMQ 目前只支持固定精度的定时消息。延迟等级的配置在 MessageStoreConfig.java 文件中进行,具体支持的延迟等级及其对应的时间如下:
| 延迟等级 | 时间 |
|---|---|
| 1 | 1 秒 |
| 2 | 5 秒 |
| 3 | 10 秒 |
| 4 | 30 秒 |
| 5 | 1 分钟 |
| 6 | 2 分钟 |
| 7 | 3 分钟 |
| 8 | 4 分钟 |
| 9 | 5 分钟 |
| 10 | 6 分钟 |
| 11 | 7 分钟 |
| 12 | 8 分钟 |
| 13 | 9 分钟 |
| 14 | 10 分钟 |
| 15 | 20 分钟 |
| 16 | 30 分钟 |
| 17 | 1 小时 |
| 18 | 2 小时 |
在 ScheduleMessageService.java 中,parseDelayLevel 方法负责解析延迟等级字符串配置。该方法通过将等级字符串按空格分割,逐个解析每个延迟等级,映射到对应的时间单位(如秒、分钟、小时等),并存储在 delayLevelTable 中。
public boolean parseDelayLevel() { // 初始化时间单位映射 HashMap timeUnitTable = new HashMap<>(); timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); timeUnitTable.put("h", 1000L * 60 * 60); timeUnitTable.put("d", 1000L * 60 * 60 * 24); // 获取延迟等级配置字符串 String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() - 1); Long tu = timeUnitTable.get(ch); if (tu != null) { int level = i + 1; if (level > this.maxDelayLevel) { this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); } } } catch (Exception e) { log.error("parseDelayLevel exception", e); log.info("levelString String = {}", levelString); return false; } return true;} 在 RocketMQ 中,Producer 可以通过设置消息的延迟等级来实现定时消息的功能。当消息被发送到 Broker 时,Broker 会根据延迟等级将消息存储到对应的延迟队列中。具体实现如下:
Message msg = new Message(...);msg.setDelayTimeLevel(level);
Broker 将定时消息存储到特定的主题中(如 SCHEDULE_TOPIC_XXXX),并根据延迟等级与消息队列编号建立固定映射关系。具体实现如下:
int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } } // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX`。 topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId);} Broker 每条消费队列对应单独一个定时任务,根据消息计划消费时间(deliverTimestamp)轮询发送消息到对应的主题中。具体实现如下:
class DeliverDelayedMessageTimerTask extends TimerTask { private final int delayLevel; private final long offset; DeliverDelayedMessageTimerTask(int delayLevel, long offset) { this.delayLevel = delayLevel; this.offset = offset; } @Override public void run() { try { this.executeOnTimeup(); } catch (Exception e) { log.error("ScheduleMessageService, executeOnTimeup exception", e); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); } } private void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); for (int i = 0; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { log.error("a message time up, but it failed, topic: {}, msgId: {}", msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { log.error("messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ", offsetPy=" + offsetPy + ", sizePy=" + sizePy + ", e)", e); } } ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); } finally { bufferCQ.release(); } } else { long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", " + cqMinOffset + ", queueId=" + cq.getQueueId()); } ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, failScheduleOffset); } } } private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); msgInner.setTagsCode(tagsCodeValue); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); msgInner.setWaitStoreMsgOK(false); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); int queueId = Integer.parseInt(queueIdStr); msgInner.setQueueId(queueId); return msgInner; } 在 RocketMQ 中,消息重试机制旨在处理 Consumer 消费消息失败的情况,将消息发回 Broker 进入延迟消息队列,等待重新消费。具体实现如下:
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException { try { // ... (省略代码) int delayLevel = requestHeader.getDelayLevel(); int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } if (msgExt.getReconsumeTimes() >= maxReconsumeTimes) { // ... (省略代码) } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel); } // ... (省略代码) return response; } catch (RemotingCommandException e) { throw e; }} 延迟等级的灵活性
如果需要支持任意时间精度,可以考虑在 Broker 层面引入消息排序技术,但可能会带来性能开销。因此,灵活性与性能之间需要权衡。定时任务的执行
Broker 应尽量优化定时任务的执行逻辑,减少任务间的竞争和资源占用,确保定时消息能够按时发送。消息存储与检索
在存储定时消息时,确保数据结构和索引的设计高效,支持快速检索和处理。重试机制的优化
在消息重试时,需要对 Consumer 端的失败原因进行全面诊断,并采取相应的重试策略,避免重试过多导致系统负载过大。日志与监控
在处理定时消息和消息重试时,增加详细的日志记录和监控指标,及时发现和处理潜在问题。通过以上分析和优化,可以更好地理解 RocketMQ 的定时消息与消息重试机制,并在实际应用中进行性能调优和系统优化。
转载地址:http://eiqfk.baihongyu.com/