标签 威胁情报 下的文章

前言

最近刚结束了一个HVV蓝队,比较头疼,甲方内部设备简直太多了,目前各个大厂都是这么玩儿的,本地MSS不好好适配不同厂家的产品,弹起来适配的问题最好的借口就是需要开发介入调整适配,不单单是在适配不同厂商的防火墙上扯皮还是在适配不同厂商探针上同样扯皮,有这扯皮功夫还不如直接写个工具一键封禁得了。其实就目前防守状态来讲,通过告警事件联动不同区域的防火墙这种技术手段太简单了,本地MSS在态感的基础上优化剧本再加上GPT介入研判已经基本上可以解决绝大部分的告警攻击了,可能目前唯一的问题可能就是出现在厂商态感底层告警逻辑上,目前探针获取的流量也只能获取非SSL流量,不做SSL卸载或者SSL证书解密的话一部分告警是无法获取的,另外常见的横向行为和隐藏流量也只能基于厂商设备底层逻辑或者剧本编排。

这次的工具功能是封禁共享情报的ip,或者是基于不同边界的不同品牌的防火墙和WAF的封禁。

下载地址:

功能介绍

  • 支持多种防火墙设备统一管理(H3C,QAX,SXF,山石,K01,DP等)
  • 网防K01提供黑名单批量查询、添加、删除功能
  • 支持 IP 规则化、过滤、清洗等功能
  • 界面简洁,操作便捷,可以选择性的根据不同的情报源头选择不同边界的设备进行封禁
  • 关于产品型号可支持大部分型号,除网防K01外其它产品封禁实现原理是基于添加地址到地址簿,封禁策略需手动处理

工具介绍和代码逻辑

配置文件

配置文件存放在config/config.json中,需要提前配置json文件,负责无法使用程序。这里关于config的文件内容务必按照格式规则设置,否则无法解析。

DP防火墙

DP防火墙的登录方式是telnet,逻辑是通过添加ip地址进入地址簿,添加ip地址

package devices

import (
	"BT_supertoolsV2/utils"
	"fmt"
	"strings"
	"time"

	"github.com/reiver/go-telnet"
)

type DPConfig struct {
	Name         string `json:"name"`
	DeviceIP     string `json:"device_ip"`
	TelnetPort   int    `json:"telnet_port"`
	Username     string `json:"username"`
	Password     string `json:"password"`
	AddressGroup string `json:"address_group"`
	Type         string `json:"type"`
}

func AddIPsToDP(cfg DPConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 DP 设备 %s ===\n", cfg.DeviceIP))

	// IP 规则化处理
	normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
	if normalizedIPs == "" {
		return "没有有效的 IP 地址!"
	}
	ipList := strings.Split(normalizedIPs, "\n")

	// 连接 Telnet
	address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
	conn, err := telnet.DialTo(address)
	if err != nil {
		return fmt.Sprintf("Telnet 连接失败: %v\n", err)
	}
	defer conn.Close()

	// 封装发送命令函数
	send := func(cmd string) {
		conn.Write([]byte(cmd + "\n"))
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
	}

	// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
	send(cfg.Username)
	send(cfg.Password)
	send("conf")
	for _, ip := range ipList {
		send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
	}
	send("exit")

	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

这里没有直接使用回显显示状态,没有监听服务器回显状态再输入命令,具体使用该方式是新增一个地址簿,再策略位置设置封禁策略,引用添加的地址簿即可。界面效果

H3c防火墙

采用ssh登录使用命令操作

package devices

import (
	"fmt"
	"strings"
	"time"

	"golang.org/x/crypto/ssh"
)

func AddIPsToH3C(cfg H3CConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 H3C 设备 %s ===\n\n", cfg.DeviceIP))

	config := &ssh.ClientConfig{
		User: cfg.Username,
		Auth: []ssh.AuthMethod{
			ssh.Password(cfg.Password),
		},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
		Timeout:         10 * time.Second,
	}

	client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
	if err != nil {
		return fmt.Sprintf("SSH连接失败: %v\n", err)
	}
	defer client.Close()

	session, err := client.NewSession()
	if err != nil {
		return fmt.Sprintf("创建会话失败: %v\n", err)
	}
	defer session.Close()

	stdin, err := session.StdinPipe()
	if err != nil {
		return fmt.Sprintf("获取输入管道失败: %v\n", err)
	}

	modes := ssh.TerminalModes{
		ssh.ECHO:          0,
		ssh.TTY_OP_ISPEED: 14400,
		ssh.TTY_OP_OSPEED: 14400,
	}
	if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
		return fmt.Sprintf("设置终端失败: %v\n", err)
	}
	if err := session.Shell(); err != nil {
		return fmt.Sprintf("启动 shell 失败: %v\n", err)
	}

	commands := []string{
		"system-view",
		fmt.Sprintf("object-group ip  %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network host address %s", ip))
	}
	commands = append(commands, "exit")

	for _, cmd := range commands {
		fmt.Fprintf(stdin, "%s\n", cmd)
		time.Sleep(300 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
	}
	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

这里采用了监听回显,核心代码是

commands := []string{
		"system-view",
		fmt.Sprintf("object-group ip  %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network host address %s", ip))
	}
	commands = append(commands, "exit")

当然这里可以增加优化,代码内关于ip地址的添加以及删除等操作都可以做,工具后期可以依托命令功能增加多个模块化设计,当然为了降低操作风险的话,这里黑名单封禁的操作足够使用了。

山石防火墙

山石登录方式是telnet登录使用命令操作

import (
	"BT_supertoolsV2/utils"
	"fmt"
	"strings"
	"time"

	"github.com/reiver/go-telnet"
)

type HSConfig struct {
	Name         string `json:"name"`
	DeviceIP     string `json:"device_ip"`
	TelnetPort   int    `json:"telnet_port"`
	Username     string `json:"username"`
	Password     string `json:"password"`
	AddressGroup string `json:"address_group"`
	Type         string `json:"type"`
}

func AddIPsToHillstone(cfg HSConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 Hillstone 设备 %s ===\n", cfg.DeviceIP))

	// IP 规则化处理
	normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
	if normalizedIPs == "" {
		return "没有有效的 IP 地址!"
	}
	ipList := strings.Split(normalizedIPs, "\n")

	// 连接 Telnet
	address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
	conn, err := telnet.DialTo(address)
	if err != nil {
		return fmt.Sprintf("Telnet 连接失败: %v\n", err)
	}
	defer conn.Close()

	// 封装发送命令函数
	send := func(cmd string) {
		conn.Write([]byte(cmd + "\n"))
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
	}

	// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
	send(cfg.Username)
	send(cfg.Password)
	send("conf")
	for _, ip := range ipList {
		send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
	}
	send("exit")

	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

山石防火墙的命令基本和华三防火墙的命令基本一致,目前基本上可以支持大多数版本型号的防火墙,使用前可以做测试。

网盾K01

网盾K01的话这里采用的是关于api的利用,目前关于网盾K01的黑名单添加的模式是和防火墙不一致的,可以采用接口的方式增加。目前代码中关于黑名单添加的事件设置的是3600小时。

// 批量封禁
func AddBlacklist(devices []K01Device, ips []string, output *widget.Entry) {
	for _, device := range devices {
		appendOutput(output, fmt.Sprintf("🔑 正在登录设备 [https://%s]...", device.IP))

		// 登录并获取 Token
		url := fmt.Sprintf("https://%s", device.IP)
		token := Login(url, device.Username, device.Password, output, device.Name)
		if token == "" {
			appendOutput(output, fmt.Sprintf("❌ 登录失败: %s", device.Name))
			continue
		}

		// 逐个 IP 添加到黑名单
		for _, ip := range ips {
			// 如果 IP 地址没有 CIDR 后缀,添加 "/32"
			if !strings.Contains(ip, "/") {
				ip += "/32"
			}

			// 打印调试日志,确认每个 IP 地址
			appendOutput(output, fmt.Sprintf("🔍 封禁 IP: %s", ip))

			// 准备请求 payload
			payload := map[string]interface{}{
				"color":       0,
				"device_mask": []int{224},
				"items": []map[string]interface{}{
					{
						"type":        0,
						"ip":          ip, // 这里单独封禁每个 IP 地址
						"timeout":     336,
						"time_type":   "3600",
						"device_mask": []int{224},
						"comment":     "自动封禁",
					},
				},
				"method": "add",
			}

			// 打印调试日志,确认请求 Payload
			payloadBytes, _ := json.Marshal(payload)
			// appendOutput(output, fmt.Sprintf("🔍 请求 Payload: %s", string(payloadBytes)))

			// 请求封禁
			req, err := http.NewRequest("POST", url+"/api/v1/security/iplist/save", bytes.NewBuffer(payloadBytes))
			if err != nil {
				appendOutput(output, fmt.Sprintf("❌ 创建封禁请求失败: %s", err.Error()))
				continue
			}
			req.Header.Set("Authorization", "Bearer "+token)
			req.Header.Set("Content-Type", "application/json")

			// 执行请求
			resp, err := insecureClient.Do(req)
			if err != nil {
				appendOutput(output, fmt.Sprintf("❌ 封禁请求失败: %s", err.Error()))
				continue
			}
			defer resp.Body.Close()

			// // 打印调试日志,确认响应代码
			// appendOutput(output, fmt.Sprintf("🔍 响应状态: %s", resp.Status))

			// 解析响应
			body, _ := ioutil.ReadAll(resp.Body)
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				appendOutput(output, fmt.Sprintf("❌ 解析封禁响应失败: %s", err.Error()))
				continue
			}

			// // 打印调试日志,确认响应结果
			// appendOutput(output, fmt.Sprintf("🔍 响应结果: %v", result))

			// 根据结果显示成功或失败
			if success, ok := result["success"].(bool); ok && success {
				appendOutput(output, fmt.Sprintf("✅ 添加成功: %s", ip))
			} else {
				appendOutput(output, fmt.Sprintf("❌ 添加失败: %v", result["msg"]))
			}
		}
	}
}

// DeleteBlacklist 删除 IP 从黑名单
func DeleteBlacklist(devices []K01Device, selected []int, ipList []string, outputBox *widget.Entry) {
	for _, idx := range selected {
		device := devices[idx] // 获取当前选择的设备
		url := fmt.Sprintf("https://%s", device.IP)

		// 输出正在登录设备信息
		appendOutput(outputBox, fmt.Sprintf("🔄 正在登录设备【%s】...", device.Name))

		// 登录设备,获取 token
		token := Login(url, device.Username, device.Password, outputBox, device.Name)
		if token == "" {
			appendOutput(outputBox, fmt.Sprintf("❌ 设备【%s】登录失败,无法删除黑名单", device.Name))
			continue
		}

		// 遍历 IP 列表,直接进行删除操作
		for _, ip := range ipList {
			apiUrl := fmt.Sprintf("%s/api/v1/security/iplist/save", url)
			payload := fmt.Sprintf(`{
				"color": 0,
				"items": [{"id": "%s/32;0;0", "type": 0, "ip": "%s/32", "device_mask": [224]}],
				"method": "delete"
			}`, ip, ip)

			// 创建请求
			req, err := http.NewRequest("POST", apiUrl, bytes.NewBuffer([]byte(payload)))
			if err != nil {
				appendOutput(outputBox, "❌ 创建删除请求失败: "+err.Error())
				continue
			}
			req.Header.Set("Authorization", "Bearer "+token)
			req.Header.Set("Content-Type", "application/json")

			// 发送请求
			resp, err := insecureClient.Do(req)
			if err != nil {
				appendOutput(outputBox, "❌ 删除请求失败: "+err.Error())
				continue
			}
			defer resp.Body.Close()

			// 读取响应
			body, err := ioutil.ReadAll(resp.Body)
			if err != nil {
				appendOutput(outputBox, "❌ 读取删除响应失败: "+err.Error())
				continue
			}

			// 解析响应
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				appendOutput(outputBox, "❌ 解析删除响应失败: "+err.Error())
				continue
			}

			// 检查删除是否成功
			if success, ok := result["success"].(bool); ok && success {
				appendOutput(outputBox, fmt.Sprintf("✅ 删除成功,IP: %s", ip))
			} else {
				// 如果失败,输出错误信息
				if msg, ok := result["msg"].(string); ok {
					appendOutput(outputBox, fmt.Sprintf("❌ 删除失败: %s", msg))
				} else {
					appendOutput(outputBox, "❌ 删除失败,未知错误")
				}
			}
		}
	}

关于api的调用的话是有其自己的参数规则的,因为认证方式是https,所以关于身份认证的话需要忽略ssl证书。因为考虑到删除函数的逻辑需先查询要封禁的黑名单是否在黑名单列表,所以这里就执行的是直接删除,否则就删除失败,但是对于功能性的查询模块的功能是有的,由于网盾k01的产品特性,一点发现全网阻断,所以这里基本上用不到删除黑名单的功能。

QAX网神防火墙

网神联动利用的是ssh登录执行命令,所以这里权限也相对来说比较大,对于蓝队来讲不建议增加其它代码功能,原理也是利用策略引用地址簿进行封禁

func AddIPsToQAX(cfg QAXConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置奇安信设备 %s ===\n\n", cfg.DeviceIP))

	config := &ssh.ClientConfig{
		User: cfg.Username,
		Auth: []ssh.AuthMethod{
			ssh.Password(cfg.Password),
		},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
		Timeout:         15 * time.Second,
	}

	client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
	if err != nil {
		return fmt.Sprintf("SSH连接失败: %v\n", err)
	}
	defer client.Close()

	session, err := client.NewSession()
	if err != nil {
		return fmt.Sprintf("创建会话失败: %v\n", err)
	}
	defer session.Close()

	stdin, err := session.StdinPipe()
	if err != nil {
		return fmt.Sprintf("获取输入管道失败: %v\n", err)
	}

	modes := ssh.TerminalModes{
		ssh.ECHO:          0,
		ssh.TTY_OP_ISPEED: 14400,
		ssh.TTY_OP_OSPEED: 14400,
	}
	if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
		return fmt.Sprintf("设置终端失败: %v\n", err)
	}
	if err := session.Shell(); err != nil {
		return fmt.Sprintf("启动shell失败: %v\n", err)
	}

	commands := []string{
		"config terminal",
		fmt.Sprintf("object address %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network %s 32", ip))
	}
	commands = append(commands, "exit")

	for _, cmd := range commands {
		fmt.Fprintf(stdin, "%s\n", cmd)
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
	}
	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

向地址簿中添加ip地址核心代码

commands := []string{
	"config terminal",
	fmt.Sprintf("object address %s", cfg.AddressGroup),
}
for _, ip := range ips {
	commands = append(commands, fmt.Sprintf("network %s 32", ip))
}
commands = append(commands, "exit")

目前我也是查询了多款网神防火墙的手册,关于地址簿添加这块儿的命令版本是没有变化的,基本上支持大多数版本型号。

SXF_AF和WAF

这里SXF的设备是有两种模式可供选择的,但是目前SXF的ssh登录一般是由两段密码组成还有可能经常性的修改,所以这里使用的API的方式向地址簿中添加要封禁的IP地址,但是这里的话,策略务必要配置正确。

// 新增的结构体,用于表示包含 devices 字段的 JSON 数据结构
type DeviceList struct {
	Devices []SXFDevice `json:"devices"`
}

// 登录响应结构体
type loginResponse struct {
	Code int `json:"code"`
	Data struct {
		LoginResult struct {
			Token string `json:"token"`
		} `json:"loginResult"`
	} `json:"data"`
	Message string `json:"message"`
}

// 添加 IP 请求结构体
type ipRange struct {
	Start string `json:"start"`
}

type addIPRequest struct {
	IPRanges []ipRange `json:"ipRanges"`
}

// ✅ 防冲突:明确为 SXF 的登录函数
// 登录获取 token
func SXFLogin(device SXFDevice) (string, error) {
	// 打印设备信息,确保 IP、用户名和密码正确
	fmt.Printf("设备信息: IP=%s, Username=%s, Password=%s\n", device.IP, device.Username, device.Password)
	url := fmt.Sprintf("https://%s/api/v1/namespaces/public/login", device.IP)

	payload := map[string]string{
		"name":     device.Username,
		"password": device.Password,
	}
	jsonData, _ := json.Marshal(payload)

	client := &http.Client{
		Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
	}
	resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
	if err != nil {
		return "", fmt.Errorf("登录请求失败: %v", err)
	}
	defer resp.Body.Close()

	body, _ := io.ReadAll(resp.Body)

	// 打印响应内容进行调试
	fmt.Println("响应体内容:", string(body))

	var result struct {
		Code    int    `json:"code"`
		Message string `json:"message"`
		Data    struct {
			LoginResult struct {
				Token string `json:"token"`
			} `json:"loginResult"`
		} `json:"data"`
	}

	if err := json.Unmarshal(body, &result); err != nil {
		return "", fmt.Errorf("解析登录响应失败: %v", err)
	}

	// 检查返回的 code
	if result.Code != 0 {
		return "", fmt.Errorf("登录失败: %s", result.Message)
	}

	// 返回 token
	return result.Data.LoginResult.Token, nil
}

// 加载 JSON 文件并解析为设备切片
// 加载 JSON 文件并解析为设备切片
func loadSXFDevices(filePath string) ([]SXFDevice, error) {
	data, err := os.ReadFile(filePath)
	if err != nil {
		return nil, fmt.Errorf("设备配置文件加载失败: %v", err)
	}

	// 输出加载的 JSON 数据(用于调试)
	fmt.Printf("加载的 JSON 数据: %s\n", string(data))

	var devices []SXFDevice
	err = json.Unmarshal(data, &devices)
	if err != nil {
		return nil, fmt.Errorf("设备解析失败: %v", err)
	}

	// 输出解析后的设备信息(调试)
	fmt.Printf("加载的设备数量: %d\n", len(devices))
	for _, device := range devices {
		// 输出每台设备的详细信息,特别是 AddressGroup 字段
		fmt.Printf("设备信息:Name=%s, IP=%s, AddressGroup=%s\n", device.Name, device.IP, device.AddressGroup)
	}

	return devices, nil
}

// ✅ 添加多个 IP 到 SXF 地址组(支持日志回调)
// 在 AddToSXFBlacklist 函数内部,确保请求头正确设置
// 设备添加到黑名单的函数
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
// 添加多个 IP 到 SXF 地址组(支持日志回调)
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
func AddToSXFBlacklist(devices []SXFDevice, ips []string, logFn func(string)) error {

	for _, device := range devices {
		// 1. 登录
		token, err := SXFLogin(device)
		if err != nil {
			logFn(fmt.Sprintf("【%s】❌ 登录失败: %v", device.Name, err))
			continue
		}
		logFn(fmt.Sprintf("【%s】✅ 登录成功", device.Name))

		// 2. 添加每个IP
		for _, ip := range ips {
			ipRanges := []map[string]string{{"start": ip, "end": ip}}
			requestData := map[string]interface{}{"ipRanges": ipRanges}
			requestDataJSON, _ := json.Marshal(requestData)

			// 3. 发送请求
			url := fmt.Sprintf(
				"https://%s/api/v1/namespaces/public/ipgroups/%s?_arrayop=add",
				device.IP,
				device.AddressGroup,
			)
			req, _ := http.NewRequest("PATCH", url, bytes.NewBuffer(requestDataJSON))
			req.Header = http.Header{
				"token":        []string{token},
				"Content-Type": []string{"application/json"},
			}

			client := &http.Client{
				Transport: &http.Transport{
					TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
				},
			}
			resp, err := client.Do(req)
			if err != nil {
				logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %v", device.Name, ip, err))
				continue
			}
			defer resp.Body.Close()

			// 4. 处理响应
			body, _ := io.ReadAll(resp.Body)
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				logFn(fmt.Sprintf("【%s】❌ IP %s 响应解析失败: %v", device.Name, ip, err))
				continue
			}

			if code, ok := result["code"].(float64); ok && code == 0 {
				logFn(fmt.Sprintf("【%s】✅ IP %s 已添加到地址组[%s]",
					device.Name, ip, device.AddressGroup))
			} else {
				msg, _ := result["message"].(string)
				logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %s", device.Name, ip, msg))
			}
		}
	}
	return nil
}

如果想要扩展功能的话不建议使用API去扩展,毕竟太麻烦了,各种参数比较麻烦,没有ssh的方式登录使用命令操作简便。

整体界面效果

总结

其它厂商的防火墙确实没找到手册,有的是找到手册没有测试的设备,目前上述的设备和代码是完全没有问题的,如果想添加其它厂商的设备,可以给我操作手册,我这边更新新版本发布。每次更新后的授权时间是3个月

最近项目组做了一次安全项目,在联动讨论中,我们团队提出攻克一个一直被“模糊处理”的问题:如何在不引入复杂流量解密、不严重影响性能的前提下,更可靠地识别潜在的 C2通信行为。

其实在我看来这个问题并不新,在往常的项目中异常端口、可疑域名、频繁外联、策略命中日志等检测点都是这个问题。而这些信号单独存在时,误报率高,且很难形成可执行的处置结论。而本次讨论的关键转折点,正是在“出站 IP 本身是否具备明确恶意属性”这一角度上。

一、从“流量行为”转向“通信对象本身”的判断

在复盘既往处置案例时,阿孙提到一个共性:多数被确认为C2 的样本不是因为流量形态极其复杂,而是其通信目标本身就具备明显的基础设施风险特征,比如位于高风险国家或地区、IP 段频繁出现在僵尸网络、钓鱼、木马回连情报中、长期驻留在 VPS/云主机/异常 ASN、多项目/多情报源重复命中等

但这些信息在现有体系中是割裂的,地理信息在 IP 归属系统中,恶意标签在威胁情报平台中,而F火墙/EDR却只能看到“一个外联IP”,由此,我门讨论是否可以多采用IP离线库植入威胁情报,手动拓宽我们的“威胁情报”,提出这正是我们提出使用IP离线库和威胁情报进行融合。

二、 为什么必须引入 IP 离线库,而不是完全依赖情报 API

最初也有人提出直接调用在线威胁情报 API 即可,但在技术评估阶段,很快暴露出几个不可回避的问题:

1. 出站连接频率高,实时 API 成本与延迟不可控
在核心业务网段,单节点每天的外联 IP 数量级在百万级,实时查询并不可行。

2. 部分安全系统处于内网或半隔离环境
核心日志分析、审计系统无法直接访问外部情报接口。

3. IP 基础属性缺失会削弱判断上下文
单一“是否恶意”的结论,无法解释风险来源,例如:

1. 这是一个海外 IDC 正常业务 IP?

2. 还是位于高风险区域的小型自治系统?

3. 是否属于动态拨号或代理出口?

因此,我们的思路是先通过本地 IP 离线库快速完成“背景定性”,再用威胁情报完成“恶意定量”  。

三、 IP离线库在 C2 识别中的实际定位

在方案中,IP 离线库并不直接承担“是否 C2”的判断,而是用于回答以下关键问题:

1. 该出站 IP 的国家/地区/城市是否与业务场景匹配

2. 是否命中云厂商、数据中心、匿名网络、异常 ASN

3. 是否存在明显的跨国、跨区域跳变特征

4. 是否属于历史上极少访问、但突然频繁出现的地理位置

在我们实际部署中,使用的是IP数据云离线库,它覆盖IPv4/IPv6的本地IP 离线库,字段不仅包括国家、省市,还包含 ASN、运营商类型、IDC/住宅网络标识,方便后续识别C2通信,评估出站IP是否为已知恶意地址。
【场景:识别C2通信】评估出站IP是否为已知恶意地址,方法:IP离线库+威胁情报融合2.png

四、 威胁情报融合的方式,而非简单“命中即拦截”

第二层才是威胁情报,但这里我们刻意避免了“黑名单式”的粗暴使用方式。

具体做法是:

第一步,多源情报聚合:商业情报 + 社区情报 + 历史处置数据

第二步,情报置信度分级:区分活跃 C2、历史恶意、关联基础设施

第三步,时间衰减机制:避免因陈旧情报导致长期误判

当某个出站 IP在 IP 离线库中表现为、海外小众地区、云主机 / VPS、非业务白名单 ASN,该IP同时在威胁情报中命中已知 C2 或僵尸网络关联,被多个情报源低频标注,我们才将其提升为  “高置信度可疑 C2 通信”  ,进入阻断或人工复核流程。

五、 实际落地的处理流程拆解

在最终方案中,整体流程被拆解为清晰的四个阶段:

1. 出站连接采集
从F火墙、NDR、EDR 中统一采集目的 IP、端口、协议、频率。

2.IP 离线库快速画像

1. 地理位置

2. ASN / 网络类型

3. 是否云主机 / IDC

4. 是否偏离正常业务访问分布

3.威胁情报关联评分

1. 是否命中恶意标签

2. 情报来源数量

3. 最近活跃时间

4. 策略与响应联动

1. 自动阻断(高置信度)

2. 降权监控(中置信度)

3. 留痕审计(低置信度)

这种方式的一个明显变化是我们不再“猜测流量是不是 C2”,而是在判断“这个通信对象是否值得被当作 C2 对待”  。

项目讨论中最被认可的是①不依赖深度包检测;②不影响现有网络性能;③可解释性强,适合审计与复盘;④能在离线、内网环境稳定运行。【场景:识别C2通信】评估出站IP是否为已知恶意地址,方法:IP离线库+威胁情报融合1.png

最近项目组做了一次安全项目,在联动讨论中,我们团队提出攻克一个一直被“模糊处理”的问题:如何在不引入复杂流量解密、不严重影响性能的前提下,更可靠地识别潜在的 C2通信行为。

其实在我看来这个问题并不新,在往常的项目中异常端口、可疑域名、频繁外联、策略命中日志等检测点都是这个问题。而这些信号单独存在时,误报率高,且很难形成可执行的处置结论。而本次讨论的关键转折点,正是在“出站 IP 本身是否具备明确恶意属性”这一角度上。

一、从“流量行为”转向“通信对象本身”的判断

在复盘既往处置案例时,阿孙提到一个共性:多数被确认为C2 的样本不是因为流量形态极其复杂,而是其通信目标本身就具备明显的基础设施风险特征,比如位于高风险国家或地区、IP 段频繁出现在僵尸网络、钓鱼、木马回连情报中、长期驻留在 VPS/云主机/异常 ASN、多项目/多情报源重复命中等

但这些信息在现有体系中是割裂的,地理信息在 IP 归属系统中,恶意标签在威胁情报平台中,而F火墙/EDR却只能看到“一个外联IP”,由此,我门讨论是否可以多采用IP离线库植入威胁情报,手动拓宽我们的“威胁情报”,提出这正是我们提出使用IP离线库和威胁情报进行融合。

二、 为什么必须引入 IP 离线库,而不是完全依赖情报 API

最初也有人提出直接调用在线威胁情报 API 即可,但在技术评估阶段,很快暴露出几个不可回避的问题:

1. 出站连接频率高,实时 API 成本与延迟不可控
在核心业务网段,单节点每天的外联 IP 数量级在百万级,实时查询并不可行。

2. 部分安全系统处于内网或半隔离环境
核心日志分析、审计系统无法直接访问外部情报接口。

3. IP 基础属性缺失会削弱判断上下文
单一“是否恶意”的结论,无法解释风险来源,例如:

1. 这是一个海外 IDC 正常业务 IP?

2. 还是位于高风险区域的小型自治系统?

3. 是否属于动态拨号或代理出口?

因此,我们的思路是先通过本地 IP 离线库快速完成“背景定性”,再用威胁情报完成“恶意定量”  。

三、 IP离线库在 C2 识别中的实际定位

在方案中,IP 离线库并不直接承担“是否 C2”的判断,而是用于回答以下关键问题:

1. 该出站 IP 的国家/地区/城市是否与业务场景匹配

2. 是否命中云厂商、数据中心、匿名网络、异常 ASN

3. 是否存在明显的跨国、跨区域跳变特征

4. 是否属于历史上极少访问、但突然频繁出现的地理位置

在我们实际部署中,使用的是IP数据云离线库,它覆盖IPv4/IPv6的本地IP 离线库,字段不仅包括国家、省市,还包含 ASN、运营商类型、IDC/住宅网络标识,方便后续识别C2通信,评估出站IP是否为已知恶意地址。
【场景:识别C2通信】评估出站IP是否为已知恶意地址,方法:IP离线库+威胁情报融合2.png

四、 威胁情报融合的方式,而非简单“命中即拦截”

第二层才是威胁情报,但这里我们刻意避免了“黑名单式”的粗暴使用方式。

具体做法是:

第一步,多源情报聚合:商业情报 + 社区情报 + 历史处置数据

第二步,情报置信度分级:区分活跃 C2、历史恶意、关联基础设施

第三步,时间衰减机制:避免因陈旧情报导致长期误判

当某个出站 IP在 IP 离线库中表现为、海外小众地区、云主机 / VPS、非业务白名单 ASN,该IP同时在威胁情报中命中已知 C2 或僵尸网络关联,被多个情报源低频标注,我们才将其提升为  “高置信度可疑 C2 通信”  ,进入阻断或人工复核流程。

五、 实际落地的处理流程拆解

在最终方案中,整体流程被拆解为清晰的四个阶段:

1. 出站连接采集
从F火墙、NDR、EDR 中统一采集目的 IP、端口、协议、频率。

2.IP 离线库快速画像

1. 地理位置

2. ASN / 网络类型

3. 是否云主机 / IDC

4. 是否偏离正常业务访问分布

3.威胁情报关联评分

1. 是否命中恶意标签

2. 情报来源数量

3. 最近活跃时间

4. 策略与响应联动

1. 自动阻断(高置信度)

2. 降权监控(中置信度)

3. 留痕审计(低置信度)

这种方式的一个明显变化是我们不再“猜测流量是不是 C2”,而是在判断“这个通信对象是否值得被当作 C2 对待”  。

项目讨论中最被认可的是①不依赖深度包检测;②不影响现有网络性能;③可解释性强,适合审计与复盘;④能在离线、内网环境稳定运行。【场景:识别C2通信】评估出站IP是否为已知恶意地址,方法:IP离线库+威胁情报融合1.png

前言

最近刚结束了一个HVV蓝队,比较头疼,甲方内部设备简直太多了,目前各个大厂都是这么玩儿的,本地MSS不好好适配不同厂家的产品,弹起来适配的问题最好的借口就是需要开发介入调整适配,不单单是在适配不同厂商的防火墙上扯皮还是在适配不同厂商探针上同样扯皮,有这扯皮功夫还不如直接写个工具一键封禁得了。其实就目前防守状态来讲,通过告警事件联动不同区域的防火墙这种技术手段太简单了,本地MSS在态感的基础上优化剧本再加上GPT介入研判已经基本上可以解决绝大部分的告警攻击了,可能目前唯一的问题可能就是出现在厂商态感底层告警逻辑上,目前探针获取的流量也只能获取非SSL流量,不做SSL卸载或者SSL证书解密的话一部分告警是无法获取的,另外常见的横向行为和隐藏流量也只能基于厂商设备底层逻辑或者剧本编排。

这次的工具功能是封禁共享情报的ip,或者是基于不同边界的不同品牌的防火墙和WAF的封禁。

下载地址:

功能介绍

  • 支持多种防火墙设备统一管理(H3C,QAX,SXF,山石,K01,DP等)
  • 网防K01提供黑名单批量查询、添加、删除功能
  • 支持 IP 规则化、过滤、清洗等功能
  • 界面简洁,操作便捷,可以选择性的根据不同的情报源头选择不同边界的设备进行封禁
  • 关于产品型号可支持大部分型号,除网防K01外其它产品封禁实现原理是基于添加地址到地址簿,封禁策略需手动处理

工具介绍和代码逻辑

配置文件

配置文件存放在config/config.json中,需要提前配置json文件,负责无法使用程序。这里关于config的文件内容务必按照格式规则设置,否则无法解析。

DP防火墙

DP防火墙的登录方式是telnet,逻辑是通过添加ip地址进入地址簿,添加ip地址

package devices

import (
	"BT_supertoolsV2/utils"
	"fmt"
	"strings"
	"time"

	"github.com/reiver/go-telnet"
)

type DPConfig struct {
	Name         string `json:"name"`
	DeviceIP     string `json:"device_ip"`
	TelnetPort   int    `json:"telnet_port"`
	Username     string `json:"username"`
	Password     string `json:"password"`
	AddressGroup string `json:"address_group"`
	Type         string `json:"type"`
}

func AddIPsToDP(cfg DPConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 DP 设备 %s ===\n", cfg.DeviceIP))

	// IP 规则化处理
	normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
	if normalizedIPs == "" {
		return "没有有效的 IP 地址!"
	}
	ipList := strings.Split(normalizedIPs, "\n")

	// 连接 Telnet
	address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
	conn, err := telnet.DialTo(address)
	if err != nil {
		return fmt.Sprintf("Telnet 连接失败: %v\n", err)
	}
	defer conn.Close()

	// 封装发送命令函数
	send := func(cmd string) {
		conn.Write([]byte(cmd + "\n"))
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
	}

	// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
	send(cfg.Username)
	send(cfg.Password)
	send("conf")
	for _, ip := range ipList {
		send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
	}
	send("exit")

	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

这里没有直接使用回显显示状态,没有监听服务器回显状态再输入命令,具体使用该方式是新增一个地址簿,再策略位置设置封禁策略,引用添加的地址簿即可。界面效果

H3c防火墙

采用ssh登录使用命令操作

package devices

import (
	"fmt"
	"strings"
	"time"

	"golang.org/x/crypto/ssh"
)

func AddIPsToH3C(cfg H3CConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 H3C 设备 %s ===\n\n", cfg.DeviceIP))

	config := &ssh.ClientConfig{
		User: cfg.Username,
		Auth: []ssh.AuthMethod{
			ssh.Password(cfg.Password),
		},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
		Timeout:         10 * time.Second,
	}

	client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
	if err != nil {
		return fmt.Sprintf("SSH连接失败: %v\n", err)
	}
	defer client.Close()

	session, err := client.NewSession()
	if err != nil {
		return fmt.Sprintf("创建会话失败: %v\n", err)
	}
	defer session.Close()

	stdin, err := session.StdinPipe()
	if err != nil {
		return fmt.Sprintf("获取输入管道失败: %v\n", err)
	}

	modes := ssh.TerminalModes{
		ssh.ECHO:          0,
		ssh.TTY_OP_ISPEED: 14400,
		ssh.TTY_OP_OSPEED: 14400,
	}
	if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
		return fmt.Sprintf("设置终端失败: %v\n", err)
	}
	if err := session.Shell(); err != nil {
		return fmt.Sprintf("启动 shell 失败: %v\n", err)
	}

	commands := []string{
		"system-view",
		fmt.Sprintf("object-group ip  %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network host address %s", ip))
	}
	commands = append(commands, "exit")

	for _, cmd := range commands {
		fmt.Fprintf(stdin, "%s\n", cmd)
		time.Sleep(300 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
	}
	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

这里采用了监听回显,核心代码是

commands := []string{
		"system-view",
		fmt.Sprintf("object-group ip  %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network host address %s", ip))
	}
	commands = append(commands, "exit")

当然这里可以增加优化,代码内关于ip地址的添加以及删除等操作都可以做,工具后期可以依托命令功能增加多个模块化设计,当然为了降低操作风险的话,这里黑名单封禁的操作足够使用了。

山石防火墙

山石登录方式是telnet登录使用命令操作

import (
	"BT_supertoolsV2/utils"
	"fmt"
	"strings"
	"time"

	"github.com/reiver/go-telnet"
)

type HSConfig struct {
	Name         string `json:"name"`
	DeviceIP     string `json:"device_ip"`
	TelnetPort   int    `json:"telnet_port"`
	Username     string `json:"username"`
	Password     string `json:"password"`
	AddressGroup string `json:"address_group"`
	Type         string `json:"type"`
}

func AddIPsToHillstone(cfg HSConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 Hillstone 设备 %s ===\n", cfg.DeviceIP))

	// IP 规则化处理
	normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
	if normalizedIPs == "" {
		return "没有有效的 IP 地址!"
	}
	ipList := strings.Split(normalizedIPs, "\n")

	// 连接 Telnet
	address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
	conn, err := telnet.DialTo(address)
	if err != nil {
		return fmt.Sprintf("Telnet 连接失败: %v\n", err)
	}
	defer conn.Close()

	// 封装发送命令函数
	send := func(cmd string) {
		conn.Write([]byte(cmd + "\n"))
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
	}

	// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
	send(cfg.Username)
	send(cfg.Password)
	send("conf")
	for _, ip := range ipList {
		send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
	}
	send("exit")

	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

山石防火墙的命令基本和华三防火墙的命令基本一致,目前基本上可以支持大多数版本型号的防火墙,使用前可以做测试。

网盾K01

网盾K01的话这里采用的是关于api的利用,目前关于网盾K01的黑名单添加的模式是和防火墙不一致的,可以采用接口的方式增加。目前代码中关于黑名单添加的事件设置的是3600小时。

// 批量封禁
func AddBlacklist(devices []K01Device, ips []string, output *widget.Entry) {
	for _, device := range devices {
		appendOutput(output, fmt.Sprintf("🔑 正在登录设备 [https://%s]...", device.IP))

		// 登录并获取 Token
		url := fmt.Sprintf("https://%s", device.IP)
		token := Login(url, device.Username, device.Password, output, device.Name)
		if token == "" {
			appendOutput(output, fmt.Sprintf("❌ 登录失败: %s", device.Name))
			continue
		}

		// 逐个 IP 添加到黑名单
		for _, ip := range ips {
			// 如果 IP 地址没有 CIDR 后缀,添加 "/32"
			if !strings.Contains(ip, "/") {
				ip += "/32"
			}

			// 打印调试日志,确认每个 IP 地址
			appendOutput(output, fmt.Sprintf("🔍 封禁 IP: %s", ip))

			// 准备请求 payload
			payload := map[string]interface{}{
				"color":       0,
				"device_mask": []int{224},
				"items": []map[string]interface{}{
					{
						"type":        0,
						"ip":          ip, // 这里单独封禁每个 IP 地址
						"timeout":     336,
						"time_type":   "3600",
						"device_mask": []int{224},
						"comment":     "自动封禁",
					},
				},
				"method": "add",
			}

			// 打印调试日志,确认请求 Payload
			payloadBytes, _ := json.Marshal(payload)
			// appendOutput(output, fmt.Sprintf("🔍 请求 Payload: %s", string(payloadBytes)))

			// 请求封禁
			req, err := http.NewRequest("POST", url+"/api/v1/security/iplist/save", bytes.NewBuffer(payloadBytes))
			if err != nil {
				appendOutput(output, fmt.Sprintf("❌ 创建封禁请求失败: %s", err.Error()))
				continue
			}
			req.Header.Set("Authorization", "Bearer "+token)
			req.Header.Set("Content-Type", "application/json")

			// 执行请求
			resp, err := insecureClient.Do(req)
			if err != nil {
				appendOutput(output, fmt.Sprintf("❌ 封禁请求失败: %s", err.Error()))
				continue
			}
			defer resp.Body.Close()

			// // 打印调试日志,确认响应代码
			// appendOutput(output, fmt.Sprintf("🔍 响应状态: %s", resp.Status))

			// 解析响应
			body, _ := ioutil.ReadAll(resp.Body)
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				appendOutput(output, fmt.Sprintf("❌ 解析封禁响应失败: %s", err.Error()))
				continue
			}

			// // 打印调试日志,确认响应结果
			// appendOutput(output, fmt.Sprintf("🔍 响应结果: %v", result))

			// 根据结果显示成功或失败
			if success, ok := result["success"].(bool); ok && success {
				appendOutput(output, fmt.Sprintf("✅ 添加成功: %s", ip))
			} else {
				appendOutput(output, fmt.Sprintf("❌ 添加失败: %v", result["msg"]))
			}
		}
	}
}

// DeleteBlacklist 删除 IP 从黑名单
func DeleteBlacklist(devices []K01Device, selected []int, ipList []string, outputBox *widget.Entry) {
	for _, idx := range selected {
		device := devices[idx] // 获取当前选择的设备
		url := fmt.Sprintf("https://%s", device.IP)

		// 输出正在登录设备信息
		appendOutput(outputBox, fmt.Sprintf("🔄 正在登录设备【%s】...", device.Name))

		// 登录设备,获取 token
		token := Login(url, device.Username, device.Password, outputBox, device.Name)
		if token == "" {
			appendOutput(outputBox, fmt.Sprintf("❌ 设备【%s】登录失败,无法删除黑名单", device.Name))
			continue
		}

		// 遍历 IP 列表,直接进行删除操作
		for _, ip := range ipList {
			apiUrl := fmt.Sprintf("%s/api/v1/security/iplist/save", url)
			payload := fmt.Sprintf(`{
				"color": 0,
				"items": [{"id": "%s/32;0;0", "type": 0, "ip": "%s/32", "device_mask": [224]}],
				"method": "delete"
			}`, ip, ip)

			// 创建请求
			req, err := http.NewRequest("POST", apiUrl, bytes.NewBuffer([]byte(payload)))
			if err != nil {
				appendOutput(outputBox, "❌ 创建删除请求失败: "+err.Error())
				continue
			}
			req.Header.Set("Authorization", "Bearer "+token)
			req.Header.Set("Content-Type", "application/json")

			// 发送请求
			resp, err := insecureClient.Do(req)
			if err != nil {
				appendOutput(outputBox, "❌ 删除请求失败: "+err.Error())
				continue
			}
			defer resp.Body.Close()

			// 读取响应
			body, err := ioutil.ReadAll(resp.Body)
			if err != nil {
				appendOutput(outputBox, "❌ 读取删除响应失败: "+err.Error())
				continue
			}

			// 解析响应
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				appendOutput(outputBox, "❌ 解析删除响应失败: "+err.Error())
				continue
			}

			// 检查删除是否成功
			if success, ok := result["success"].(bool); ok && success {
				appendOutput(outputBox, fmt.Sprintf("✅ 删除成功,IP: %s", ip))
			} else {
				// 如果失败,输出错误信息
				if msg, ok := result["msg"].(string); ok {
					appendOutput(outputBox, fmt.Sprintf("❌ 删除失败: %s", msg))
				} else {
					appendOutput(outputBox, "❌ 删除失败,未知错误")
				}
			}
		}
	}

关于api的调用的话是有其自己的参数规则的,因为认证方式是https,所以关于身份认证的话需要忽略ssl证书。因为考虑到删除函数的逻辑需先查询要封禁的黑名单是否在黑名单列表,所以这里就执行的是直接删除,否则就删除失败,但是对于功能性的查询模块的功能是有的,由于网盾k01的产品特性,一点发现全网阻断,所以这里基本上用不到删除黑名单的功能。

QAX网神防火墙

网神联动利用的是ssh登录执行命令,所以这里权限也相对来说比较大,对于蓝队来讲不建议增加其它代码功能,原理也是利用策略引用地址簿进行封禁

func AddIPsToQAX(cfg QAXConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置奇安信设备 %s ===\n\n", cfg.DeviceIP))

	config := &ssh.ClientConfig{
		User: cfg.Username,
		Auth: []ssh.AuthMethod{
			ssh.Password(cfg.Password),
		},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
		Timeout:         15 * time.Second,
	}

	client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
	if err != nil {
		return fmt.Sprintf("SSH连接失败: %v\n", err)
	}
	defer client.Close()

	session, err := client.NewSession()
	if err != nil {
		return fmt.Sprintf("创建会话失败: %v\n", err)
	}
	defer session.Close()

	stdin, err := session.StdinPipe()
	if err != nil {
		return fmt.Sprintf("获取输入管道失败: %v\n", err)
	}

	modes := ssh.TerminalModes{
		ssh.ECHO:          0,
		ssh.TTY_OP_ISPEED: 14400,
		ssh.TTY_OP_OSPEED: 14400,
	}
	if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
		return fmt.Sprintf("设置终端失败: %v\n", err)
	}
	if err := session.Shell(); err != nil {
		return fmt.Sprintf("启动shell失败: %v\n", err)
	}

	commands := []string{
		"config terminal",
		fmt.Sprintf("object address %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network %s 32", ip))
	}
	commands = append(commands, "exit")

	for _, cmd := range commands {
		fmt.Fprintf(stdin, "%s\n", cmd)
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
	}
	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

向地址簿中添加ip地址核心代码

commands := []string{
	"config terminal",
	fmt.Sprintf("object address %s", cfg.AddressGroup),
}
for _, ip := range ips {
	commands = append(commands, fmt.Sprintf("network %s 32", ip))
}
commands = append(commands, "exit")

目前我也是查询了多款网神防火墙的手册,关于地址簿添加这块儿的命令版本是没有变化的,基本上支持大多数版本型号。

SXF_AF和WAF

这里SXF的设备是有两种模式可供选择的,但是目前SXF的ssh登录一般是由两段密码组成还有可能经常性的修改,所以这里使用的API的方式向地址簿中添加要封禁的IP地址,但是这里的话,策略务必要配置正确。

// 新增的结构体,用于表示包含 devices 字段的 JSON 数据结构
type DeviceList struct {
	Devices []SXFDevice `json:"devices"`
}

// 登录响应结构体
type loginResponse struct {
	Code int `json:"code"`
	Data struct {
		LoginResult struct {
			Token string `json:"token"`
		} `json:"loginResult"`
	} `json:"data"`
	Message string `json:"message"`
}

// 添加 IP 请求结构体
type ipRange struct {
	Start string `json:"start"`
}

type addIPRequest struct {
	IPRanges []ipRange `json:"ipRanges"`
}

// ✅ 防冲突:明确为 SXF 的登录函数
// 登录获取 token
func SXFLogin(device SXFDevice) (string, error) {
	// 打印设备信息,确保 IP、用户名和密码正确
	fmt.Printf("设备信息: IP=%s, Username=%s, Password=%s\n", device.IP, device.Username, device.Password)
	url := fmt.Sprintf("https://%s/api/v1/namespaces/public/login", device.IP)

	payload := map[string]string{
		"name":     device.Username,
		"password": device.Password,
	}
	jsonData, _ := json.Marshal(payload)

	client := &http.Client{
		Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
	}
	resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
	if err != nil {
		return "", fmt.Errorf("登录请求失败: %v", err)
	}
	defer resp.Body.Close()

	body, _ := io.ReadAll(resp.Body)

	// 打印响应内容进行调试
	fmt.Println("响应体内容:", string(body))

	var result struct {
		Code    int    `json:"code"`
		Message string `json:"message"`
		Data    struct {
			LoginResult struct {
				Token string `json:"token"`
			} `json:"loginResult"`
		} `json:"data"`
	}

	if err := json.Unmarshal(body, &result); err != nil {
		return "", fmt.Errorf("解析登录响应失败: %v", err)
	}

	// 检查返回的 code
	if result.Code != 0 {
		return "", fmt.Errorf("登录失败: %s", result.Message)
	}

	// 返回 token
	return result.Data.LoginResult.Token, nil
}

// 加载 JSON 文件并解析为设备切片
// 加载 JSON 文件并解析为设备切片
func loadSXFDevices(filePath string) ([]SXFDevice, error) {
	data, err := os.ReadFile(filePath)
	if err != nil {
		return nil, fmt.Errorf("设备配置文件加载失败: %v", err)
	}

	// 输出加载的 JSON 数据(用于调试)
	fmt.Printf("加载的 JSON 数据: %s\n", string(data))

	var devices []SXFDevice
	err = json.Unmarshal(data, &devices)
	if err != nil {
		return nil, fmt.Errorf("设备解析失败: %v", err)
	}

	// 输出解析后的设备信息(调试)
	fmt.Printf("加载的设备数量: %d\n", len(devices))
	for _, device := range devices {
		// 输出每台设备的详细信息,特别是 AddressGroup 字段
		fmt.Printf("设备信息:Name=%s, IP=%s, AddressGroup=%s\n", device.Name, device.IP, device.AddressGroup)
	}

	return devices, nil
}

// ✅ 添加多个 IP 到 SXF 地址组(支持日志回调)
// 在 AddToSXFBlacklist 函数内部,确保请求头正确设置
// 设备添加到黑名单的函数
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
// 添加多个 IP 到 SXF 地址组(支持日志回调)
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
func AddToSXFBlacklist(devices []SXFDevice, ips []string, logFn func(string)) error {

	for _, device := range devices {
		// 1. 登录
		token, err := SXFLogin(device)
		if err != nil {
			logFn(fmt.Sprintf("【%s】❌ 登录失败: %v", device.Name, err))
			continue
		}
		logFn(fmt.Sprintf("【%s】✅ 登录成功", device.Name))

		// 2. 添加每个IP
		for _, ip := range ips {
			ipRanges := []map[string]string{{"start": ip, "end": ip}}
			requestData := map[string]interface{}{"ipRanges": ipRanges}
			requestDataJSON, _ := json.Marshal(requestData)

			// 3. 发送请求
			url := fmt.Sprintf(
				"https://%s/api/v1/namespaces/public/ipgroups/%s?_arrayop=add",
				device.IP,
				device.AddressGroup,
			)
			req, _ := http.NewRequest("PATCH", url, bytes.NewBuffer(requestDataJSON))
			req.Header = http.Header{
				"token":        []string{token},
				"Content-Type": []string{"application/json"},
			}

			client := &http.Client{
				Transport: &http.Transport{
					TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
				},
			}
			resp, err := client.Do(req)
			if err != nil {
				logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %v", device.Name, ip, err))
				continue
			}
			defer resp.Body.Close()

			// 4. 处理响应
			body, _ := io.ReadAll(resp.Body)
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				logFn(fmt.Sprintf("【%s】❌ IP %s 响应解析失败: %v", device.Name, ip, err))
				continue
			}

			if code, ok := result["code"].(float64); ok && code == 0 {
				logFn(fmt.Sprintf("【%s】✅ IP %s 已添加到地址组[%s]",
					device.Name, ip, device.AddressGroup))
			} else {
				msg, _ := result["message"].(string)
				logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %s", device.Name, ip, msg))
			}
		}
	}
	return nil
}

如果想要扩展功能的话不建议使用API去扩展,毕竟太麻烦了,各种参数比较麻烦,没有ssh的方式登录使用命令操作简便。

整体界面效果

总结

其它厂商的防火墙确实没找到手册,有的是找到手册没有测试的设备,目前上述的设备和代码是完全没有问题的,如果想添加其它厂商的设备,可以给我操作手册,我这边更新新版本发布。每次更新后的授权时间是3个月

直接导航——即在浏览器中手动输入域名访问网站的行为——正面临前所未有的风险:一项新研究发现,绝大多数"停放"域名(主要是过期或闲置域名,以及热门网站的常见拼写错误)现在都被配置为重定向访问者至传播诈骗和恶意软件的网站。

2025年10月,模仿FBI网络犯罪投诉中心网站的相似域名曾显示无威胁的停放页面(左图),而移动用户则被立即导向欺诈内容(右图)。图片来源:Infoblox。

当互联网用户尝试访问过期域名或意外导航至相似的"域名抢注"网站时,通常会被导向域名停放公司的占位页面。这些公司通过展示付费第三方网站的链接,试图从错误流量中获利。

十年前,访问这些停放域名后被重定向至恶意网站的概率相对较低:2014年研究人员发现(PDF),无论访问者是否点击停放页面上的链接,停放域名将用户重定向至恶意网站的概率不足5%。

但在过去几个月的系列实验中,安全公司Infoblox的研究人员表示,他们发现情况现已完全逆转,恶意内容目前已成为停放网站的常态。

"在大规模实验中,我们发现超过90%的情况下,停放域名的访问者会被导向非法内容、诈骗、恐吓软件和杀毒软件订阅服务或恶意软件。这是因为停放公司将'点击'出售给广告商,而广告商又经常将这些流量转售给第三方。"Infoblox研究人员在今日发布的论文中写道。

Infoblox发现,如果访问者使用虚拟专用网络(VPN)或非住宅IP地址访问停放网站,这些网站会显示正常内容。例如,Scotiabank.com客户若将域名误输为scotaibank[.]com,使用VPN时会看到正常停放页面,但使用住宅IP地址访问则会被重定向至试图传播诈骗、恶意软件或其他不良内容的网站。需要强调的是,仅需使用住宅IP地址的移动设备或台式电脑访问拼写错误的域名,就会触发这种重定向。

据Infoblox调查,scotaibank[.]com的所有者拥有近3000个仿冒域名组合,包括gmail[.]com——该域名已被证实配置了用于接收邮件的独立邮件服务器。这意味着如果您在发送邮件给Gmail用户时不小心遗漏了"gmail.com"中的字母"l",这封邮件不会消失或退回,而是直接落入诈骗者手中。报告指出,该域名近期还被用于多起商业邮件入侵攻击,通过附带木马恶意软件的"付款失败"诱饵进行欺诈。

Infoblox发现该特定域名持有者(通过公共DNS服务器torresdns[.]com暴露)针对数十个顶级网站建立了域名抢注页面,包括Craigslist、YouTube、Google、Wikipedia、Netflix、TripAdvisor、Yahoo、eBay和Microsoft。这些抢注域名的无害化列表可在此处查看(所列域名中的点号已替换为逗号)。

Infoblox威胁研究员David Brunsdon表示,停放页面会让访问者经历一系列重定向链,同时通过IP地理位置、设备指纹识别和Cookie持续分析用户系统,以确定最终重定向目标。

"通常在威胁到达前会存在重定向链——涉及停放公司外部的一到两个域名,"Brunsdon说。"每次交接时设备都会被反复分析,然后被传递到恶意域名,或者如果判定不值得攻击,则转向Amazon.com或Alibaba.com等诱饵页面。"

访问scotaibank.com时的重定向路径样本。每个分支包含观察到的系列域名,包括颜色编码的着陆页。图片来源:Infoblox。

Infoblox指出,另一个控制domaincntrol[.]com的威胁行为体(该域名与GoDaddy名称服务器仅差一个字符)长期利用DNS配置中的拼写错误将用户导向恶意网站。但最近几个月发现,这种恶意重定向仅发生在访问者使用Cloudflare DNS解析器(1.1.1.1)查询错误配置域名时,其他所有访问者只会收到拒绝加载的页面。

研究人员发现,甚至知名政府域名的变体也已成为恶意广告网络的目标。

"当我们有研究人员尝试向FBI网络犯罪投诉中心(IC3)举报犯罪时,他们意外访问了ic3[.]org而非ic3[.]gov,"报告指出。"他们的手机很快被重定向到虚假的'Drive订阅已过期'页面。他们还算幸运只遇到诈骗;根据我们的研究,他们同样可能轻易遭遇信息窃取程序或木马病毒。"

Infoblox报告强调,他们追踪的恶意活动无法归因于任何已知方,研究中提及的域名停放或广告平台与其记录的恶意广告行为无关。

但报告最终指出,尽管停放公司声称只与顶级广告商合作,但这些域名的流量经常被转售给联盟网络,经过多次倒手后,最终广告商与停放公司已不存在直接业务关系。