线程池


一. 线程池简介

  • 线程池的概念

    • 线程池就是首先创建一些线程,它们的集合称为线程池。
    • 使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
  • 线程池的工作机制:

    • 在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程。
    • 一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
  • 使用线程池的原因:

    • 当使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。而通过线程池使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务。
    • 多线程运行时,系统不断的启动和关闭新线程,成本非常高,会过渡消耗系统资源,以及过渡切换线程的危险,从而可能导致系统资源的崩溃。这时,线程池就是最好的选择了。

二. 四种常见的线程池

线程池的返回值ExecutorService简介:

ExecutorService是Java提供的用于管理线程池的接口。该接口的两个作用:控制线程数量和重用线程

而4种常用的线程池返回值都是ExecutorService!

2.1 Executors.newCacheThreadPool()

可缓存线程池,先查看池中有没有以前建立的线程,如果有,就直接使用。如果没有,就建一个新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务。

package com.xsh;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author : xsh
 * @create : 2020-03-30 - 20:01
 * @describe:
 */
public class ThreadPoolExecutorTest1 {
    public static void main(String[] args) {
        //创建一个可缓存线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            try {
                //sleep可明显看到使用的是线程池里面以前的线程,没有创建新的线程
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    //打印正在执行的缓存线程信息
                    System.out.println(Thread.currentThread().getName()+"正在被执行");
                }
            });
        }
    }
}

运行结果(同一线程,未创建新线程):

线程池为无限大,当执行当前任务时上一个任务已经完成,会复用执行上一个任务的线程,而不用每次新建线程。

2.2 Executors.newFixedThreadPool(int n)

创建一个可重用固定个数的线程池,以共享的无界队列方式来运行这些线程。

package com.xsh;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author : xsh
 * @create : 2020-03-30 - 20:25
 * @describe:
 */
public class ThreadPoolExecutorTest2 {
    public static void main(String[] args) {
        //创建一个可重用固定个数的线程,重用个数为3
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            fixedThreadPool.execute(new Runnable(){
                public void run() {
                    try {
                        //打印正在执行的缓存线程信息
                        System.out.println(Thread.currentThread().getName()+"正在被执行");
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

运行结果(重复使用指定数量的线程):

因为线程池大小为3,每个任务输出打印结果后sleep 2秒,所以每两秒打印3个结果。 定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()

2.3 Executors.newScheduledThreadPool(int n)

创建一个定长线程池,支持定时及周期性任务执行。

定时执行任务,即指定时间后执行线程内的任务:

package com.xsh;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author : xsh
 * @create : 2020-03-30 - 20:35
 * @describe:
 */
public class ThreadPoolExecutorTest3 {
    public static void main(String[] args) {
        //创建一个定长线程池,支持定时及周期性任务执行——延迟执行
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        //延迟5秒执行
        scheduledThreadPool.schedule(new Runnable() {
            public void run() {
                System.out.println("延迟5秒执行");
            }
        }, 5, TimeUnit.SECONDS);//5秒后执行run( )方法
    }
}

周期性任务,即指定时间间隔后重复执行任务:

package com.xsh;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author : xsh
 * @create : 2020-03-30 - 20:43
 * @describe:
 */
public class ThreadPoolExecutorTest4 {
    public static void main(String[] args) {
        //创建一个定长线程池,支持定时及周期性任务执行——定期执行
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        //延迟1秒后每3秒执行一次
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            public void run() {
                System.out.println("延迟1秒后每3秒执行一次");

            }
        }, 1, 3, TimeUnit.SECONDS);
    }
}

运行结果:

2.4 Executors.newSingleThreadExecutor()

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

package com.xsh;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author : xsh
 * @create : 2020-03-30 - 20:51
 * @describe:
 */
public class ThreadPoolExecutorTest5 {
    public static void main(String[] args) {
        //创建一个单线程化的线程池
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {
                public void run() {
                    try {
                        //结果依次输出,相当于顺序执行各个任务
                        System.out.println(Thread.currentThread().getName()+"正在被执行,打印的值是:"+index);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

运行结果(保证使用一个线程,顺序执行任务):

三. Java中的ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

构造器中各个参数的含义:

  • corePoolSize:核心池的大小。

    • 在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。
    • 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。

    • 默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。
    • 但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue;
    PriorityBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;
    

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;

  • handler:表示当拒绝处理任务时的策略,有以下四种取值:

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService。AbstractExecutorService源码:

public abstract class AbstractExecutorService implements ExecutorService {
 
     
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

ExecutorService接口源码:

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

而ExecutorService又是继承了Executor接口。

Executor接口源码:

public interface Executor {
    void execute(Runnable command);
}

查看以上列举的各类源码,可以得出ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了:

  • Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  • 然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  • 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  • ThreadPoolExecutor继承了类AbstractExecutorService。

而在ThreadPoolExecutor类中有几个非常重要的方法:

  • execute():实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  • submit():在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果

  • shutdown()和shutdownNow()是用来关闭线程池的。

  • 还有getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法。

四. 线程池实现原理

4.1 线程池状态

在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

下面的几个static final变量表示runState可能的几个取值:

  • 当创建线程池后,初始时,线程池处于RUNNING状态;

  • 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

  • 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

  • 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

4.2 ThreadPoolExecutor成员变量

ThreadPoolExecutor类中其他的一些比较重要成员变量:

//任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
//线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
 
private volatile long  keepAliveTime;//线程存货时间   
private volatile boolean allowCoreThreadTimeOut;//是否允许为核心线程设置存活时间
private volatile int   corePoolSize;//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;//线程池最大能容忍的线程数
 
private volatile int   poolSize;//线程池中当前的线程数
 
private volatile RejectedExecutionHandler handler; //任务拒绝策略
 
private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
 
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数
  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

4.3 线程池中的线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

  在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化所有核心线程
public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); 
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))
        ++n;
    return n;
}

任务缓存队列及排队策略

任务缓存队列,即workQueue,它用来存放等待执行的任务。

workQueue的类型为BlockingQueue,通常可以取下面三种类型:

  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

任务拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

线程池的关闭

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池容量的动态调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

完整案例

package com.xsh;

import java.util.concurrent.*;

/**
 * @author : xsh
 * @create : 2020-03-30 - 21:11
 * @describe:
 */
public class ThreadPoolExecutorTest6 {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));

        for(int i=0;i<15;i++){
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                    executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
}

class MyTask implements Runnable {
    private int taskNum;

    public MyTask(int num) {
        this.taskNum = num;
    }

    @Override
    public void run() {
        System.out.println("正在执行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"执行完毕");
    }
}

执行结果:

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

不提倡直接使用ThreadPoolExecutor创建线程池,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

方法实现:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

 从方法的具体实现来看,实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

五. 缓冲队列BlockingQueue和自定义线程池ThreadPoolExecutor

缓冲队列BlockingQueue简介:

​ BlockingQueue是双缓冲队列。BlockingQueue内部使用两条队列,允许两个线程同时向队列一个存储,一个取出操作。在保证并发安全的同时,提高了队列的存取效率。

常用的几种BlockingQueue:

  • ArrayBlockingQueue(int i):规定大小的BlockingQueue,其构造必须指定大小。其所含的对象是FIFO顺序排序的。
  • LinkedBlockingQueue()或者(int i):大小不固定的BlockingQueue,若其构造时指定大小,生成的BlockingQueue有大小限制,不指定大小,其大小有Integer.MAX_VALUE来决定。其所含的对象是FIFO顺序排序的。
  • PriorityBlockingQueue()或者(int i):类似于LinkedBlockingQueue,但是其所含对象的排序不是FIFO,而是依据对象的自然顺序或者构造函数的Comparator决定。
  • SynchronizedQueue():特殊的BlockingQueue,对其的操作必须是放和取交替完成。

自定义线程池(ThreadPoolExecutor和BlockingQueue连用):

自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法来创建线程池。

​ 常见的构造函数:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

自定义实现:

package com.xsh;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author : xsh
 * @create : 2020-03-30 - 21:44
 * @describe:
 */


class TempThread implements Runnable {

    @Override
    public void run() {
        // 打印正在执行的缓存线程信息
        System.out.println(Thread.currentThread().getName() + "正在被执行");
        try {
            // sleep一秒保证3个任务在分别在3个线程上执行
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

public class ThreadPoolExecutorTest7 {
    
    public static void main(String[] args) {
        // 创建数组型缓冲等待队列
        BlockingQueue<Runnable> bq = new ArrayBlockingQueue<Runnable>(10);
        // ThreadPoolExecutor:创建自定义线程池,池中保存的线程数为3,允许最大的线程数为6
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(3, 6, 50, TimeUnit.MILLISECONDS, bq);

        // 创建3个任务
        Runnable t1 = new TempThread();
        Runnable t2 = new TempThread();
        Runnable t3 = new TempThread();
        // Runnable t4 = new TempThread();
        // Runnable t5 = new TempThread();
        // Runnable t6 = new TempThread();

        // 3个任务在分别在3个线程上执行
        tpe.execute(t1);
        tpe.execute(t2);
        tpe.execute(t3);
        // tpe.execute(t4);
        // tpe.execute(t5);
        // tpe.execute(t6);

        // 关闭自定义线程池
        tpe.shutdown();
    }
}

运行结果:

基础知识
源码分析
  • 作者:管理员(联系作者)
  • 发表时间:2020-03-31 00:18
  • 版权声明:自由转载-非商用-非衍生-保持署名(null)
  • undefined
  • 评论