feitu的博客

Java多线程的学习和使用 1.基本用法

引言

Java多线程的从最初的Thread & Runnable,到JDK1.8中的parallel stream,几乎每个版本都有重大且深刻的变化,本文是本人对Java多线程的学习和使用的整理和记录。

需求场景

为了更好的说明,本文引入了一个来自于实际的、经过简化了的需求场景: 有一个User对象,User对象有2个属性,id和value,数据库中有user表作为对应,UserService中有一个doSth方法,doSth方法每次都要到数据库读取user的value值,在其基础上随机增加一个数值,然后更新到数据库中,也就是每次doSth方法要读写数据库。

1
2
3
4
5
6
7
8
9
10
public class User {
private Long id;
private int value;
//setters & getters
}
public class UserService {
public void doSth(User user){
//若干数据库操作
}
}

现在需求如下:对所有的User数据进行doSth操作,由于User数据量较大,需要用到多线程来处理。在下文对多线程各个方面的说明中,均使用这个需求来进行举例。

不使用线程

首先是不使用线程的方式,最简单,当然这样效率也是最低的

1
2
3
for(User user : users){
userService.doSth(user);
}

使用Thread实现多线程

使用原始的Thread实现多线程,这里直接的想法可能是为每一个User对象创建一个Thread进行运行,但是实际上这样肯定行不通,虽然线程相对于进程而言是轻量级的,但是如此多的线程几乎一定会导致内存不足,而且线程之间的竞争关系,也会导致性能的不升反降,除此之外,不同平台的JVM对能创建的线程数量也是有限制的。一个合理的方式,将users分成多个部分,每个线程处理一部分。
首先定义一个Runnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class UserListRunnable implements Runnable{
private List<User> users;
private UserService userService;
public UserListRunnable(List<User> users, UserService userService){
this.users = users;
this.userService = userService;
}
@Override
public void run() {
for(User u : users){
userService.doSth(u);
}
}
}

在主线程中,将users分到多个list,分别创建并启动Thread

1
2
3
4
5
6
7
8
9
10
11
12
List<Thread> threads = new ArrayList<>();
for(List list : userList){
Thread t = new Thread(new UserListRunnable(list, userService));
threads.add(t);
}
for(Thread t : threads){
t.start();
}
for(Thread t : threads){
t.join();
}

在这里,我们希望让主线程等待所有的线程运行结束后再继续,为了做到这一点,使用了for循环对所有Thread执行了join方法,这里要用两个for循环,而不能写成下面的样子:

1
2
3
4
for(Thread t : threads){
t.start();
t.join();
}

如果这样写,当一个线程启动之后,立刻调用join,这时候主线程不会进入执行for循环进入下一个线程,而是等待当前这个线程结束之后才进入下一个线程,相当于多个线程一个接一个(而不是同时)的执行了一遍。

为了保证所有的子线程都结束,这里也可以在所有的子线程上用遍历调用isAlive,直到全部返回false。

Java有一个叫做线程组的东西,听名字似乎可以用在上面的场景中,实际上这个东西已经被废弃了,把它当做Java的黑历史就好了。

使用CyclicBarrier控制线程的执行

在java1.5中,JDK引入了一批关于多线程的API,多线程的操作变得更加灵活和易用,例如在上面这个多个线程的例子中,为了让主线程等待所有线程结束,使用在每个线程上调用join的方式,在JDK1.5中,有了更多的选择,例如CyclicBarrier和CountDownLatch。

CyclicBarrier的用法,简单来说就是要求规定数量线程都达到某种状态时,才能继续向下运行,举例说明使用CyclicBarrier确保所有子线程的结束后主线程才进行运行。

首先改写UserListRunnable,加入一个CyclicBarrier,并在super.run()之后,调用CyclicBarrier的await方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class UserListWithCyclicBarrierRunnable extends UserListRunnable {
private CyclicBarrier barrier;
public UserListWithCyclicBarrierRunnable(List<User> users, UserService userService, CyclicBarrier barrier) {
super(users, userService);
this.barrier = barrier;
}
@Override
public void run() {
super.run();
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
logger.error("error...", e);
}
}
}

在主线程中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER + 1);
for(List list : userList){
Thread t = new Thread(new UserListWithCyclicBarrierRunnable(list, userService, barrier));
threads.add(t);
}
for(Thread t : threads){
t.start();
}
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
logger.error("error...", e);
}

需要注意在创建CyclicBarrier时,传入的线程数目是子线程数目+1,而且不只UserListWithCyclicBarrierRunnable在run方法中调用了barrier.await(); 主线程也调用了这个方法,这是由于如果不在主线程上加入这个方法,就只有子线程分别相互等待,而主线程不会等待子线程。

使用CountDownLatch控制线程的执行

类似的,使用CountDownLatch的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class UserListWithCountDownLatchRunnable extends UserListRunnable {
private CountDownLatch countDown;
public UserListWithCountDownLatchRunnable(List<User> users, UserService userService, CountDownLatch countDown) {
super(users, userService);
this.countDown = countDown;
}
@Override
public void run() {
super.run();
countDown.countDown();
}
}

主线程中

1
2
3
4
5
6
7
8
9
10
11
12
13
for(List list : userList){
Thread t = new Thread(new UserListWithCountDownLatchRunnable(list, userService, countDown));
threads.add(t);
}
for(Thread t : threads){
t.start();
}
try {
countDown.await();
} catch (InterruptedException e) {
logger.error("error...", e);
}

在这个例子里,主线程中的countDown一直等待子线程中的countDown倒计时到0,因此UserListWithCountDownLatchRunnable的初始化方法中参数是线程数目。

使用ExecutorService

Java1.5中,还加入了一个非常重要的并发工具:ExecutorService,现在我们利用ExecutorService实现上面的需求。
首先创建一个Runnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UserRunnable implements Runnable{
private User user;
private UserService userService;
public UserRunnable(User user, UserService userService){
this.user = user;
this.userService = userService;
}
@Override
public void run() {
userService.doSth(user);
}
}

由于ExecutorService是重复使用Thread的,所以即使每一个Runnable每次只处理一个User对象,也不出现Thread过多的情况。

我们采用固定数量线程的ExecutorService。

1
2
3
4
5
6
7
8
9
10
ExecutorService service = Executors.newFixedThreadPool(THREAD_NUMBER);
for (User user : users) {
service.submit(new UserRunnable(user, userService));
}
service.shutdown();
try {
service.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.error("error... ", e);
}

可以看到,使用ExecutorService管理线程,比自己创建并管理Thread要便捷的多。

使用fork/join框架

在使用ExecutorService的时候,我们没有手工创建任何thread,thread的创建和管理全部委托给了Executor,我认为这是Java在多线程上的一次重大进步。在上面需要人工创建Thread的例子里,需要用户自己考虑Thread对象的生命周期,而现在,仿佛Thread根本不存在。

Java1.7又为多线程的引入了新的工具:fork/join框架,详细熟悉linux编程的同学对这两个单词不会陌生。Java1.7中的这个框架,其作用简单说,就是把一个冗长的任务分解为一些简洁的子任务,计算结束之后再将结果合并在一起。

本文一开始中引入的需求,其实不太适合用这个框架处理,下面的示例主要是为了演示其用法。

首先需要定一个RecursiveAction,其逻辑是:如果users的size大于某个数值(这个例子中数值取10),则将users平分为2个list,分别继续执行RecursiveAction;如果users的size已经被分解到不大于这个数值,则处理list中的User,由于这个例子没有返回值,因此只展示了fork部分,没有使用join合并结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class UserRecursiveAction extends RecursiveAction{
private final Logger logger = LoggerFactory.getLogger(UserRecursiveAction.class);
private static final int size = 10;
private List<User> users;
private UserService userService;
public UserRecursiveAction(List<User> users, UserService userService ) {
this.users = users;
this.userService = userService;
}
@Override
protected void compute() {
if(users.size() > size){
int half = users.size() / 2;
invokeAll(
new UserRecursiveAction(users.subList(0, half), userService),
new UserRecursiveAction(users.subList(half, users.size()), userService));
return;
}
for(User u : users){
userService.doSth(u);
}
}
}

使用ForkJoinPool执行

1
2
3
4
5
6
7
8
9
UserRecursiveAction action = new UserRecursiveAction(users, userService);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(action);
pool.shutdown();
try {
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.error("error...", e);
}

和ExecutorService相比,fork/join进一步简化了多线程的写法,使用fork/join时,几乎感觉不到Thread,Runnable的存在,不过这不是fork/join框架最显著的特点,其内部实现了一个叫做“工作窃取(Work-Stealing)”的算法,这种算法简单来说,就是当一些线程同时工作时,那些率先完成任务的线程会去那些还没完成任务的线程中窃取任务,也就说先干完活的会去帮助那些还没干完活的干活(太敬业了),这样一来,整体的效率就提高了,“分而治之”的策略和work-stealing算法是fork/join区别于之前的最大特点。

在进入java1.8之前,我觉得有必要简要说明下两个概念:并发(multi-thread)和并行(parallel),通俗的将,并发是多个线程以抢占资源(这里的资源经常指的是CPU时钟)的方法运行,虽然是多个线程,但是同一时间只有一个线程占有CPU;而并行指的是多个线程在多个CPU上同时运行。Java中,既有并发,也有并行,但是从Java1.7开始,我感觉似乎开始更强调并行,在Java1.8中更加明显。

使用parallel stream

进入1.8时代的Java,和以往有了很大不同,这个版本引入了很多新的特种,虽然java.util.concurrent包下并没有太多变化,但是在stream中的parallel模式,极大的改变了Java多线程代码的编写方式。

把上面的需求用stream实现,代码大概是这样的:

1
2
3
users.parallelStream().forEach(
u -> userService.doSth(u)
);

是的,你没有看错,这样的几行代码就可以了,没有Thread,没有fork/join,看不多任何多线程的痕迹,那个parallelStream搞定了一切,JDK1.8万岁!parallelStream万岁!

等等,冷静一下,stream的parallel就真的能让多线程开发简单如此丧心病狂的程度吗?客观的说,处理上面这个需求中,确实只需要这几行代码就足够了,也不会有什么问题,但不是所有的场景都会如此简单。stream的parallel天生的提供了并行性,但是parallel丝毫不会为你的程序保证线程安全,并发的底层实现机制是线程,而线程的安全依然需要开发人员自己实现。

例如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ParallelStreamWithList {
private List<Object> objects;
private void init(){
objects = new ArrayList<>();
LongStream.range(0, 100).parallel().forEach(
l -> {
objects.add(new Object());
}
);
}
public static void main(String[] args) {
ParallelStreamWithList psl = new ParallelStreamWithList();
psl.init();
System.out.println(psl.objects.size());
}
}

这段代码逻辑很简单,向一个list中放Object,重点在于放的过程是并行的,那么,请问打印出的objects size是多少? 把这段程序运行几次,会发现运气好的话可能会看到100,大多数情况下会看到一个小于100的数字,或者下面这样的异常:

1
2
3
4
5
6
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 49
at java.util.ArrayList.add(ArrayList.java:459)
at cn.feitu.javaparallel.ParallelStreamWithList.lambda$init$0(ParallelStreamWithList.java:18)
at cn.feitu.javaparallel.ParallelStreamWithList$$Lambda$1/668386784.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(ForEachOps.java:225)
......

奇怪吗?对于不熟悉线程安全的开发人员来说,这确实很奇怪;对于在多线程编程上吃过亏的同学来说,这个结果可能并不让人吃惊。之所以出现这个结果,是由于ArrayList不是线程安全的,parallel下objects的操作会出现各种各样的线程问题。正确的写法应该将

1
objects = new ArrayList<>();

改为

1
objects = Collections.synchronizedList(new ArrayList<>());

使用一个同步的list就不会出现这种问题了(多说一句,在这个例子里,这样处理就可以了,但并非所有涉及到list的多线程场合,都是synchronizedList一下就可以的,很多时候还有使用内置锁机制,即synchronized)。

总结和心得

如果总一下的话,从最初的Thread到最近的parallelSteam,java的多线程编程发生了巨大的变化,这个变化的外因是多核CPU的普及使得并行算法越来越重要,而Java多线程本身的变化趋势也很明显,即使用方式上从复杂到简单,这一点,从上文的各种实例中可以清晰的看到。但是,多线程的开发涉及的领域很多,虽然Java不断的为开发者提供更加便捷的工具,想要写出安全的多线程代码依然不是轻而易举就可以做到的。

代码说明

本文涉及的到的代码放在github的这个repo
clone代码之后,运行之前需要创建数据库并修改ctx.xml中的jdbc配置,代码是用maven管理的,在项目目录下执行mvn test即可运行