首页 > 编程学习 > 《Java并发编程的艺术》读书笔记 - 第六章 - Java并发容器和框架

目录

前言 

ConcurrentHashMap 的实现原理

锁分段技术

ConcurrentHashMap 的结构

JDK1.7 

JDK1.8

Java中的阻塞队列

Java 中提供的 7 个阻塞队列

ArrayBlockingQueue

LinkedBlockingQueue

PriorityBlockingQueue

DelayQueue

SynchronousQueue 

LinkedTransferQueue

LinkedBlockingDeque

Fork/Join 框架

工作窃取算法

使用Fork/Join 框架


前言 

Java程序员进行并发编程时,相比于其他语言的程序员而言要倍感幸福,因为并发大师Doug Lea(道格·利)不遗余力地为 Java 开发者提供了非常多的并发容器和框架。

ConcurrentHashMap 的实现原理

ConcurrentHashMap是线程安全且高效的HashMap,使用它一般基于以下两个原因:

  • 常规的HashMap在并发情况下不安全(JDK1.7还有死链问题)
  • 线程安全的HashTable效率非常低下(使用synchronized关键字保证线程安全)

锁分段技术

在ConcurrentHashMap中,首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的似乎还,其他段的数据也能被其他线程访问。 

ConcurrentHashMap 的结构

JDK1.7 

JDK1.8

图片来源:百度

如上图所示,JDK1.7中ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry 则用于存储键值对数据。一个ConcurrentHashMap里包含一个 Segment 数组。Segment 的结构和HashMap类似,是一种数组和链表结构。一个 Segment 里包含一个HashEntry数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个HashEntry数组里的元素,当对 HashEntry 数组的数据进行修改时,必须先获得与它对应的  Segment 锁。 而JDK1.8中将锁的粒度更为细化,相当于对每一个Node节点添加了一把与之对应的锁,另外在Node节点基础上引入了TreeBin。


Java中的阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列:

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满
  • 支持阻塞的移除方法:意思是当队列空时,获取元素的线程会等待队列变为非空

    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。

图片来源:百度

注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用 put 或 offer 方法永远不会被阻塞,而且使用 offer 方法时永远返回 true。

Java 中提供的 7 个阻塞队列

ArrayBlockingQueue

一个由数组结构组成的有界阻塞队列,按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。 

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。 

DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
两个使用场景:

  • 缓存系统的设计:如果能从DelayQueue中取到数据说明缓存已过期
  • 定时任务调度:如果能从DelayQueue中取到数据说明可以开始执行任务

SynchronousQueue 

SynchronousQueue 是一个不存储元素的阻塞队列。每一个put 操作必须等待一个 take 操作,否则不能继续添加元素。默认情况下线程采用非公平性策略访问队列,同时它支持公平性访问队列。

LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞TransferQueue队列。相较于其它队列,它多了 transfer 和 tryTransfer 两个方法。

  • transfer:如果当前有消费者正在等待接收元素,该方法可以把生产者传入的元素立刻传输给消费者。如果没有,则会将元素放在队列的 tail 节点。
  • tryTransfer:用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。与transfer相比少了放入队列的过程。

LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移除元素。


Fork/Join 框架

Fork/Join 框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

图片来源:百度

工作窃取算法

工作窃取算法是指某个线程从其他队列里窃取任务来执行。通常会使用双端队列来完成此算法,被窃取的任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
优点:充分利用线程进行并行计算,减少了线程间的竞争。

缺点:在双端队列里只有一个任务时存在竞争。并且该算法消耗了更多的系统资源,比如创建了多个线程和多个双端队列。

使用Fork/Join 框架

使用Fork/Join框架计算 0 - 100 的和

public class ForkJoinDemo extends RecursiveTask<Integer> {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 生成一个计算任务,负责计算 0 - 100的和
        ForkJoinDemo task = new ForkJoinDemo(0, 100);
        ForkJoinTask<Integer> result = forkJoinPool.submit(task);
        Integer sum = result.get();
        System.out.println("0 - 100 的和为: " + sum);
    }

    // 任务执行阈值
    private static final int THRESHOLD = 2;

    private int start;
    private int end;
    public ForkJoinDemo(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        // 两数之差小于等于阈值就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 否则分解任务
            int middle = (start + end) / 2;
            ForkJoinDemo leftTask = new ForkJoinDemo(start, middle);
            ForkJoinDemo rightTask = new ForkJoinDemo(middle + 1, end);
            // 执行子任务
            leftTask.fork();
            rightTask.fork();
            // 等待子任务执行完成
            Integer leftResult = leftTask.join();
            Integer rightResult = rightTask.join();
            // 合并子任务
            sum = leftResult + rightResult;
        }

        return sum;
    }
}

Copyright © 2010-2022 dgrt.cn 版权所有 |关于我们| 联系方式