标签 线程安全 下的文章

为什么不推荐使用Stack

Java已不推荐使用Stack,而是推荐使用更高效的ArrayDeque

为什么不推荐使用

  • 性能低:是因为 Stack 继承自 Vector, 而 Vector 在每个方法中都加了锁。由于需要兼容老的项目,很难在原有的基础上进行优化,因此 Vector 就被淘汰掉了,使用 ArrayListCopyOnWriteArrayList 来代替,如果在非线程安全的情况下可以使用 ArrayList,线程安全的情况下可以使用 CopyOnWriteArrayList
  • 破坏了原有的数据结构:栈的定义是在一端进行 push 和 pop 操作,除此之外不应该包含其他 入栈和出栈 的方法,但是 Stack 继承自 Vector,使得 Stack 可以使用父类 Vector 公有的方法。

为什么现在还在用

但是为什么还有很多人在使用 Stack。总结了一下主要有两个原因。

  • JDK 官方是不推荐使用 Stack,之所以还有很多人在使用,是因为 JDK 并没有加 deprecation 注解,只是在文档和注释中声明不建议使用,但是很少有人会去关注其实现细节
  • 在笔试面试需要做算法题的时候,更多关注点是在解决问题的算法逻辑思路上,并不会关注在不同语言下 Stack 实现细节,但是对于使用 Java 语言的业务开发者,不仅需要关注算法逻辑本身,也需要关注它的实现细节

为什么推荐使用 Deque 接口替换栈

如果 JDK 不推荐使用 Stack,那应该使用什么集合类来替换栈,一起看看官方的文档。

正如图中标注部分所示,栈的相关操作应该由 Deque 接口来提供,推荐使用 Deque 这种数据结构, 以及它的子类,例如 ArrayDeque。

val stack: Deque<Int> = ArrayDeque()

使用 Deque 接口来实现栈的功能有什么好处:

  • 速度比 Stack 快

这个类作为栈使用时可能比 Stack 快,作为队列使用时可能比 LinkedList 快。因为原来的 Java 的 Stack 继承自 Vector,而 Vector 在每个方法中都加了锁,而 Deque 的子类 ArrayDeque 并没有锁的开销。

  • 屏蔽掉无关的方法

原来的 Java 的 Stack,包含了在任何位置添加或者删除元素的方法,这些不是栈应该有的方法,所以需要屏蔽掉这些无关的方法。声明为 Deque 接口可以解决这个问题,在接口中声明栈需要用到的方法,无需管子类是如何是实现的,对于上层使用者来说,只可以调用和栈相关的方法。

Stack 和 ArrayDeque的 区别

集合类型数据结构是否线程安全
Stack数组
ArrayDeque数组

Stack 常用的方法如下所示:

操作方法
入栈push(E item)
出栈pop()
查看栈顶peek() 为空时返回 null

ArrayDeque 常用的方法如下所示:

操作方法
入栈push(E item)
出栈poll() 栈为空时返回 nullpop() 栈为空时会抛出异常
查看栈顶peek() 为空时返回 null

Queue介绍

Java里有一个叫做Stack的类,却没有叫做Queue的类(它是个接口名字)。当需要使用栈时,Java已不推荐使用Stack,而是推荐使用更高效的ArrayDeque;既然Queue只是一个接口,当需要使用队列时也就首选ArrayDeque了(次选是LinkedList)。

Queue

Queue接口继承自Collection接口,除了最基本的Collection的方法之外,它还支持额外的insertion, extraction和inspection操作。这里有两组格式,共6个方法,一组是抛出异常的实现;另外一组是返回值的实现(没有则返回null)。

Deque

Deque 是"double ended queue", 表示双向的队列,英文读作"deck". Deque 继承自 Queue接口,除了支持Queue的方法之外,还支持 insert , remove 和 examine操作,由于Deque是双向的,所以可以对队列的头和尾都进行操作,它同时也支持两组格式,一组是抛出异常的实现;另外一组是返回值的实现(没有则返回null)。共12个方法如下:

当把 Deque 当做FIFO的 queue 来使用时,元素是从 deque 的尾部添加,从头部进行删除的; 所以 deque 的部分方法是和 queue 是等同的。具体如下:

Deque的含义是“double ended queue”,即双端队列,它既可以当作栈使用,也可以当作队列使用。下表列出了Deque与Queue相对应的接口:

下表列出了Deque与Stack对应的接口:

上面两个表共定义了Deque的12个接口。添加,删除,取值都有两套接口,它们功能相同,区别是对失败情况的处理不同。一套接口遇到失败就会抛出异常,另一套遇到失败会返回特殊值( false 或 null )。除非某种实现对容量有限制,大多数情况下,添加操作是不会失败的。虽然Deque的接口有12个之多,但无非就是对容器的两端进行操作,或添加,或删除,或查看。

ArrayDeque和LinkedList是Deque的两个通用实现,由于官方更推荐使用AarryDeque用作栈和队列,加之上一篇已经讲解过LinkedList,本文将着重讲解ArrayDeque的具体实现

从名字可以看出ArrayDeque底层通过数组实现,为了满足可以同时在数组两端插入或删除元素的需求,该数组还必须是循环的,即循环数组(circular array),也就是说数组的任何一点都可能被看作起点或者终点。ArrayDeque是非线程安全的(not thread-safe),当多个线程同时使用的时候,需要程序员手动同步;另外,该容器不允许放入 null 元素。

上图中我们看到, head 指向首端第一个有效元素, tail 指向尾端第一个可以插入元素的空位。因为是循环数组,所以 head 不一定总等于0, tail 也不一定总是比 head 大。

方法剖析

addFirst()

addFirst(E e)的作用是在Deque的首端插入元素,也就是在head的前面插入元素,在空间足够且下标没有越界的情况下,只需要将elements[--head] = e即可。

实际需要考虑:

  1. 空间是否够用
  2. 下标是否越界的问题

上图中,如果head为0之后接着调用addFirst(),虽然空余空间还够用,但head为-1,下标越界了。

//addFirst(E e)
public void addFirst(E e) {
    if (e == null)//不允许放入null
        throw new NullPointerException();
    elements[head = (head - 1) & (elements.length - 1)] = e;//2.下标是否越界
    if (head == tail)//1.空间是否够用
        doubleCapacity();//扩容
}

上述代码可以看到, 空间问题是在插入之后解决的;首先,因为tail总是指向下一个可插入的空位,也就意味着elements数组至少有一个空位,所以插入元素的时候不用考虑空间问题。

下标越界的处理解决起来非常简单,head = (head - 1) & (elements.length - 1)就可以了,这段代码相当于取余,同时解决了head为负值的情况。因为elements.length必需是2的指数倍,elements - 1就是二进制低位全1,跟head - 1相与之后就起到了取模的作用,如果head - 1为负数(其实只可能是-1),则相当于对其取相对于elements.length的补码。

计算机里数值都是用补码表示的,如果是8位的,-1就是1111 1111,而 (elements.length - 1) 也是 1111 1111,因此两者相与也就是(elements.length - 1);

head = (head - 1) & (elements.length - 1) 最后再让算出的位置赋值给head,因此其实这段代码就是让head再从后往前赋值

扩容函数doubleCapacity(),其逻辑是申请一个更大的数组(原数组的两倍),然后将原数组复制过去。过程如下图所示:

图中可以看到,复制分两次进行,第一次复制head右边的元素,第二次复制head左边的元素。

//doubleCapacity()
private void doubleCapacity() {
    assert head == tail;
    int p = head;
    int n = elements.length;
    int r = n - p; // head右边元素的个数
    int newCapacity = n << 1;//原空间的2倍
    if (newCapacity < 0)
        throw new IllegalStateException("Sorry, deque too big");
    Object[] a = new Object[newCapacity];
    System.arraycopy(elements, p, a, 0, r);//复制右半部分,对应上图中绿色部分
    System.arraycopy(elements, 0, a, r, p);//复制左半部分,对应上图中灰色部分
    elements = (E[])a;
    head = 0;
    tail = n;
}

addLast()

addLast(E e)的作用是在Deque的尾端插入元素,也就是在tail的位置插入元素,由于tail总是指向下一个可以插入的空位,因此只需要elements[tail] = e;即可。插入完成后再检查空间,如果空间已经用光,则调用doubleCapacity()进行扩容。

public void addLast(E e) {
    if (e == null)//不允许放入null
        throw new NullPointerException();
    elements[tail] = e;//赋值
    if ( (tail = (tail + 1) & (elements.length - 1)) == head)//下标越界处理
        doubleCapacity();//扩容
}

pollFirst()

pollFirst()的作用是删除并返回Deque首端元素,也即是head位置处的元素。如果容器不空,只需要直接返回elements[head]即可,当然还需要处理下标的问题。由于ArrayDeque中不允许放入null,当elements[head] == null时,意味着容器为空。

public E pollFirst() {
    int h = head;
    E result = elements[head];
    if (result == null)//null值意味着deque为空
        return null;
    elements[h] = null;//let GC work
    head = (head + 1) & (elements.length - 1);//下标越界处理
    return result;
}

pollLast()

pollLast()的作用是删除并返回Deque尾端元素,也即是tail位置前面的那个元素。

public E pollLast() {
    int t = (tail - 1) & (elements.length - 1);//tail的上一个位置是最后一个元素
    E result = elements[t];
    if (result == null)//null值意味着deque为空
        return null;
    elements[t] = null;//let GC work
    tail = t;
    return result;
}

peekFirst()

peekFirst()的作用是返回但不删除Deque首端元素,也即是head位置处的元素,直接返回elements[head]即可。

public E peekFirst() {
    return elements[head]; // elements[head] is null if deque empty
}

peekLast()

peekLast()的作用是返回但不删除Deque尾端元素,也即是tail位置前面的那个元素。

public E peekLast() {
    return elements[(tail - 1) & (elements.length - 1)];
}

简介

BlockingCollection<T>.NET 中非常重要且实用的线程安全、阻塞式的生产者-消费者集合类,位于 System.Collections.Concurrent 命名空间。

BlockingCollection 不是队列,
而是一个“带阻塞语义的并发管道(Blocking Producer–Consumer Abstraction)”。
在并发集合外面,加了一层“阻塞 + 容量控制 + 完成语义”

什么是生产者-消费者模式?

// 生产者线程 → [BlockingCollection] → 消费者线程
// 1. 生产者添加项目,如果集合已满则阻塞等待
// 2. 消费者取出项目,如果集合为空则阻塞等待
// 3. 自动的线程同步和资源管理

核心定位与价值

BlockingCollection<T> 是一个包装器,它可以基于以下几种底层集合来工作(默认使用 ConcurrentQueue<T>):

底层集合类型默认有界(Bounded)特点
ConcurrentQueue<T>可选FIFO,性能最高
ConcurrentStack<T>可选LIFO
ConcurrentBag<T>可选无序,插入/取出最快
自定义 IProducerConsumerCollection<T>可选高度自定义

在多线程场景中,“生产者线程生产数据,消费者线程消费数据” 是高频场景(如日志收集、任务队列、消息处理)。若用普通集合(如List<T>)+ 手动锁实现,需处理:

  • 线程安全(加 lock );
  • 空集合时消费者等待(Monitor.Wait);
  • 满集合时生产者等待(Monitor.Wait);
  • 数据就绪时唤醒等待线程(Monitor.Pulse)。

BlockingCollection<T> 封装了上述所有逻辑,核心价值:

  • 开箱即用的阻塞逻辑:空集合消费阻塞、满集合生产阻塞;
  • 线程安全:所有操作(添加 / 移除 / 遍历)均线程安全;
  • 支持边界限制:可设置集合最大容量(满则阻塞生产者);
  • 支持取消 / 完成:可优雅停止生产 / 消费,避免线程卡死;
  • 灵活的底层存储:默认基于 ConcurrentQueue<T>(先进先出),也可指定 ConcurrentStack<T>/ConcurrentBag<T>

最常用的几种创建方式

// 1. 最常用:无界队列(推荐用于大多数场景)
var bc = new BlockingCollection<string>();

// 2. 有界队列(限制容量,生产者满时会阻塞)
var bcBounded = new BlockingCollection<string>(boundedCapacity: 100);

// 3. 指定底层集合 + 有界
var bcStack = new BlockingCollection<string>(
    new ConcurrentStack<string>(),
    boundedCapacity: 50);

// 4. 基于已有的集合(高级用法)
var queue = new ConcurrentQueue<string>();
var bcFromExisting = new BlockingCollection<string>(queue, 200);

核心 API 与基础使用

核心构造函数

  • BlockingCollection<T>(): 默认构造:无边界限制,底层用 ConcurrentQueue<T>
  • BlockingCollection<T>(int boundedCapacity): 指定最大容量(边界),满则生产者阻塞
  • BlockingCollection<T>(IProducerConsumerCollection<T>): 自定义底层存储(如ConcurrentStack<T>
  • BlockingCollection<T>(IProducerConsumerCollection<T>, int): 自定义存储 + 最大容量

核心方法 / 属性

  • Add(T item): 向集合添加元素:若集合满则阻塞,直到有空间
  • Add(T item, CancellationToken): 带取消令牌的 Add:可中途取消阻塞
  • Take(): 从集合移除并返回元素:若集合空则阻塞,直到有元素
  • Take(CancellationToken): 带取消令牌的 Take:可中途取消阻塞
  • TryAdd(T item, int millisecondsTimeout): 尝试添加:超时返回 false(非阻塞)
  • TryTake(out T item, int millisecondsTimeout): 尝试获取:超时返回 false(非阻塞)
  • CompleteAdding(): 标记 “添加完成”:后续 Add 会抛异常,Take 在集合空后退出
  • IsAddingCompleted: 判断是否已调用 CompleteAdding()
  • IsCompleted: 判断是否 “添加完成且集合为空”
  • BoundedCapacity: 集合最大容量(-1 表示无限制)

核心操作方法

public class CoreOperations
{
    public static void DemonstrateOperations()
    {
        var collection = new BlockingCollection<string>(boundedCapacity: 3);
        
        // 1. 添加项目
        collection.Add("项目1"); // 阻塞直到有空间
        
        // 2. 尝试添加(不阻塞)
        bool added = collection.TryAdd("项目2", millisecondsTimeout: 0);
        Console.WriteLine($"尝试添加结果: {added}");
        
        // 3. 带超时的添加
        bool addedWithTimeout = collection.TryAdd("项目3", 
            millisecondsTimeout: 1000); // 最多等待1秒
        Console.WriteLine($"带超时添加结果: {addedWithTimeout}");
        
        // 4. 取出项目(阻塞)
        string item1 = collection.Take(); // 阻塞直到有项目可取
        Console.WriteLine($"取出: {item1}");
        
        // 5. 尝试取出(不阻塞)
        bool taken = collection.TryTake(out string item2, millisecondsTimeout: 0);
        Console.WriteLine($"尝试取出结果: {taken}, 项目: {item2}");
        
        // 6. 查看但不移除
        bool peeked = collection.TryPeek(out string item3);
        Console.WriteLine($"查看结果: {peeked}, 项目: {item3}");
        
        // 7. 完成添加
        collection.CompleteAdding();
        Console.WriteLine($"IsAddingCompleted: {collection.IsAddingCompleted}");
        Console.WriteLine($"IsCompleted: {collection.IsCompleted}");
        
        // 8. 获取当前所有项目(不阻塞)
        string[] allItems = collection.ToArray();
        Console.WriteLine($"当前项目数: {allItems.Length}");
    }
}

基础示例:简单生产者 - 消费者

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class BlockingCollectionBasicDemo
{
    static void Main()
    {
        // 创建阻塞集合,最大容量为5(满则生产者阻塞)
        var bc = new BlockingCollection<int>(5);

        // 1. 生产者线程:生产1-10的数字
        Task producer = Task.Run(() =>
        {
            for (int i = 1; i <= 10; i++)
            {
                bc.Add(i); // 满则阻塞
                Console.WriteLine($"生产者:添加 {i},当前集合数量:{bc.Count}");
                Thread.Sleep(100); // 模拟生产耗时
            }
            // 标记添加完成:消费者知道不会有新数据了
            bc.CompleteAdding();
            Console.WriteLine("生产者:完成所有生产,标记添加完成");
        });

        // 2. 消费者线程:消费所有数字
        Task consumer = Task.Run(() =>
        {
            // GetConsumingEnumerable():遍历集合,空则阻塞,直到CompleteAdding且空
            foreach (int item in bc.GetConsumingEnumerable())
            {
                Console.WriteLine($"消费者:消费 {item},当前集合数量:{bc.Count}");
                Thread.Sleep(500); // 模拟消费耗时(比生产慢,会导致集合堆积)
            }
            Console.WriteLine("消费者:所有数据消费完成");
        });

        // 等待所有任务完成
        Task.WaitAll(producer, consumer);
        bc.Dispose(); // 释放资源
    }
}

输出结果

生产者:添加 1,当前集合数量:1
生产者:添加 2,当前集合数量:2
生产者:添加 3,当前集合数量:3
生产者:添加 4,当前集合数量:4
生产者:添加 5,当前集合数量:5
消费者:消费 1,当前集合数量:4
生产者:添加 6,当前集合数量:5  // 消费后腾出空间,生产者继续添加
生产者:添加 7,当前集合数量:5  // 集合再次满,生产者阻塞
消费者:消费 2,当前集合数量:4
生产者:添加 8,当前集合数量:5
...(后续依次消费和生产)
生产者:完成所有生产,标记添加完成
消费者:消费 10,当前集合数量:0
消费者:所有数据消费完成

核心现象:

  • 集合容量设为 5,生产者添加到 5 个后阻塞,直到消费者消费 1 个腾出空间;
  • GetConsumingEnumerable() 自动处理阻塞逻辑,无需手动判断集合是否为空;
  • CompleteAdding() 后,消费者遍历完剩余数据即退出,不会无限阻塞。

高级用法详解

边界限制(Bounded Capacity)

通过构造函数指定 boundedCapacity,实现 “生产者限流”:

// 最大容量3,满则生产者阻塞
var bc = new BlockingCollection<string>(3);

// 生产者1:快速添加3个元素,第4个会阻塞
Task.Run(() =>
{
    bc.Add("A");
    bc.Add("B");
    bc.Add("C");
    Console.WriteLine("生产者1:已添加3个,准备添加第4个(会阻塞)");
    bc.Add("D"); // 阻塞,直到消费者消费一个
    Console.WriteLine("生产者1:第4个元素添加成功");
});

// 消费者1:2秒后消费一个元素
Task.Run(() =>
{
    Thread.Sleep(2000);
    var item = bc.Take();
    Console.WriteLine($"消费者1:消费 {item}");
});

取消阻塞(CancellationToken)

CancellationToken 中断阻塞的 Add/Take 操作,避免线程永久阻塞:

var cts = new CancellationTokenSource();
// 3秒后取消
cts.CancelAfter(3000);

var bc = new BlockingCollection<int>();

// 生产者:尝试添加,3秒后取消
Task.Run(() =>
{
    try
    {
        // 集合无边界,此处不会阻塞,但演示取消逻辑
        for (int i = 1; ; i++)
        {
            bc.Add(i, cts.Token);
            Console.WriteLine($"添加 {i}");
            Thread.Sleep(500);
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("生产者:添加操作被取消");
        bc.CompleteAdding();
    }
});

// 消费者:尝试消费,3秒后取消
Task.Run(() =>
{
    try
    {
        while (true)
        {
            int item = bc.Take(cts.Token);
            Console.WriteLine($"消费 {item}");
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("消费者:消费操作被取消");
    }
});

自定义底层存储

默认底层是 ConcurrentQueue<T>(FIFO),可指定 ConcurrentStack<T>(LIFO)或 ConcurrentBag<T>(无序):

// 底层用ConcurrentStack(栈:后进先出)
var bc = new BlockingCollection<int>(new ConcurrentStack<int>());

bc.Add(1);
bc.Add(2);
bc.Add(3);

// Take会获取最后添加的3(栈顶)
Console.WriteLine(bc.Take()); // 输出:3
Console.WriteLine(bc.Take()); // 输出:2
Console.WriteLine(bc.Take()); // 输出:1

多生产者 / 多消费者

BlockingCollection<T> 天然支持多生产者、多消费者并发操作,无需额外同步:

var bc = new BlockingCollection<int>(10);

// 3个生产者线程
for (int i = 0; i < 3; i++)
{
    int producerId = i + 1;
    Task.Run(() =>
    {
        for (int j = 1; j <= 5; j++)
        {
            int value = producerId * 100 + j;
            bc.Add(value);
            Console.WriteLine($"生产者{producerId}:添加 {value}");
            Thread.Sleep(100);
        }
    });
}

// 2个消费者线程
for (int i = 0; i < 2; i++)
{
    int consumerId = i + 1;
    Task.Run(() =>
    {
        foreach (var item in bc.GetConsumingEnumerable())
        {
            Console.WriteLine($"消费者{consumerId}:消费 {item}");
            Thread.Sleep(200);
        }
    });
}

// 等待所有生产者完成后标记添加完成
Task.Delay(2000).ContinueWith(_ => bc.CompleteAdding());

数据流水线(Pipeline)模式

public class DataPipelineExample
{
    public static void RunPipeline()
    {
        // 创建三个阶段的流水线
        var stage1 = new BlockingCollection<string>(boundedCapacity: 10);
        var stage2 = new BlockingCollection<string>(boundedCapacity: 10);
        var stage3 = new BlockingCollection<string>(boundedCapacity: 10);
        
        CancellationTokenSource cts = new CancellationTokenSource();
        
        // 阶段1:数据源
        var sourceTask = Task.Run(() =>
        {
            try
            {
                for (int i = 1; i <= 20; i++)
                {
                    string data = $"原始数据{i}";
                    stage1.Add(data, cts.Token);
                    Console.WriteLine($"阶段1: 产生 {data}");
                    Thread.Sleep(50);
                }
                
                stage1.CompleteAdding();
                Console.WriteLine("阶段1完成");
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("阶段1被取消");
            }
        });
        
        // 阶段2:数据处理
        var processorTask = Task.Run(() =>
        {
            try
            {
                foreach (var item in stage1.GetConsumingEnumerable(cts.Token))
                {
                    string processed = $"处理过的[{item}]";
                    stage2.Add(processed, cts.Token);
                    Console.WriteLine($"阶段2: 处理 {item} -> {processed}");
                    Thread.Sleep(100);
                }
                
                stage2.CompleteAdding();
                Console.WriteLine("阶段2完成");
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("阶段2被取消");
            }
        });
        
        // 阶段3:数据输出
        var outputTask = Task.Run(() =>
        {
            try
            {
                foreach (var item in stage2.GetConsumingEnumerable(cts.Token))
                {
                    string result = $"最终结果<{item}>";
                    stage3.Add(result, cts.Token);
                    Console.WriteLine($"阶段3: 输出 {item} -> {result}");
                    Thread.Sleep(80);
                }
                
                stage3.CompleteAdding();
                Console.WriteLine("阶段3完成");
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("阶段3被取消");
            }
        });
        
        // 监控输出
        var monitorTask = Task.Run(() =>
        {
            int count = 0;
            foreach (var item in stage3.GetConsumingEnumerable())
            {
                count++;
                Console.WriteLine($"监控: 收到第{count}个结果: {item}");
            }
            
            Console.WriteLine($"监控: 总共收到 {count} 个结果");
        });
        
        // 运行5秒后取消
        Task.Run(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("\n流水线运行5秒,发送取消信号...");
            cts.Cancel();
        });
        
        try
        {
            Task.WaitAll(sourceTask, processorTask, outputTask, monitorTask, 10000);
        }
        catch (AggregateException ex)
        {
            Console.WriteLine($"任务异常: {ex.Flatten().Message}");
        }
        
        Console.WriteLine("流水线运行结束");
    }
}

使用场景

适合场景

  • CPU 线程池任务
  • 后台 Worker
  • 批处理系统
  • ETL 管道
  • 传统 Producer–Consumer

不适合场景

  • async/await
  • 高吞吐低延迟网络 IO
  • UI 线程
  • 实时系统

总结

  • BlockingCollection<T>.NET 官方的阻塞式线程安全集合,核心适配 “生产者 - 消费者” 模型;
  • 核心特性:空集合消费阻塞、满集合生产阻塞,支持边界限制、取消操作、自定义底层存储;
  • 核心 API:Add()(生产)、Take()(消费)、CompleteAdding()(标记生产完成)、GetConsumingEnumerable()(遍历消费);
  • 关键坑点:必须调用 CompleteAdding() 避免消费者永久阻塞,使用后需 Dispose 释放资源;
  • 适用场景:日志收集、任务队列、消息分发、多线程数据处理等生产者 - 消费者场景,优先使用而非手动实现。

InheritableThreadLocal相比ThreadLocal多一个能力:在创建子线程Thread时,子线程Thread会自动继承父线程的InheritableThreadLocal信息到子线程中,进而实现在在子线程获取父线程的InheritableThreadLocal值的目的。

关于ThreadLocal详细内容,可以看这篇文章:史上最全ThreadLocal 详解

和 ThreadLocal 的区别

举个简单的栗子对比下InheritableThreadLocal和ThreadLocal:

public class InheritableThreadLocalTest {    
    private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();    
    private static final InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();    

    public static void main(String[] args) {        
        testThreadLocal();        
        testInheritableThreadLocal();    
    }    

    /**     * threadLocal测试     */    
    public static void testThreadLocal() {       
         // 在主线程中设置值到threadLocal        
         threadLocal.set("我是父线程threadLocal的值");        
         // 创建一个新线程并启动        
         new Thread(() -> {            
                 // 在子线程里面无法获取到父线程设置的threadLocal,结果为null            
                 System.out.println("从子线程获取到threadLocal的值: " + threadLocal.get());           }
         ).start();    
     }    
 
     /**     * inheritableThreadLocal测试     */  
    public static void testInheritableThreadLocal() {        
        // 在主线程中设置一个值到inheritableThreadLocal        
        inheritableThreadLocal.set("我是父线程inheritableThreadLocal的值");        
        // 创建一个新线程并启动        
        new Thread(() -> {            
                // 在子线程里面可以自动获取到父线程设置的inheritableThreadLocal    
                System.out.println("从子线程获取到inheritableThreadLocal的值: " + inheritableThreadLocal.get());        
            }).start();    
        }
    }

执行结果:

从子线程获取到threadLocal的值:null
从子线程获取到inheritableThreadLocal的值:我是父线程inheritableThreadLocal的值

可以看到子线程中可以获取到父线程设置的inheritableThreadLocal值,但不能获取到父线程设置的threadLocal值

实现原理

InheritableThreadLocal 的实现原理相当精妙,它通过在创建子线程的瞬间,“复制”父线程的线程局部变量,从而实现了数据从父线程到子线程的一次性、创建时的传递 。

其核心工作原理可以清晰地通过以下序列图展示,它描绘了当父线程创建一个子线程时,数据是如何被传递的:

sequenceDiagram
    participant Parent as 父线程
    participant Thread as Thread构造方法
    participant ITL as InheritableThreadLocal
    participant ThMap as ThreadLocalMap
    participant Child as 子线程

    Parent->>Thread: 创建 new Thread()
    Note over Parent,Thread: 关键步骤:初始化
    Thread->>Thread: 调用 init() 方法
    Note over Thread,ITL: 检查父线程的 inheritableThreadLocals
    Thread->>+ThMap: createInheritedMap(<br/>parent.inheritableThreadLocals)
    ThMap->>ThMap: 新建一个ThreadLocalMap
    loop 遍历父线程Map中的每个Entry
        ThMap->>+ITL: 调用 key.childValue(parentValue)
        ITL-->>-ThMap: 返回子线程初始值<br/>(默认返回父值,可重写)
        ThMap->>ThMap: 将 (key, value) 放入新Map
    end
    ThMap-->>-Thread: 返回新的ThreadLocalMap对象
    Thread->>Child: 将新Map赋给子线程的<br/>inheritableThreadLocals属性
    Note over Child: 子线程拥有父线程变量的副本

下面我们来详细拆解图中的关键环节。

### 核心实现机制

  1. **数据结构基础:Thread类内部维护了两个 ThreadLocalMap类型的变量 :

    • threadLocals:用于存储普通 ThreadLocal设置的变量副本。
    • inheritableThreadLocals:专门用于存储 InheritableThreadLocal设置的变量副本 。InheritableThreadLocal通过重写 getMapcreateMap方法,使其所有操作都针对 inheritableThreadLocals字段,从而与普通 ThreadLocal分离开 。
  2. 继承触发时刻:子线程的创建。继承行为发生在子线程被创建(即执行 new Thread())时。在 Thread类的 init方法中,如果判断需要继承(inheritThreadLocals参数为 true父线程(当前线程)的 inheritableThreadLocals不为 null,则会执行复制逻辑 。
  3. 复制过程的核心:createInheritedMap。这是实现复制的核心方法 。它会创建一个新的 ThreadLocalMap,并将父线程 inheritableThreadLocals中的所有条目遍历拷贝到新 Map 中。

    • Key的复制:Key(即 InheritableThreadLocal对象本身)是直接复制的引用。
    • Value的生成:Value 并非直接复制引用,而是通过调用 InheritableThreadLocalchildValue(T parentValue)方法来生成子线程中的初始值。默认实现是直接返回父值return parentValue;),这意味着对于对象类型,父子线程将共享同一个对象引用 。

关键特性与注意事项

  1. 创建时复制,后续独立:继承只发生一次,即在子线程对象创建的瞬间。此后,父线程和子线程对各自 InheritableThreadLocal变量的修改互不影响 。
  2. 在线程池中的局限性:这是 InheritableThreadLocal最需要警惕的问题。线程池中的线程是复用的,这些线程在首次创建时可能已经从某个父线程继承了值。但当它们被用于执行新的任务时,新的任务提交线程(逻辑上的“父线程”)与工作线程已无直接的创建关系,因此之前继承的值不会更新,这会导致数据错乱(如用户A的任务拿到了用户B的信息)或内存泄漏​ 。对于线程池场景,应考虑使用阿里开源的 TransmittableThreadLocal (TTL)​ 。
  3. 浅拷贝与对象共享:由于 childValue方法默认是浅拷贝,如果存入的是可变对象(如 MapList),父子线程实际持有的是同一个对象的引用。在一个线程中修改该对象的内部状态,会直接影响另一个线程 。若需隔离,可以重写 childValue方法实现深拷贝 。
  4. 内存泄漏风险:与 ThreadLocal类似,如果线程长时间运行(如线程池中的核心线程),并且未及时调用 remove方法清理,那么该线程的 inheritableThreadLocals会一直持有值的强引用,导致无法被GC回收。良好的实践是在任务执行完毕后主动调用 remove()

线程池中局限性

一般来说,在真实的业务场景下,没人会直接 new Thread,而都是使用线程池的,因此InheritableThreadLocal在线程池中的使用局限性要额外注意

首先,我们先理解 InheritableThreadLocal的继承前提

  • InheritableThreadLocal的继承只发生在 新线程被创建时(即 new Thread()并启动时)。在创建过程中,子线程会复制父线程的 InheritableThreadLocal值。
  • 在线程池中,线程是预先创建或按需创建的,并且会被复用。因此,继承只会在线程池创建新线程时发生,而不会在复用现有线程时发生。

再看线程池创建新线程的条件,对于标准的 ThreadPoolExecutor,新线程的创建遵循以下规则:

  1. 当前线程数 < 核心线程数:当提交新任务时,如果当前运行的线程数小于核心线程数,即使有空闲线程,线程池也会创建新线程来处理任务。此时,新线程会继承父线程(提交任务的线程)的 InheritableThreadLocal
  2. 当前线程数 >= 核心线程数 && 队列已满 && 线程数 < 最大线程数:当任务队列已满,且当前线程数小于最大线程数时,线程池会创建新线程来处理任务。同样,新线程会继承父线程的 InheritableThreadLocal

不会继承的场景

  • 线程复用:当线程池中有空闲线程时(例如,当前线程数 >= 核心线程数,但队列未满),任务会被分配给现有线程执行。此时,没有新线程创建,因此不会发生继承。现有线程的 InheritableThreadLocal值保持不变(可能是之前任务设置的值),这可能导致数据错乱(如用户A的任务看到用户B的数据)。
  • 线程数已达最大值:如果线程数已达最大线程数,且队列已满,新任务会被拒绝(根据拒绝策略),也不会创建新线程,因此不会继承。

不只是线程池污染,线程池使用 InheritableThreadLocal 还可能存在获取不到值的情况。例如,在执行异步任务的时候,复用了某个已有的线程A,并且当时创建该线程A的时候,没有继承InheritableThreadLocal,进而导致后面复用该线程的时候,从InheritableThreadLocal获取到的值为null:

public class InheritableThreadLocalWithThreadPoolTest {    
    private static final InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();    
    // 这里线程池core/max数量都只有2    
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            
        2,            
        2,            
        0L,            
        TimeUnit.MILLISECONDS,            
        new LinkedBlockingQueue<Runnable>(3000),            
        new ThreadPoolExecutor.CallerRunsPolicy()    
    );    
    
    public static void main(String[] args) {        
    // 先执行了不涉及InheritableThreadLocal的子任务初始化线程池线程 
           testAnotherFunction();        
           testAnotherFunction();        
           // 后执行了涉及InheritableThreadLocal
           testInheritableThreadLocalWithThreadPool("张三");        
           testInheritableThreadLocalWithThreadPool("李四");        
           threadPoolExecutor.shutdown();    
     }    
     
     /**     * inheritableThreadLocal+线程池测试     */    
        public static void testInheritableThreadLocalWithThreadPool(String param) {        
            // 1. 在主线程中设置一个值到inheritableThreadLocal        
             inheritableThreadLocal.set(param);        
            // 2. 提交异步任务到线程池        
            threadPoolExecutor.execute(() -> {            
            // 3. 在线程池-子线程里面可以获取到父线程设置的inheritableThreadLocal吗?            
                System.out.println("线程名: " + Thread.currentThread().getName() + ", 父线程设置的inheritableThreadLocal值: " + param + ", 子线程获取到inheritableThreadLocal的值: " + inheritableThreadLocal.get());        
            });        
            // 4. 清除inheritableThreadLocal        
            inheritableThreadLocal.remove();    
       }    
                   
       /**     * 模拟另一个独立的功能     */   
       public static void testAnotherFunction() {        
           // 提交异步任务到线程池        
           threadPoolExecutor.execute(() -> {            
           // 在线程池-子线程里面可以获取到父线程设置的inheritableThreadLocal吗?            
               System.out.println("线程名: " + Thread.currentThread().getName() + ", 线程池-子线程摸个鱼");        
           });    
       }
}

执行结果:

线程名:pool-1-thread-2,线程池-子线程摸个鱼
线程名:pool-1-thread-1,线程池-子线程摸个鱼
线程名:pool-1-thread-1,父线程设置的inheritableThreadLocal值:李四,子线程获取到inheritableThreadLocal的值:null
线程名:pool-1-thread-2,父线程设置的inheritableThreadLocal值:张三,子线程获取到inheritableThreadLocal的值:null

当然了,解决这个问题可以考虑使用阿里开源的 TransmittableThreadLocal (TTL),​或者在提交异步任务前,先获取线程数据,再传入。例如:

// 1. 在主线程中先获取inheritableThreadLocal的值
String name = inheritableThreadLocal.get();    
    
// 2. 提交异步任务到线程池        
threadPoolExecutor.execute(() -> {            
// 3. 在线程池-子线程里面直接传入数据  
System.out.println("线程名: " + Thread.currentThread().getName() + ", 父线程设置的inheritableThreadLocal值: " + param + ", 子线程获取到inheritableThreadLocal的值: " + name);        
            });        

与 ThreadLocal 的对比

特性ThreadLocalInheritableThreadLocal
数据隔离线程绝对隔离线程绝对隔离
子线程继承不支持支持(创建时)
底层存储字段Thread.threadLocalsThread.inheritableThreadLocals
适用场景线程内全局变量,避免传参父子线程间需要传递上下文数据