七、线程池
1. 线程池概念
首先有关线程的使用会出现两个问题:
- 线程是宝贵的内存资源、单个线程约占1MB空间,过多分配易造成内存溢出。
- 频繁的创建及销毁线程会增加虚拟机回收频率、资源开销,造成性能下降。
基于如上的问题,出现了线程池:
- 线程容器,可设定线程分配的数量。
- 将预先创建的线程对象存入池中,并重用线程池中的线程对象。
- 避免频繁的创建和销毁。
2. 线程池原理
假如线程池里固定有三个线程,有四个任务。线程池中的三个线程分别完成三个任务,第四个任务则进入等待状态,线程执行完前三个任务后继续执行第四个任务。
- 将任务提交给线程池,由线程池分配线程、运行任务,并在当前任务结束后复用线程。
3. 创建线程池
创建线程池过程(重要)
- 创建池子
ExecutorService executorService=Executors.newFixedThreadPool(10);
- 执行任务
- Runnable接口执行任务
executorService.execute(Runnable任务);
- Callable接口执行任务
executorService.submit(Callable任务);
- Runnable接口执行任务
- 关闭线程池
executorService.shutdown();
常用方法
corePoolSize;
核心池的大小maximumPoolSize;
最大线程数keepAliveTime;
:线程没有任务时最多保持多久会终止
ExecutorService executorService=Executors.newFixedThreadPool(10);
//创建一个有十个线程的池子
-
通过
newFixedThreadPool(int nThreads)
获得固定数量的线程池。参数:指定线程池中线程的数量。 -
通过
newCachedThreadPool()
获得动态数量的线程池,如不够则创建新的,没有上限。
package com.gong.Demo03;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestPool {
public static void main(String[] args) {
TestRunnable myRunnable = new TestRunnable();
ExecutorService executorService= Executors.newFixedThreadPool(10);
executorService.execute(myRunnable);
executorService.execute(myRunnable);
executorService.execute(myRunnable);
executorService.execute(myRunnable);
executorService.shutdown();
}
}
class TestRunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行了");
}
}
这里之后?
4. Callable接口
public interface Callable<V>{
public V call() throws Exception;
}
- JDK1.5加入,与Runnable接口类似,实现之后代表一个线程任务。
- Callable具有泛型返回值、可以声明异常。
与Runnable接口的区别:
- Callable接口中call方法有返回值,Runnable接口中run方法没有返回值。
- Callable接口中call方法有声明异常,Runnable接口中run方法没有异常。
/**
* 演示Callable接口的使用
* 功能需求:使用Callable实现1-100的和。
*/
public class Demo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1.创建Callable对象
Callable<Integer> callable=new Callable<Integer>() {
private int sum=0;
@Override
public Integer call() throws Exception {
for(int i=1;i<=100;i++) {
sum+=i;
}
return sum;
}
};
//2.Thread的构造方法中没有带Callable的构造方法
//需要把Callable对象转成可执行任务,FutureTask表示将要执行的任务
//该类实现了RunnableFuture<V>接口,而该接口又继承了Runnable类
FutureTask<Integer> task=new FutureTask<Integer>(callable);
//3.创建线程对象
Thread thread=new Thread(task);
//4.启动线程
thread.start();
//5.获取结果(等待call方法执行完毕,才会返回)
Integer sum=task.get();
System.out.println("结果是"+sum);
}
}
5. Callable结合线程池使用
/**
* 使用线程池计算1-100的和
*/
public class Demo2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1.创建线程池
ExecutorService executorService=Executors.newFixedThreadPool(1);
//2.提交任务,Future表示将要执行任务的结果;
//submit可以传入一个Callable<T>对象
Future<Integer> future=executorService.submit(new Callable<Integer>() {
private int sum=0;
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+"开始计算。。");
for(int i=1;i<=100;i++) {
sum+=i;
Thread.sleep(10);
}
return sum;
}
});
//3.获取任务的结果(等待任务完成才会返回)
System.out.println(future.get());
//4.关闭线程池
executorService.shutdown();
}
}
6. Future接口
- Future:表示将要完成任务的结果。
演示一个案例:使用两个线程,并发计算1-50、51-100的和,再进行汇总统计。
/**
* 演示Future接口的使用
*/
public class Demo3 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1.创建线程池
ExecutorService executorService=Executors.newFixedThreadPool(2);
//2.提交任务
Future<Integer> future1=executorService.submit(new Callable<Integer>() {
int sum=0;
@Override
//计算1-50的和
public Integer call() throws Exception {
for(int i=1;i<=50;i++) {
sum+=i;
}
System.out.println("1-50的和计算完毕。");
return sum;
}
});
Future<Integer> future2=executorService.submit(new Callable<Integer>() {
int sum=0;
@Override
//计算51-100的和
public Integer call() throws Exception {
for(int i=51;i<=100;i++) {
sum+=i;
}
System.out.println("51-100的和计算完毕。");
return sum;
}
});
//3.获取结果
System.out.println(future1.get()+future2.get());
//4.关系线程池
executorService.shutdown();
}
}
- 表示
ExecutorService.submit()
所返回的状态结果,就是call的返回值。 - 方法
V get()
以阻塞形式等待Future中的异步处理结果(call的返回值)。
7. 线程的同步与异步
-
同步
形容一次方法调用,同步一旦开始,调用者必须等待该方法返回,才能继续。
当主线程调用子线程执行任务时,必须等到子线程返回结果后才能继续。
-
异步
形容一次方法调用,异步一旦开始就像是一次消息传递,调用者告知之后立刻返回。二者竞争时间片,并发执行。异步有多条执行路径。
八、线程安全的集合
下图中蓝色的表示线程安全的集合,绿色表示现代开发中已经很少使用的线程安全的集合。
-
Collection体系集合
-
Map安全集合体系
在多线程中使用线程不安全的集合会出现异常。在JDK1.5之前,可以使用Collections中的工具类方法。
Collections工具类中提供了多个可以获得线程安全集合的方法:
public static <T> Collection<T> synchronizedCollection(Collection<T> c)
public static <T> List<T> synchronizedList(List<T> list)
public static <T> Set<T> synchronizedSet(Set<T> s)
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
public static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s)
public static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V>)
以上为JDK1.2提供,接口单一、维护性高,但性能没有提升,均以synchronized实现。
public class Demo1 {
public static void main(String[] args) {
//1.使用ArrayList(不经过2步骤则报异常)
ArrayList<String> arrayList=new ArrayList<String>();
//2 使用Collections中的线程安全方法转成线程安全的集合
List<String> synList=Collections.synchronizedList(arrayList);
//3 使用并发包里提供的集合
//OnWriteArrayList<String> arrayList2=new OnWriteArrayList<String>();
//创建线程
for(int i=0;i<20;i++) {
int temp=i;
new Thread(new Runnable() {
@Override
public void run() {
for(int j=0;j<10;j++) {
synList.add(Thread.currentThread().getName()+":"+temp);
System.out.println(synList.toString());
}
}
}).start();
}
}
}
把ArrayList转成线程安全的集合后程序正常运行,结果不再演示。
1. OnWriteArrayList集合
- 线程安全的ArrayList,加强版的读写分离。
- 写有锁,读无锁,读写之间不堵塞,优于读写锁。
- 写入时,先一个容器副本、再添加新元素,最后替换引用。所以说它是用空间换安全的一种方式。
- 使用ArrayList无异。
/**
* 演示OnWriteArrayList的使用
*/
public class Demo2 {
public static void main(String[] args) {
//1.创建集合
OnWriteArrayList<String> list=new OnWriteArrayList<String>();
//2.使用多线程操作
ExecutorService eService=Executors.newFixedThreadPool(5);
//3.提交任务
for(int i=0;i<5;i++) {
eService.submit(new Runnable() {
@Override
public void run() {
for(int j=0;j<10;j++) {
list.add(Thread.currentThread().getName()+"..."+new Random().nextInt(1000));
}
}
});
}
//4.关闭线程池
eService.shutdown();
//等所有线程都执行完毕
while(!eService.isTerminated());
//5.打印结果
System.out.println("元素个数:"+list.size());
for (String string : list) {
System.out.println(string);
}
}
}
结果如下,没有问题:
元素个数:50
pool-1-thread-2...222
pool-1-thread-2...688
pool-1-thread-2...770
......
pool-1-thread-4...568
pool-1-thread-4...537
pool-1-thread-4...413
2. OnWriteArrayList源码分析
-
final transient ReentrantLock lock = new ReentrantLock();
此集合所使用的的锁lock是重入锁ReentrantLock。
-
private transient volatile Object[] array;
此集合实际存储的数组array。
-
在上节中调用的无参构造方法创建的是一个空的数组。
public OnWriteArrayList() {
setArray(new Object[0]);
}
final void setArray(Object[] a) {
array = a;
}
add(E)
添加元素是先把原来的数组到一个长度加1的新数组里,然后对新数组进行操作,最后再把新数组赋给原数组。这个操作上了锁。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.Of(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
remove(int)
删除元素同样是复制原数组到一个长度减1的新数组里,然后对新数组进行操作,最后再把新数组赋给原数组。这个操作也上了锁。
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.Of(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.array(elements, 0, newElements, 0, index);
System.array(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
有关数组修改的操作都上了锁,也就说写操作是互斥访问的。
有关读操作的代码都是直接进行了访问,没有上锁,也就是说在写的同时可以读。
private E get(Object[] a, int index) {
return (E) a[index];
}
3. OnWriteArraySet集合
- 线程安全的Set,底层使用OnWriteArrayList实现。
唯一不同在于,使用addIfAbsent()
添加元素,会遍历数组,如果已有元素(比较依据是equals),则不添加(扔掉副本)。
//演示OnWriteArraySet的使用
public class Demo3 {
public static void main(String[] args) {
OnWriteArraySet<String> set=new OnWriteArraySet<String>();
set.add("tang");
set.add("he");
set.add("yu");
set.add("wang");
set.add("tang");//重复元素,添加失败
System.out.println(set.size());
System.out.println(set.toString());
}
}
这个set集合是顺序输出的,结果如下:
4
[tang, he, yu, wang]
4. OnWriteArraySet源码分析
private final OnWriteArrayList<E> al
这个集合实际上使用的就是OnWriteArrayList集合。
它的无参构造方法new的就是OnWriteArrayList对象,所以它是有序的。
public OnWriteArraySet() {
al = new OnWriteArrayList<E>();
}
添加元素的操作和OnWriteArrayList大同小异。
public boolean add(E e) {
return al.addIfAbsent(e);
}
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
这是一个三元表达式,意思是存在相同元素返回false,否则添加元素。
先进入indexOf方法查看源码:
private static int indexOf(Object o, Object[] elements,
int index, int fence) {
if (o == null) {
for (int i = index; i < fence; i++)
if (elements[i] == null)
return i;
} else {
for (int i = index; i < fence; i++)
if (o.equals(elements[i]))
return i;
}
return -1;
}
add方法是添加单个元素,index参数就是0,这个方法就是在遍历数组,如果数组中已经存在相同元素则返回数组下标,注意看它的比较依据是equals方法;如果不存在则返回-1。
在addIfAbsent
所返回的三元表达式中,如果indexOf方法返回数组下标,则返回false,表示已经存在相同元素,添加失败;否则返回-1执行addIfAbsent(e, snapshot)
,进入该方法:
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.Of(current, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
我们可以忽略if语句,重点关注它的添加操作,发现它也将原数组到长度加一的新数组中,再对新数组进行操作,这个写操作上了锁。其他的写方法都调用了OnWriteArrayList的方法,同样是写操作上锁,读操作可以同时执行。
九、Queue接口(队列)
Collection的子接口,表示队列FIFO(First In First Out),先进先出。
常用方法:
-
抛出异常:
boolean add(E e)
顺序添加一个元素(到达上限后,再添加则会抛出异常)。
E remove()
获得第一个元素并移除(如果队列没有元素时,则抛出异常)。
E element()
获得第一个元素但不移除(如果队列没有元素时,则抛异常)。
-
返回特殊值:(建议使用以下方法)
boolean offer(E e)
顺序添加一个元素(到达上限后,再添加则会返回false)。
E poll()
获得第一个元素并移除(如果队列没有元素时,则返回null)。
E peek()
获得第一个元素但不移除(如果队列没有元素时,则返回null)。
//演示Queue实现类的使用
public class Demo4 {
public static void main(String[] args) {
//创建队列
Queue<String> queue=new LinkedList<String>();
//入队
queue.offer("tang");
queue.offer("he");
queue.offer("yu");
queue.offer("wang");
queue.offer("fan");
System.out.println("队首元素:"+queue.peek());
System.out.println("元素个数:"+queue.size());
//出队
int size=queue.size();
for(int i=0;i<size;i++) {
System.out.println(queue.poll());
}
System.out.println("出队完毕:"+queue.size());
}
}
需要注意的是因为LinkedList是线程不安全的集合,所以不能在多线程的环境中使用。该程序输出如下:
队首元素:tang
元素个数:5
tang
he
yu
wang
fan
出队完毕:0
1. ConcurrentLinkedQueue类
-
Queue接口的实现类。线程安全、可高效读写的队列,高并发下性能最好的队列。
-
无锁、CAS(Compare and Swap)比较交换算法,修改的方法包含三个核心参数(V,E,N)。
-
V:要更新的变量;E:预期值;N:新值。
-
只有当V==E,V=N;否则表示V已被更新过,则取消当前操作。
也就是说假如当前值V是80,要将其改成100,先将V读取出来,读取的V就是预期值;如果预期值E和V相等,就把V的值更新成新值100;如果不等,说明中间有其他线程更新了V,就取消当前操作。
//演示线程安全的队列
public class Demo5 {
public static void main(String[] args) throws InterruptedException {
//创建安全队列
ConcurrentLinkedQueue<Integer> queue=new ConcurrentLinkedQueue<Integer>();
//两个线程执行入队操作
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
for(int i=1;i<=5;i++) {
queue.offer(i);
}
}
});
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
for(int i=6;i<=10;i++) {
queue.offer(i);
}
}
});
//启动线程
t1.start();
t2.start();
t1.join();
t2.join();
for(int i=1;i<=10;i++) {
System.out.println(queue.poll());
}
}
}
因为是两个线程同时添加,所以结果不是顺序的:
1
6
2
3
4
7
5
8
9
10
2. BlockingQueue接口(阻塞队列)
-
Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法。
-
方法
void put(E e)
将指定元素插入此队列中,如果没有可用空间,则等待。
E take()
获取并移除此队列头部元素,如果没有可用元素,则等待。
-
可用于解决生产者、消费者问题。
2.1 阻塞队列(实现类)
-
ArrayBlockingQueue
数组结构实现,有界队列。
-
LinkedBlockingQueue
链表结构实现,有界队列。默认上限
Integer.MAX_VALUE
。
通过一个小程序演示一下所谓的阻塞:
public class Demo6 {
public static void main(String[] args) throws InterruptedException {
//创建一个有界队列
ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue<Integer>(3);
//添加数据使用put
arrayBlockingQueue.put(1);
arrayBlockingQueue.put(2);
arrayBlockingQueue.put(3);
System.out.println(arrayBlockingQueue.size());
System.out.println(arrayBlockingQueue.toString());
arrayBlockingQueue.put(4);
System.out.println("我不会被执行。");
}
}
该程序执行后可以通过控制台看见程序并没有结束,也没有打印最后一句话,说明当前线程(主线程)被阻塞了:
3
[1, 2, 3]
2.2 重写生产者消费者问题
public class Demo7 {
public static void main(String[] args) {
//创建队列
ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(6);
//创建两个线程
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
for(int i=1;i<=30;i++) {
try {
queue.put(i);
System.out.println("生产者生产了一个产品,产品ID:"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
for(int i=1;i<=30;i++) {
try {
queue.take();
System.out.println("消费者消费了一个产品,产品ID:"+i);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
}
});
t1.start();
t2.start();
}
}
需要注意的是插入队尾的方法是put,删除队首元素的方法是take。结果运行如下:
生产者生产了一个产品,产品ID:1
生产者生产了一个产品,产品ID:2
生产者生产了一个产品,产品ID:3
生产者生产了一个产品,产品ID:4
生产者生产了一个产品,产品ID:5
生产者生产了一个产品,产品ID:6
消费者消费了一个产品,产品ID:1
......
消费者消费了一个产品,产品ID:25
消费者消费了一个产品,产品ID:26
消费者消费了一个产品,产品ID:27
消费者消费了一个产品,产品ID:28
消费者消费了一个产品,产品ID:29
消费者消费了一个产品,产品ID:30
十、ConcurrentHashMap
- 初始容量默认为16段(Segment),使用分段锁设计。每一段都对应着一个哈希表。
- 不对整个Map加锁,而是为每个Segment加锁。对一个Segment的操作不影响其他Segment。
- 当多个对象存入同一个Segment时,才需要互斥。
- 最理想状态为16个对象分别存入16个Segment,并行数量16。
- 使用方式与HashMap无异。
注:在JDK1.8之后,ConcurrentHashMap不再采用分段锁,而是采用无锁算法CAS。
//演示线程安全的Map
public class Demo8 {
public static void main(String[] args) {
//创建集合
ConcurrentHashMap<String, Integer> hashMap=new ConcurrentHashMap<String, Integer>();
//使用多线程添加数据
for(int i=0;i<5;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for(int k=0;k<10;k++) {
hashMap.put(Thread.currentThread().getName(), k);
System.out.println(hashMap);
}
}
}).start();
}
}
}
没有问题,不再演示结果。