多线程使用2
文章目录
- 10.使用ReentrantLock和Condition
- 11.使用ReadWriteLock
- 12.使用StampedLock
- 13.使用java.util.concurrent包提供的线程安全的集合
- 14.使用Atomic
- 15.使用线程池
- 16.Callable接口和Future接口
- 17.使用CompletableFuture
- 18.使用Fork/Join线程池
- 19.使用ThreadLocal
10.使用ReentrantLock和Condition
从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。
我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是非公平锁,当使用notify唤醒时,所有锁都会竞争,二是获取时必须一直等待,没有额外的尝试机制。
https://blog.csdn.net/qq_27184497/article/details/118460141
java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁
使用多个Condition 可以使得线程的控制粒度更精细;那么是怎么个精细法呢?接下来我们来模拟一下,使用 condition 实现三个(或者三个以上)线程打印ABC ABC…;这样保证了每个线程都能执行到;不会出现饿死的情况,因为让线程按照自己想要的顺序执行, 所以也不会出现一直被生产者抢到的情况;
实现方式的思路是这样的:
1、启动的时候A线程先执行,
2、然后A线程挂起,启动B线程,
3、B线程挂起,启动C线程
4、C线程挂起,在启动A线程;
以上的方式就相当于启动了一个轮询,每个线程执行完成后调用下一个线程;
package com.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 开启三个线程 ABC,按顺序打印ABC ABC....一直循环,使用Condition实现
*/
public class ConditionTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
ConditionTest test = new ConditionTest();
// 开启线程A-----------------------------
test.newThread(lock,"A",condition2,condition1);
// 开启线程B-----------------------------
test.newThread(lock,"B",condition3,condition2);
// 开启线程C-----------------------------
test.newThread(lock,"C",condition1,condition3);
}
/**
* 开启新线程
* @param name 线程名称
* @param signalCondition 唤醒的线程
* @param awaitCondition 等待的线程
*/
public void newThread(Lock lock,String name,Condition signalCondition,Condition awaitCondition){
new Thread(() -> {
for (; ; ) {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
signalCondition.signal(); // 唤醒等待的线程
awaitCondition.await();// 让线程进入等待状态
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}
}, name).start();
}
}
不用condition,使用 ReentrantLock 的公平锁
package com.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 开启三个线程 ABC,按顺序打印ABC ABC....一直循环,使用公平锁实现
*/
public class ConditionTest_1 {
public static void main(String[] args) {
// 构造函数的参数设为true表示公平锁
Lock lock = new ReentrantLock(true);
ConditionTest_1 test = new ConditionTest_1();
// 开启线程A-----------------------------
test.newThread(lock,"A");
// 开启线程B-----------------------------
test.newThread(lock,"B");
// 开启线程C-----------------------------
test.newThread(lock,"C");
}
/**
* 开启新线程
* @param name 线程名称
*/
public void newThread(Lock lock,String name){
new Thread(() -> {
for (; ; ) {
lock.lock();
System.out.println(Thread.currentThread().getName());
lock.unlock();
}
}, name).start();
}
}
11.使用ReadWriteLock
ReentrantLock使得在读和写时只能有一个线程占有资源。但是如果很多个线程只读取数据,不修改数据(如论坛加载评论)就不会导致读取的数据出现逻辑错误。这种读多写少的情况可以使用ReadWriteLock。
只允许一个线程写入(其他线程既不能写入也不能读取);
没有写入时,多个线程允许同时读(提高性能)。
public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];
public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}
public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}
12.使用StampedLock
如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock。
StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。
可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。
StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。
13.使用java.util.concurrent包提供的线程安全的集合
我们在前面已经通过ReentrantLock和Condition实现了一个BlockingQueue:
public class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
BlockingQueue的意思就是说,当一个线程调用这个TaskQueue的getTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。
因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue。
除了BlockingQueue外,针对List、Map、Set、Deque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:
interface、non-thread-safe、thread-safe
List、ArrayList、CopyOnWriteArrayList
Map、HashMap、ConcurrentHashMap
Set、HashSet / TreeSet、CopyOnWriteArraySet
Queue、ArrayDeque / LinkedList、ArrayBlockingQueue / LinkedBlockingQueue
Deque、ArrayDeque / LinkedList、LinkedBlockingDeque
使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:
Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");
因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:
Map<String, String> map = new HashMap<>();
改为:
Map<String, String> map = new ConcurrentHashMap<>();
就可以了。
java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:
Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
但是它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。
14.使用Atomic
使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:
原子操作实现了无锁的线程安全;
适用于计数器,累加器等。
Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。
我们以AtomicInteger为例,它提供的主要操作有:
- 增加值并返回新值:int addAndGet(int delta)
- 加1后返回新值:int incrementAndGet()
- 获取当前值:int get()
- 用CAS方式设置:int compareAndSet(int expect, int update)
Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。
如果我们自己通过CAS编写incrementAndGet(),它大概长这样:
public int incrementAndGet(AtomicInteger var) {
int prev, next;
do {
prev = var.get();
next = prev + 1;
} while ( ! var.compareAndSet(prev, next));
return next;
}
CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do … while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。
我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:
class IdGenerator {
AtomicLong var = new AtomicLong(0);
public long getNextId() {
return var.incrementAndGet();
}
}
通常情况下,我们并不需要直接用do … while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。
在高度竞争的情况下,还可以使用Java 8提供的LongAdder和LongAccumulator。
15.使用线程池
Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。
如果可以复用一组线程:
┌─────┐ execute ┌──────────────────┐
│Task1 │──────> │ ThreadPool │
├─────┤ │┌───────┐┌───────┐│
│Task2 │ ││ Thread1 ││ Thread2 │ │
├─────┤ │└───────┘└───────┘│
│Task3 │ │┌───────┐┌───────┐│
├─────┤ ││ Thread3 ││ Thread4 ││
│Task4 │ │└───────┘└───────┘│
├─────┤ └──────────────────┘
│Task5 │
├─────┤
│Task6 │
└─────┘
那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。
简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。
Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:
// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);
因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:
FixedThreadPool:线程数固定的线程池;
CachedThreadPool:线程数根据任务动态调整的线程池;
SingleThreadExecutor:仅单线程执行的线程池。
创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:
// thread-pool
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
// 创建一个固定大小的线程池:
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
es.submit(new Task("" + i));
}
// 关闭线程池:
es.shutdown();
}
}
class Task implements Runnable {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("start task " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("end task " + name);
}
}
我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。
线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。
如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。
如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
因此,想创建指定动态范围的线程池,可以这么写:
int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,
60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。
创建一个ScheduledThreadPool仍然是通过Executors类:
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
我们可以提交一次性任务,它会在指定延迟后只执行一次:
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
如果任务以固定的每3秒执行,我们可以这样写:
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
如果任务以固定的3秒为间隔执行,我们可以这样写:
// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:
而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:
因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。
细心的童鞋还可以思考下面的问题:
在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?
如果任务抛出了异常,后续任务是否继续执行?
Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。
小结
JDK提供了ExecutorService实现了线程池功能:
线程池内部维护一组线程,可以高效执行大量小任务;
Executors提供了静态方法创建不同类型的ExecutorService;
必须调用shutdown()关闭ExecutorService;
ScheduledThreadPool可以定期调度多个任务。
16.Callable接口和Future接口
Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:
class Task implements Callable<String> {
public String call() throws Exception {
return longTimeCalculation();
}
}
并且Callable接口是一个泛型接口,可以返回指定类型的结果。
现在的问题是,如何获得异步执行的结果?
如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
一个Future接口表示一个未来可能会返回的结果,它定义的方法有:
get():获取结果(可能会等待)
get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
cancel(boolean mayInterruptIfRunning):取消当前任务;
isDone():判断任务是否已完成。
17.使用CompletableFuture
18.使用Fork/Join线程池
19.使用ThreadLocal
对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:
public void process(User user) {
checkPermission();
doWork();
saveStatus();
sendResponse();
}
然后,通过线程池去执行这些任务。
观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?
process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:
public void process(User user) {
checkPermission(user);
doWork(user);
saveStatus(user);
sendResponse(user);
}
但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:
void doWork(User user) {
queryStatus(user);
checkStatus();
setNewStatus(user);
log();
}
这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。
给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。
Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。
ThreadLocal实例通常总是以静态字段初始化如下:
static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
它的典型使用方式如下:
void processUser(user) {
try {
threadLocalUser.set(user);
step1();
step2();
} finally {
threadLocalUser.remove();
}
}
通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:
void step1() {
User u = threadLocalUser.get();
log();
printUser();
}
void log() {
User u = threadLocalUser.get();
println(u.name);
}
void step2() {
User u = threadLocalUser.get();
checkUser(u.id);
}
注意到普通的方法调用一定是同一个线程执行的,所以,step1()、step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。
实际上,可以把ThreadLocal看成一个全局Map<Thread, Object>:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:
Object threadLocalValue = threadLocalMap.get(Thread.currentThread());
因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。
最后,特别注意ThreadLocal一定要在finally中清除:
try {
threadLocalUser.set(user);
...
} finally {
threadLocalUser.remove();
}
这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。
为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {…}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:
public class UserContext implements AutoCloseable {
static final ThreadLocal<String> ctx = new ThreadLocal<>();
public UserContext(String user) {
ctx.set(user);
}
public static String currentUser() {
return ctx.get();
}
@Override
public void close() {
ctx.remove();
}
}
使用的时候,我们借助try (resource) {…}结构,可以这么写:
try (var ctx = new UserContext("Bob")) {
// 可任意调用UserContext.currentUser():
String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象
这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {…}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名。
小结
ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;
ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);
使用ThreadLocal要用try … finally结构,并在finally中清除。