深入理解并发编程-锁机制理论与应用


1. TimeUnit

在Java里面为了进一步简化这种时间的计算管理,提供了一个专属的TimeUnit工具类,该类是一个枚举类

public enum TimeUnit extends Enum<TimeUnit>

枚举类的特点:除了可以定义一系列的常量之外,还可以实现接口,定义方法,或者是定义抽象方法,是一个强大的多例设计模式

long hour = 1;
//需要将一个小时的单元转变为秒
System.out.println(TimeUnit.SECONDS.convert(hour, TimeUnit.HOURS));//小时数,原来的单位
  • 在JDK1.8之后提供了一个Duration的操作类(间隔),这个类在实际的开发之中经常使用到,例如在Spring中进行任务的间隔配置中会被使用到
Duration duration = Duration.ofHours(2).plusHours(2);
System.out.println(TimeUnit.SECONDS.convert(duration));//TimeUnit可以进行各种时间的转换
//计算18天后的日期
long currentTimeMillis = System.currentTimeMillis();
long l = currentTimeMillis + TimeUnit.MILLISECONDS.convert(180, TimeUnit.DAYS);
System.out.println(l);
System.out.println(new SimpleDateFormat("yyyy-MM-dd").format(new Date(l)));
try {
    TimeUnit.SECONDS.sleep(500);//休眠500秒,同理的,也可以使用其他时间单位的休眠
} catch (InterruptedException e) {
    e.printStackTrace();
}

在未来进行多线程的开发之中只要是牵扯到时间的问题的时候,最好都使用TimeUnit这个时间单位进行操作

2.ThreadFactory

既然现在学习的是面向对象的编程语言,那么就需要进行不断的解耦和设计的考虑,传统的创建线程的方式是直接通过new来进行线程对象的构建,所以在JUC里面改进了对于线程的获取机制,提供了工厂类的标准接口

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

在这个对象中对Runnable接口进行包装

public class MyThreadFactory implements ThreadFactory{
    private final static ThreadFactory INSTANCE = new MyThreadFactory();
    private MyThreadFactory(){};//工厂类在定义的时候一定不需要实例化对象
    private static int count = 0;
    public static ThreadFactory getInstance(){//返回工厂单例
        return INSTANCE;
    }
    @Override
    public Thread newThread(Runnable r){//包装Runnable接口
        try{
            Class<?> clazz = Class.forName("java.lang.Thread");
            Constructor<?> constructor = clazz.getConstructor(Runnable.class, String.class);
            return (Thread) constructor.newInstance(r,"yep"+count++);
        }catch (Exception e){
            return null;
        }
    }
}

只要在后续的开发之中见到了ThreadFactory,直接表示的就是子线程的创建。

3. 原子操作类

利用多线程实现一个银行存款的操作机制,这个操作机制可以通过多个线程向同一个银行账户进行业务的处理

public class Bank {
    public static int money = 0;//模拟银行的存款
    public static void main(String[] args) throws InterruptedException {
        int[] data = new int[]{100,200,300};//要存款的数据总额
        for (int i = 0; i < data.length; i++) {//进行存款线程的配置
            final int temp = i;
            new Thread(()->{
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                money += data[temp];//模拟村矿
            },String.valueOf(temp)).start();
        }
        //等待两秒的时间,让子线程跑完,结果是600
        TimeUnit.SECONDS.sleep(2);
        System.out.println(money);
    }
}

此时的操作出现了一个不同步的设计问题,这个时候按照最为传统的思路进行解决,那么必须使用同步方法来解决这个问题

public static synchronized void save(int m){
    money += m;
}

这种问题也有弊端,也就是随意一个普通的数据操作都需要经过同步方法的修饰后才能使用,太过于繁琐。

public static AtomicInteger money = new AtomicInteger(0);//模拟银行的存款
money.addAndGet(data[temp]);

利用原子类解决了synchronized同步的设计问题

public class AtomicInteger{
    private volatile int value;//进行快速的值的更新
    private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
        public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    public final int addAndGet(int delta) {
        return U.getAndAddInt(this, VALUE, delta) + delta;
    }
    
}

原子操作类并没有使用到传统的同步机制,而是通过了一种CAS机制来完成的。

  • 基本类型:AtomicIntegerAtomicLongAtomicBoolean ;
  • 数组类型:AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray ;
  • 引用类型:AtomicReferenceAtomicStampedReferenceAtomicMarkableReference ;
  • 对象的属性修改类型:AtomicIntegerFieldUpdaterAtomicLongFieldUpdaterAtomicReferenceFieldUpdater

所有的原子类具有同步的支持,但是考虑到性能问题,没有使用到synchronized关键字来实现,是依靠底层来实现的

3.1 基础类型的原子操作类

  • atomicLong类型的常用方法

在观察原子类操作的过程之中,会存在一个CAS(compareAndSet方法),是首字母的摘取为主命名的。这个方法是整个JUC之中实现数据同步处理而且又能兼顾性能的唯一支持

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    private static native boolean VMSupportsCS8();
    private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
    private static final long VALUE = U.objectFieldOffset(AtomicLong.class, "value");
    private volatile long value;//可以实现内存的快速处理
    public AtomicLong(long initialValue) {
        value = initialValue;
    }

long属于64位的长度,如果运行在了32位的系统之中,那么就需要有两位去描述long的数据类型,而在进行数据修改的时候必须考虑到2位的数据同时修改完成,才可以称为正确的修改。

  • CAS机制解析
public final long addAndGet(long delta) {//增加方法的实现,依靠的是Unsafe类的处理机制
    return U.getAndAddLong(this, VALUE, delta) + delta;
}
   
@HotSpotIntrinsicCandidate
   public final long getAndAddLong(Object o, long offset, long delta) {
       long v;
       do {
           v = getLongVolatile(o, offset);
       } while (!weakCompareAndSetLong(o, offset, v, v + delta));
       return v;
   }
//此时的操作方法是由硬件CPU的指令完成处理的,它不再是通过Java的运行机制来完成,这样的优势是在于速度快
//同时又避免在内存之中的互相拷贝所带来的额外开销
   @HotSpotIntrinsicCandidate
   public native long getLongVolatile(Object o, long offset);

   @HotSpotIntrinsicCandidate
   public final boolean weakCompareAndSetLong(Object o, long offset,
                                              long expected,
                                              long x) {
       return compareAndSetLong(o, offset, expected, x);
   }

   @HotSpotIntrinsicCandidate
   public final native boolean compareAndSetLong(Object o, long offset,
                                                 long expected,
                                                 long x);
  • 在AtomicLong类中还提供有一个compareAndSet()方法,该方法的主要作用是进行数据内容的修改,但是在修改之前需要首先判断当前所保存的数据是否和指定的内容相同,如果相同,则允许修改,如果不同则不允许修改

只有在CAS操作比较成功之后才会进行内容的修改,而如果此时的比较失败是不会进行内容的修改的。

这是一种乐观锁的机制,所谓的乐观锁就是一般不会发生任何问题,不需要进行非常严格的处理,而悲观锁认为临界资源在每时每刻都发生着抢占的问题,一般在多线程中采取的是synchronized进行同步实现

compareAndSet()数据修改操作方法在JUC中被称为CAS机制,CAS(compare and swap)是一条CPU并发原语,它的功能是判断内存中某个位置的值是否为预期值,如果是则将其更为新的值,否则不进行修改,这个过程属于原子性操作。

在多线程进行数据修改得时候,为了保证数据修改的正确性,常规的做法就是使用synchronized同步锁,但是这种锁属于悲观锁,每一个线程都需要在操作之前锁定当前的内存区域,而后才可以进行处理,这样一来在高并发环境下就会严重影响程序性能。

而CAS采用的是一种乐观锁机制,其最大的特点就是不进行强制性的同步处理,而是为了保证数据修改的正确性添加了一些比较数据(比如compareAndSet()在修改之前需要比较值与预设的值是否相同),采用的是一种冲突重试的处理机制,这样可以有效地避免线程阻塞的问题,在并发竞争不是很激烈的情况下,CAS可以获得较好的处理性能,而JDK9及其之后的版本为了进一步提升CAS的操作性能,追加了硬件处理指令集的支持,可以得到更好的处理性能。

3.2 数组类型的原子操作类

  • 数组类型:AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray

可以看到这些方法都是基于乐观锁的同步机制来实现的

使用示例

String[] strings = new String[]{"aaa","bbb","ccc"};
AtomicReferenceArray<String> atomicReferenceArray = new AtomicReferenceArray<>(strings);
//实现实例化处理操作
System.out.println(atomicReferenceArray.length());
atomicReferenceArray.compareAndSet(2,"ccc","aaa");
for(int i = 0;i<atomicReferenceArray.length();i++){
    System.out.println(atomicReferenceArray.get(i)+" ");
}
public class AtomicReferenceArray<E> implements java.io.Serializable {
    private static final long serialVersionUID = -6209656149925076980L;
    private static final VarHandle AA
        = MethodHandles.arrayElementVarHandle(Object[].class);
    
    private final Object[] array; // must have exact type Object[]
    public AtomicReferenceArray(int length) {
        array = new Object[length];
    }
    public AtomicReferenceArray(E[] array) {
        // Visibility guaranteed by final field guarantees
        this.array = Arrays.copyOf(array, array.length, Object[].class);
    }
}
public final boolean compareAndSet(int i, E expectedValue, E newValue) {
    return AA.compareAndSet(array, i, expectedValue, newValue);
}

可见在当前原子数组操作类之中所提供的CAS的极值并不是由Unsafe提供的,而是由VarHandle类来实现的。

这个类的操作本质是上依靠的是java.lang.invoke开发包中的VarHandle类完成处理的,该类主要用于动态操作数组元素或者数组的成员属性。VarHandle类提供了一些列的标准内存屏障操作,可以用于更细粒度的控制指令排序,在安全性、可用性以及性能方面都要优于已有的程序类库,同时可以和任何类型的变量进行关联操作

VarHandle是一个性能上更加强大,同时处理安全性更加稳定的一个特殊工具类。

String[] strings = new String[]{"aaa","bbb","ccc"};
VarHandle varHandle = MethodHandles.arrayElementVarHandle(String[].class);//句柄
varHandle.compareAndSet(strings,2,"ccc","aaa");//直接修改数据
System.out.println(Arrays.toString(strings));

在VarHandle也可以实现反射。

//此时通过类的查找机制查找Book类中的Title属性,同时设置好Title属性的对应的数据类型
VarHandle varHandle = MethodHandles.lookup().findVarHandle(Book.class, "title", String.class);
//所有类的对象属性操作前提就是要进行有效的对象实例化处理
Book book = Book.class.getDeclaredConstructor().newInstance();
varHandle.set(book,"aaa");
System.out.println(book.title);

3.3 引用类型的原子操作类

引用类在程序的开发之中也是需要进行同步处理的,在一个多线程的操作类之中,你需要引用其他类型的对象,这个时候就需要进行引用的原子类操作的使用。

按照java的基本概念来说,引用数据类型和基本数据类型是完全不同的,毕竟存在了堆栈关系,而基本数据类型是以数据的形式存储的,这实际上对于CAS的操作而言是存在一个局限性的

  • 这时候或许会有一个想法,对于匿名对象而言,其存储空间是不在堆上的,但是其比较原理应该还是CAS,比较对象,那么我只要编写hashCodeequals()方法,那么是否就能够支持其比较了呢?

这实际上是不行的,我们知道引用实际上是一个地址指向,如果是new出来的两个对象,尽管它的内部的值是完全一样的,但是实际上这两个对象的地址指向不同,而CAS正是比较这两个对象的引用地址,如果是这样的话,其CAS肯定是不会成功的。

在JUC包里面实际上提供有三种类型

  • AtomicReference(引用类型的原子类)

这个类可以直接实现引用数据类型的存储,在进行修改的时候可以实现线程安全的更新操作,更新的实现原理还是CAS

  • AtomicStampedReference(带有引用版本号(时间戳)的原子类)

在之前的原子操作类之中,是直接根据对象的地址数值来进行CAS处理,而这个类是在地址数值之上又增加了时间戳的处理,在数值相同的基础上,还要匹配时间戳。

  • AtomicMarkableReference(带有标记的原子类)

实际上不管是版本戳还是标记的原子引用类型最终都是为了去解决项目之中可能出现的ABA数据的错乱问题

CAS 在修改变量值时,会先检查该变量的值是否和预期值一致,若一致则修改,引发的ABA问题的情况是:如一个变量初始值为A,被另外一个线程修改成B,再由B修改为A,此时使用CAS进行操作就检查不出变量的变化轨迹,并对该变量修改,这就是ABA问题,其前提条件是“节点可以被循环使用”。

但是对于下面的场景可能就是严重的事故了。

例如:

小牛取款,由于机器不太好使,多点了几次取款操作。后台threadA和threadB工作,
此时threadA操作成功(100->50),threadB阻塞。正好牛妈打款50元给小牛(50->100),
threadC执行成功,之后threadB运行了,又改为(100->50)。
牛气冲天,lz钱哪去了???

3.4 属性修改原子操作类

在一个类之中可能会存在有若干个不同的属性,但是有可能在进行线程同步处理的时候不是该类中所有的属性都会被进行所谓的同步操作,而是只有部分的属性需要进行同步的处理操作,所以在JUC提供的原子类型中包含有属性修改器,利用属性修改器就可以安全地修改属性的内容

  • AtomicIntegerFieldUpdater

  • AtmoicLongFieldUpdater

  • AtomicReferenceFieldUpdater

@CallerSensitive
public static <U> AtomicLongFieldUpdater<U> newUpdater(Class<U> tclass,
                                                       String fieldName) {
    Class<?> caller = Reflection.getCallerClass();
    if (AtomicLong.VM_SUPPORTS_LONG_CAS)
        return new CASUpdater<U>(tclass, fieldName, caller);
    else
        return new LockedUpdater<U>(tclass, fieldName, caller);
}
class Book {
	// 必须使用volatile关键字定义,否则将出现IllegalArgumentException异常
	private volatile long id; 				// 图书ID
	private String title; 				// 图书名称
	private double price; 				// 图书价格
	public Book(long id, String title, double price) { 	// 双参构造
		this.id = id; 			// 属性赋值
		this.title = title; 			// 属性赋值
		this.price = price; 			// 属性赋值
	}
	public void setId(long id) {
		AtomicLongFieldUpdater<Book> atoLong = AtomicLongFieldUpdater
						.newUpdater(Book.class, "id"); 	// 获取待更新属性
		atoLong.compareAndSet(this, this.id, id); 		// CAS操作
	}
}

4. 并发计算

  • 使用原子操作类可以保证多线程并发访问下的数据操作安全性,而为了进一步加强多线程下的计算操作
  • 所以从JDK 1.8之后开始提供有累加器DoubleAccumulatorLongAccumulator)和加法器DoubleAdderLongAdder)的支持,但是对于原子性的累加器只适合于进行基础的数据统计,并不适用于其他更加细粒度的操作
DoubleAccumulator da = new DoubleAccumulator((x, y) -> x + y, 1.1); // 累加器
		System.out.println("【累加器】原始存储内容:" + da.doubleValue()); // 原始内容
		da.accumulate(20); 		// 数据累加计算
		System.out.println("【累加器】累加计算结果:" + da.get());	// 获取数据
DoubleAdder da = new DoubleAdder();	// 定义加法器
da.add(10); 			// 数据执行加法
da.add(20); 			// 数据执行加法
da.add(30); 			// 数据执行加法
System.out.println(da.sum());	// 数据累加
 

5. Random与多线程

java.util.Random可以实现随机数的生成处理,而其在随机数生成时所依靠的是一个seed种子数,如果现在使用的是单线程开发,那么这样的设计没有任何问题。

然而在多线程开发过程中就会出现若干个不同的线程使用同一个种子数来生成随机数,这样就会出现若干个线程竞争同一种子数而带来性能下降的问题

为了解决Random在多线程操作下的性能问题,在J.U.C中提供了ThreadLocalRandom操作类(该类为Random子类),利用该类可以为每一个线程保存不同的种子数,从而解决了Random种子数更新所带来的性能问题。

相比较于Random操作来讲,它可以针对于每一个不同的线程保存各自的因子,从而实现准确的随机数生成,这种机制等同于每一个线程保存一个Random

6. 线程锁简介

在多线程并发访问处理下,如果想要保证操作资源的线程安全性,则必须对资源的处理使用synchronized关键字来进行标注,同时还需要通过线程等待与唤醒的机制来实现多个线程的协作处理,但是这样的实现机制是在过于繁琐,而且容易发生死锁,为了更好的解决线程同步处理的操作问题,在JUC包中提供了一个新的锁处理机制,为了实现这一机制,扩充了两个新的锁接口

  • Lock接口:支持各种不同语义的锁规则
    • **公平锁:**不同线程获取锁的机制是公平的
    • 非公平锁:不同线程获取锁的机制是不公平的,允许竞争
    • 可重入锁:指同一个锁能被一个线程多次获取,可重入锁最大的作用是避免死锁
  • ReadWriteLock:针对线程的读或者写提供不同的锁处理机制 ,在数据读取的时候采用共享锁,数据修改的时候独占锁,这样就可以保证数据访问的性能。
public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

最终再进行读写锁处理的时候,都是通过Lock接口提供的方法来实现最终的锁定操作,但是这些操作都是接口,接口在实际的使用之中必须考虑到其具体的实现子类。例如ReentranLock(互斥锁,独占锁,这个是Lock接口的实现子类)、(ReentranReadWriteLock互斥读写锁,是ReadWirteLock接口的实现子类)

7. AQS

  • 源代码组成
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;
}
  • Sync组成

该类的主要作用是保存所有等待获取锁的线程队列,而对于锁的获取又分为两种类型

公平锁(FairSync子类):按照通过CLH等待线程先来先得的规则,公平地获取锁

非公平锁(NonFairSync子类):当线程要获取锁的时候,它会无视CLH等待队列而直接获取锁

abstract static class Sync extends AbstractQueuedSynchronizer {}
  • AbstractQueuedSynchronizer组成
public abstract class AbstractQueuedSynchronizer //AQS
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {}

AQS的本质就是一个执行队列,所有的待执行的线程全部都保存在一个队列之中,实际上这样做的目的是为了解决死锁,在JUC的AQS中提供了CLH队列

AbstractQueuedSynchronizer实现了一个FIFO的线程等待队列,而后会根据不同应用场景来实现具体的独占锁模式(多线程更新数据的时候需要单个线程运行)或者是共享锁模式(多线程数据读取时,读线程不需要同步,只需要和写线程做同步处理即可)

CLH(Craig,Landin,and Hagersten)锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

static final class Node {//所有的待执行线程都在队列之中等待调度
    	volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
} 
  • AbstractOwnableSynchronizer组成
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {}

8. ReentrantLock

**互斥锁(ReentrantLock)**:称为独占锁,一旦获取到了锁之后,其他的线程都不允许进行操作,实际上在之前学习过的synchronized就属于一种独占锁

使用ReentrantLock最大的特点是可以避免传统的线程与唤醒机制的繁琐处理,开发者只需要通过lock()方法即可实现资源锁定,而在资源锁定过程中其他未竞争到的资源的线程则自动进入等待状态。

等到当前的线程调用unlock()方法后,才会让出当前的互斥锁资源,并根据默认的竞争策略(公平机制与非公平机制)唤醒其他等待线程,所以ReentrantLock属于一个可重用锁,这就意味着该锁可以被线程重复获取

  • 多个线程抢占互斥资源

  • 互斥锁的抢夺与等待

所有的等待线程都会保存在AQS等待队列的节点之中,其中的公平机制还需要由Lock的接口子类来决定

public ReentrantLock() {
       sync = new NonfairSync();//非公平机制锁
   }
   public ReentrantLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();//通过判断来进行赋值
   }

在进行线程锁定的过程之中(lock()方法的调用),那么最终依靠的是一个线程锁定数量的控制,每一次锁定的时候都会调用一个锁定计数的方法acquire(1),一旦有一个线程被锁定了,这个时候计数就会+1,CLH就依靠数量是否为0来确定是否有锁定的线程,从而解决线程死锁的问题。

而在每一次通过unlock()方法解锁的时候,会调用release(1),将线程解锁

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {//while(true)
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

9. ReentranReadWriteLock

ReentranLock是一种完全独占的操作锁,无论是面对读或者写的时候都属于独占的操作形式,为了进一步提高读写操作的性能,在JUC里面就涉及一个读写互斥锁,最大的特点是提供了两把锁:一把是读锁,属于共享锁,另外一把是写锁,是独占锁

它的完整规则是:读读不互斥读写互斥写写互斥。它适用于多读的业务场景,使用它可以有效的提高程序的执行性能,也能避免读取到操作了一半的临时数据。

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
}
class Account { 				// 账户信息类
	private String name; 						// 账户名称
	private int asset; 			// 账户资金
	private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 读写锁
	public Account(String name, int asset) { 						// 构造方法
		this.name = name; 		// 属性赋值
		this.asset = asset; 			// 属性赋值
	}
	public void save(int asset) {	// 写锁操作方法
		this.readWriteLock.writeLock().lock();	// 独占的写锁
		try {
			this.asset += asset; 		// 账户资金累加
			TimeUnit.SECONDS.sleep(1); 		// 模拟延迟
			System.out.println("【" + Thread.currentThread().getName() + 
				"】修改银行资产,当前的资产为:" + (this.asset / 100.0));
		} catch (Exception e) {
		} finally {
			this.readWriteLock.writeLock().unlock();		// 释放写锁
		}
	}
	public String toString() { 		// 读锁操作方法
		this.readWriteLock.readLock().lock();	// 共享读锁
		try {
			TimeUnit.MILLISECONDS.sleep(100); 		// 模拟延迟
			return "〖" + Thread.currentThread().getName() + "〗账户名称:" + this.name + 
					"、账户余额:" + (this.asset / 100.0);
		} catch (Exception e) {
			return null;
		} finally {
			this.readWriteLock.readLock().unlock();		// 释放读锁
		}
	}
}
  • 读写锁的均衡问题

使用读写锁进行多线程访问控制的时候,独占锁与共享锁之间是属于互斥的关系的,如果在线程个数平衡的环境下,使用读写锁进行操作没有任何问题,但是如果此时读取的线程很多,而写入的线程很少,则可能出现写入线程出现饥饿的现象,写入线程很有可能长时间无法竞争到资源的控制权,而一直处于空闲状态。

10. StampedLock

StampedLock是一个依据时间戳来实现锁定机制的处理机制,使用这个锁可以有效的解决读写资源存在的不平衡的设计问题。

在该类中支持有三种锁的处理模式,分别为写锁、悲观锁、乐观锁,每一个完整的StampedLock是由版本和模式两个部分所组成,在获取相关锁时会返回有一个数字标记戳,用于控制锁的状态,并利用该标记戳实现解锁的处理。

悲观锁就是如果抢占到了资源一定要时刻进行同步处理,锁定目标内存区域,而如果使用了乐观锁至少还有释放的机会。

那么由于悲观锁在每次操作的时候都会强制性的直接锁定,其实对于资源的切换操作其实并不是很方便,那么如果此时使用的是乐观锁,可以考虑在读的时候假设要发生写操作,那么就让出一下资源,将读锁变为写锁出现。

class Account { 			
	public void save(int asset) {		// 写锁操作方法
		long stamp = this.stampedLock.writeLock();	// 获取写锁的标记戳
		try {
			this.asset += asset; 	// 账户资金累加
			TimeUnit.SECONDS.sleep(1); 		// 模拟延迟
		} catch (Exception e) {
		} finally {
			this.stampedLock.unlockWrite(stamp); 	// 释放写锁
		}
	}
	public String toString() { // 读锁操作方法
		long stamp = this.stampedLock.readLock(); 	// 获取读锁标记戳
		try {
			TimeUnit.MILLISECONDS.sleep(100); 		// 模拟延迟
		} catch (Exception e) {
			return null;
		} finally {
			this.stampedLock.unlockRead(stamp); 	// 释放读锁
		}
	}
}
  • 乐观锁

所谓的乐观锁模式,指的是在当前应用中读线程数量多于线程数量的情况下,可以乐观的认为,数据写入与读取同时发生的几率很小,因此不需要使用悲观读取锁完全锁定,在读取时如果发现有写入的执行变更,可以及时采取相应的措施,(重新读取变更后的数据或者抛出一个读取异常),这样就可以避免悲观读锁的独占操作所带来的性能问题。

public void save(int asset) {		// 写锁操作方法
	long stamp = this.stampedLock.writeLock();	// 获取写锁标记戳
	boolean flag = true; 				// 强制转换处理标记
	try { //  
		long writeStamp = this.stampedLock.tryConvertToWriteLock(stamp);
		while (flag) { 	// 转换尝试
			if (writeStamp != 0) { 	// 锁标记戳转换成功
				stamp = writeStamp; 	// 标记戳替换
				this.asset += asset; 	// 修改内容
				TimeUnit.SECONDS.sleep(2); // 休眠1秒的时间,模拟延迟
				flag = false;
			} else { 		// 转换失败
				this.stampedLock.unlockRead(stamp); 	// 释放掉读锁标记戳
				writeStamp = this.stampedLock.writeLock();	// 获取写锁标记戳
				stamp = writeStamp; 	// 保存标记戳
			}
		}
	} catch (Exception e) {
	} finally {
		this.stampedLock.unlockWrite(stamp); // 释放写锁
	}
}
public String toString() { 		// 读锁操作方法
	// 返回一个有效的乐观锁读取标记戳,如果独占锁已锁定,则返回的戳记为0
	long stamp = this.stampedLock.tryOptimisticRead();	// 获取读锁标记戳
	try {
		int current = this.asset; 		// 在输出之前获取原始内容
		TimeUnit.MILLISECONDS.sleep(500); 	// 休眠1秒的时间,模拟延迟
		if (!this.stampedLock.validate(stamp)) { 	// 标记戳不正确
			stamp = this.stampedLock.readLock();	// 获取读锁标记戳
			try {
				current = this.asset; 	// 存款金额有可能发生改变
			} finally {
				this.stampedLock.unlockRead(stamp); 	// 释放读锁
			}
		}
}

11. Condition

Obejct类之中所提供的wait()notify()方法进行等待与唤醒的处理,而后在传统的Thread类之中,又提供了暂停和恢复的功能,可是对于等待和唤醒机制的操作是比较简单的。

public Condition newCondition();

Lock接口之中存在有一个ReentranLock子类,提供了一个newCondition接口。

  • 该方法底层是由sync.newCondition()来实现的。
  • 在实现类中发现其手工实例化了一个Condition对象
public class ConditionObject implements Condition, java.io.Serializable {
    //是基于队列来进行实现的
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

Condition内部实际上是将所有的等待的操作线程保存在了队列之中,因为可以依靠AQS提供的CLH机制实现死锁线程的控制

Lock lock = new ReentrantLock();		// 实例化Lock接口对象
Condition condition = lock.newCondition();		// 创建一个新的锁控制
lock.lock();			// 锁定主线程
try {
    new Thread(() -> { 		// 启动一个子线程
        lock.lock();			// 同步锁
        try {
            condition.signal();// 【恢复】唤醒等待的主线程
        } catch (Exception e) {
        } finally {
            lock.unlock();		// 解除锁定
        }
    }, "子线程").start();			// 子线程启动
    condition.await();			// 【挂起】主线程等待
} finally {
    lock.unlock();				// 释放外部锁
}

我们看到,这个实例程序中,在线程中使用Condition的时候,必须先将当前线程锁定,为什么呢?因为其内部是基于AQS实现的,如果当前线程没有在AQS里面,那么就无法执行唤醒、等待的操作,因为操作都是基于AQS实现的。

否则的话,因为该线程没有加入到AQS中,监控器没有监控到它的状态,无法执行线程的操作(会抛出非法监控状态的异常)。

12. LockSupport

针对于resume()、stop()等已经过时的方法,新版本提供了LockSupport类,这个类中的方法的作用与上面的方法相同。

LockSupport又被称为阻塞原语,其最重要的一点是因为它的实现并不是依靠Java代码实现的,而是通过UnSafe类转化为了硬件指令。

类中提供的方法都是static类型的

Thread mainThread = Thread.currentThread();				// 获得主线程
new Thread(() -> { 			// 启动一个子线程
    try {
        TimeUnit.SECONDS.sleep(2); 		// 让子线程先运行2秒
    } catch (Exception e) {
    } finally {
        LockSupport.unpark(mainThread); 		// 解锁主线程
    }
}, "子线程").start();					// 子线程启动
LockSupport.park(mainThread); 				// 主线程暂停执行

13. Semaphore(信号量)

在大部分的应用场景下,很多资源实际上都属于有限提供的,在面对大规模并发访问的应用环境中,为了合理的安排有限资源的调度,提供了Semaphore处理类

public void acquire() throws InterruptedException {//尝试获取到一个资源
    sync.acquireSharedInterruptibly(1);
}
Semaphore semaphore = new Semaphore(2,false);
if(semaphore.availablePermits() >=1){
    semaphore.acquire();
    //do something
    semaphore.release();
}

14. CountDownLatch

CountDownLatch:是一种基于倒计数同步的线程管理机制,例如,在对未归队的人员进行一个统计,每当归队一位,就进行计数的减少,一直到计数为0的时候才进行后续的活动。

在主线程派生出三个子线程,而后主线程必须在这三个子线程全部执行完毕后再继续向下执行,所以此时就可以基于CountDownLatch设置等待的线程数为3,每当一个子线程执行完毕后就进行一个计数减1操作。

//实现计数的同步
CountDownLatch countDownLatch = new CountDownLatch(3);
countDownLatch.await();
for (int i = 0; i < 3; i++) {
    new Thread(()->{
        countDownLatch.countDown();
    },"111").start();
}

需要注意的是CountDownLatch没有实现公平与非公平的机制,所以其内部只是创建了一个Sync子类,因为实际上并没有涉及到竞争,该类的构造方法之中仅仅只是实现了一个AQS的实现子类

15. CyclicBarrier

简单来说就是等待足够的数量后才能继续向下执行。

CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
for (int i = 0; i < 3; i++) {
    new Thread(()->{
        try {
            cyclicBarrier.await();//等待,凑够了2个等待的线程(包括自己)
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    },"111").start();
}

但是这样的话,如果长时间等不到下一个线程的到来,等待池中的线程就会一直等待,针对这种情况可以设置一个过期时间

for (int i = 0; i < 3; i++) {
    final int temp  = i;
    new Thread(()->{
        try {
            if(temp == 2){
                cyclicBarrier.reset();//取消等待
            }else{
                cyclicBarrier.await();//等待,凑够了2个等待的线程
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    },"111").start();
}

此外还提供有一个完整线程任务的处理,当已经等待了足够线程数量之后,可以开始执行某些任务

public CyclicBarrier(int parties, Runnable barrierAction) {
    //其中barrierAction是一个处理事件
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
    System.out.println("开始业务处理");
});

CyclicBarrier的实现就好比栅栏一样,这样可以保证若干个线程的并行执行,同时还可以利用方法更新屏障点的状态进行更加方便的控制

16. Exchanger

生产者与消费者的通讯模型是多线程设计与开发的经典案例,其中为了实现生产者与消费者操作的同步处理,需要在公共空间内进行一系列的同步、等待与唤醒操作,为了更加便于这种公共交换空间的设计,利用该类的exchange()方法可以自动的实现不同线程操作的数据同步处理。

Exchanger是一个实现若干线程间彼此协作的通讯类,操作其的生产者与消费者线程同一使用exchange()方法进行数据的交换,如果消费者线程先执行了exchange()但是又没有生产,则会自动进入到阻塞状态,并等待生产者生产数据后再解决当前的阻塞状态,反之亦然。

int repeat = 2; 		// 生产以及消费的次数
Exchanger<String> exc = new Exchanger<>(); 	// 定义一个交换空间
new Thread(() -> {
	for (int y = 0; y < repeat; y++) { 		// 循环生产数据
		if (y % 2 == 0) {
			info = "李兴华高薪就业编程训练营:edu.yootk.com"; // 生产数据
		} else {
			info = "沐言科技:www.yootk.com"; 		// 生产数据
		}
		try {
			TimeUnit.SECONDS.sleep(1);
			exc.exchange(info); 	// 数据存储
		} catch (InterruptedException e) {}
	}
}, "信息生产者").start();		// 线程启动
new Thread(() -> {
	for (int y = 0; y < repeat; y++) { 		// 循环获取数据
		try {
			TimeUnit.SECONDS.sleep(1);
			String info = exc.exchange(null); 	// 数据获取
		} catch (InterruptedException e) {}
	}
}, "信息

17. CompletableFuture

Callale接口需要与Future接口整合在一起之后,最后再进行最终的异步操作调用。

Future可以实现异步计算操作,虽然Future的相关方法提供了异步任务的执行能力,但是对于线程执行结果的获取只能够采用阻塞或者轮询的方式进行处理,阻塞的方式与多线程异步处理的初衷产生了分歧,轮询这种方法会造成CPU资源的浪费,同时也无法及时的得到结果,为了解决这些设计问题,提供了CompletableFuture实现子类,该类可以帮助开发者简化异步编程,同时又可以结合函数式编程模式,利用回调的方式进行异步处理计算操作

    CompletableFuture<String> completableFuture = new CompletableFuture<>();
    for(int i = 0;i<2;i++){
        new Thread(()->{
            System.out.println("开始,炮兵就位");
            completableFuture.get();//当get()了东西,该线程就解除阻塞
        }).start();
    }
    completableFuture.complete("开炮");//传递信息
}

以上的处理是基于同步执行的,除了可以通过complete()方式解除所有线程的阻塞状态之外,在CompletableFuture中也可以通过runAsync()方法定义一个异步任务的处理线程,并且在该线程执行完成后才会解除所有子线程的阻塞状态。

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
    System.out.println("命令发出");
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});//异步线程不考虑返回值
completableFuture.get();//异步处理过程没有返回值

文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录