Executor

概念

线程池接口,不能被直接使用,需要实现类

继承结构如下:

Excutor类及其相关子类.png
Excutor类及其相关子类.png

Executor使用示例

  • newCachedThreadPool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
     ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(new Runnable() {
    @Override
    public void run() {
    try {
    System.out.println(Thread.currentThread().getName() + " begin: " + System.currentTimeMillis());
    Thread.sleep(1000);
    System.out.println("end: " + System.currentTimeMillis());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
  • newCachedThreadPool(ThreadFactory threadFactory): 参数ThreadFactory示例,而ThreadFactory是一个接口,因此可以实现该接口,重写newThread方法,实现我们想要的功能

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
     public static void main(String[] args) {
    CustomizeFactory customizeFactory=new CustomizeFactory();
    ExecutorService executorService = Executors.newCachedThreadPool(customizeFactory);
    executorService.execute(new Runnable() {
    @Override
    public void run() {
    try {
    System.out.println(Thread.currentThread().getName() + " begin: " + System.currentTimeMillis());
    Thread.sleep(1000);
    System.out.println("end: " + System.currentTimeMillis());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });

    }

    static class CustomizeFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
    Thread thread = new Thread(r);
    thread.setName("自定义的线程工厂");
    return thread;
    }
    }
  • newFixedThreadPool(int nThreads): 有界的线程池,可以设置线程池中的线程最大数量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
     public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 5; i++) {
    executorService.execute(new Runnable() {
    @Override
    public void run() {
    try {
    System.out.println(Thread.currentThread().getName() + " begin: " + System.currentTimeMillis());
    Thread.sleep(1000);
    System.out.println("end: " + System.currentTimeMillis());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    }
    }

    示例中即使execute了大于三次,实际上也只会使用三个线程

  • newFixedThreadPool(int nThreads, ThreadFactory threadFactory):可以自定义线程工厂,并设置线程池最大个数

  • newSingleThreadExecutor:创建单一线程,实际上是调用ThreadPoolExecutor并指定corePoolSize和maximumPoolSize都为1实现的

ExecutorService

使用示例

  • invokeAny:取得第一个完成任务的结果值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public class ExecutorServiceDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    List callableList = new ArrayList<>();
    callableList.add(new CallableA());
    callableList.add(new CallableB1());

    ExecutorService executorService = Executors.newCachedThreadPool();
    String value = executorService.invokeAny(callableList);
    System.out.println("结果为: "+value);

    }

    static class CallableA implements Callable {

    @Override
    public Object call() throws Exception {
    System.out.println("CallableA " + Math.random());
    return "callableA";
    }
    }

    static class CallableB1 implements Callable {

    @Override
    public Object call() throws Exception {
    System.out.println("CallableB1 " + Math.random());
    return "CallableB1";
    }
    }

    }
  • invokeAny(Collection<? extends Callable> tasks,

    long timeout, TimeUnit unit):在指定时间内取得第一个先执行任务的结果值
  • invokeAll(Collection<? extends Callable> tasks):返回所有任务的执行结果

  • invokeAll(Collection<? extends Callable> tasks,

    long timeout, TimeUnit unit):如果全部任务在指定时间内没有完成,则抛出异常

ThreadPoolExecutor

概念与作用

Executor的可以直接实例化的实现类,用于创建线程池对象

使用

构造方法:

构造方法一:

1
2
3
4
5
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

构造方法二:

1
2
3
4
5
6
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)

构造方法三:

1
2
3
4
5
6
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

构造方法四:

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePooSize:核心线程数最大值
  • maximumPoolSize:线程池最大线程数
  • keepAliveTime:线程数大于corePoolSize时,超过该时间,将会删除,为0时,线程执行完任务会立即在队列中删除
  • unit: 时间单位
  • workQueue:执行前用于保持任务的队列
  • threadFactory:线程工厂
  • handler:处理任务被拒绝执行时的行为,RejectedExecutionHandler是一个接口,如果我们想自定义拒绝处理行为,那么可以类似这样:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
       static class CustomizeExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    System.out.println("被拒绝执行了");
    }
    }

    // 在创建线程池时使用
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 9999L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    threadPoolExecutor.setRejectedExecutionHandler(new CustomizeExecutionHandler());

其他方法

  • shutdown:执行完当前任务后,主线程结束,不在添加新的任务,队列中任务还是会正常执行的
  • shutdownNow:中断所有任务,队列中未执行的线程不再执行,同时会返回未被执行的任务列表,如果在shutdownNow方法后面又execute线程,将会抛出异常
  • isShutdown:判断线程池是否关闭
  • awaitTermination(long timeout, TimeUnit unit):如果有任务正在执行,阻塞线程,在指定时间之后,再去判断线程池是否已经关闭了,否则不阻塞
  • allowCoreThreadTimeOut(boolean value) :设置核心线程是否具有超时效果,超时时间由创建线程池时指定

几个要点说明

  • 1、创建的线程数量小于corePoolSize,那么即使设置了超时时间,该参数也不会起作用
  • 2、创建的线程大于corePoolSize,并且小于等于最大线程数,workQueue使用的时LinkedBlockDeue队列,那么maximumPoolSize和keepAliveTime可以被忽略
  • 3、创建的线程大于corePoolSize,并且小于等于最大线程数,workQueue使用的时SynchronousQueue队列,那么maximumPoolSize和keepAliveTime会起作用

执行流程图

执行流程图
执行流程图