深入理解并发编程-高并发高性能数据结构


1. 并发集合

如果想要通过ArraysList的 add()增加数据,又要通过list.toString()方法获取全部的数据内容,所以此时就可能存在设计上的问题了,将会产生并发的修改异常。ArraysList的设计原理部分存在一个内部的itr的实现,这个实现继承了Iterator的接口

private class Itr implements Iterator<E> {
    int cursor;       // index of next element to return
    int lastRet = -1; // index of last element returned; -1 if no such
    int expectedModCount = modCount;//求模的计数
}

AbstractList中发现其定义了这个属性

protected transient int modCount = 0;
final void checkForComodification() {
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
}
public void add(int index, E element) {
    rangeCheckForAdd(index);
    checkForComodification();
    root.add(offset + index, element);
    updateSizeAndModCount(1);
}

在每一次保存数据的时候都会修改modCount(),这个属性的含义的作用在于要进行一个修改的计数的统计,如果此时修改了100次数据,则这个统计一定会随之增长。

checkForComodification()方法进行了输出前的检查,而这个检查主要是判断当前的输出时的数据的修改次数是否与其内部修改的次数相同。那么如果不同一的时候就认为已经产生了多线程的修改异常。

注:这些集合类之所以与Collections中提供的类不同,是因为Collections所提供的集合类的底层都是基于同步实现的,而同步就意味着性能下降,并不能完美解决这个问题。

2.并发单值集合

在类集之中单值集合只有两种类型ListSet,在JUC中提供了CopyOnWriteArrayListCopyOnWriteSet

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    final transient Object lock = new Object();
    private transient volatile Object[] array;
}

该类实现了List<>子接口,(传统的ArrayList都继承了AbstractList抽象类),然后还存在有一个Lock的属性内容,而这个属性主要用于锁定处理。

public boolean add(E e) {
    synchronized (lock) {
        Object[] es = getArray();//返回array,获取当前保存的全部数组元素项
        int len = es.length;
        es = Arrays.copyOf(es, len + 1);//数组拷贝,在原始数组上做一个变更
        es[len] = e;//增加新元素
        setArray(es);//重新保存数组内容
        return true;
    }
}

这时候就没有了之前ArrayList类内部所存在的modCount属性,因为每一次的处理都是直接进行的复制操作。

static final class COWIterator<E> implements ListIterator<E> {
        private final Object[] snapshot;
        private int cursor;
        COWIterator(Object[] es, int initialCursor) {
            cursor = initialCursor;
            snapshot = es;
        }
        public boolean hasNext() {
            return cursor < snapshot.length;
        }
        public boolean hasPrevious() {
            return cursor > 0;
        }

        @SuppressWarnings("unchecked")
        public E next() {
            if (! hasNext())
                throw new NoSuchElementException();
            return (E) snapshot[cursor++];
        }

        @SuppressWarnings("unchecked")
        public E previous() {
            if (! hasPrevious())
                throw new NoSuchElementException();
            return (E) snapshot[--cursor];
        }

        public int nextIndex() {
            return cursor;
        }

        public int previousIndex() {
            return cursor - 1;
        }
}

这个时候就是对数组的内容进行了检查,也没有了最终进行处理判断的繁琐逻辑。

CopyOnWriteArrayList虽然可以保证线程的处理安全性,但是其内部是依据数组拷贝的处理形式完成的。

在每一次进行数据添加的时候都会将原始的数组拷贝为一个新数组进行修改,随后再将新数组的引用交给原来的数组,而为了保证该操作的同步性,在add()方法中会采用同步锁的方式进行处理。

由于数据修改的时候与输出的时候使用了不同的数组,这样就可以解决ArrayList集合同步的设计问题。

但是这样的设计操作由于每次数据存储的时候都需要进行新数组的的拷贝,所以肯定会带来很大的GC压力,所以并不适合于高并发修改的处理场景,但是在一定数据量的情况下可以保证较高的数据读取性能。

CopyOnWriteArraySet是基于CopyOnWriteArrayList进行操作的。

3. ConcurrentHashMap

  • Map集合可以实现二元偶对象的存储,而在实际的开发中,Map集合最重要的一点是根据KEY来获取对应的VALUE数据,也就是查询操作要比写入操作使用更多,考虑到性能的问题,较为常用的HashMap子类,但是如果在多线程下环境,HashMap是无法实现安全的线程操作的,会抛出多线程修改异常。
  • 为了进一步提高Map集合在并发处理下的性能操作,J.U.C提供了ConCurrentHashMap子类,该类的实现模式与HashMap相同,存储的形式采用了数组+链表(红黑树)的结构。在进行数据写入的时候采用了CASsynchronized实现了并发安全。而在数据读取的时候,则会根据哈希值获取哈希桶并结合链表或者红黑树的方式进行实现。
final Node<K,V> nextNode() {
    Node<K,V>[] t;
    Node<K,V> e = next;
    if (modCount != expectedModCount)
        //由于涉及到了modCount
        //因此整个的集合实际上都存在着一个核心的问题
        //无法在输出的时候修改数据
        throw new ConcurrentModificationException();
    if (e == null)
        throw new NoSuchElementException();
    if ((next = (current = e).next) == null && (t = table) != null) {
        do {} while (index < t.length && (next = t[index++]) == null);
    }
    return e;
}

concurrentHashMap之中是基于了哈希桶的形式来完成了数据的存储分类(依据HashCode来决定保存在哪一个哈希桶之中),这个桶直接决定了数据同步的环境,只有该桶中的数据可以进行更新的独占锁配置,而其他的桶不会受到影响。

但是在其早期的实现上,它是依靠分段Segament分段的结构来实现资源的同步处理的。

通过Segament进行资源保护的时候,会依据互斥锁的结构来进行资源同步的修改,也就是说属于当前段的Map集合会受到这种锁的限制。如果不是该段的Map集合,那么就不会受到这种锁的约束,保证了更新的安全,保证了数据读取的性能。

4. 跳表集合

  • 数组的最快查找方式=>二分查找法

  • 完整的节点数据结构=>红黑树查找法,因为红黑树按照平衡二叉树的设置保持节点的平衡,可以保持查询性能

  • 普通的链表=>跳表的结构

数组是一种常见的线性结构,如果在进行索引查询的时候,其时间复杂度为O(1),但是在进行数据内容查询的时候,就必须基于有序存储并结合二分法进行查找,这样操作的时间复杂度为O(log2n)

但是在很多情况下对于数组:由于其固定长度的限制,所以开发中会通过链表来解决,但是如果想要进一步提升链表的查询性能,就必须采用跳表结构来处理,而跳表结构的本质是需要提供有一个有序的链表集合,并从中依据二分法的原理抽取出一些样本数据,通过数据的抽样操作来选择一个数据的操作基点,在每一次进行数据查询的时候是依据这个基点来进行判断,而后对样本数据的范围进行查询。

在JUC中有提供一个ConcurrentSkipListMapConcurrentSkipListSet实现了跳表的操作。

4.1 跳表实现原理

跳表(skip list)对标的是平衡树(AVL Tree)和二分查找,是一种插入/删除/搜索都是logn的数据结构,但是要注意的是跳表里面存储的数据必须是有序的。

简单地说,就是为原始链表增加索引,一般来说,如果想要对一个一维的数据结构的增删查改进行提速的话,最有效的方法就是给它增维,增加一个维度,这个维度能够包含有多的信息来帮助低维数据的查找。

这里话来演示一下如何查找8

首先它会先查找最高级索引,1=>7=>10

如何就会定位到7这个索引,然后拿到这个索引后,去找到下一级索引7=>9

然后就会开始在最原始链表中查找7=>8,发现找到了,查找结束

索引的高度:logn,总体时间复杂度为logn

5. 阻塞队列

利用队列可以实现数据缓冲的目的,生产者与消费者之间可以依靠队列进行存储。在项目中包含有一些核心的处理资源,为了提高资源的处理性能往往会采用多线程的方式进行处理,但是如果无节制的持续进行线程的创建,最终就会导致核心资源处理性能的下降。

为了保证在高并发下的资源处理性能,最佳的做法就是引入一个FIFO的缓冲队列,这样就可以减少高并发时出现的资源消耗过度的问题。

以生产者消费者的操作为例,现在假设生产者线程过多,而消费者线程较少,就会出现生产者大量停滞的问题,而在未增加消费者线程的环境下,就可以通过一个队列进行生产数据的存储,但是这时的消费者就需要不断的进行队列的轮询,以便及时的获取数据。

为了解决项目开发之中这种多线程存储关系队列的操作问题,提供了阻塞队列的概念,可以将这种队列理解为自动的队列,当进行数据写入的时候,如果发现队列已经满了,则不再写入,自动进入阻塞状态,而在进行数据读取的时候,发现队列是空的,则也不再直接读取,而是进入到阻塞状态,等待有数据之后自动唤醒。

  • BlockingQueue接口是实现阻塞队列的主要接口,而且该接口属于Queue的子接口

  • ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。

  • LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。

  • PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。

  • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。

  • SynchronousQueue: 一个不存储元素的阻塞队列。

  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。

  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。

方式 抛出异常 不会抛出异常,有返回值 阻塞等待 超时等待
添加 add() offer() put() offer(obj,timeout,TimeUnit)
移除 remove() poll() take() offer(timeout,TimeUnit)
判断队列首部 element() peek() - -

5.1 BlockingQueue

BlockingQueue属于单端阻塞队列,所有的数据将按照FIFO算法进行保存与获取,BlockingQueue提供有如下几个子类:ArrayBlockingQueue(数组结构)LinkedBlockingQueue(链表单端阻塞队列)PriorityBlockingQueue(优先级阻塞队列)SynchronousQueue(同步队列)

  • ArrayBlockingQueue子类中是存在有一个互斥锁的,而后由于队列需要考虑到空以及满的状态,因此内部实现了两个condition,用来在的情况下对对应的线程进行操作
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
  • LinkedBlockingQueue
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}
  • PriorityBlockingQueue
private final ReentrantLock lock = new ReentrantLock();
public PriorityBlockingQueue(int initialCapacity,
                                Comparator<? super E> comparator) {
       if (initialCapacity < 1)
           throw new IllegalArgumentException();
       this.comparator = comparator;
       this.queue = new Object[Math.max(1, initialCapacity)];
   }

以上的三种阻塞队列的实现都是基于锁实现的。每一次处理的时候都需要不断的考虑锁定以及解锁的设计问题。

由于链表单端阻塞队列优先级队列均是更改了内部存储对象的方式,因此实现效果上大同小异,最重要的是其线程阻塞与唤醒机制的使用

注意:SynchronousQueue是不太一样的,它被称为单一队列,这个队列只能够保存单个数据的内容,有点像之前的Exchanger

同步队列内部依靠的是一个Transfer抽象类完成的,该类提供有队列存储实现子类与栈实现子类,并且数的存储与获取都是通过transfer() 来完成的,对于公平与非公平的定义是利用其不同的实现子类来实现的。

5.2 TransferQueue

SynchronousQueue此类在进行数据处理的时候,不需要不断地考虑锁定以及解锁的设计问题。其内部是基于Transfer来实现的

SynchronousBlockingQueue类的实现可以避免锁的存在机制对代码所带来的性能影响,但是这个实现类只能够保存单个数据。

TransferQueue接口中提供了一个最重要的transfer()方法,该方法可以直接实现put生产线程与take消费线程之间的转换处理,提供更加高效的数据处理。

同时JUC中由提供了一个LinkedTransferQueue实现子类,这个类基于链表的方式的进行实现,同时基于CAS操作形式实现了无阻塞的数据处理,可以将其理解为阻塞队列中的LinkedBlockingQueue多数据存储+SynchronousQueue的无锁转换结合体,其内部维护一个完整的数据链表。

同时每当用户进行数据生产put或者数据消费take的时候,都会进行HEAD节点的轮询判断,如果发现当前队列中存在消费线程,那么就会将数据直接交给消费线程给取走,如果没有的话则会将其保存在链表的尾部,这样可以实现更加高效的处理。

特别是在消费者数量较多的时候,这样的处理可以提供消费处理的性能

传统的链表形式实现的队列采用的机制是统一向队列尾部进行数据的存储,而如果使用的了TransferQueue接口实现,那么就会直接在当前的首部进行判断,如果是获取数据,那么这些数据就不再存储在队列节点之中了。

public void put(E e) {
    xfer(e, true, ASYNC, 0);
}
public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

5.3 BlockingDeque

BlockingDeque就属于一个双端队列,而后实现的子类:LinkedBlockingDeque。它是一种基于互斥锁实现的链表队列。

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    private static final long serialVersionUID = -387911632671998426L;
    
    static final class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
    
    transient Node<E> first;
    transient Node<E> last;
    private transient int count;
    private final int capacity;
    
    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        addAll(c);
    }

6. 延迟队列

DelayQueue是在阻塞队列之中提供的一个类型,这个队列称为延迟队列

延迟队列最大的特点就在于,如果到设定的时间了,就会自动地弹出数据

之前学习的阻塞队列的实现子类是需要开发者通过线程的操作来获取队列内容的,但是延迟队列是可以自己进行数据弹出的。延迟队列的这种设计模式就使得开发者只能够接收弹出的数据项。

其数据项会包括具体数据内容以及操作的时间(最大的操作范围),这个操作时间就是最终完成工作预计所花费的时间,而延迟队列会根据这个时间来进行内容的弹出。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>{}

所有类集里面提供的队列结构Queue都会存在一个AbstractQueue抽象父类进行一些操作的公共实现

public interface Delayed extends Comparable<Delayed> {//提供了可比较器
    long getDelay(TimeUnit unit);//设置延迟时间的单位
}

延迟项本身属于Comparable接口实例,这是因为DelayQueue内部有一个排序的操作结构,那么必须要提供比较器的支持。

private final transient ReentrantLock lock = new ReentrantLock();
   private final PriorityQueue<E> q = new PriorityQueue<E>();

可以看到延迟队列里面含有一个优先级队列,可见其是基于优先级队列实现的。

6.1 延迟队列的基本操作

延迟队列的实现需要通过Delay接口定义具体的队列存储项,这样在进行队列数据获取的时候,会自动进行延迟时间的判断,以达到弹出的目的

class Employee implements Delayed { 			// 延迟队列项
	private String name; 			// 雇员姓名
	private String task; 			// 雇员任务
	private long start = System.currentTimeMillis();		// 任务开始时间
	// 每个人对时间的控制不同,使用TimeUnit进行时间单元配置,不管什么单元最终都以毫秒为主
	private long delay; 			// 从进入到离开的延迟时间
	public Employee(String name, String task, long delay, TimeUnit timeUnit) {
		this.name = name; 				
		this.task = task; 			
		this.delay = TimeUnit.MILLISECONDS.convert(delay, timeUnit); // 以毫秒为单位进行存储
	}
	@Override
	public long getDelay(TimeUnit unit) { 		// 失效时间计算
		return unit.convert((this.start + this.delay) - System.currentTimeMillis(),  TimeUnit.MILLISECONDS);
	}
	@Override
	public int compareTo(Delayed o) { 		// 队列排序
		return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
	}
}

6.2 数据缓存

延迟队列最大的特点是到时间后自动弹出,而这一个自动弹出的机制如果想要充分的发挥,最大的特点就在于数据缓存的操作上。

程序执行的过程中是需要进行CPU资源的抢占的,为了便于CPU运算时的数据读取,在运算之前需要将所有的数据由存储介质(磁盘、网络)加载到内存之中。I/O的性能就决定了整个应用程序的处理性能。

要想提高程序的并发处理,那么最佳的做法就是减少操作系统中的IO操作,而将所需要的核心数据保存在内存之中,即:部分数据的缓存。 数据缓存是直接在JVM的堆内存中直接开辟了一块完整的空间,但是这样一来也会出现一个新的问题:内存保存的数据量会造成内存的溢出,所以应该做一个缓存的定期清理工作,此时就可以基于延迟队列的操作实现

如果想要进行数据的存储,那么一定是需要提供有一个存储环境,这个存储环境必须充分的考虑到代码之中的数据保存的有效性问题,那么自然就会选择Map集合,使用Map集合最大的操作特点就是可以根据KEY来获取VALUE的数据项。那么此时又会存在一个问题,如何保证快速的读取呢?

JDK1.8之后提供了红黑树的处理结构,这个结构可以保证获取内容的性能的稳定。同时缓存又有可能进行多个线程的并发处理,这样的环境就应该考虑ConcurrentMap集合

但是要是无限制的追加缓存存储的话,那么最终是会造成内存溢出的,因此需要进行定期的清理,则可以考虑通过延迟队列来进行清理处理,延迟队列里面的数据一旦弹出之后,则自动地通过Map集合来进行清除。

6.3 范例:使用延迟队列实现数据缓存

package cache;

import java.security.Key;
import java.sql.Time;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 缓存类
 *
 * @author: 张庭杰
 * @date: 2022年10月07日 18:27
 */
public class Cache<K,V> {
    //缓存的内部必须要考虑到多线程下的资源安全以及性能操作
    private Map<K,V> cacheObjects = new ConcurrentHashMap<>();//并发集合
    //牵扯到数据清除的问题,那么可以考虑通过延迟队列来实现
    private BlockingQueue<DelayedItem<Pair>> queue = new DelayQueue();
    //设置缓存的时间,可以设置为一个常量来进行定义,本次定义为2s
    private static final long DELAY_SECOND = 2;
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

    public void put(K key,V value){
        //内容设置
        //缓存的同时返回之前的数据
        V oldValue = this.cacheObjects.put(key,value);
        //所有的数据的内容是被保存在队列之中的,如果重复了,那么应该更新时间
        if(oldValue !=  null){
            //数据重复保存
            this.queue.remove(oldValue);
        }
        try {
            this.queue.put(new DelayedItem<Pair>(new Pair(key,value),2,TimeUnit.MINUTES));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public V get(K key){
        return this.cacheObjects.get(key);
    }

    public Cache(){//要想进行清理的最佳方法是启动一个线程
        Thread thread =  new Thread(()->{
            while (true){//持续地清除
                try {
                    DelayedItem<Pair> item = this.queue.take();//获取弹出的数据
                    if(item != null){
                        Pair pair = item.getItem();//获取数据内容
                        Cache.this.cacheObjects.remove(pair.key,pair.value);//彻底删除数据
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //这个线程负责数据的清理,但清理的工作都属于后台的工作,后台工作需要线程
        thread.setDaemon(true);
        thread.start(); 

    }


    //实现一个内部类,对键值对进行处理,封装K-V关系
    private class Pair{//封装K-V类
        private K key;
        private V value;
        //构造函数
        public Pair(K key, V value) {
            this.key = key;
            this.value = value;
        }
    }
    //将K-V类继续包装,使得其满足Delayed项的定义
    private class DelayedItem<T> implements Delayed{
        private T item;//缓存项
        private long delay;//缓存的时间
        private long start;//开始的时间

        public DelayedItem(T item, long delay, TimeUnit unit) {
            this.item = item;
            this.delay = TimeUnit.MINUTES.convert(delay,unit);//将它转为合理的时间
            this.start = System.currentTimeMillis();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert((this.delay+this.start)-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
        }

        public T getItem() {
            return item;
        }
    }
}

缓存的实现要点如下:

  • 时间单位的统一
  • 优先级队列的定义
  • 键值对封装
  • 过期缓存 回收守护线程

文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录