# 线程池

loading

# 一、为什么要用线程池

Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1T3 时间的技术,从而提高服务器程序性能的。它把 T1T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有 T1T3 的开销了。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

下面演示我们自己如何实现一个线程池:

public class MyThreadPool {

    /**
     * 缺省线程数量
     */
    private static int WORK_COUNT = 5;

    /**
     * 缺省任务数量
     */
    private static int TASK_COUNT = 100;

    /**
     * 存放任务
     */
    private final BlockingQueue<Runnable> taskQueue;

    /**
     * 工作线程
     */
    private WorkThread[] workThreads;

    /**
     * 实际线程数量
     */
    private final int workNumber;

    public MyThreadPool() {
        this(TASK_COUNT, WORK_COUNT);
    }

    /**
     * 线程池构造方法
     *
     * @param taskCount  任务数量
     * @param workNumber 线程数量
     */
    public MyThreadPool(int taskCount, int workNumber) {
        if (taskCount <= 0) {
            taskCount = TASK_COUNT;
        }
        if (workNumber <= 0) {
            workNumber = WORK_COUNT;
        }
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        this.workNumber = workNumber;
        workThreads = new WorkThread[workNumber];

        // 准备好工作线程
        for (int i = 0; i < workNumber; i++) {
            workThreads[i] = new WorkThread();
            workThreads[i].start();
        }
    }

    /**
     * 放入任务,加入队列
     *
     * @param task
     */
    public void execute(Runnable task) {
        try {
            taskQueue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 销毁线程
     */
    public void destroy() {
        System.out.println("destroy");
        for (int i = 0; i < workNumber; i++) {
            workThreads[i].stopWork();
            workThreads[i] = null;
        }
        taskQueue.clear();
    }

    /**
     * 内部类,工作线程的实现
     */
    private class WorkThread extends Thread {

        @Override
        public void run() {
            Runnable r;
            try {
                while (!isInterrupted()) {
                    r = taskQueue.take();
                    System.out.println(getId() + "(" + ((TestMyThreadPool.MyTask) r).getName() + ") is running ... ");
                    r.run();
                    // 方便 gc
                    r = null;
                }
            } catch (Exception ignored) {
            }
        }

        /**
         * 终止工作
         */
        public void stopWork() {
            interrupt();
        }
    }

    @Override
    public String toString() {
        return "MyThreadPool{workNumber: " + workNumber + ", wait task number: " + taskQueue.size() + "}";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

自定义的线程池中有一个存放任务的阻塞队列 taskQueue,里面存放所有需要执行的任务 Runnable,默认初始化大小是100。workThreads 是用来存放执行任务的线程数组,默认初始化的大小是5,workNumber 存放实际的线程数量。WorkThread 工作线程通过 taskQueue.take() 阻塞方法从阻塞队列中获取任务,如果能获取到任务就执行,获取不到任务时,除非线程被中断,否则一直阻塞。

测试方法:

public class TestMyThreadPool {

    public static void main(String[] args) throws InterruptedException {
        // 创建3个线程的线程池
        MyThreadPool threadPool = new MyThreadPool(0, 3);
        threadPool.execute(new MyTask("testA"));
        threadPool.execute(new MyTask("testB"));
        threadPool.execute(new MyTask("testC"));
        threadPool.execute(new MyTask("testD"));
        threadPool.execute(new MyTask("testE"));
        System.out.println(threadPool);
        Thread.sleep(10000);
        // 所有线程都执行完成才 destroy()
        threadPool.destroy();
        System.out.println(threadPool);
    }

    /**
     * 任务类
     */
    static class MyTask implements Runnable {

        private String name;
        private Random r = new Random();

        public MyTask(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(r.nextInt(1000) + 2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId() + " has InterruptedException, isInterrupted() =  "
                        + Thread.currentThread().isInterrupted());
            }
            System.out.println("任务 " + name + " 完成");
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

执行结果:

MyThreadPool{workNumber: 3, wait task number: 4}
13(testB) is running ... 
14(testC) is running ... 
12(testA) is running ... 
任务 testC 完成
14(testD) is running ... 
任务 testA 完成
12(testE) is running ... 
任务 testB 完成
任务 testD 完成
任务 testE 完成
destroy
MyThreadPool{workNumber: 3, wait task number: 0}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 二、ThreadPoolExecutor 的类关系

大致明白如何自己实现一个线程池,那么接下来看看 jdk 中和线程池有关的类。

ThreadPoolExecutor类关系

Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。

ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()submit() 的扩展,可以说是真正的线程池接口。

AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法。

ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。

ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期执行"的功能。

ScheduledThreadPoolExecutor 是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutorTimer 更灵活,功能更强大。

# 三、线程池的创建各个参数含义

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
1
2
3
4
5
6
7

# 1、corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize

如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

如果执行了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有核心线程。

# 2、maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize

# 3、keepAliveTime

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于 corePoolSize 时才有用。

TimeUnitkeepAliveTime 的时间单位。

workQueue 必须是 BlockingQueue 阻塞队列。当线程池中的线程数超过它的 corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能。

# 4、workQueue

用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  2. 由于1,使用无界队列时 maximumPoolSize 将是一个无效参数。
  3. 由于1和2,使用无界队列时 keepAliveTime 将是一个无效参数。
  4. 更重要的,使用无界队列可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。

所以一般会使用,ArrayBlockingQueueLinkedBlockingQueueSynchronousQueuePriorityBlockingQueue

# 5、threadFactory

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。

# 6、RejectedExecutionHandler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

  1. AbortPolicy:直接抛出异常,默认策略;
  2. CallerRunsPolicy:用调用者所在的线程来执行任务;
  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  4. DiscardPolicy:直接丢弃任务。

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

线程池的使用:

public class UseThreadPool {

    /**
     * 没有返回值
     */
    private static class Worker implements Runnable {
        private String taskName;
        private Random r = new Random();

        public Worker(String taskName) {
            this.taskName = taskName;
        }

        public String getName() {
            return taskName;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " process the task : " + taskName);
            SleepTool.ms(r.nextInt(100) * 5);
        }
    }

    /**
     * 有返回值
     */
    private static class CallWorker implements Callable<String> {

        private String taskName;
        private Random r = new Random();

        public CallWorker(String taskName) {
            this.taskName = taskName;
        }

        public String getName() {
            return taskName;
        }

        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + " process the task : " + taskName);
            return Thread.currentThread().getName() + ":" + r.nextInt(100) * 5;
        }

    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = new ThreadPoolExecutor(2, 4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 0; i <= 6; i++) {
            Worker worker = new Worker("worker " + i);
            System.out.println("A new worker has been added : " + worker.getName());
            threadPool.execute(worker);
        }

        for (int i = 0; i <= 6; i++) {
            CallWorker callWorker = new CallWorker("CallWorker " + i);
            System.out.println("A new callWorker has been added : " + callWorker.getName());
            Future<String> result = threadPool.submit(callWorker);
            System.out.println(result.get());
        }
        // 只终止空闲的线程
        // threadPool.shutdown();
        // 终止全部线程
        threadPool.shutdownNow();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

执行结果:

A new worker has been added : worker 0
A new worker has been added : worker 1
A new worker has been added : worker 2
A new worker has been added : worker 3
A new worker has been added : worker 4
A new worker has been added : worker 5
A new worker has been added : worker 6
A new callWorker has been added : CallWorker 0
pool-1-thread-1 process the task : worker 0
pool-1-thread-2 process the task : worker 1
pool-1-thread-1 process the task : worker 2
pool-1-thread-1 process the task : worker 3
pool-1-thread-2 process the task : worker 4
pool-1-thread-1 process the task : worker 5
pool-1-thread-2 process the task : worker 6
pool-1-thread-2 process the task : CallWorker 0
pool-1-thread-2:125
A new callWorker has been added : CallWorker 1
pool-1-thread-2 process the task : CallWorker 1
pool-1-thread-2:155
A new callWorker has been added : CallWorker 2
pool-1-thread-2 process the task : CallWorker 2
pool-1-thread-2:410
A new callWorker has been added : CallWorker 3
pool-1-thread-2 process the task : CallWorker 3
pool-1-thread-2:235
A new callWorker has been added : CallWorker 4
pool-1-thread-2 process the task : CallWorker 4
pool-1-thread-2:65
A new callWorker has been added : CallWorker 5
pool-1-thread-2 process the task : CallWorker 5
pool-1-thread-2:215
A new callWorker has been added : CallWorker 6
pool-1-thread-2 process the task : CallWorker 6
pool-1-thread-2:5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 四、扩展线程池

# 1、自定义线程池中线程的创建方式

如何我们想自定义线程池中线程的创建方式,比如将线程设置为守护线程,该怎么做呢?

public class ThreadPoolAdv {

    private static class Worker implements Runnable {
        private String taskName;
        private Random r = new Random();

        public Worker(String taskName) {
            this.taskName = taskName;
        }

        public String getName() {
            return taskName;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " process the task : " + taskName);
            SleepTool.ms(r.nextInt(100) * 5);
        }
    }

    private static class MyThreadFactory implements ThreadFactory {

        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "Mark_" + count.getAndIncrement());
            // 设置守护线程
            t.setDaemon(true);
            System.out.println("create " + t);
            return t;
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
                new MyThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 0; i <= 6; i++) {
            Worker worker = new Worker("worker " + i);
            System.out.println("A new worker has been added : " + worker.getName());
            threadPool.execute(worker);
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

在创建线程池的时候多了一个参数 new MyThreadFactory(),在这个自定义类中自己指定了创建线程的规则。

执行结果:

A new worker has been added : worker 0
create Thread[Mark_0,5,main]
A new worker has been added : worker 1
create Thread[Mark_1,5,main]
A new worker has been added : worker 2
A new worker has been added : worker 3
A new worker has been added : worker 4
A new worker has been added : worker 5
A new worker has been added : worker 6
1
2
3
4
5
6
7
8
9

可以看出,当主线程结束的时候,线程池也立刻结束了,并没有将所有待执行的任务执行完毕。

# 2、在任务执行前后做自定义逻辑

如何在任务执行的前后做一点我们自己的业务工作呢? JDK 的线程池已经为我们预留了接口,在线程池核心方法中,有2个方法是空的,beforeExecute()afterExecute() ,它们就是给我们预留的。还有一个线程池退出时会调用的方法 terminated()

public class ThreadPoolExt {

    private static class Worker implements Runnable {
        private String taskName;
        private Random r = new Random();

        public Worker(String taskName) {
            this.taskName = taskName;
        }

        public String getName() {
            return taskName;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " process the task : " + taskName);
            SleepTool.ms(r.nextInt(100) * 5);
        }
    }

    public static void main(String[] args)
            throws InterruptedException, ExecutionException {
        ExecutorService threadPool = new ThreadPoolExecutor(2, 4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy()) {

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("Ready execute " + ((Worker) r).getName());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("Complete execute " + ((Worker) r).getName());
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        for (int i = 0; i <= 6; i++) {
            Worker worker = new Worker("worker " + i);
            System.out.println("A new worker has been added : " + worker.getName());
            threadPool.execute(worker);
        }
        threadPool.shutdown();
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

执行结果:

A new worker has been added : worker 0
A new worker has been added : worker 1
A new worker has been added : worker 2
A new worker has been added : worker 3
A new worker has been added : worker 4
A new worker has been added : worker 5
A new worker has been added : worker 6
Ready execute worker 1
Ready execute worker 0
pool-1-thread-2 process the task : worker 1
pool-1-thread-1 process the task : worker 0
Complete execute worker 1
Ready execute worker 2
pool-1-thread-2 process the task : worker 2
Complete execute worker 2
Ready execute worker 3
pool-1-thread-2 process the task : worker 3
Complete execute worker 0
Ready execute worker 4
pool-1-thread-1 process the task : worker 4
Complete execute worker 3
Ready execute worker 5
pool-1-thread-2 process the task : worker 5
Complete execute worker 4
Ready execute worker 6
pool-1-thread-1 process the task : worker 6
Complete execute worker 6
Complete execute worker 5
线程池退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

可以看到,每个任务执行前后都会调用 beforeExecute()afterExecute() 方法。相当于执行了一个切面。而在调用 shutdown() 方法后则会调用 terminated() 方法。

# 五、线程池的工作机制

  • 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  • 如果运行的线程等于或多于 corePoolSize,则将任务加入阻塞队列 BlockingQueue
  • 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务。
  • 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。

# 六、提交任务

  • execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  • submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget() 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit) 方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

# 七,关闭线程池

可以通过调用线程池的 shutdown()shutdownNow() 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt() 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别: shutdownNow() 首先将线程池的状态设置成 STOP ,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown() 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断** 所有没有正在执行任务的线程 **。

只要调用了这两个关闭方法中的任意一个,isShutdown() 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed() 方法会返回 true 。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown() 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow() 方法。

# 八,合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:

  • 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。

首先可以通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的 CPU 个数。

# 1、CUP 密集型

CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu + 1 个线程的线程池。

# 2、IO 密集型

IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2 * Ncpu 个线程的线程池。

对于 IO 型的任务的最佳线程数,有个公式可以计算: Nthreads = NCPU * UCPU * (1 + W / C),其中:

  • NCPU 是处理器的核的数目

  • UCPU 是期望的 CPU 利用率(该值应该介于0和1之间)

  • W / C 是等待时间与计算时间的比率

等待时间与计算时间我们在 Linux 下使用相关的 vmstat 命令或者 top 命令查看。

# 3、混合型

混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

# 4、优先级不同

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。

# 5、执行时间不同

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

# 6、依赖数据库

依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。

假设,我们现在有一个 Web 系统,里面使用了线程池来处理业务,在某些情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。

如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。

# 九、预定义线程池

# 1、FixedThreadPool

创建使用固定线程数的 FixedThreadPoolAPI。适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。FixedThreadPoolcorePoolSizemaximumPoolSize 都被设置为创建 FixedThreadPool 时指定的参数 nThreads

当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把 keepAliveTime 设置为0L,意味着多余的空闲线程会被立即终止。FixedThreadPool 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。

# 2、SingleThreadExecutor

创建使用单个线程的 SingleThreadExecutorAPI,用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。

corePoolSizemaximumPoolSize 被设置为 1。其他参数与 FixedThreadPool 相同。SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。

# 3、CachedThreadPool

创建一个会根据需要创建新线程的 CachedThreadPoolAPI。大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

corePoolSize 被设置为 0,即 corePool 为空,maximumPoolSize 被设置为 Integer.MAX_VALUE 。这里把 keepAliveTime 设置为 60L,意味着 CachedThreadPool 中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

FixedThreadPoolSingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列。 CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但 CachedThreadPoolmaximumPool 是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool 会因为创建过多线程而耗尽CPU` 和内存资源。

# 4、WorkStealingPool

利用所有运行的处理器数目来创建一个工作窃取的线程池,使用 forkjoin 实现

# 5、ScheduledThreadPoolExecutor

使用工厂类 Executors 来创建。Executors 可以创建2种类型的 ScheduledThreadPoolExecutor,如下:

  • ScheduledThreadPoolExecutor

包含若干个线程的 ScheduledThreadPoolExecutor

  • SingleThreadScheduledExecutor

只包含一个线程的 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。

SingleThreadScheduledExecutor 适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

# (1) 提交定时任务

  • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

向定时任务线程池提交一个延时 Runnable 任务(仅执行一次)

public class ScheduledCase {

    public static void main(String[] args) {

        ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);

        // 延时 Runnable 任务(仅执行一次)
        schedule.schedule(() -> System.out.println("the task only run once !"), 3000, TimeUnit.MILLISECONDS);
    }
}
1
2
3
4
5
6
7
8
9
10

执行结果:

the task only run once !
1
  • public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);

向定时任务线程池提交一个延时的 Callable 任务(仅执行一次)

  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

向定时任务线程池提交一个固定延时间隔执行的任务。固定延时间隔的任务是指每次执行完任务以后都延时一个固定的时间。由于操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时间是不确定的,也就导致了每次任务的执行周期存在一定的波动。

public class ScheduledCase {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
        // 固定延时间隔执行的任务
        schedule.scheduleWithFixedDelay(() -> {
            System.out.println("fixDelay start," + ScheduleWorker.format.format(new Date()));
            SleepTool.second(2);
            System.out.println("fixDelay end," + ScheduleWorker.format.format(new Date()));
        }, 1000, 3000, TimeUnit.MILLISECONDS);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

任务类:

public class ScheduleWorker implements Runnable {

    /**
     * 普通任务类型
     */
    public final static int NORMAL = 0;

    /**
     * 会抛出异常的任务类型
     */
    public final static int HAS_EXCEPTION = -1;

    /**
     * 抛出异常但会捕捉的任务类型
     */
    public final static int PROCESS_EXCEPTION = 1;

    public static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private int taskType;

    public ScheduleWorker(int taskType) {
        this.taskType = taskType;
    }

    @Override
    public void run() {
        if (taskType == HAS_EXCEPTION) {
            System.out.println(format.format(new Date()) + ", exception has been made");
            throw new RuntimeException("ExceptionHappen");
        } else if (taskType == PROCESS_EXCEPTION) {
            try {
                System.out.println("process exception ... " + format.format(new Date()));
                throw new RuntimeException("ProcessException");
            } catch (RuntimeException e) {
                System.out.println("ProcessException caught");
            }
        } else {
            System.out.println("normal ..." + format.format(new Date()));
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

执行结果:

fixDelay start,2020-03-15 22:47:15
fixDelay end,2020-03-15 22:47:17
fixDelay start,2020-03-15 22:47:20
fixDelay end,2020-03-15 22:47:22
fixDelay start,2020-03-15 22:47:25
fixDelay end,2020-03-15 22:47:27
fixDelay start,2020-03-15 22:47:30
fixDelay end,2020-03-15 22:47:32
...
1
2
3
4
5
6
7
8
9

第一个任务延迟1s后执行,每一个任务执行需要2s,每一个任务执行完毕后等待3s继续执行下一个任务。

  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

向定时任务线程池提交一个固定时间间隔执行的任务。固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间从理论上讲是确定的,当然执行任务的时间不能超过执行周期。

# (2) 定时任务超时问题

scheduleAtFixedRate 中,若任务处理时长超出设置的定时频率时长,本次任务执行完才开始下次任务,下次任务已经处于超时状态,会马上开始执行。

若任务处理时长小于定时频率时长,任务执行完后,定时器等待,下次任务会在定时器等待频率时长后执行。

例如:

设置定时任务每 60s 执行一次,那么从理论上应该第一次任务在第 0s 开始, 第二次任务在第 60s 开始,第三次任务在 120s 开始,但实际运行时第一次任务时长 80s,第二次任务时长 30s,第三次任务时长 50s,则实际运行结果为:

第一次任务第 0s 开始,第 80s 结束; 第二次任务第 80s 开始,第 110s 结束(上次任务已超时,本次不会再等待 60s, 会马上开始); 第三次任务第 120s 开始,第 170s 结束. 第四次任务第 180s 开始 .....

下面是一个例子:

public class ScheduledCase {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
        // 固定时间间隔执行的任务,从理论上说第二次任务在 6000ms 后执行,第三次在 6000*2ms 后执行
        schedule.scheduleAtFixedRate(new ScheduleWorkerTime(),
                0, 6000, TimeUnit.MILLISECONDS);
    }
}
1
2
3
4
5
6
7
8
9

任务类:

public class ScheduleWorkerTime implements Runnable {

    /**
     * 工作8秒
     */
    public final static int LONG_8 = 8;

    /**
     * 工作2秒
     */
    public final static int SHORT_2 = 2;

    /**
     * 工作5秒
     */
    public final static int NORMAL_5 = 5;

    public static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static AtomicInteger count = new AtomicInteger(0);

    @Override
    public void run() {
        if (count.get() == 0) {
            System.out.println("LONG_8 ... begin: " + format.format(new Date()));
            SleepTool.second(LONG_8);
            System.out.println("LONG_8 ... end: " + format.format(new Date()));
            count.incrementAndGet();
        } else if (count.get() == 1) {
            System.out.println("SHORT_2 ... begin: " + format.format(new Date()));
            SleepTool.second(SHORT_2);
            System.out.println("SHORT_2 ... end: " + format.format(new Date()));
            count.incrementAndGet();
        } else {
            System.out.println("NORMAL_5 ... begin: " + format.format(new Date()));
            SleepTool.second(NORMAL_5);
            System.out.println("NORMAL_5 ... end: " + format.format(new Date()));
            count.incrementAndGet();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

执行结果:

LONG_8 ... begin: 2020-03-15 22:51:16
LONG_8 ... end: 2020-03-15 22:51:24
SHORT_2 ... begin: 2020-03-15 22:51:24
SHORT_2 ... end: 2020-03-15 22:51:26
NORMAL_5 ... begin: 2020-03-15 22:51:28
NORMAL_5 ... end: 2020-03-15 22:51:33
NORMAL_5 ... begin: 2020-03-15 22:51:34
NORMAL_5 ... end: 2020-03-15 22:51:39
NORMAL_5 ... begin: 2020-03-15 22:51:40
NORMAL_5 ... end: 2020-03-15 22:51:45
...
1
2
3
4
5
6
7
8
9
10
11

按照设定的规则应当是:固定时间间隔执行的任务,从理论上说第二次任务在 6000ms 后执行,第三次在 6000*2ms 后执行,但是第一个任务执行了8s,所以第二个任务在第一个任务完成后(相当于已经晚于预期了),直接执行了;而第二个任务执行完成后还没有到第三个任务预期执行的开始时间,所以等待了2s后开始执行了第三个任务。

# (3) 定时任务抛异常问题

在上面的 ScheduleWorker 类中,使用了 taskType 来定义了任务的类型,一共可以有3种类型:普通任务(上面演示的)、抛异常不处理的任务以及抛异常处理的任务。

下面看看后面两种任务类型在定时任务中的执行情况。

先看看抛异常的任务:

public class ScheduledCase {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
        // 固定时间间隔执行的任务,开始执行后就触发异常,next周期将不会运行
        schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.HAS_EXCEPTION),
    }
}
1
2
3
4
5
6
7
8

执行结果:

2020-03-15 23:04:17, exception has been made
1

可以看出,任务在第一次执行抛出异常后,就停止了,后面也不会再执行了,并且连异常错误信息都没有打印。

再看看抛异常处理的任务:

public class ScheduledCase {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
        // 固定时间间隔执行的任务,虽然抛出了异常,但被捕捉了,next周期继续运行
        schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.PROCESS_EXCEPTION),
                0, 3000, TimeUnit.MILLISECONDS);
    }
}
1
2
3
4
5
6
7
8
9

执行结果:

process exception ... 2020-03-15 23:05:34
ProcessException caught
process exception ... 2020-03-15 23:05:37
ProcessException caught
process exception ... 2020-03-15 23:05:40
ProcessException caught
...
1
2
3
4
5
6
7

可以看出异常在任务类中被捕获了,所以任务还可以得以继续执行,并且输出了在人物类中捕获异常时的日志。

# 6、CompletionService

CompletionService 实际上可以看做是 ExecutorBlockingQueue 的结合体。CompletionService 在接收到要执行的任务时,通过类似 BlockingQueueput()take() 获得任务执行的结果。

CompletionService 的一个实现是 ExecutorCompletionServiceExecutorCompletionService 把具体的计算任务交给 Executor 完成。

在实现上,ExecutorCompletionService 在构造函数中会创建一个 BlockingQueue(使用的基于链表的 LinkedBlockingQueue),该 BlockingQueue 的作用是保存 Executor 执行的结果。

当提交一个任务到 ExecutorCompletionService 时,首先将任务包装成 QueueingFuture,它是 FutureTask 的一个子类,然后改写 FutureTaskdone() 方法,之后把 Executor 执行的计算结果放入 BlockingQueue 中。

ExecutorService 最主要的区别在于 submit()task 不一定是按照加入时的顺序完成的。CompletionServiceExecutorService 进行了包装,内部维护一个保存 Future 对象的 BlockingQueue。只有当这个 Future 对象状态是结束的时候,才会加入到这个 Queue 中,take() 方法其实就是 Producer-Consumer 中的 Consumer。它会从 Queue 中取出 Future 对象,如果 Queue 是空的,就会阻塞在那里,直到有完成的 Future 对象加入到 Queue 中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。

下面是一个例子:

public class CompletionCase {

    private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors() * 10;

    /**
     * 方法一、自己写集合来实现获取线程池中任务的返回结果
     *
     * @throws Exception
     */
    private void testByQueue() throws Exception {
        long start = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        // 队列,拿任务的执行结果
        BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<>();

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            Future<Integer> future = pool.submit(new WorkTask("ExecTask" + i));
            queue.add(future);
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            int sleptTime = queue.take().get();
            //System.out.println(" slept "+sleptTime+" ms ...");
            count.addAndGet(sleptTime);
        }

        // 关闭线程池
        pool.shutdown();
        System.out.println("(Queue) all tasks sleep time: " + count.get() + "ms, and spend time: "
                + (System.currentTimeMillis() - start) + "ms");
    }

    /**
     * 方法二、使用 CompletionService 来实现获取线程池中任务的返回结果
     *
     * @throws Exception
     */
    private void testByCompletion() throws Exception {
        long start = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(pool);

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            completionService.submit(new WorkTask("ExecTask" + i));
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            int sleptTime = completionService.take().get();
            count.addAndGet(sleptTime);
        }

        // 关闭线程池
        pool.shutdown();
        System.out.println("(CompletionService) all tasks sleep time: " + count.get() + "ms, and spend time: "
                + (System.currentTimeMillis() - start) + "ms");

    }

    public static void main(String[] args) throws Exception {
        CompletionCase t = new CompletionCase();
        t.testByQueue();
        t.testByCompletion();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

任务类:

public class WorkTask implements Callable<Integer> {

    private String name;

    public WorkTask(String name) {
        this.name = name;
    }

    @Override
    public Integer call() {
        int sleepTime = new Random().nextInt(1000);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回给调用者的值
        return sleepTime;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

方法 testByQueue() 中,自己创建一个集合来保存 Future 存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为 take() 方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。

方法 testByCompletion() 中,使用 CompletionService 来维护处理线程不的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。

执行结果:

(Queue) all tasks sleep time: 58289ms, and spend time: 5505ms
(CompletionService) all tasks sleep time: 60732ms, and spend time: 5413ms
1
2

可以看出,使用 CompletionService 虽然任务的合计时间比使用 Queue 要多出来 2443ms,但是总耗时却少了 92ms。


上次更新: 2020-08-21 09:02:51(10 小时前)