深入理解并发编程-线程池与分支任务流


1. 线程池

1.1 子线程调度

多线程是JAVA中最为核心的部分,并且也提供了合理的多线程应用开发框架,开发者可以很方便的通过主线程创建自己所需要的子线程,每一个子线程都需要等待操作系统的执行调度。

如果一个应用中的子线程数量过多,那么最终的结果就是调度时间加长,线程竞争剧烈,最终整个系统的性能严重下降。

1.2 线程处理架构

为了解决子线程过多所造成的资源损耗的问题,在JDK1.5之后提供了线程池的概念。

利用线程池可以有效的分配物理线程(内核线程)和用户线程之间的资源

在系统内部根据需要创建若干个核心线程数量(CorePool),而后每一个要操作的任务子线程去竞争者若干个核心线程的数量,而当核心线程已被占满的时候,将通过阻塞队列来保存等待执行的子线程,如此就可以保证子线程的有序创建。

注意区分任务线程内核线程之间的关系,任务线程:是被创建出来,但是不一定开始执行里面的任务代码,内核线程:是获得了CPU资源,可以开始执行任务代码的线程。

比如说,电脑是4核8线程的,我的用户现在创建了200个线程(任务线程),最终也是在针对这8个物理线程的资源抢占,这些任务线程彼此之间要进行资源的抢占,会通过各种机制去抢占这8个物理的线程资源,如果说此时的任务线程过多,如果全部都参与到了抢占的操作,那么整个的系统的资源分配就会出现问题。

这时候最佳的做法是采用一个延迟队列的结构,为每个线程分配时间片,利用延迟队列进行资源的分配。

而线程池可以完成的工作就是固定物理线程的数量(一般的配置都是内核线程数量*2,这是由Netty决定的),而后再定义延迟队列的大小,因为延迟队列越大,可以保存的待执行任务越多,但是如果太大了,因为一旦抢占不到资源,就会出现等待时间超长的问题。

1.3 线程池的创建与使用

为了便于线程池的创建管理,提供了一个Executors工具类,开发者利用该类方法创建如下四类线程池

  • 无大小限制的线程池:如果现在执行任务的线程不足了,那么将一直进行新的线程的创建
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
ExecutorService executorService = Executors.newCachedThreadPool();//创建无限大小的线程池
for (int i = 0; i < 10; i++) {
    //创建10个线程池
    executorService.submit(()->{
        System.out.println("111");
    });
}

所谓无限大小的线程池指的是如果发现线程池之中的线程数量不足了,那么就会自动创建一个新的线程,而后将这个新创建的线程保存在线程池之中。由于这种操作会无限制的增长,如果超过了其允许的线程个数,那么也会出现性能的瓶颈。这种操作只是提供了一个线程的统一管理。

  • 创建固定大小的线程池:线程池的容量是固定的,如果达到了指定的容量上限,那么任务线程将进入到阻塞队列之中进行保存,当有了空余的核心线程后才会进行具体的执行调度
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);//创建无限大小的线程池 16
for (int i = 0; i < 100; i++) {
    //创建10个线程
    executorService.submit(()->{
        System.out.println("111");
    });
}

由于当前本机的物理线程的数量为16,因此最终可以开辟的内核线程线程池的数量为32,所有的线程任务都会去抢占这32个线程的资源,哪一个线程任务抢占到了此资源就可以执行当前的线程处理操作。

  • 单线程池:不管有多少个用户线程,该线程池中只提供有一个可用线程
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

单线程池有什么作用?能否使用一个线程代替?

如果在没有任何突发情况下,单个线程的确可以代替单线程池,但是之所以去使用单线程池的本质上来讲是有一个前提:线程池可以自动维护单线程,如果你的线程池之中的单线程已经由于某些原因导致中断了,线程池会自动的进行一个单线程的恢复,这就是最大的区别。

以上三种线程池都有一个特点,所有创建的线程池的类型都使用ExecutorService接口实例进行描述

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
}
  • 定时调度池:实现线程定时任务的处理执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

这种调度的处理操作需要设置一个调度的任务线程,而这个任务的线程可以使用Runnable或者Callable来实现,最重要的一点是所有的调度任务可以进行延迟的配置,并且设置循环调用的间隔

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(6);
for (int i = 0; i < 10; i++) {
    scheduledExecutorService.schedule(()->{
        System.out.println("111");
    },5, TimeUnit.SECONDS);//5秒后开始执行此任务
}

循环调度(间隔调度)模式

public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(6);
    for (int i = 0; i < 10; i++) {
        scheduledExecutorService.scheduleAtFixedRate(()->{
            System.out.println("111");
        },5, 2,TimeUnit.SECONDS);//初始延迟5秒,每两秒做一次调度
    }
}

Callable实现线程池的调度

ExecutorService executorService = Executors.newFixedThreadPool(2);//创建两个线程
Set<Callable<String>> allThreads = new HashSet<>();//线程的绑定
for (int i = 0; i < 10; i++) {
    final int temp = i;
    allThreads.add(()->{
        return String.format("%s\n%s",Thread.currentThread().getName(),String.valueOf(temp));
    });
}
List<Future<String>> futures = executorService.invokeAll(allThreads);
for (Future<String> future : futures) {
    System.out.println(future.get());
}

以上操作实现了传统多线程与线程池之间的关联,需要注意的是在整个线程池之中,所有的用户创建的线程任务(Runnable或者Callable实现类)都要在线程池之中竞争内核线程资源,分为任务和执行两部分。

执行线程的数量会由线程池自行维护,如果发现线程消失了,则会自动进行创建。

2. CompletionService

在线程池的开发处理中,如果使用了Callable接口则需要进行异步任务结果的接收

为了便于异步数据的返回,在J.U.C中提供了一个CompletionService操作接口,

该接口可以将所有异步任务的执行结果保存到阻塞队列之中,而后再利用阻塞队列实现结果的获取

异步任务调度

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    //Future接收异步返回结果
}
public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture<V>(f, completionQueue));
    return f;
}
ExecutorService service = Executors.newFixedThreadPool(2);
//线程池由CompletionService来管理,所有的线程任务交由此接口进行操作
CompletionService<String> completionService = new ExecutorCompletionService<String>(service);
for (int i = 0; i < 10; i++) {
    completionService.submit(new TaskItem());
}
for (int i = 0; i < 10; i++) {
    System.out.println(completionService.take().get());
}
service.shutdown();

此时的线程池大小为2,所以每一次只有两个线程可以被调度,而后被调度执行完成的线程会自动地保存在阻塞队列中

3. ThreadPoolExcutor

开发之中必定会需要有线程池的创建,但是所有的线程池可以看见的操作都是基于Executors来进行实现的。

创建定长线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

创建单线程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

3.1 ThreadPoolExector构造方法(七大参数)

七大参数

public ThreadPoolExecutor(int corePoolSize,	  //内核线程数量
                          int maximumPoolSize,//线程池中的最大的大小
                          long keepAliveTime, //线程池存活时间
                          TimeUnit unit,	  //时间单元(这两个可以合并成一个来记忆其实)
                          BlockingQueue<Runnable> workQueue//线程工作队列(核心)
                          ThreadFactory threadFactory,//线程工厂,统一线程创建逻辑 
                          RejectedExecutionHandler handler//线程拒绝策略
                         )

3.2 四大拒绝策略

在创建ThreadPoolExecutor类对象实例的时候,需要通过RejectedExecutionHandler接口配置拒绝策略,而所谓的拒绝策略指的就是在线程池以及任务队列被占满的时候对于新任务的处理形式,在ThreadPoolExecutor定义时有四个内置拒绝策略

  • AbortPolicy(默认拒绝策略):当线程添加到线程池之中被拒绝的时候,如果被拒绝了就会抛出一个导致执行中断的异常。
  • CallerRunsPolicy:当任务被拒绝的时候将会使用调用者的线程进行任务处理,调度这个线程的线程去执行任务。
  • DiscardPolicy:当线程任务被拒绝的时候,直接丢弃此任务
  • DiscardOldestPolicy:当线程任务被拒绝的时候,线程池会自动放弃等待队列之中等待时间最长的任务,并且将被拒绝的任务添加到阻塞队列中

而在之前Executor中创建出来的线程池之所以察觉不到拒绝策略的原因是其内部使用的是LinkedBlockingQueue,任务队列不会被占满,默认构造方法中,构造的是2^31-1个任务,如果已经存储了这么多个任务的时候,大概率就产生内存溢出的异常,轮不到这个策略抛出的异常了。当然这并不是说不会发生了,只是在一般情况下,一般机器不会发生。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public static void main(String[] args) throws Exception { 				BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1); // 设置阻塞队列
	// 手工创建线程池,该线程池的大小为2,如果线程池已满则使用当前线程处理新的任务
	ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, queue,Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
	for (int x = 0; x < 5; x++) {
		executor.submit(() -> {
			try {
				System.out.println("【" + Thread.currentThread().getName() + "】处理YOOTK任务");
				TimeUnit.SECONDS.sleep(3); 	// 增加任务执行时间
			} catch (InterruptedException e) {}
		});
	}
	executor.shutdown();			// 关闭线程池
}

此时线程的任务是由主方法分配的,如果线程任务被线程池拒绝了,那么就会被交给主线程来处理。

4. ForkJoinPool

4.1 分支任务简介

ForkJoin(分支任务) 可以将一个复杂的业务计算进行拆分,交由多个CPU进行并行计算,这样就可以提供程序的执行性能。

  • 分解(fork):将一个大型任务拆分为若干个小任务在框架中执行
  • 合并(Join):主任务等待多个子任务执行完毕后进行结果的合并。

4.2 分支任务与工作窃取

从JDK1.7开始提供了ForkJoinPool的任务框架,并且其在已有的线程池的概念的基础上进行了扩展,同时考虑到服务的处理性能,引入了工作窃取Work Stealing的机制。

这样可以在进行线程分配的同时自动分配与之数量相等的任务队列,所有新加入的任务会被平均的分配到对应的任务队列之中。

不同的线程处理各自的任务队列,当某一个线程的任务队列已经提前完成的时候,会从其他线程的队列尾部窃取未完成的任务。

这个图是任务的初始平均分配

这个图是任务的窃取

从原理上来看,其实ForkJoinPool实际上是线程池的一种变种的应用,是基于线程池的一种分支计算的应用。

为了实现分支任务线程池的功能,在JUC中提供了一个ForkJoinPool工具类,这个类是ExecutorService线程池操作类的子类,并在ForkJoinPool类中自动提供有一个WorkQueue内部类以实现所有工作队列的维护。

在分支任务中会存在有多个工作线程,而每一个工作线程全部由ForkJoinWorkerThreadFactory接口进行创建规范化管理,是一个实现子类。

程序可以通过newThread()方法来创建线程对象,同时在每一个工作线程对象中都会保存有一个WorkQueue对象的引用。也就是不同工作线程维护各自的任务队列。

5. ForkJoinTask

同时需要注意的是,所有的具体任务的配置都是由ForkJoinTask抽象类来定义的。同时其内部会直接提供有完整的ForkJoinWorkerThreadFactory工厂接口实例。

任务配置主要有三种

  • RecursiveTask:有返回值的任务
  • RecursiveAction:没有返回值的任务
  • CountedCompleter:数量计算有关的任务,在子任务停顿或者阻塞的情况下使用
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {}
//实现了Future<V>异步处理返回调用

5.1 RecursiveTask

所有的分支任务的处理执行都需要有具体的任务的处理类。

每一个分支任务在执行时可以直接将分支计算的结果进行返回,这时就需要通过RecursiveTask继承实现。

该类中提供有一个compute()计算方法,在每次分支处理时都会递归调用此方法实现计算,下面将基于分支计算的处理形式实现一个数据累加的操作。

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;
    V result;
    protected abstract V compute();//最终的结果通过此方法进行返回

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    protected final boolean exec() {
        result = compute();
        return true;
    }

}
  • 第一个任务:计算1~50
    • 第一个子分支:1~25
    • 第二个子分支:25~50
  • 第二个任务:计算51~100
    • 第一个子分支:51~76
    • 第二个子分支:76-100
package threadDemo;
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Integer> {//实现数据的累加计算
    private static final int THRESHOLD = 25;//根据分支阈值进行运算
    private int start;//开始计算的数值
    private int end;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        //所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
        int sum =0;//保存最终的结果
        boolean isFork = (end - start) <= THRESHOLD;//是否需要进行分支
        if(isFork){//开启分支计算
            for(int i = start;i<=end;i++){
                sum += i;
            }
            System.out.println("start"+start+":"+sum+Thread.currentThread().getName());
        }else{//否则就是合并处理
            int mid = (start+end)/2;
            SumTask leftTask = new SumTask(this.start,mid);
            SumTask rightTask = new SumTask(mid+1,this.end);
            leftTask.fork();//开启分支
            rightTask.fork();//开启右分支
            sum = leftTask.join()+rightTask.join();
        }
        return sum;
    }
}

所有的分支任务的内部本质上包裹的还是线程池,如果要进行分支过多的创建,最终导致的结果就是线程资源的耗尽,所以为了保护电脑硬件不透支,使用的还是内置的CPU内核数量进行的线程池配置。

5.2 RecursiveAction

RecursiveTask的特点是执行分支任务后可以返回分支计算后的结果。但是很多时候的分支任务是不需要进行计算结果的返回的,只是需要执行某些操作。

public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;
    protected abstract void compute();
    public final Void getRawResult() { return null; }
    protected final void setRawResult(Void mustBeNull) { }
    protected final boolean exec() {
        compute();
        return true;
    }
}
public class CountSave{
    private Lock lock = new ReentrantLock();
    private int sum = 0;
    public CountSave(){
        this.sum = 0;
    }
    public void add(int num){
        this.lock.lock();
        try {
            this.sum += num;
        }finally {
            this.lock.unlock();
        }
    }
    public int getSum(){
        return this.sum;
    }

}
public class SumTask extends RecursiveAction {//实现数据的累加计算
    private static final int THRESHOLD = 25;//根据分支阈值进行运算
    private int start;//开始计算的数值
    private int end;
    private CountSave countSave;

    public SumTask(int start, int end,CountSave countSave) {
        this.start = start;
        this.end = end;
        this.countSave = countSave;//保存累加处理
    }

    @Override
    protected void compute() {
        //所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
        int sum =0;//保存最终的结果
        boolean isFork = (end - start) <= THRESHOLD;//是否需要进行分支
        if(isFork){//开启分支计算
            for(int i = start;i<=end;i++){
                sum += i;
            }
            this.countSave.add(sum);//保存累加结果
            System.out.println("start"+start+":"+sum+Thread.currentThread().getName());
        }else{//否则就是合并处理
            int mid = (start+end)/2;
            SumTask leftTask = new SumTask(this.start,mid,this.countSave);
            SumTask rightTask = new SumTask(mid+1,this.end,this.countSave);
            leftTask.fork();//开启分支
            rightTask.fork();//开启右分支
        }
    }
}

5.3 CountedCompleter分支任务

为了更好的解决分支任务阻塞的操作能力,因此对ForkJoinTask扩充了一个新的CountedCOmpleter子类,该类的基本实现与之前的任务结构相同,唯一的区别是该类中可以挂起指定的任务数量,同时在结束的时候也可以基于挂起的任务数量来实现任务完成状态的判断。

public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
    public abstract void compute();
}

使用CountedCompleter进行开发操作的时候,有一个执行任务的回调处理机制,只要分支处理完成,就可以触发这个回调的操作。可以触发一些分支的后续的线程的释放处理能力

	@Override
	public void compute() {
		int sum = 0;
		boolean isFork = (end - start) <= THRESHOLD; 	// 分支判断
		if (isFork) {			// 不开启新的分支
			for (int i = start; i <= end; i++) {	// 循环处理
				sum += i; 	// 数据累加计算
			}
			this.result.addAndGet(sum); 	// 保存累加结果
			super.tryComplete();		// onCompletion()钩子触发,去执行onCompletion()
		} else {			// 分支开启
			int middle = (start + end) / 2; 	// 计算中间值
			SumTask left = new SumTask(this.result, this.start, middle); // 分支任务
			SumTask right = new SumTask(this.result, middle + 1, this.end); // 分支任务
			left.fork();		// 任务执行
			right.fork();		// 任务执行
		}
	}
	@Override
	public void onCompletion(CountedCompleter<?> caller) { 		// 回调处理
		System.out.println("【" + Thread.currentThread().getName() + "】start = " + this.start + "、end = " + this.end); // 输出分支信息
	}
}
public class YootkDemo { 	// 李兴华高薪就业编程训练营
	public static void main(String[] args) throws Exception { 	// 沐言科技:www.yootk.com
		AtomicInteger result = new AtomicInteger(0); // 保存计算结果
		SumTask task = new SumTask(result, 0, 100); // 分支任务
		task.addToPendingCount(1); 	// 设置挂起线程数量
		ForkJoinPool pool = new ForkJoinPool();	// 定义分支任务池
		pool.submit(task); 		// 提交任务
		while (task.getPendingCount() != 0) { 		// 任务未执行完
			TimeUnit.MILLISECONDS.sleep(100); 		// 等待任务执行完毕
			if (result.get() != 0) { 			// 获取计算结果
				System.out.println("计算结果:" + result.get());/ 获取计算结果
				break;
			}
		}
	}

6. ForkJoinPool.ManagerBlocking

6.1 分支任务阻塞

使用分支业务可以充分的发挥出电脑的硬件处理性能,然而在进行分支处理的时候,有可能所处理的业务会造成阻塞的情况出现。假设现在只设置有2个核心线程,但是却产生了6个分支,这样一来只能有2个线程执行,而其他的任务则必须进行工作线程资源的等待,从而出现严重的性能问题。

为了解决这种情况下的分支性能处理问题,在ForkJoinPool中提供了ManagerBlocker阻塞管理接口,开发者可以利用此接口明确告诉ForkJoinPool可能产生阻塞的操作,而后会依据ManageBlocker接口所提供的方法来判断当前线程池的运行情况,如此发现此时线程池资源已经耗尽,但是还有未执行的任务的时候,就会自动的在线程池中进行核心线程的补充,从而实现分支快速处理的需求。

static class SumHandleManagedBlocker implements ForkJoinPool.ManagedBlocker {
		private Integer result; 		// 结果保存
		private int start; 		// 累加开始值
		private int end; 			// 累加结束值
		private Lock lock; 			// 独占锁
		public SumHandleManagedBlocker(Lock lock, int start, int end) {
			this.start = start; 	// 属性赋值
			this.end = end; 	// 属性赋值
			this.lock = lock; 	// 属性赋值
		}
		@Override
		public boolean block() throws InterruptedException {//处理延迟任务
			int sum = 0;
			this.lock.lock();	// 同步锁
			try {
				for (int i = start; i <= end; i++) { 		// 循环处理
					TimeUnit.MILLISECONDS.sleep(100); 	// 每次休眠10毫秒
					sum += i; 		// 数据累加
				}
			} finally {
				this.result = sum; 		// 保存计算结果
				this.lock.unlock();		// 解除同步锁
			}
			System.out.println("【" + Thread.currentThread().getName() +  "】处理数据累加业务,start = " + this.start + "、end = “ + this.end + "、sum = " + sum); // 输出分支信息
			return result != null; 	// 返回true继续保持阻塞状态
		}
		@Override
		public boolean isReleasable() { 	// 返回false会创建补偿线程
			return this.result != null; 	// 阻塞解除判断
		}
	}
}

7. Phaser

  • Phaser是在JDK1.7之后引入的一个同步处理工具类,主要用于分阶段的任务处理上,可以理解为CountDownLatchCyclicBarrier的功能集合,同时又支持有良好的分层计算(分支计算处理)能力。

  • 在Phaser中是根据阶段(phase)的概念来进行处理的,所有的阶段需要达到指定的参与者线程之后才可以进行阶段的进阶处理,现在假设定义了两个参与者,则其进阶过程是在两个参与者线程全部达到之后进行的。

Phaser phaser = new Phaser(2);//定义两个任务
System.out.println(phaser.getPhase());//初始阶段的配置项
for (int i = 0; i < 2; i++) {
    //循环创建线程
    new Thread(()->{
        System.out.println(Thread.currentThread().getName());
        phaser.arriveAndAwaitAdvance();//就位处理
    }).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(phaser.getPhase());

所有的执行阶段的控制,全都是由Phaser类来完成的,每当触发了任务的执行,就表示阶段的增加

Phaser phaser = new Phaser(2);//定义两个任务
System.out.println(phaser.getPhase());//初始阶段的配置项
for (int i = 0; i < 2; i++) {
    //循环创建线程
    new Thread(()->{
        System.out.println(Thread.currentThread().getName());
        phaser.arrive();
    }).start();
}
phaser.awaitAdvance(phaser.getPhase());
System.out.println(phaser.getPhase());
int repeat = 2;//执行的轮数的配置
Phaser phaser = new Phaser(){
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {//回调处理
        System.out.println("on advance");
        return phase +1 >= repeat || registeredParties == 0;
    }
};
//任务量到了就执行这个方法
for (int i = 0; i < 2; i++) {
    phaser.register();
    new Thread(()->{//每个线程都在持续执行
        while (!phaser.isTerminated()){
            phaser.arriveAndAwaitAdvance();//等待其他线程就位
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1111"+Thread.currentThread().getName());
        }
    }).start();
}

Phaser是一种更高级的多任务的处理,因为在整个的Phaser内部维持了一个链表。可以考虑通过链表的形式设置分层的Phaser处理,就相当于维持了一棵树的形式。

class Tasker implements Runnable {
	private final Phaser phaser; 			// 保存当前的Phaser对象
	public Tasker(Phaser phaser) {
		this.phaser = phaser; 		// 属性赋值
		this.phaser.register();			// 注册参与者线程
	}
	@Override
	public void run() {
		while (!phaser.isTerminated()) { 		// 未终止则持续执行
			this.phaser.arriveAndAwaitAdvance();			// 等待其它参与者线程
			System.out.println("【" + Thread.currentThread().getName() + "】YOOTK业务处理。");
		}
	}
}
public class YootkDemo { 		// 李兴华高薪就业编程训练营
	private static final int THRESHOLD = 2; 			// 每个Phaser对应的任务数
	public static void main(String[] args) throws Exception { 	// 沐言科技:www.yootk.com
		int repeat = 2; 				// 重复周期
		Phaser phaser = new Phaser() {
			protected boolean onAdvance(int phase, int registeredParties) { // 进阶处理
				System.out.println("【onAdvance()处理】进阶处理操作。phase = " + phase + 
					"、registeredParties = " + registeredParties);
				return phase + 1 >= repeat || registeredParties == 0; // 终止控制
			};
		};
		Tasker[] taskers = new Tasker[5]; 	// 子线程数组
		build(taskers, 0, taskers.length, phaser); 		// 配置Phaser层级
		for (int i = 0; i < taskers.length; i++) { 		// 执行任务
			new Thread(taskers[i], "子线程 - " + i).start();	// 线程启动
		}
	}
	private static void build(Tasker[] taskers, int low,  int high, Phaser parent) { 	// 构建Phaser层级
		if (high - low > THRESHOLD) { 	// 超过阈值
			for (int x = low; x < high; x += THRESHOLD) {		// 部分数组循环
				int limit = Math.min(x + THRESHOLD, high); 	// 获取最小值
				build(taskers, x, limit, new Phaser(parent)); 	// 设置层级关系
			}
		} else { 	// 任务配置
			for (int x = low; x < high; ++x) { 		// 循环创建任务
				taskers[x] = new Tasker(parent); 		// 实例化任务对象
			}
		}
	}
}

8. 响应式编程

8.1 响应式编程简介

随着互联网技术的不断发展,用户需要更快的响应处理速度与稳定可靠的运行环境,即使在出现了错误之后也可以优雅的面对失败。

同时要求构建的系统具有灵活、松散耦合以及可伸缩性的特点。所以在这一的背景下,响应式编程(Reactive Programming)开始应用于项目开发之中。

在响应式编程之中主要实现了一种基于数据流Data Stream和变化传递Propagation of Change的声明式Declarative的编程范式

8.2 响应式数据流模式

在响应式编程中需要基于响应式数据流(Reactive Steams,异步非阻塞式数据流传输)实现数据的传输,在该数据流的模型之中需要提供有一个发布者(Publisher)和一个订阅者(Subscriber),而发布者和订阅者之间可以实现数据流的直接传输,也可以通过处理器Processor实现两者之间的数据流处理。在整个的处理过程中,Processor既可以作为订阅者也可以作为发布者。

8.3 响应式编程中的变化传递

在传统的命令式编程的开发模型中,一个经典案例是实现两个变量内容的计算,常用的命令格式为sum = a+b,这样就会根据变量a和变量b的内容一次性得到sum的结果。而在计算完成后如果变量a的内容进行了修改则也不会影响到最终的sum结果。

但是在响应式编程的情况下,即便已经得到了sum的计算结果,而在变量a发生了改变之后,也会影响到sum的内容,这一点就称为变化传递

8.4 SubmissionPublier

在响应流的开发中最为重要的就是数据发布者,Publisher只是定义了发布者的核心操作方法,而具体的数据发布处理操作都是由SubmissionPublier类来实现的

public static interface Publisher<T> {
       public void subscribe(Subscriber<? super T> subscriber);
   }
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable {}

数据传输缓冲

/** The largest possible power of two array size. */
static final int BUFFER_CAPACITY_LIMIT = 1 << 30;

static final int INITIAL_CAPACITY = 32;

消费处理

public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
    if (consumer == null)
        throw new NullPointerException();
    CompletableFuture<Void> status = new CompletableFuture<>();//异步处理
    subscribe(new ConsumerSubscriber<T>(status, consumer));//自动找到一个consumer的订阅者
    return status;
}

而后在当前进行消费处理的时候就可以发现其内部提供有一个ConsumerSubScriber类型

static final class ConsumerSubscriber<T> implements Subscriber<T> {}

响应式的数据发布

List<String> data  = List.of("111","111");
SubmissionPublisher publisher = new SubmissionPublisher<>();
CompletableFuture consume = publisher.consume(System.out::println);
data.forEach((s)->{
    publisher.submit(s);//发布数据
});
publisher.close();
if(consume!=null){
    try {
        consume.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

发布者与订阅者之间是存在有缓冲区的,但是如果说此时发布者和订阅者采用了同频的方式进行数据的处理,一般来说缓冲区的空间是足够使用的。但是如果发布者的数据流处理要远远高于订阅者,这个时候缓冲区就很快被占满,一种理想的情况是建立一个无界的缓冲区

这个问题的解决方案称为背压策略(BackPressure)。在该策略下,订阅者会主动告知发布者减慢发送数据的速率,以便发布者准保号足够大的缓冲空间后再进行元素的处理。

8.5 Reactive编程模型

基于发布者与订阅者的响应式通讯模型

public class Book {
    private String name;
    private Integer price;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getPrice() {
        return price;
    }

    public void setPrice(Integer price) {
        this.price = price;
    }

    public Book(String name, Integer price) {
        this.name = name;
        this.price = price;
    }
    static class BookCreator{//创造者模式
        public static List<Book> getBooks(){
            ArrayList<Book> books = new ArrayList<>();
            books.add(new Book("1",1));
            books.add(new Book("1",1));
            books.add(new Book("1",1));
            books.add(new Book("1",1));
            books.add(new Book("1",1));
            books.add(new Book("1",1));
            return books;
        }
    }
}

订阅者

public class BookSubscribe implements Flow.Subscriber<Book>{
    private Integer count = 0;
    private Flow.Subscription subscription;//订阅控制
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        //由于不确定后面会同时返回多少个数据,此时的目的是触发数据的接受
        this.subscription.request(1);//从发布者之中获取一个数据项
    }

    @Override
    public void onNext(Book item) {//这个操作方法不是一次性完成的
        System.out.println(item);
        this.count++;
        this.subscription.request(1);//再次接受
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println(throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("处理完成");
    }
}

数据发布处理

public static void main(String[] args) {
    SubmissionPublisher<Book> publisher = new SubmissionPublisher<>();
    BookSubscribe subscribe = new BookSubscribe();
    publisher.subscribe(subscribe);
    List<Book> books = Book.BookCreator.getBooks();//获取创建的集合
    books.stream().forEach((book)->{
        publisher.submit(book);
    });
    while (books.size() != subscribe.getCount()){
        try {
            //数据量没有消费完成
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    publisher.close();
}

8.6 Processor数据转换处理

除了使用发布者与订阅者的直连模式之外,还可以在两者之间引入一个Processor转换处理器,利用转换处理器可以接收发布者发送的数据,随后将该数据转换处理后再发送给订阅者。

转换器同时实现了发布者与订阅者的操作

Flow.Processor接口并没有定义任何的抽象方法,该接口同时继承了Flow.PublisherFlow.Subscriber两个父接口。

所以该接口中同时拥有发布者与订阅者的方法标准,而为了简化发布者的定义,可以创建一个MessageProcessor处理类,该类除了实现Processor父接口之外,还可以继承SubmissionPublisher父类(Publisher接口子类),这样就可以直接利用已有类的方法实现发布者的功能

// Processor<T,R>同时实现了Subscriber<T>与Publisher<R>父接口
class MessageProcessor extends SubmissionPublisher<Message> 
		implements Processor<Book, Message> { // 数据转换器
	private Subscription subscription; 			// 订阅者处理
	private Function<Book, Message> function; 			// 转换处理接口
	public MessageProcessor(Function<Book, Message> function) {
		this.function = function;
	}
	@Override
	public void onSubscribe(Subscription subscription) { 		// 订阅开启
		this.subscription = subscription;
		subscription.request(1); 				// 抓取数据
	}
	@Override
	public void onNext(Book item) { 				// 数据处理
		super.submit(this.function.apply(item)); 			// 数据转换后发出
		this.subscription.request(1); 				// 抓取数据
	}
	@Override
	public void onError(Throwable throwable) { 			// 错误处理
		throwable.printStackTrace();
	}
	@Override
	public void onComplete() {} 					// 操作完成
}
SubmissionPublisher<Book> publisher = new SubmissionPublisher<>(); // 创建发布者
MessageProcessor processor = new MessageProcessor(item -> {
    return new Message(item.toString(), "李兴华"); 		// Book转为Message
}); 					// 创建Processor处理器
MessageSubscriber sub = new MessageSubscriber();		// 订阅者
publisher.subscribe(processor); 			// 配置订阅者
processor.subscribe(sub); 				// 配置发布者
List<Book> books = Book.BookDataCreator.getBooks();		// 获取Book集合
books.stream().forEach(book -> publisher.submit(book)); 	// 数据迭代
while (books.size() != sub.getCounter()) { 			// 结束判断
    TimeUnit.SECONDS.sleep(1); 		// 主线程等待
}
publisher.close();

文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录