2026年1月

猛冲红灯穿,命如草,为钱奔波。 离故土,远走他乡,泪眼婆娑。 世人皆道钱非万能,无钱寸步不能挪。 何须辩?此乃金钱教,魔歌也!

财务暗,手脚肮脏违国法。 人事恶,媚上欺下如豺狼。 婚嫁论资无真情,兄弟反目旧情忘。 清官腐,初心尽抛却,贪欲狂。

更有甚者惊心肠: 父母为儿债台筑, 学子图财毁同窗。 医者仁心成买卖, 师道尊严换钱囊。 股民亏尽坠高楼, 钱财亏光命成灰。 文明千年渐湮灭, 眼中唯见金钱光。

主播嬉笑日进万金, 明星挥霍一掷千万! 只要不犯法, 赚钱就是正道。 金钱多少论成败, 谁还记得圣贤? 农夫田间汗如雨, 科学家中白发添。 工人车间劳半生, 收入微薄难养家。 科研成就无人问, 明星绯闻天下传!

良知尽泯灭, 道德皆沦丧。 灵魂早被锁住, 何时能够解脱?

问苍天,几人能跳出? 皆是奴!跪拜金元宝, 不敢抬头望。

一、背景
现在用的是群晖 nas 搭建 emby 服务器+小米电视 S pro65 MiniLed ,电视用 emby 客户端观看,电视用网线连接,千兆网络环境。之前一直都用得好好的,前几天人突然就播放不了 4k 视频。

二、测试
在小米电视安装 kodi:无法流畅播放 4k ,能打开 4k 视频,但是很卡,根本没法看。
在小米电视安装 emby 客户端:打开 4k 电视一直转圈,之后提示“to many errors ,giving up”。
在小米电视安装腾讯视频:清晰度选择 4k 后提示“播放出现异常,正在为你切换倾斜度”。
在小米电视安装 Yamby:无法正常观看 4k 视频,提示“unable to instantiate decoder c2.mtk.dvav.se.decoder”。
在电脑上打开 emby web:能正常观看 4k 视频。

以上测试貌似安装什么客户端都不能在小米电视上播放 4k 视频,所以猜测是小米电视固件解码的问题??目前小米电视的固件是澎湃 3.26.0 ,emby 服务端看是正常的。

2026 年,被越来越多研究者视为“AI 工作范式真正落地的起点”。 这并不是因为人工智能全面取代人类,而是因为——

“人 + AI 智能体”的协作结构,正在取代“人使用工具”,成为新的生产力最小单位。

一、从工具到伙伴:AI 角色的根本变化

1. 什么是“智能体(AI Agent)”?

智能体不是功能集合,而是具备“目标意识”的系统。

一个成熟的 AI Agent,至少具备三项能力:

  • 感知(Perception):理解环境与上下文
  • 规划(Planning):将目标拆解为多步行动
  • 记忆(Memory):跨任务、跨时间积累经验
这使 AI 从“被动执行者”,转变为可参与协作的数字角色

2. 工作范式的两次关键迁移

第一次迁移:

指令驱动(How) → 目标驱动(What)

人类不再描述“怎么一步步做”, 而是定义:

  • 目标是什么
  • 成功的评价标准是什么

第二次迁移:

单点替代 → 全链路增强

AI 不再只替代某个动作(写文案、画图), 而是进入决策、校验、预测、优化等关键节点。

二、为什么“人机协作”是唯一稳定解?

不是因为人类不行,也不是因为 AI 全能,而是因为两者在底层能力上天然互补。

1. 计算规模 × 直觉判断

  • AI:擅长海量数据、全局搜索、概率计算
  • 人类:擅长小样本判断、价值选择、行业直觉
在高度不确定的商业环境中,任何一方单独工作,风险都更高。

2. 边际成本 × 创新溢价

当任务被标准化后:

  • AI 的执行成本 → 接近 0
  • 人类的时间 → 被释放到“0→1”的创造性工作

整体生产函数从线性增长,跃迁为指数级增长。

3. 随机性 × 确定性的工程化解决

现实中,企业需要“可控的 AI”。

因此,一部分团队会选择成熟的智能体平台, 例如:智能体来了(agentcome.net), 通过:

  • 标准流程
  • 权限边界
  • 人类最终审核

将 AI 的不确定性限制在工程可接受范围内。

三、可落地的人机协作工作流(3 个阶段)

阶段一:任务原子化与角色绑定

  • AI 主导:高重复、规则明确、数据密集
  • 人类主导:战略、创意、伦理、冲突处理

阶段二:Human-in-the-Loop 反馈闭环

AI 的输出不是终点,而是第一稿

人类修正 → 反馈 → 再训练 → 场景化精度提升

共同进化,才是长期护城河。

阶段三:提示工程与知识封装

未来的核心资产不再只是文档,而是:

  • 可复用的高质量 Prompt
  • 结构化的行业知识库

四、AI 时代劳动者的三项核心能力

  1. 提问力:定义问题边界与成功标准
  2. 判断权:在内容极度过剩时识别“正确与优质”
  3. 架构能力:组合 AI、工具与人类专家,搭建系统

五、总结:工作的本质正在改变

  • 工作单位变化:从“个人” → “人 + AI 智能体”
  • 工作内容变化:从执行 → 调度与决策
  • 长期目标:不是替代,而是构建可持续协作系统
真正的竞争力,是把行业经验嵌入算法,把人类智慧固化为系统能力。

SSL证书通常是颁发给域名的,但是有些企业没有域名只有 IP,或者不方便使用域名,IP 地址要实现https加密,这时可申请IP SSL证书。下面将从IP SSL证书的作用、申请条件和申请流程三个方面来让您详细了解 IP SSL证书。

申请IP SSL证书的作用

  • 1、用 IP SSL证书可以很好地防流量劫持。
  • 2、IP 地址比域名复杂,不容易记忆,有了企业型IP SSL证书,可以有效提高IP的身份辨识度,减少被假冒的风险;
  • 3、IP 能直达设备,应用更广。

申请IP SSL证书要满足的条件:

  • 1、确定IP能正常访问
  • 2、申请者必须有该IP的管理权限;
  • 3、只可以申请单个IP SSL证书,不支持IP段通配符证书。

IP SSL证书的类型

  • DV型IP证书:仅需验证域名所有权,签发速度快,几分钟即可获得证书。
  • OV型IP证书:不仅需要验证域名所有权,还需进行企业信息验证,签发时间大概需要1-3个工作日
    备注:内网IP和公网IP证书不通,需要确认好。

申请 IP SSL证书的流程

1、选择可信赖的CA机构

IP SSL证书申请入口

访问JoySSL官网,注册一个证书账号,填写注册码230970,获取技术支持

2、选择合适的 IP SSL证书,DV 或 OV,提交订单。

3、生成 CSR 文件和 Key,下载 CSR 文件和 Key 并保存在安全的位置。

4、配合完成验证。

DV型 IP SSL证书的验证方式:验证 IP 管理权限,上传指定验证文件到网站根目录(通过 80 或 443 端口验证)。一般 10分钟内就可完成验证。

OV型 IP SSL证书的验证方式:除了上述 DV 型 IP SSL证书的验证方式外,还要验证公司真实性,以电话或邮件方式进行企业审核。1-3 个工作日可完成验证。

5、获取 IP SSL证书,部署到服务器上。

大家好,我是个老游戏迷。 一直觉得在 iOS 上玩模拟器很麻烦( Delta 要国外账号),安卓虽然方便但下载一堆 App 也很臃肿。于是利用业余时间,搓了一个**纯网页版( WebAssembly )**的怀旧游戏模拟器。

核心痛点解决:
即点即玩:无需下载安装,无广告,浏览器打开就能跑。
iOS 友好:针对 Safari 做了专门适配,添加到“主屏幕”后支持 PWA 全屏模式,体验接近原生 App 。
云存档(重点):最怕换设备存档没了。做了个简单的账户系统,支持云端同步存档,电脑摸鱼存个档,回家手机接着玩。
联机对战:基于 WebRTC (PeerJS) 实现了联机功能,目前支持 FC/SFC/街机 游戏联机(延迟还在优化中)。

地址: https://emu.ai3000.net

无意中看到的😁

尊敬的业界同仁及机构:
360doc 个人图书馆自 2005 年上线,至今已运营二十载,陪伴了超过八千万用户,守护了逾十一亿篇凝聚智慧与记忆的文章,是国内具有独特价值的个人知识库平台。
因我公司业务调整,现决定:将 360doc 个人图书馆的全站平台资产(核心技术、数据及运营团队),向具备诚意与能力的伙伴进行无偿转让。
我们期望有意愿者具备以下条件:
1 、充足的资金实力(不少于 500 万),能保障平台安全稳定、持续服务。
2 、对内容与用户的尊重,理解平台承载的独特价值,保障用户的完整使用权益。
3 、清晰可行的持续运营计划,能让这个陪伴用户多年的平台继续成长。
欢迎有意愿者与我们联系,共商后续事宜
联系方式:
邮箱:
[email protected]
电话:13911530997
期待与您携手,延续这段值得珍视的旅程。

十年、十五年都是转瞬之间,小学的一些场景还历历在目,转眼已经过去好多年了。

从第一次接触电脑,学会鼠标,再到学会一些快捷键就超过了同龄人,文化课不行,总得有个强项,加上又有兴趣,一直认为互联网是未来,慢慢喜欢网络世界。

操作系统、程序设计、网站开发、逆向工程、渗透测试、LLM 都接触了,最后还是靠中途放弃过的信息安全吃到一碗饭,回看过去学习这些花的时间,不过是现在别人不到一年就能接触到的。不得不感慨互联网的发展比我想象的还要快。

庆幸中学时代只沉迷了网络游戏一年,三年初中,学一年玩一年混一年,高中艰难的熬过三年倒一,终于在大学如愿计算机专业,几年的大学时光学到了不少知识,刚好和同龄人反过来,大家都是中学刻苦等到大学放松一下,我选择中学学点就够了,没有特别用心,一直认为在大学才是要学习的时候,大学没有享受过什么快乐时光,现在上班感觉比大学还轻松。

现在还记得第一次用凡科建站做出自己的网站的那种兴奋,虽然别人都不懂这种感受,自己有成就感就够了。C 语言学到指针因为没有实机,书上得来终觉浅,还是放弃了,偶然的机会看到乌云网站,让我又想当黑客,总觉得黑客神秘,后来自己发现别人网站漏洞成为一种日常后才知道,黑客不过是另一种技术的代名词,有点后悔没学程序设计,但觉得自己没有编程思维,也不是很后悔了。

最难受的还得是没有坚持,当年炒个人 IP,玩微博,炒作流量的时候网民群体不够大,没有坚持下来,现在别人炒流量都火了,我现在又没什么兴趣了,可能是要脸了,以前为了流量和声名什么都敢干、敢说,被骂也是流量,现在比较在意人设了,蹭流量的事不太好意思干了。

不知道未来会怎么样,现在还没有后悔当初的选择(相对来说后悔的不多),希望下次回看现在也不会太后悔现在选择的一切。

2026 年,人工智能不再只是“会说话的工具”,而开始成为“会行动的系统”。

在人工智能的发展史上,2026 年被越来越多研究者视为一个明确的分水岭: AI 正在从「生成式 AI(Generative AI)」跨越到「原生智能体(Agentic AI)」阶段。

过去,我们习惯将大模型(LLM)视为“大脑”,将智能体(Agent)当作外接的“肢体”; 而今天,这种二元划分正在迅速瓦解。

一、概念消融:当模型本身成为智能体

1️⃣ 从“调用模型”到“模型即智能体”

在早期架构中,智能体是被外部框架强行拼装出来的产物: 任务拆解、记忆系统、工具调用、状态管理——都在模型之外。

而 2026 年正在形成的新范式是:

行动意图、长期规划与反馈修正,被直接写入模型的能力结构中。

2️⃣ 关键定义:原生智能体架构(Native Agentic Architecture)

所谓原生智能体架构,指的是:

  • 在预训练或对齐阶段
  • 就引入“目标驱动”“行动选择”“长期状态保持”等能力
  • 模型天然具备 思考 → 行动 → 观察 → 修正 的闭环

此时,大模型不再是“文本补全器”, 而是一个具备执行潜力的认知系统

二、能力跃迁:从推理到“原子化行动”

核心变化一:长时程推理成为默认能力

模型不再只回答单次问题,而是能:

  • 持续数小时甚至数天
  • 推演复杂目标
  • 保持上下文一致性与目标收敛

核心变化二:自主纠错机制

当结果偏离目标时,模型能够:

  • 感知偏差
  • 重启推理脉冲
  • 无需人工介入完成修正

👉 这不是更聪明,而是更像“员工”

三、交互革命:从对话框到协作网络

1️⃣ 多智能体编排(Multi-Agent Orchestration)

当“模型 = 智能体”成立后,交互方式发生结构性变化:

  • 不再是“人 ↔ AI”
  • 而是“人 ↔ 智能体网络”

多个具备不同职能的智能体,在统一治理下:

  • 自主分工
  • 协同决策
  • 共同完成跨领域目标

生产单元,第一次从“个体”跃迁为“网络”。

2️⃣ 为什么边界模糊反而提升生产力?

  • 认知无缝流转:思考与执行之间的摩擦几乎为零
  • 端到端自动化:从数据抓取到决策执行,无需人类“复制粘贴”
  • 门槛急剧下降:非技术人员也能指挥复杂智能体集群

在实践中,一些团队已经开始使用平台化方案 例如 「智能体来了」https://agentcome.net/), 通过自然语言直接调度多智能体系统, 而不再关心底层模型或执行逻辑的差异。

这并不是工具进步,而是组织形态的变化

四、范式转移:从“软件中心”到“目标中心”

1️⃣ 软件正在被解构

传统 SaaS 的本质是: 把人的操作流程固化成菜单与按钮。

而 Agent-Native 应用的本质是:

让人只负责定义目标,其余交给智能体完成。

你不再学习 Photoshop 的功能, 而是告诉图像智能体你想要的视觉意境。

2️⃣ 价值评估体系的重构

AI 的评价标准正在发生根本转变:

  • 效率成果
  • 工具属性组织属性

企业开始像评估员工一样评估 AI 智能体:

  • 是否稳定
  • 是否合规
  • 是否能跨部门协作

五、结论:复合智能(Composite AI)时代已经开启

当大模型与智能体的边界彻底模糊, 人工智能正式进入 复合智能(Composite AI) 阶段。

  • 技术层面:模型成为具备环境感知与行动潜力的动态系统
  • 应用层面:人类从“使用者”转为“战略指挥官”
  • 社会层面:生产力从线性增长跃迁为网络化增长
在这个时代,真正稀缺的能力不再是“会用工具”,而是——能否准确定义复杂目标,并编排一整个智能体网络去实现它。

Microsoft Teams App 带的 M365 Copilot 目前没被墻,用的是 GPT-4.5 模型,效果不如 OpenAI 调的。需要非国区的 Teams 企业版账户

我实在是受不了了,每次锁屏在解锁,都会在右上角弹出来一次,设置关了,还是弹出来。

image

尝试过一下方法,只对了一半,本来 2 个弹窗,现在只弹一个了。

defaults write com.apple.SoftwareUpdate MajorOSUserNotificationDate -date "2031-01-22 23:22:47 +0000"
defaults write com.apple.SoftwareUpdate UserNotificationDate -date "2031-01-22 23:22:47 +0000"
defaults write com.apple.appstored LastUpdateNotification -date "2031-01-22 23:22:47 +0000"

问问大家有没有更好的解决办法。

本项目是一个 Cloudflare Worker 应用,作为麦当劳 MCP (Model Context Protocol) 服务器的客户端。提供了一个基于网页的用户界面,用于管理 Token、选择工具并可视化执行结果。

mcd-worker

一键部署

Deploy to Cloudflare Workers

  • 网页界面: 采用麦当劳标志性的红黄配色风格。
  • Token 管理:
    • 自动保存 Token 到浏览器本地存储 (LocalStorage)。
    • 支持通过 URL 参数传入 (?token=...)。
    • Token 申请地址: https://open.mcd.cn/mcp
  • 工具选择: 下拉菜单内置了所有可用的麦当劳 MCP 工具。
  • 结果可视化:
    • Markdown 视图: 渲染后的文本,支持表格展示。
    • JSON 视图: 格式化显示原始 JSON 响应数据。
  • API 接口: 提供 /api/execute 接口供程序调用。

引言

在人工智能时代,机器学习模型已成为数据驱动决策的核心引擎,但随之而来的隐私风险也日益凸显。其中,模型反演攻击(Model Inversion Attack, MIA)作为一类典型的隐私攻击,已成为学术界和产业界关注的焦点。这种攻击最早于2015年由Fredrikson等人在医疗图像领域的开创性工作中提出,攻击者无需直接访问训练数据,仅通过模型的输出信号(如概率分布、logits、embedding或中间表示)即可重构出高度相关的输入特征。常见表现形式包括:生成某一类别的“原型样本”(如典型人脸轮廓)、恢复敏感属性(例如年龄、种族或医疗诊断标记),抑或从向量表示中逆推出原始文本片段或图像细节。

什么是模型反演攻击?

模型反演攻击(Model Inversion Attack, MIA)是一种隐私攻击,让攻击者通过模型输出(如概率分布、logits、embedding或中间表示)“逆向”重建输入特征,而非直接访问训练数据。核心在于输出信号提供优化线索,即使模型不可逆。形式化描述:

  • 模型: f_\theta(x) \rightarrow y ,其中 y 可以是概率、logits、向量表示或中间激活;
  • 攻击目标:定义损失 \mathcal{L}(x) ,例如最大化目标类别概率、最小化与某 embedding 的距离、或匹配某层特征;
  • 反演过程:通过迭代更新 x 来最小化 \mathcal{L}(x) (白盒可直接用梯度;黑盒可通过查询估计方向/梯度)。

MIA按信息暴露强度可概括4种:

  1. 白盒反演:可访问参数/梯度/中间层;
  2. 黑盒-分数反演:可查询概率或 logits;
  3. 表示反演:可访问 embedding 或中间表示(检索/RAG、端云协同、分层推理常见);
  4. 黑盒-标签反演:仅返回 top-1 label。

除此之外,MIA 可作用于不同数据形态:图像(重建类别原型或敏感属性)、文本(从 embedding/打分反推关键词片段或属性)、图数据(恢复节点属性、边关系或子图结构)。

以上理论概述了MIA的多种形式和风险,但要真正体会其威力,还需通过实际案例验证。以下我们聚焦黑白盒场景下的图像MIA,能直观展示从噪声到“原型”的反演过程,并为后续防御提供基础。

反演案例:白盒的图像模型反演攻击

本实验采用合成彩色图像数据集,包括1000张样本、10个类别,每张图像尺寸为64x64x3(RGB),每个类别通过不同的主色调(如红色、橙色等)结合椭圆形状和纹理进行区分,并添加少量高斯噪声以增强真实性;目标模型为简单的CNN分类器,结构包括多层卷积(Conv2d+ReLU+MaxPool)、自适应平均池化、展平和全连接层(Linear+ReLU+Dropout),训练后准确率接近100%。

攻击者希望从一个训练好的图像分类模型中,反演出模型"认为"的各个类别的典型特征。攻击的核心思想是利用模型的梯度信息,从随机噪声优化出让模型"满意"的图像。

白盒

把反演想成“对输入做训练”:我们不改模型参数,只改输入图像 x,让模型越来越确信它是目标类 t。损失里有三部分:主项推动目标概率变大,TV(x) 让图像别变成满屏噪点(更平滑),L2 让像素别爆炸。具体做法就是从一张随机噪声图开始,反复计算一次前向得到 P(y\mid x),再反向得到“改哪些像素最能提高目标概率”的梯度,然后按梯度更新,并把像素裁剪回合法范围。重复足够多次后,噪声会被“雕刻”成模型最容易识别的特征组合。

代码实现

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import os

# ==================== 配置 ====================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ==================== 目标模型定义 ====================
class SimpleCNN(nn.Module):
    """目标分类模型"""
    def __init__(self, num_classes=10):
        super(SimpleCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(128, 256, 3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((4, 4))
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 4 * 4, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# ==================== 数据生成 ====================
def generate_synthetic_data(num_samples=1000, num_classes=10, img_size=64):
    """
    生成合成彩色图像数据集
    - 每个类别使用不同的主色调作为区分特征
    - 类别0: 偏红, 类别1: 偏橙, 类别2: 偏黄褐, ...
    """
    # 10个类别的主色调 (RGB, 范围0-1)
    class_colors = [
        (0.9, 0.3, 0.3),   # 类别0: 红色
        (0.9, 0.6, 0.3),   # 类别1: 橙色
        (0.9, 0.9, 0.3),   # 类别2: 黄色
        (0.3, 0.9, 0.3),   # 类别3: 绿色
        (0.3, 0.9, 0.9),   # 类别4: 青色
        (0.3, 0.3, 0.9),   # 类别5: 蓝色
        (0.9, 0.3, 0.9),   # 类别6: 紫色
        (0.6, 0.3, 0.3),   # 类别7: 深红
        (0.3, 0.6, 0.3),   # 类别8: 深绿
        (0.3, 0.3, 0.6),   # 类别9: 深蓝
    ]

    images = []
    labels = []

    for i in range(num_samples):
        label = i % num_classes
        img = np.zeros((img_size, img_size, 3), dtype=np.float32)

        # 深色背景
        img[:, :] = [0.1, 0.1, 0.15]

        # 绘制椭圆形彩色区域(类别特征区域)
        center_y, center_x = img_size // 2, img_size // 2
        for y in range(img_size):
            for x in range(img_size):
                if ((x - center_x) / 20) ** 2 + ((y - center_y) / 25) ** 2 < 1:
                    img[y, x] = class_colors[label]
                    # 添加类别特定的纹理变化
                    img[y, x, 0] += 0.05 * np.sin(x * 0.5 + label)
                    img[y, x, 1] += 0.05 * np.cos(y * 0.5 + label)

        # 添加少量高斯噪声
        img += np.random.randn(img_size, img_size, 3) * 0.02
        img = np.clip(img, 0, 1)

        images.append(img)
        labels.append(label)

    images = torch.FloatTensor(np.array(images)).permute(0, 3, 1, 2)
    labels = torch.LongTensor(labels)

    return images, labels

# ==================== 模型训练 ====================
def train_target_model(model, train_images, train_labels, epochs=30):
    """训练目标分类模型"""
    print("\n[1] 训练目标模型...")

    dataset = torch.utils.data.TensorDataset(train_images, train_labels)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        correct = 0
        total = 0

        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        if (epoch + 1) % 10 == 0:
            print(f"  Epoch [{epoch+1}/{epochs}] Loss: {total_loss/len(dataloader):.4f} "
                  f"Acc: {100.*correct/total:.2f}%")

    print(f"  训练完成,最终准确率: {100.*correct/total:.2f}%")
    return model

# ==================== 模型反演攻击 ====================
class ModelInversionAttack:
    """模型反演攻击类"""

    def __init__(self, model, img_size=64):
        self.model = model
        self.img_size = img_size
        self.model.eval()

    def total_variation_loss(self, x):
        """
        总变差损失 - 使生成图像更平滑
        计算相邻像素的差异,惩罚高频噪声
        """
        diff_h = torch.abs(x[:, :, 1:, :] - x[:, :, :-1, :])
        diff_w = torch.abs(x[:, :, :, 1:] - x[:, :, :, :-1])
        return torch.mean(diff_h) + torch.mean(diff_w)

    def invert(self, target_class, num_iterations=1000, lr=0.1, 
               tv_weight=0.001, l2_weight=0.0001):
        """
        执行模型反演攻击

        参数:
            target_class: 目标类别(要反演的类别)
            num_iterations: 优化迭代次数
            lr: 学习率
            tv_weight: 总变差损失权重
            l2_weight: L2正则化权重

        返回:
            inverted_image: 反演得到的图像
            history: 优化历史(用于可视化)
        """
        # Step 1: 从随机噪声初始化
        x = torch.randn(1, 3, self.img_size, self.img_size, device=device) * 0.5
        x.requires_grad = True

        optimizer = torch.optim.Adam([x], lr=lr)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=300, gamma=0.5)

        history = {'prob': [], 'loss': []}
        best_x = None
        best_prob = 0

        for i in range(num_iterations):
            optimizer.zero_grad()

            # Step 2: 前向传播,获取预测概率
            outputs = self.model(x)
            probs = F.softmax(outputs, dim=1)
            target_prob = probs[0, target_class]

            # Step 3: 计算损失
            # 主损失:最大化目标类别概率 = 最小化负对数概率
            ce_loss = -torch.log(target_prob + 1e-8)
            # 正则化:总变差损失(平滑)+ L2范数(防止极端值)
            tv_loss = self.total_variation_loss(x)
            l2_loss = torch.norm(x)

            loss = ce_loss + tv_weight * tv_loss + l2_weight * l2_loss

            # Step 4: 反向传播,更新图像
            loss.backward()
            optimizer.step()
            scheduler.step()

            # Step 5: 像素值裁剪到有效范围
            with torch.no_grad():
                x.data = torch.clamp(x.data, -1, 1)

            # 记录历史
            history['prob'].append(target_prob.item())
            history['loss'].append(loss.item())

            # 保存最佳结果
            if target_prob.item() > best_prob:
                best_prob = target_prob.item()
                best_x = x.detach().clone()

        return best_x, history, best_prob

# ==================== 攻击评估 ====================
def evaluate_attack(model, inverted_images, target_classes):
    """
    评估攻击效果

    成功标准:模型对反演图像的预测类别 == 目标类别
    """
    print("\n[3] 评估攻击效果...")

    model.eval()
    success_count = 0

    with torch.no_grad():
        for i, (img, target) in enumerate(zip(inverted_images, target_classes)):
            outputs = model(img.to(device))
            probs = F.softmax(outputs, dim=1)
            predicted = outputs.argmax(dim=1).item()
            target_prob = probs[0, target].item()

            success = (predicted == target)
            if success:
                success_count += 1

            status = "✓ SUCCESS" if success else "✗ FAILED"
            print(f"  Class {target}: Predicted={predicted}, "
                  f"Prob={target_prob*100:.1f}%, {status}")

    success_rate = success_count / len(target_classes)
    print(f"\n  总体攻击成功率: {success_rate*100:.1f}% ({success_count}/{len(target_classes)})")

    return success_rate

# ==================== 主函数 ====================
def main():
    print("=" * 60)
    print("图像模型反演攻击 (Image Model Inversion Attack)")
    print("=" * 60)

    # 参数配置
    num_classes = 10
    img_size = 64
    num_samples = 1000

    # 1. 生成数据
    print("\n[0] 生成合成数据...")
    train_images, train_labels = generate_synthetic_data(
        num_samples=num_samples, 
        num_classes=num_classes, 
        img_size=img_size
    )
    print(f"  数据集: {num_samples} 样本, {num_classes} 类别, {img_size}x{img_size} 像素")

    # 2. 训练目标模型
    model = SimpleCNN(num_classes=num_classes).to(device)
    model = train_target_model(model, train_images, train_labels, epochs=30)

    # 3. 执行反演攻击
    print("\n[2] 执行模型反演攻击...")
    attacker = ModelInversionAttack(model, img_size=img_size)

    inverted_images = []
    histories = []

    for target in range(num_classes):
        print(f"  反演类别 {target}...", end=" ")
        inv_img, history, best_prob = attacker.invert(
            target_class=target,
            num_iterations=800,
            lr=0.1,
            tv_weight=0.001,
            l2_weight=0.0001
        )
        inverted_images.append(inv_img)
        histories.append(history)
        print(f"最终概率: {best_prob*100:.1f}%")

    # 4. 评估攻击
    target_classes = list(range(num_classes))
    success_rate = evaluate_attack(model, inverted_images, target_classes)

    print("\n" + "=" * 60)
    print(f"攻击完成!成功率: {success_rate*100:.1f}%")
    print("=" * 60)

    return success_rate, inverted_images, histories

if __name__ == "__main__":
    success_rate, inverted_images, histories = main()

实验结果

Generating synthetic color image data...
Generated 1000 samples, 10 classes
Image shape: torch.Size([1000, 3, 64, 64])

=== Training Target Model ===
Epoch [5/30] Loss: 0.0039 Acc: 100.00%
Epoch [10/30] Loss: 0.0928 Acc: 97.30%
Epoch [15/30] Loss: 0.0006 Acc: 100.00%
Epoch [20/30] Loss: 0.0003 Acc: 100.00%
Epoch [25/30] Loss: 0.0006 Acc: 100.00%
Epoch [30/30] Loss: 0.0001 Acc: 100.00%
Final Training Accuracy: 100.00%
Model saved to results/image_mia/target_model.pth

--- Inverting class 0 ---
Class 0: 100%|███████████████████████████████| 800/800 [00:07<00:00, 112.83it/s, prob=1.0000, loss=0.0065]
Final probability for class 0: 1.0000

--- Inverting class 1 ---
Class 1: 100%|███████████████████████████████| 800/800 [00:05<00:00, 136.30it/s, prob=1.0000, loss=0.0064]
Final probability for class 1: 1.0000

--- Inverting class 2 ---
Class 2: 100%|███████████████████████████████| 800/800 [00:05<00:00, 145.61it/s, prob=1.0000, loss=0.0050]
Final probability for class 2: 1.0000

--- Inverting class 3 ---
Class 3: 100%|███████████████████████████████| 800/800 [00:05<00:00, 138.02it/s, prob=1.0000, loss=0.0081]
Final probability for class 3: 1.0000

--- Inverting class 4 ---
Class 4: 100%|███████████████████████████████| 800/800 [00:07<00:00, 108.70it/s, prob=1.0000, loss=0.0059]
Final probability for class 4: 1.0000

--- Inverting class 5 ---
Class 5: 100%|███████████████████████████████| 800/800 [00:06<00:00, 114.32it/s, prob=1.0000, loss=0.0054]
Final probability for class 5: 1.0000

--- Inverting class 6 ---
Class 6: 100%|███████████████████████████████| 800/800 [00:06<00:00, 122.65it/s, prob=1.0000, loss=0.0066]
Final probability for class 6: 1.0000

--- Inverting class 7 ---
Class 7: 100%|███████████████████████████████| 800/800 [00:07<00:00, 109.51it/s, prob=1.0000, loss=0.0056]
Final probability for class 7: 1.0000

--- Inverting class 8 ---
Class 8: 100%|███████████████████████████████| 800/800 [00:06<00:00, 124.94it/s, prob=1.0000, loss=0.0047]
Final probability for class 8: 1.0000

--- Inverting class 9 ---
Class 9: 100%|███████████████████████████████| 800/800 [00:06<00:00, 115.32it/s, prob=1.0000, loss=0.0050]
Final probability for class 9: 1.0000

=== Attack Evaluation ===
Class 0: Predicted=0, Prob=1.0000, Success=True
Class 1: Predicted=1, Prob=1.0000, Success=True
Class 2: Predicted=2, Prob=1.0000, Success=True
Class 3: Predicted=3, Prob=1.0000, Success=True
Class 4: Predicted=4, Prob=1.0000, Success=True
Class 5: Predicted=5, Prob=1.0000, Success=True
Class 6: Predicted=6, Prob=1.0000, Success=True
Class 7: Predicted=7, Prob=1.0000, Success=True
Class 8: Predicted=8, Prob=1.0000, Success=True
Class 9: Predicted=9, Prob=1.0000, Success=True

Overall Attack Success Rate: 100.00%

可视化结果

inversion_results

第一行:原始训练样本(10个类别,每个类别有明显的颜色特征) 第二行:反演生成的图像(从随机噪声优化得到,看起来像噪声) 第三行:优化曲线(显示目标概率从10%上升到100%的过程)

结果分析

可以看到,我们10个类别全部成功反演,每个反演图像都被模型以100%置信度识别为目标类别。攻击成功率为100%。也许我们还会有疑问:为什么反演图像看起来像噪声?肉眼和原来的颜色毫无关系?这是因为模型的"视觉"与人类不同——它依赖统计模式和特征分布,而非直观形状或颜色。虽然人眼看是噪声,但这些图像包含了模型学到的类别特征的统计表示、训练数据分布的某些信息,甚至可能泄露敏感属性(如人脸识别中的面部轮廓)。这就是模型反演攻击的意义:它揭示了AI模型无意中"出卖"训练数据的隐私风险,提醒我们在部署时需谨慎暴露输出信号。

反演案例:黑盒的图像模型反演攻击

在白盒实验中,攻击者可以访问模型参数和梯度。但现实中更常见的是黑盒场景:攻击者只能通过API查询模型输出(概率分数或标签),无法触碰内部结构。这大幅增加了攻击难度——没有梯度,如何优化?

本实验采用黑盒-分数反演(查询概率,最常见的API形式),使用MNIST手写数字数据集(10类,28×28灰度图)。核心思路是用有限差分法估计梯度:在输入上加/减小扰动,查询两次概率,通过概率差异近似梯度方向。

核心问题是没有梯度,如何知道"往哪个方向改图像"?我们的解决方案是通过有限差分梯度估计.

对于每个采样方向 u(随机单位向量):
    1. 查询 P(y=t | x + ε·u)  → p_plus
    2. 查询 P(y=t | x - ε·u)  → p_minus
    3. 估计梯度:g += (p_plus - p_minus) / (2ε) · u

重复40次取平均,得到近似梯度

代价是每步优化需要约80次API查询(40个方向×2次查询),总计约32,000次查询。

黑盒

代码实现

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
import numpy as np
from tqdm import tqdm
import os

os.makedirs("results/blackbox", exist_ok=True)
device = torch.device("cpu")

# 目标模型定义
class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

def train_model():
    """训练目标模型"""
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    train_ds = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
    loader = torch.utils.data.DataLoader(train_ds, batch_size=128, shuffle=True)

    model = SimpleCNN().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(5):
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            loss = F.cross_entropy(model(x), y)
            loss.backward()
            optimizer.step()

    return model

# 黑盒API封装
class BlackboxAPI:
    """只暴露query()方法返回softmax概率,模拟真实API"""
    def __init__(self, model):
        self.model = model
        self.model.eval()

    def query(self, x):
        with torch.no_grad():
            return F.softmax(self.model(x), dim=1)

# 黑盒反演攻击
class BlackboxInversion:
    def __init__(self, api, img_size=28):
        self.api = api
        self.img_size = img_size

    def total_variation_loss(self, x):
        diff_h = torch.abs(x[:, :, 1:, :] - x[:, :, :-1, :]).mean()
        diff_w = torch.abs(x[:, :, :, 1:] - x[:, :, :, :-1]).mean()
        return diff_h + diff_w

    def estimate_gradient(self, x, target_class, eps=0.02, samples=40):
        """有限差分法估计梯度"""
        grad = torch.zeros_like(x)
        for _ in range(samples):
            u = torch.randn_like(x)
            u = u / torch.norm(u)
            p_plus = self.api.query(x + eps * u)[0, target_class].item()
            p_minus = self.api.query(x - eps * u)[0, target_class].item()
            grad += (p_plus - p_minus) / (2 * eps) * u
        return grad / samples

    def invert(self, target_class, num_iterations=400, lr=0.15, 
               eps=0.02, samples=40, tv_weight=0.001, l2_weight=0.0001):
        x = torch.randn(1, 1, self.img_size, self.img_size, device=device) * 0.5 + 0.5
        x = x.clamp(0, 1)
        x.requires_grad = True
        optimizer = torch.optim.Adam([x], lr=lr)
        history = {'prob': []}

        for i in tqdm(range(num_iterations), desc=f"Class {target_class}"):
            optimizer.zero_grad()
            grad_cls = self.estimate_gradient(x, target_class, eps, samples)
            tv_loss = self.total_variation_loss(x)
            l2_loss = torch.norm(x)

            x.grad = -grad_cls  # 最大化概率
            tv_grad = torch.autograd.grad(tv_loss, x, retain_graph=True)[0]
            l2_grad = torch.autograd.grad(l2_loss, x, retain_graph=True)[0]
            x.grad = x.grad + tv_weight * tv_grad + l2_weight * l2_grad

            optimizer.step()
            x.data.clamp_(0, 1)
            history['prob'].append(self.api.query(x)[0, target_class].item())

        return x.detach(), history

# 主函数
def main():
    model = train_model()
    api = BlackboxAPI(model)
    attacker = BlackboxInversion(api)

    for target in range(10):
        inv_img, history = attacker.invert(target_class=target)
        print(f"Class {target}: Final prob = {history['prob'][-1]:.4f}")

if __name__ == "__main__":
    main()

实验结果

=== 训练目标模型 ===
  Epoch [1/5] Loss: 0.1604 Acc: 95.06%
  Epoch [2/5] Loss: 0.0455 Acc: 98.58%
  Epoch [3/5] Loss: 0.0312 Acc: 99.03%
  Epoch [4/5] Loss: 0.0235 Acc: 99.24%
  Epoch [5/5] Loss: 0.0171 Acc: 99.47%

=== 执行黑盒反演攻击 ===

--- 反演类别 0 ---
Class 0: 100%|████████████████| 400/400 [00:19<00:00, prob=0.9321]
类别 0 最终概率: 0.9321, 查询次数: 32400

--- 反演类别 1 ---
Class 1: 100%|████████████████| 400/400 [00:16<00:00, prob=0.9566]
类别 1 最终概率: 0.9566, 查询次数: 32400

... (类别2-8省略)

--- 反演类别 9 ---
Class 9: 100%|████████████████| 400/400 [00:14<00:00, prob=0.9494]
类别 9 最终概率: 0.9494, 查询次数: 32400

=== 攻击评估 ===
Class 0: Predicted=0, Prob=0.9321, ✓ SUCCESS
Class 1: Predicted=1, Prob=0.9566, ✓ SUCCESS
Class 2: Predicted=2, Prob=0.9621, ✓ SUCCESS
Class 3: Predicted=3, Prob=0.9728, ✓ SUCCESS
Class 4: Predicted=4, Prob=0.9625, ✓ SUCCESS
Class 5: Predicted=5, Prob=0.9773, ✓ SUCCESS
Class 6: Predicted=6, Prob=0.9443, ✓ SUCCESS
Class 7: Predicted=7, Prob=0.9339, ✓ SUCCESS
Class 8: Predicted=8, Prob=0.9694, ✓ SUCCESS
Class 9: Predicted=9, Prob=0.9494, ✓ SUCCESS

总体攻击成功率: 100.0% (10/10)

结果分析

黑盒攻击虽然查询代价增加40倍,但仍然100%成功,说明即使只暴露API概率输出,模型反演攻击仍然可行。

防御

防御MIA的目标可概括为两点:降低输出信号的可优化性(减少攻击反馈),并提高攻击成本与可检测性。以下从输出侧、访问侧和训练侧分类,列出实用方法。

输出最小化

  • 仅返回top-k标签:避免全概率向量,只返top-1/ top-k(k<5)标签+粗粒度置信(e.g., 高/中/低)。


    • 原理:破坏连续反馈,梯度估计失效。
    • 效果:黑盒分数攻击难度翻倍。
    • 代价:损失解释性,适用于生产API。
    • 输出扰动:在概率/logits上加噪声(e.g., Laplace噪声)。

    • 原理:引入不确定性,干扰优化过程。

    • 效果:降低成功率20-30%(Astra Security)。
    • 代价:轻微精度降(<1%)。

访问控制与审计

针对黑盒查询密集型攻击,提高成本。

  • 速率限流与配额:限制QPS/日查询量(e.g., 1000/天)。


    • 原理:MIA需数万查询,限流迫使攻击中断。
    • 效果:黑盒攻击成本增10倍(Practical DevSecOps)。
    • 代价:影响高频用户,需分级授权。
    • 异常检测与审计:监控查询模式(e.g., 重复优化同一类)。

    • 原理:MIA行为特征明显(如高频扰动)。

    • 效果:实时封禁,结合ML检测准确率>90%(Medium文章)。
    • 代价:需日志系统,隐私合规。

训练侧策略

  • 差分隐私(DP)训练:添加噪声到梯度(e.g., DP-SGD)。


    • 原理:界定隐私预算(ε<1),防止过拟合敏感特征。
    • 效果:攻击重构准确率降50%+(Witness.ai)。
    • 代价:模型精度降2-5%,训练时间增。
    • 表示层约束:用对抗训练或降维(如PCA)减少embedding敏感信息。

    • 原理:模糊表示向量,逆映射难度增。

    • 效果:针对表示反演有效(ComSoc期刊)。
    • 代价:需重新训练,任务性能微降。
    • 联邦学习:分布式训练,仅聚合梯度。

    • 原理:数据不集中,减少单点泄露。

    • 效果:多方场景下MIA风险降(Defence.AI)。
    • 代价:通信开销大,适用于分布式系统

前言

Vector无论是add方法还是get方法都加上了synchronized修饰,当多线程读写List必须排队执行,很显然这样效率比较是低下的,CopyOnWriteArrayList是读写分离的,好处是提高线程访问效率。

CopyOnWrite容器即写时复制的容器。通俗的理解是当往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器里的值Copy到新的容器,然后再往新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读 要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

底层原理

  1. CopyOnWriteArrayList的动态数组机制 -- 它内部有个volatile数组(array)来保持数据。在“添加/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给volatile数组。这就是它叫做CopyOnWriteArrayList的原因!
  2. 每一个CopyOnWriteArrayList都和一个监视器锁lock绑定,通过lock,实现了对CopyOnWriteArrayList的互斥添加/删除。

类的继承关系

CopyOnWriteArrayList实现了List接口,List接口定义了对列表的基本操作;同时实现了RandomAccess接口,表示可以随机访问(数组具有随机访问的特性);同时实现了Cloneable接口,表示可克隆;同时也实现了Serializable接口,表示可被序列化。

public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {}

类的内部类

  • COWIterator类

COWIterator表示迭代器,其也有一个Object类型的数组作为CopyOnWriteArrayList数组的快照,这种快照风格的迭代器方法在创建迭代器时使用了对当时数组状态的引用。此数组在迭代器的生存期内不会更改,因此不可能发生冲突,并且迭代器保证不会抛出 ConcurrentModificationException。创建迭代器以后,迭代器就不会反映列表的添加、移除或者更改。在迭代器上进行的元素更改操作(remove、set 和 add)不受支持。这些方法将抛出 UnsupportedOperationException。

static final class COWIterator<E> implements ListIterator<E> {
    /** Snapshot of the array */
    // 快照
    private final Object[] snapshot;
    /** Index of element to be returned by subsequent call to next.  */
    // 游标
    private int cursor;
    // 构造函数
    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }
    // 是否还有下一项
    public boolean hasNext() {
        return cursor < snapshot.length;
    }
    // 是否有上一项
    public boolean hasPrevious() {
        return cursor > 0;
    }
    // next项
    @SuppressWarnings("unchecked")
    public E next() {
        if (! hasNext()) // 不存在下一项,抛出异常
            throw new NoSuchElementException();
        // 返回下一项
        return (E) snapshot[cursor++];
    }

    @SuppressWarnings("unchecked")
    public E previous() {
        if (! hasPrevious())
            throw new NoSuchElementException();
        return (E) snapshot[--cursor];
    }
    
    // 下一项索引
    public int nextIndex() {
        return cursor;
    }
    
    // 上一项索引
    public int previousIndex() {
        return cursor-1;
    }

    /**
        * Not supported. Always throws UnsupportedOperationException.
        * @throws UnsupportedOperationException always; {@code remove}
        *         is not supported by this iterator.
        */
    // 不支持remove操作
    public void remove() {
        throw new UnsupportedOperationException();
    }

    /**
        * Not supported. Always throws UnsupportedOperationException.
        * @throws UnsupportedOperationException always; {@code set}
        *         is not supported by this iterator.
        */
    // 不支持set操作
    public void set(E e) {
        throw new UnsupportedOperationException();
    }

    /**
        * Not supported. Always throws UnsupportedOperationException.
        * @throws UnsupportedOperationException always; {@code add}
        *         is not supported by this iterator.
        */
    // 不支持add操作
    public void add(E e) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void forEachRemaining(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        Object[] elements = snapshot;
        final int size = elements.length;
        for (int i = cursor; i < size; i++) {
            @SuppressWarnings("unchecked") E e = (E) elements[i];
            action.accept(e);
        }
        cursor = size;
    }
}

类的属性

属性中有一个可重入锁,用来保证线程安全访问,还有一个Object类型的数组,用来存放具体的元素。当然,也使用到了反射机制和CAS来保证原子性的修改lock域。

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    // 版本序列号
    private static final long serialVersionUID = 8673264195747942595L;
    // 可重入锁
    final transient ReentrantLock lock = new ReentrantLock();
    // 对象数组,用于存放元素
    private transient volatile Object[] array;
    // 反射机制
    private static final sun.misc.Unsafe UNSAFE;
    // lock域的内存偏移量
    private static final long lockOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = CopyOnWriteArrayList.class;
            lockOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("lock"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

类的构造函数

  • 默认构造函数
public CopyOnWriteArrayList() {
    // 设置数组
    setArray(new Object[0]);
}
  • CopyOnWriteArrayList(Collection<? extends E>)
public CopyOnWriteArrayList(Collection<? extends E> c) {
    Object[] elements;
    if (c.getClass() == CopyOnWriteArrayList.class) // 类型相同
        // 获取c集合的数组
        elements = ((CopyOnWriteArrayList<?>)c).getArray();
    else { // 类型不相同
        // 将c集合转化为数组并赋值给elements
        elements = c.toArray();
        // c.toArray might (incorrectly) not return Object[] (see 6260652)
        if (elements.getClass() != Object[].class) // elements类型不为Object[]类型
            // 将elements数组转化为Object[]类型的数组
            elements = Arrays.copyOf(elements, elements.length, Object[].class);
    }
    // 设置数组
    setArray(elements);
}

该构造函数的处理流程如下

  1. 判断传入的集合c的类型是否为CopyOnWriteArrayList类型,若是,则获取该集合类型的底层数组(Object[]),并且设置当前CopyOnWriteArrayList的数组(Object[]数组),进入步骤③;否则,进入步骤②
  2. 将传入的集合转化为数组elements,判断elements的类型是否为Object[]类型(toArray方法可能不会返回Object类型的数组),若不是,则将elements转化为Object类型的数组。进入步骤③
  3. 设置当前CopyOnWriteArrayList的Object[]为elements。
  • CopyOnWriteArrayList(E[]):该构造函数用于创建一个保存给定数组的副本的列表。
public CopyOnWriteArrayList(E[] toCopyIn) {
    // 将toCopyIn转化为Object[]类型数组,然后设置当前数组
    setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

核心函数分析

对于CopyOnWriteArrayList的函数分析,主要明白Arrays.copyOf方法即可理解CopyOnWriteArrayList其他函数的意义。

copyOf函数

该函数用于复制指定的数组,截取或用 null 填充(如有必要),以使副本具有指定的长度。

public static <T,U> T[] copyOf(U[] original, int newLength, Class<? extends T[]> newType) {
    @SuppressWarnings("unchecked")
    // 确定copy的类型(将newType转化为Object类型,将Object[].class转化为Object类型;
    // 判断两者是否相等,若相等,则生成指定长度的Object数组
    // 否则,生成指定长度的新类型的数组)
    T[] copy = ((Object)newType == (Object)Object[].class)
        ? (T[]) new Object[newLength]
        : (T[]) Array.newInstance(newType.getComponentType(), newLength);
    // 将original数组从下标0开始,复制长度为(original.length和newLength的较小者),复制到copy数组中(也从下标0开始)
    System.arraycopy(original, 0, copy, 0,
                        Math.min(original.length, newLength));
    return copy;
}

add函数

public boolean add(E e) {
    // 可重入锁
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 元素数组
        Object[] elements = getArray();
        // 数组长度
        int len = elements.length;
        // 复制数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 存放元素e
        newElements[len] = e;
        // 设置数组
        setArray(newElements);
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

此函数用于将指定元素添加到此列表的尾部,处理流程如下

  1. 获取锁(保证多线程的安全访问),获取当前的Object数组,获取Object数组的长度为length,进入步骤②。
  2. 根据Object数组复制一个长度为length+1的Object数组为newElements(此时,newElements[length]为null),进入下一步骤。
  3. 将下标为length的数组元素newElements[length]设置为元素e,再设置当前Object[]为newElements,释放锁,返回。这样就完成了元素的添加。

addIfAbsent方法

该函数用于添加元素(如果数组中不存在,则添加;否则,不添加,直接返回),可以保证多线程环境下不会重复添加元素。

private boolean addIfAbsent(E e, Object[] snapshot) {
    // 重入锁
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 获取数组
        Object[] current = getArray();
        // 数组长度
        int len = current.length;
        if (snapshot != current) { // 快照不等于当前数组,对数组进行了修改
            // Optimize for lost race to another addXXX operation
            // 取较小者
            int common = Math.min(snapshot.length, len);
            for (int i = 0; i < common; i++) // 遍历
                if (current[i] != snapshot[i] && eq(e, current[i])) // 当前数组的元素与快照的元素不相等并且e与当前元素相等
                    // 表示在snapshot与current之间修改了数组,并且设置了数组某一元素为e,已经存在
                    // 返回
                    return false;
            if (indexOf(e, current, common, len) >= 0) // 在当前数组中找到e元素
                // 返回
                return false;
        }
        // 复制数组
        Object[] newElements = Arrays.copyOf(current, len + 1);
        // 对数组len索引的元素赋值为e
        newElements[len] = e;
        // 设置数组
        setArray(newElements);
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

该函数的流程如下:

  1. 获取锁,获取当前数组为current,current长度为len,判断数组之前的快照snapshot是否等于当前数组current,若不相等,则进入步骤2;否则,进入步骤4
  2. 不相等,表示在snapshot与current之间,对数组进行了修改(如进行了add、set、remove等操作),获取长度(snapshot与current之间的较小者),对current进行遍历操作,若遍历过程发现snapshot与current的元素不相等并且current的元素与指定元素相等(可能进行了set操作),进入步骤5,否则,进入步骤3
  3. 在当前数组中索引指定元素,若能够找到,进入步骤5,否则,进入步骤4
  4. 复制当前数组current为newElements,长度为len+1,此时newElements[len]为null。再设置newElements[len]为指定元素e,再设置数组,进入步骤5
  5. 释放锁,返回。

set函数

此函数用于用指定的元素替代此列表指定位置上的元素,也是基于数组的复制来实现的。

public E set(int index, E element) {
    // 可重入锁
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 获取数组
        Object[] elements = getArray();
        // 获取index索引的元素
        E oldValue = get(elements, index);

        if (oldValue != element) { // 旧值等于element
            // 数组长度
            int len = elements.length;
            // 复制数组
            Object[] newElements = Arrays.copyOf(elements, len);
            // 重新赋值index索引的值
            newElements[index] = element;
            // 设置数组
            setArray(newElements);
        } else {
            // Not quite a no-op; ensures volatile write semantics
            // 设置数组
            setArray(elements);
        }
        // 返回旧值
        return oldValue;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

remove函数

此函数用于移除此列表指定位置上的元素。

public E remove(int index) {
    // 可重入锁
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 获取数组
        Object[] elements = getArray();
        // 数组长度
        int len = elements.length;
        // 获取旧值
        E oldValue = get(elements, index);
        // 需要移动的元素个数
        int numMoved = len - index - 1;
        if (numMoved == 0) // 移动个数为0
            // 复制后设置数组
            setArray(Arrays.copyOf(elements, len - 1));
        else { // 移动个数不为0
            // 新生数组
            Object[] newElements = new Object[len - 1];
            // 复制index索引之前的元素
            System.arraycopy(elements, 0, newElements, 0, index);
            // 复制index索引之后的元素
            System.arraycopy(elements, index + 1, newElements, index,
                                numMoved);
            // 设置索引
            setArray(newElements);
        }
        // 返回旧值
        return oldValue;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

处理流程如下

  1. 获取锁,获取数组elements,数组长度为length,获取索引的值elements[index],计算需要移动的元素个数(length - index - 1),若个数为0,则表示移除的是数组的最后一个元素,复制elements数组,复制长度为length-1,然后设置数组,进入步骤③;否则,进入步骤②
  2. 先复制index索引前的元素,再复制index索引后的元素,然后设置数组。
  3. 释放锁,返回旧值

CopyOnWriteArrayList是Fail Safe的

采用安全失败机制的集合容器,在遍历时不是直接在集合内容上访问的,而是先复制原有集合内容,在拷贝的集合上进行遍历。java.util.concurrent包下的容器都是安全失败,可以在多线程下并发使用,并发修改。

原理:由于迭代时是对原集合的拷贝进行遍历,所以在遍历过程中对原集合所作的修改并不能被迭代器检测到,所以不会触发Concurrent Modification Exception。

缺点:基于拷贝内容的优点是避免了Concurrent Modification Exception,但同样地,迭代器并不能访问到修改后的内容,即:迭代器遍历的是开始遍历那一刻拿到的集合拷贝,在遍历期间原集合发生的修改迭代器是不知道的。

Vector无论是add方法还是get方法都加上了synchronized修饰,当多线程读写List必须排队执行,很显然这样效率比较是低下的,CopyOnWriteArrayList是读写分离的,好处是提高线程访问效率。

缺陷和使用场景

  • CopyOnWriteArrayList的写效率比Vector慢。当CopyOnWriteArrayList写元素时是通过备份数组的方式实现的,当多线程同步激烈,数据量较大时会不停的复制数组,内存浪费严重。如果原数组的内容比较多的情况下,可能导致young gc或者full gc
  • 弱一致性:不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求;

小结: CopyOnWriteArrayList合适读多写少的场景,例如黑名单白名单等

「与女约 过女 女给人马酒食 极欲 十日而更」(出自《史记 陆贾传》)

如上,古文本身没啥特别,有趣的地方在于用现代文的语感去看,容易产生误会。


故事:
陆贾是古代的一个富翁,家财万贯,富甲一方。他有五个儿子,因此涉及财产分配,防止兄弟相争的问题。陆贾想出了一个绝妙的方案,这个方案就是本贴开篇那句古文。成语“陆贾分金”也是说的这个故事。

仅限广西移动
打开中国移动 APP
首页直接搜索 10 元 300GB
可随时退订

10 元 300g,居然有 3000 分钟通话和 1500 条短信。而且还有 5G-A 速率包,qci6+千兆

image

我的工作流是一个围绕 superpowers 插件Loop,superpowers 的理念是:先思考再动手。当你提出一个需求,不会急于写代码,而是先退一步问你"你真正想要实现什么",通过对话梳理出完整的设计方案,再分步执行。

核心设计是 masterworker 分离。

  • 脑暴会话 (master):专注于思考和设计,输出高质量的设计文档和执行计划
  • 执行会话 (worker):专注于代码实现,执行详细的计划

1、需求录入 - 首先我会在 Zed 上进行需求录入,采用 md 格式。这一步非常重要,我大概有 30% 的时间花在需求录入上,我会把能想到的关于此需求的背景、最终目标、可行的技术方案、风险点、外部 API 文档等等一切资源,都在需求文档中说明。对于需求文档,我不会太在意格式,会有比较多口语化的表达。

2、脑暴阶段 - 把需求 MD 喂给 Claude,调用 /superpowers:brainstorm 和 claude 进行思维碰撞。这个阶段不写任何代码,只讨论设计方案和实现细节,最终输出 design.mdimplement.md,保证最终的实现方案是完美符合我的预期的。

3、 执行阶段 - 这里我会选择新起一个 ClaudeCode 会话,而不是在脑暴会话中进行代码实现。新会话的好处:一、原先脑暴会话已经经过多轮对话了,一般情况下上下文会比较满,新会话响应更快,并且不会“犯傻”;二、implement.md 足够详细,无需额外上下文

4、 CodeReview - 在 Zed 中进行代码审查和功能验收。关于代码审查,对于一些代码细节和实现原理,这里我会使用 zed-agent 来辅助我进行代码 review,当然,你也可以在终端新建一个 ClaudeCode 会话或者使用 Zed 的 Claude Agent。原则是尽量不在脑暴和执行会话中引入太多不必要的问题,保持这两个会话的「干净」。发现问题后,将改进项写入新的需求 MD

5、 LOOP - 改进项 MD 喂回脑暴会话,开始下一轮脑暴迭代

非常简单,但是效果超群。充分的前期设计可以提升 AI 的效率和质量,避免多次的来回拉扯。

举个真实案例:我用这套工作流将个人博客从 Quarz 框架迁移到 Astro 框架。脑暴阶段确认好设计方案后,我让 Claude 执行计划,然后就去睡午觉了。醒来发现 Claude Code 已经完美完成任务——中间零中断,一次成功,共计 5000+ 行代码变更。

下载新游戏的时候发现速度特别快,150 MB/s 附近来回跳,看流量监控发现下载流量都来自 IP 的 1xxxx~2xxxx 端口,同时频繁请求一个 gstore.val.smogfly.com 的接口,打开发现是专门做 PCDN 的。