189 8069 5689

MQTT大消息失败原因排查的过程

这篇文章将为大家详细讲解有关MQTT大消息失败原因排查的过程,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

成都网站建设公司更懂你!创新互联只做搜索引擎喜欢的网站!成都网站制作前台采用搜索引擎认可的DIV+CSS架构,全站HTML静态,HTML5+CSS3网站,提供:网站建设,微信开发,成都小程序开发商城网站建设app软件开发域名注册,服务器租售,网站代托管运营,微信公众号代托管运营。

Background

小组内使用 MQTT 协议搭建了一个聊天服务器,前天在测大消息(超过5000汉字)时,连接直接变得不可用,后续发送的消息全部都收不到回复。

服务器环境:
Netty :4.1.32.Final
使用的是 Netty 包中自带的 MqttDecoder

客户端: Android

排查过程

  1. 由于所有的消息都打印了日志,因此先搜了一下服务器日志,发现日志中并没有发送的消息内容。

  2. 难道是客户端在超长消息时没有发送?使用 tcpdump 抓了包,发现客户端正常发送,并且所有的包服务端都已经 ack,但是后续服务端没有发回响应,猜测是服务端在大消息的情况下处理失败了。

    1. tcpdump 使用 -nn 打印出ip和端口,-X 打印网络包的内容,也可以使用-w 选项保存到文件里,然后使用 tcpdumpwireshark 来分析

  3. 于是查了一下 MQTT 支持的最大 payload,MQTT 官方文档 中说明是 256M,这个大小肯定不会超过。

  4. 在服务端抓了下包,确认消息已经收到,但是无确认消息返回

  5. 开启线上debug,发现收到了一个 PUBLISH 类型的消息,但是消息的 class 不为 MqttPublishMessage, 且 payload 中无数据,但在 Message 中有一个报错消息 too large message: 56234 bytes

  6. Google 一下,有网友遇到了同样的问题, 虽然这个问题里 MQTT 是 C 语言的。

  7. 查看 MqttDecoder, 发现 decoder 有最长 payload 限制(以下为部分代码),启动代码里调用的是默认构造函数,因此默认最长数据为 8092 字节。

public final class MqttDecoder extends ReplayingDecoder {
    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
    public MqttDecoder() {
      this(DEFAULT_MAX_BYTES_IN_MESSAGE);
    }

    public MqttDecoder(int maxBytesInMessage) {
        super(DecoderState.READ_FIXED_HEADER);
        this.maxBytesInMessage = maxBytesInMessage;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception {
        switch (state()) {
            case READ_FIXED_HEADER: try {
                mqttFixedHeader = decodeFixedHeader(buffer);
                bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
                checkpoint(DecoderState.READ_VARIABLE_HEADER);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_VARIABLE_HEADER:  try {
                final Result decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
                variableHeader = decodedVariableHeader.value;
                if (bytesRemainingInVariablePart > maxBytesInMessage) {
                    throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
                }
                bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
                checkpoint(DecoderState.READ_PAYLOAD);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_PAYLOAD: try {
                final Result decodedPayload =
                        decodePayload(
                                buffer,
                                mqttFixedHeader.messageType(),
                                bytesRemainingInVariablePart,
                                variableHeader);
                bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
                if (bytesRemainingInVariablePart != 0) {
                    throw new DecoderException(
                            "non-zero remaining payload bytes: " +
                                    bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
                }
                checkpoint(DecoderState.READ_FIXED_HEADER);
                MqttMessage message = MqttMessageFactory.newMessage(
                        mqttFixedHeader, variableHeader, decodedPayload.value);
                mqttFixedHeader = null;
                variableHeader = null;
                out.add(message);
                break;
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case BAD_MESSAGE:
                // Keep discarding until disconnection.
                buffer.skipBytes(actualReadableBytes());
                break;

            default:
                // Shouldn't reach here.
                throw new Error();
        }
    }

    private MqttMessage invalidMessage(Throwable cause) {
      checkpoint(DecoderState.BAD_MESSAGE);
      return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
    }
}
  1. 长消息的原因找到了,还剩一个问题,为什么后续的消息包括 ping 消息就再也发不出去了?经过查看代码,这与 MqttDecoder 的父类 ReplayingDecoder 有关系,查看源码有详尽的类说明, 在读取可变长度头部时,如果payload 超过了最大限制,那么直接抛出异常。摘出代码如下:

case READ_VARIABLE_HEADER:  try {
    final Result decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
    variableHeader = decodedVariableHeader.value;
    if (bytesRemainingInVariablePart > maxBytesInMessage) {
        throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
    }
    bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
    checkpoint(DecoderState.READ_PAYLOAD);
    // fall through
} catch (Exception cause) {
    out.add(invalidMessage(cause));
    return;
}

在异常处理中,调用了 invalidMessage 方法,这个方法将 状态设为 DecoderState.BAD_MESSAGE, 在这个状态下,所有的字节都直接被丢弃。

case BAD_MESSAGE:
    // Keep discarding until disconnection.
    buffer.skipBytes(actualReadableBytes());
    break;

也就是说此后的消息都不会进入到业务处理逻辑,这条长连接废掉了。

解决方案

  1. 客户端对长消息做字数限制和拆分,保证单条消息不超过最大限制

  2. 服务端增大最大载荷长度,MqttDecoder 提供了构造函数(不建议使用,这样会增大服务器处理时间和内存负担)

关于MQTT大消息失败原因排查的过程就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


分享名称:MQTT大消息失败原因排查的过程
地址分享:http://cdxtjz.cn/article/ghhicd.html

联系我们

您好HELLO!
感谢您来到成都网站建设公司,若您有合作意向,请您为我们留言或使用以下方式联系我们, 我们将尽快给你回复,并为您提供真诚的设计服务,谢谢。
  • 电话:028- 86922220 18980695689
  • 商务合作邮箱:631063699@qq.com
  • 合作QQ: 532337155
  • 成都网站设计地址:成都市青羊区锣锅巷31号五金站写字楼6楼

小谭建站工作室

成都小谭网站建设公司拥有多年以上互联网从业经验的团队,始终保持务实的风格,以"帮助客户成功"为已任,专注于提供对客户有价值的服务。 我们已为众企业及上市公司提供专业的网站建设服务。我们不只是一家网站建设的网络公司;我们对营销、技术、管理都有自己独特见解,小谭建站采取“创意+综合+营销”一体化的方式为您提供更专业的服务!

小谭观点

相对传统的成都网站建设公司而言,小谭是互联网中的网站品牌策划,我们精于企业品牌与互联网相结合的整体战略服务。
我们始终认为,网站必须注入企业基因,真正使网站成为企业vi的一部分,让整个网站品牌策划体系变的深入而持久。