博客
关于我
RocketMQ 源码分析 —— 定时消息与消息重试
阅读量:798 次
发布时间:2023-03-22

本文共 10488 字,大约阅读时间需要 34 分钟。

RocketMQ 定时消息与消息重试深度分析

概述

在 RocketMQ 中,定时消息和消息重试是两大重要的功能模块,分别用于支持消息的定时投递和处理,以及在消费失败时提供重试机制。理解这两部分的实现原理,对于优化消息系统的性能和可靠性非常有帮助。

本文将从以下几个方面展开分析:

  • 定时消息的处理逻辑
  • 消息重试机制的实现
  • 核心代码解析
  • 性能优化与调优建议
  • 定时消息的处理逻辑

    定时消息的定义

    定时消息是指在 Broker 收到消息后,无法立即被 Consumer 消费,而是需要等待特定时间点或达到特定时间后才能被消费的消息。这种消息在 RocketMQ 中被称为“延迟消息”,其核心特点是支持在指定时间后自动投递。

    延迟等级的配置

    RocketMQ 目前只支持固定精度的定时消息。延迟等级的配置在 MessageStoreConfig.java 文件中进行,具体支持的延迟等级及其对应的时间如下:

    延迟等级 时间
    1 1 秒
    2 5 秒
    3 10 秒
    4 30 秒
    5 1 分钟
    6 2 分钟
    7 3 分钟
    8 4 分钟
    9 5 分钟
    10 6 分钟
    11 7 分钟
    12 8 分钟
    13 9 分钟
    14 10 分钟
    15 20 分钟
    16 30 分钟
    17 1 小时
    18 2 小时

    延迟等级的解析

    ScheduleMessageService.java 中,parseDelayLevel 方法负责解析延迟等级字符串配置。该方法通过将等级字符串按空格分割,逐个解析每个延迟等级,映射到对应的时间单位(如秒、分钟、小时等),并存储在 delayLevelTable 中。

    public boolean parseDelayLevel() {
    // 初始化时间单位映射
    HashMap
    timeUnitTable = new HashMap<>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);
    // 获取延迟等级配置字符串
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
    String[] levelArray = levelString.split(" ");
    for (int i = 0; i < levelArray.length; i++) {
    String value = levelArray[i];
    String ch = value.substring(value.length() - 1);
    Long tu = timeUnitTable.get(ch);
    if (tu != null) {
    int level = i + 1;
    if (level > this.maxDelayLevel) {
    this.maxDelayLevel = level;
    }
    long num = Long.parseLong(value.substring(0, value.length() - 1));
    long delayTimeMillis = tu * num;
    this.delayLevelTable.put(level, delayTimeMillis);
    }
    }
    } catch (Exception e) {
    log.error("parseDelayLevel exception", e);
    log.info("levelString String = {}", levelString);
    return false;
    }
    return true;
    }

    Producer 发送定时消息

    在 RocketMQ 中,Producer 可以通过设置消息的延迟等级来实现定时消息的功能。当消息被发送到 Broker 时,Broker 会根据延迟等级将消息存储到对应的延迟队列中。具体实现如下:

    Message msg = new Message(...);
    msg.setDelayTimeLevel(level);

    Broker 存储定时消息

    Broker 将定时消息存储到特定的主题中(如 SCHEDULE_TOPIC_XXXX),并根据延迟等级与消息队列编号建立固定映射关系。具体实现如下:

    int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    }
    // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX`。
    topic = ScheduleMessageService.SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    msg.setTopic(topic);
    msg.setQueueId(queueId);
    }

    Broker 发送定时消息

    Broker 每条消费队列对应单独一个定时任务,根据消息计划消费时间(deliverTimestamp)轮询发送消息到对应的主题中。具体实现如下:

    class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;
    DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
    this.delayLevel = delayLevel;
    this.offset = offset;
    }
    @Override
    public void run() {
    try {
    this.executeOnTimeup();
    } catch (Exception e) {
    log.error("ScheduleMessageService, executeOnTimeup exception", e);
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
    }
    private void executeOnTimeup() {
    ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
    long failScheduleOffset = offset;
    if (cq != null) {
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if (bufferCQ != null) {
    try {
    long nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    for (int i = 0; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    long offsetPy = bufferCQ.getByteBuffer().getLong();
    int sizePy = bufferCQ.getByteBuffer().getInt();
    long tagsCode = bufferCQ.getByteBuffer().getLong();
    long now = System.currentTimeMillis();
    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    long countdown = deliverTimestamp - now;
    if (countdown <= 0) {
    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
    if (msgExt != null) {
    try {
    MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
    PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
    if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
    continue;
    } else {
    log.error("a message time up, but it failed, topic: {}, msgId: {}", msgExt.getTopic(), msgExt.getMsgId());
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
    return;
    }
    } catch (Exception e) {
    log.error("messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ", offsetPy=" + offsetPy + ", sizePy=" + sizePy + ", e)", e);
    }
    }
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
    return;
    }
    }
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
    } finally {
    bufferCQ.release();
    }
    } else {
    long cqMinOffset = cq.getMinOffsetInQueue();
    if (offset < cqMinOffset) {
    failScheduleOffset = cqMinOffset;
    log.error("schedule CQ offset invalid. offset=" + offset + ", " + cqMinOffset + ", queueId=" + cq.getQueueId());
    }
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
    ScheduleMessageService.this.updateOffset(this.delayLevel, failScheduleOffset);
    }
    }
    }
    private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
    long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
    msgInner.setTagsCode(tagsCodeValue);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
    msgInner.setWaitStoreMsgOK(false);
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
    String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
    int queueId = Integer.parseInt(queueIdStr);
    msgInner.setQueueId(queueId);
    return msgInner;
    }

    消息重试机制

    在 RocketMQ 中,消息重试机制旨在处理 Consumer 消费消息失败的情况,将消息发回 Broker 进入延迟消息队列,等待重新消费。具体实现如下:

    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException {
    try {
    // ... (省略代码)
    int delayLevel = requestHeader.getDelayLevel();
    int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
    maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
    }
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes) {
    // ... (省略代码)
    } else {
    if (0 == delayLevel) {
    delayLevel = 3 + msgExt.getReconsumeTimes();
    }
    msgExt.setDelayTimeLevel(delayLevel);
    }
    // ... (省略代码)
    return response;
    } catch (RemotingCommandException e) {
    throw e;
    }
    }

    性能优化与调优建议

  • 延迟等级的灵活性

    如果需要支持任意时间精度,可以考虑在 Broker 层面引入消息排序技术,但可能会带来性能开销。因此,灵活性与性能之间需要权衡。

  • 定时任务的执行

    Broker 应尽量优化定时任务的执行逻辑,减少任务间的竞争和资源占用,确保定时消息能够按时发送。

  • 消息存储与检索

    在存储定时消息时,确保数据结构和索引的设计高效,支持快速检索和处理。

  • 重试机制的优化

    在消息重试时,需要对 Consumer 端的失败原因进行全面诊断,并采取相应的重试策略,避免重试过多导致系统负载过大。

  • 日志与监控

    在处理定时消息和消息重试时,增加详细的日志记录和监控指标,及时发现和处理潜在问题。

  • 通过以上分析和优化,可以更好地理解 RocketMQ 的定时消息与消息重试机制,并在实际应用中进行性能调优和系统优化。

    转载地址:http://eiqfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现double hash双哈希算法(附完整源码)
    查看>>
    Objective-C实现double linear search recursion双线性搜索递归算法(附完整源码)
    查看>>
    Objective-C实现DoublyLinkedList双链表的算法(附完整源码)
    查看>>
    Objective-C实现DPLL(davisb putnamb logemannb loveland)算法(附完整源码)
    查看>>
    Objective-C实现Edmonds-Karp算法(附完整源码)
    查看>>
    Objective-C实现EEMD算法(附完整源码)
    查看>>
    Objective-C实现EM算法(附完整源码)
    查看>>
    Objective-C实现EM算法(附完整源码)
    查看>>
    Objective-C实现entropy熵算法(附完整源码)
    查看>>
    Objective-C实现euclidean distance欧式距离算法(附完整源码)
    查看>>
    Objective-C实现Euclidean GCD欧几里得最大公约数算法(附完整源码)
    查看>>
    Objective-C实现euclideanDistance欧氏距离算法(附完整源码)
    查看>>
    Objective-C实现euler method欧拉法算法(附完整源码)
    查看>>
    Objective-C实现eulerianPath欧拉路径算法(附完整源码)
    查看>>
    Objective-C实现eval函数功能(附完整源码)
    查看>>
    Objective-C实现Exceeding words超词(差距是ascii码的距离) 算法(附完整源码)
    查看>>
    Objective-C实现extended euclidean algorithm扩展欧几里得算法(附完整源码)
    查看>>
    Objective-C实现Factorial digit sum阶乘数字和算法(附完整源码)
    查看>>
    Objective-C实现factorial iterative阶乘迭代算法(附完整源码)
    查看>>
    Objective-C实现FigurateNumber垛积数算法(附完整源码)
    查看>>