阅读 29 SEO

java项目中的多线程实践记录

项目开发中对于一些数据的处理需要用到多线程,比如文件的批量上传,数据库的分批写入,大文件的分段下载等,主要涉及到多线程的一些知识,本文通过实例代码给大家介绍的非常详细,需要的朋友参考下

项目开发中对于一些数据的处理需要用到多线程,比如文件的批量上传,数据库的分批写入,大文件的分段下载等。 通常会使用spring自带的线程池处理,做到对线程的定制化处理和更好的可控,建议使用自定义的线程池。 主要涉及到的几个点:

1. 自定义线程工厂(ThreadFactoryBuilder),主要用于线程的命名,方便追踪

2. 自定义的线程池(ThreadPoolExecutorUtils),可以按功能优化配置参数

3. 一个抽象的多线程任务处理接口(OperationThreadService)和通用实现(OperationThread)

4. 统一的调度实现(MultiThreadOperationUtils)

核心思想:分治归并,每个线程计算出自己的结果,最后统一汇总。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
 
/**
 * description: 自定义实现的线程池,遵循alibaba编程规范,使用ThreadPoolExecutor创建线程池使用
 * 设置更有描述意义的线程名称,默认的ThreadFactory,它给线程起名字大概规律就是pool-m-thread-n,如pool-1-thread-1。
 * 当分析一个thread dump时,很难知道线程的目的,需要有描述意义的线程名称来分析追踪问题
 * 设置线程是否是守护线程,默认的ThreadFactory总是提交非守护线程
 * 设置线程优先级,默认ThreadFactory总是提交的一般优先级线程
 * <p>
 * CustomThreadFactoryBuilder类实现了一种优雅的Builder Mechanism方式去得到一个自定义ThreadFactory实例。
 * ThreadFactory接口中有一个接受Runnable类型参数的方法newThread(Runnable r),
 * 业务的factory逻辑就应该写在这个方法中,去配置线程名称、优先级、守护线程状态等属性。
 * 原文链接:https://blog.csdn.net/zombres/article/details/80497515
 *
 * @author Hlingoes
 * @date 2019/12/22 0:45
 */
public class ThreadFactoryBuilder {
    private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);
 
    private String nameFormat = null;
    private boolean daemon = false;
    private int priority = Thread.NORM_PRIORITY;
 
    public ThreadFactoryBuilder setNameFormat(String nameFormat) {
        if (nameFormat == null) {
            throw new NullPointerException();
        }
        this.nameFormat = nameFormat;
        return this;
    }
 
    public ThreadFactoryBuilder setDaemon(boolean daemon) {
        this.daemon = daemon;
        return this;
    }
 
    public ThreadFactoryBuilder setPriority(int priority) {
        if (priority < Thread.MIN_PRIORITY) {
            throw new IllegalArgumentException(String.format(
                    "Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY));
        }
 
        if (priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(String.format(
                    "Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY));
        }
 
        this.priority = priority;
        return this;
    }
 
    public ThreadFactory build() {
        return build(this);
    }
 
    private static ThreadFactory build(ThreadFactoryBuilder builder) {
        final String nameFormat = builder.nameFormat;
        final Boolean daemon = builder.daemon;
        final Integer priority = builder.priority;
        final AtomicLong count = new AtomicLong(0);
 
        return (Runnable runnable) -> {
            Thread thread = new Thread(runnable);
            if (nameFormat != null) {
                thread.setName(String.format(nameFormat, count.getAndIncrement()));
            }
            if (daemon != null) {
                thread.setDaemon(daemon);
            }
            thread.setPriority(priority);
            thread.setUncaughtExceptionHandler((t, e) -> {
                String threadName = t.getName();
                logger.error("error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
            });
            return thread;
        };
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.*;
 
/**
 * description: 创建通用的线程池
 * <p>
 * corePoolSize:线程池中核心线程数量
 * maximumPoolSize:线程池同时允许存在的最大线程数量
 * 内部处理逻辑如下:
 * 当线程池中工作线程数小于corePoolSize,创建新的工作线程来执行该任务,不管线程池中是否存在空闲线程。
 * 如果线程池中工作线程数达到corePoolSize,新任务尝试放入队列,入队成功的任务将等待工作线程空闲时调度。
 * 1. 如果队列满并且线程数小于maximumPoolSize,创建新的线程执行该任务(注意:队列中的任务继续排序)。
 * 2. 如果队列满且线程数超过maximumPoolSize,拒绝该任务
 * <p>
 * keepAliveTime
 * 当线程池中工作线程数大于corePoolSize,并且线程空闲时间超过keepAliveTime,则这些线程将被终止。
 * 同样,可以将这种策略应用到核心线程,通过调用allowCoreThreadTimeout来实现。
 * <p>
 * BlockingQueue
 * 任务等待队列,用于缓存暂时无法执行的任务。分为如下三种堵塞队列:
 * 1. 直接递交,如SynchronousQueue,该策略直接将任务直接交给工作线程。如果当前没有空闲工作线程,创建新线程。
 * 这种策略最好是配合unbounded线程数来使用,从而避免任务被拒绝。但当任务生产速度大于消费速度,将导致线程数不断的增加。
 * 2. 无界队列,如LinkedBlockingQueue,当工作的线程数达到核心线程数时,新的任务被放在队列上。
 * 因此,永远不会有大于corePoolSize的线程被创建,maximumPoolSize参数失效。
 * 这种策略比较适合所有的任务都不相互依赖,独立执行。
 * 但是当任务处理速度小于任务进入速度的时候会引起队列的无限膨胀。
 * 3. 有界队列,如ArrayBlockingQueue,按前面描述的corePoolSize、maximumPoolSize、BlockingQueue处理逻辑处理。
 * 队列长度和maximumPoolSize两个值会相互影响:
 * 长队列 + 小maximumPoolSize。会减少CPU的使用、操作系统资源、上下文切换的消耗,但是会降低吞吐量,
 * 如果任务被频繁的阻塞如IO线程,系统其实可以调度更多的线程。
 * 短队列 + 大maximumPoolSize。CPU更忙,但会增加线程调度的消耗.
 * 总结一下,IO密集型可以考虑多些线程来平衡CPU的使用,CPU密集型可以考虑少些线程减少线程调度的消耗
 *
 * @author Hlingoes
 * @citation https://blog.csdn.net/wanghao112956/article/details/99292107
 * @citation https://www.jianshu.com/p/896b8e18501b
 * @date 2020/2/26 0:46
 */
public class ThreadPoolExecutorUtils {
    private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);
 
    public static int defaultCoreSize = Runtime.getRuntime().availableProcessors();
    private static int pollWaitingTime = 60;
    private static int defaultQueueSize = 10 * 1000;
    private static int defaultMaxSize = 4 * defaultCoreSize;
    private static String threadName = "custom-pool";
 
    /**
     * description: 创建线程池
     *
     * @param waitingTime
     * @param coreSize
     * @param maxPoolSize
     * @param queueSize
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/4/12
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int coreSize, int maxPoolSize, int queueSize) {
        pollWaitingTime = waitingTime;
        defaultCoreSize = coreSize;
        defaultMaxSize = maxPoolSize;
        defaultQueueSize = queueSize;
        return getExecutorPool();
    }
 
    /**
     * description: 创建线程池
     *
     * @param waitingTime
     * @param queueSize
     * @param maxPoolSize
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize, int maxPoolSize) {
        pollWaitingTime = waitingTime;
        defaultQueueSize = queueSize;
        defaultMaxSize = maxPoolSize;
        return getExecutorPool();
    }
 
    /**
     * description: 创建线程池
     *
     * @param waitingTime
     * @param queueSize
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize) {
        pollWaitingTime = waitingTime;
        defaultQueueSize = queueSize;
        return getExecutorPool();
    }
 
    /**
     * description: 创建线程池
     *
     * @param waitingTime
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime) {
        pollWaitingTime = waitingTime;
        return getExecutorPool();
    }
 
    /**
     * description: 创建线程池
     *
     * @param
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/6/6
     */
    public static ThreadPoolExecutor getExecutorPool() {
        return getExecutorPool(threadName);
    }
 
    /**
     * description: 创建线程池
     *
     * @param
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(String threadName) {
        ThreadFactory factory = new ThreadFactoryBuilder()
                .setNameFormat(threadName + "-%d")
                .build();
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(defaultQueueSize);
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(defaultCoreSize,
                defaultMaxSize, 60, TimeUnit.SECONDS, queue, factory,
                (r, executor) -> {
                    /**
                     * 自定义的拒绝策略
                     * 当提交给线程池的某一个新任务无法直接被线程池中“核心线程”直接处理,