当前位置:首页>笔记分享>Java笔记>Java多线程(二)

Java多线程(二)

七、线程池

1. 线程池概念

首先有关线程的使用会出现两个问题:

  1. 线程是宝贵的内存资源、单个线程约占1MB空间,过多分配易造成内存溢出。
  2. 频繁的创建及销毁线程会增加虚拟机回收频率、资源开销,造成性能下降。

基于如上的问题,出现了线程池:

  • 线程容器可设定线程分配的数量
  • 将预先创建的线程对象存入池中并重用线程池中的线程对象
  • 避免频繁的创建和销毁

2. 线程池原理

假如线程池里固定有三个线程,有四个任务。线程池中的三个线程分别完成三个任务,第四个任务则进入等待状态,线程执行完前三个任务后继续执行第四个任务。

img

  • 将任务提交给线程池,由线程池分配线程、运行任务,并在当前任务结束后复用线程。

3. 创建线程池

创建线程池过程(重要)

  1. 创建池子ExecutorService executorService=Executors.newFixedThreadPool(10);
  2. 执行任务
    • Runnable接口执行任务executorService.execute(Runnable任务);
    • Callable接口执行任务executorService.submit(Callable任务);
  3. 关闭线程池executorService.shutdown();

常用方法

  • corePoolSize;核心池的大小
  • maximumPoolSize;最大线程数
  • keepAliveTime;:线程没有任务时最多保持多久会终止
ExecutorService executorService=Executors.newFixedThreadPool(10);
//创建一个有十个线程的池子
  • 通过newFixedThreadPool(int nThreads)获得固定数量的线程池。参数:指定线程池中线程的数量。

  • 通过newCachedThreadPool()获得动态数量的线程池,如不够则创建新的,没有上限。

package com.gong.Demo03;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestPool {
    public static void main(String[] args) {
        TestRunnable myRunnable = new TestRunnable();
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        executorService.execute(myRunnable);
        executorService.execute(myRunnable);
        executorService.execute(myRunnable);
        executorService.execute(myRunnable);
        executorService.shutdown();
    }
}
class TestRunnable implements Runnable{

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"执行了");
    }
}

这里之后?

4. Callable接口

public interface Callable<V>{
    public V call() throws Exception;
}
  • JDK1.5加入,与Runnable接口类似,实现之后代表一个线程任务。
  • Callable具有泛型返回值、可以声明异常。

与Runnable接口的区别

  1. Callable接口中call方法有返回值,Runnable接口中run方法没有返回值。
  2. Callable接口中call方法有声明异常,Runnable接口中run方法没有异常。
/**
 * 演示Callable接口的使用
 * 功能需求:使用Callable实现1-100的和。
 */
public class Demo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1.创建Callable对象
        Callable<Integer> callable=new Callable<Integer>() {
            private int sum=0;
            @Override
            public Integer call() throws Exception {
                for(int i=1;i<=100;i++) {
                    sum+=i;
                }
                return sum;
            }
        };
        //2.Thread的构造方法中没有带Callable的构造方法
        //需要把Callable对象转成可执行任务,FutureTask表示将要执行的任务
        //该类实现了RunnableFuture<V>接口,而该接口又继承了Runnable类
        FutureTask<Integer> task=new FutureTask<Integer>(callable);

        //3.创建线程对象
        Thread thread=new Thread(task);
        //4.启动线程
        thread.start();
        //5.获取结果(等待call方法执行完毕,才会返回)
        Integer sum=task.get();
        System.out.println("结果是"+sum);
    }
}

5. Callable结合线程池使用

/**
 * 使用线程池计算1-100的和
 */
public class Demo2 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1.创建线程池
        ExecutorService executorService=Executors.newFixedThreadPool(1);
        //2.提交任务,Future表示将要执行任务的结果;
        //submit可以传入一个Callable<T>对象
        Future<Integer> future=executorService.submit(new Callable<Integer>() {
            private int sum=0;
            @Override
            public Integer call() throws Exception {
                System.out.println(Thread.currentThread().getName()+"开始计算。。");
                for(int i=1;i<=100;i++) {
                    sum+=i;
                    Thread.sleep(10);
                }
                return sum;
            }
        });
        //3.获取任务的结果(等待任务完成才会返回)
        System.out.println(future.get());
        //4.关闭线程池
        executorService.shutdown();
    }
}

6. Future接口

  • Future表示将要完成任务的结果

演示一个案例:使用两个线程,并发计算1-50、51-100的和,再进行汇总统计。

/**
 * 演示Future接口的使用
 */
public class Demo3 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1.创建线程池
        ExecutorService executorService=Executors.newFixedThreadPool(2);
        //2.提交任务
        Future<Integer> future1=executorService.submit(new Callable<Integer>() {
            int sum=0;
            @Override
            //计算1-50的和
            public Integer call() throws Exception {
                for(int i=1;i<=50;i++) {
                    sum+=i;
                }
                System.out.println("1-50的和计算完毕。");
                return sum;
            }
        });
        Future<Integer> future2=executorService.submit(new Callable<Integer>() {
            int sum=0;
            @Override
            //计算51-100的和
            public Integer call() throws Exception {
                for(int i=51;i<=100;i++) {
                    sum+=i;
                }
                System.out.println("51-100的和计算完毕。");
                return sum;
            }
        });
        //3.获取结果
        System.out.println(future1.get()+future2.get());
        //4.关系线程池
        executorService.shutdown();
    }
}
  • 表示ExecutorService.submit()所返回的状态结果就是call的返回值
  • 方法V get()以阻塞形式等待Future中的异步处理结果call的返回值)。

7. 线程的同步与异步

  • 同步

    形容一次方法调用,同步一旦开始,调用者必须等待该方法返回,才能继续。

    当主线程调用子线程执行任务时,必须等到子线程返回结果后才能继续。

  • 异步

    形容一次方法调用,异步一旦开始就像是一次消息传递,调用者告知之后立刻返回。二者竞争时间片,并发执行。异步有多条执行路径

img



八、线程安全的集合

下图中蓝色的表示线程安全的集合,绿色表示现代开发中已经很少使用的线程安全的集合。

  • Collection体系集合

  • Map安全集合体系

在多线程中使用线程不安全的集合会出现异常。在JDK1.5之前,可以使用Collections中的工具类方法。

Collections工具类中提供了多个可以获得线程安全集合的方法:

  • public static <T> Collection<T> synchronizedCollection(Collection<T> c)
  • public static <T> List<T> synchronizedList(List<T> list)
  • public static <T> Set<T> synchronizedSet(Set<T> s)
  • public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
  • public static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s)
  • public static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V>)

以上为JDK1.2提供,接口单一、维护性高,但性能没有提升,均以synchronized实现。

public class Demo1 {
    public static void main(String[] args) {
        //1.使用ArrayList(不经过2步骤则报异常)
        ArrayList<String> arrayList=new ArrayList<String>();

        //2 使用Collections中的线程安全方法转成线程安全的集合
        List<String> synList=Collections.synchronizedList(arrayList);
        //3 使用并发包里提供的集合
        //OnWriteArrayList<String> arrayList2=new OnWriteArrayList<String>();

        //创建线程
        for(int i=0;i<20;i++) {
            int temp=i;
            new Thread(new Runnable() {         
                @Override
                public void run() {
                    for(int j=0;j<10;j++) {
                        synList.add(Thread.currentThread().getName()+":"+temp);
                        System.out.println(synList.toString());
                    }
                }
            }).start();
        }
    }
}

把ArrayList转成线程安全的集合后程序正常运行,结果不再演示。

1. OnWriteArrayList集合

  • 线程安全的ArrayList,加强版的读写分离。
  • 写有锁,读无锁,读写之间不堵塞,优于读写锁。
  • 写入时,先一个容器副本、再添加新元素,最后替换引用。所以说它是用空间换安全的一种方式。
  • 使用ArrayList无异。
/**
 * 演示OnWriteArrayList的使用
 */
public class Demo2 {
    public static void main(String[] args) {
        //1.创建集合
        OnWriteArrayList<String> list=new OnWriteArrayList<String>();
        //2.使用多线程操作
        ExecutorService eService=Executors.newFixedThreadPool(5);
        //3.提交任务
        for(int i=0;i<5;i++) {
            eService.submit(new Runnable() {            
                @Override
                public void run() {
                    for(int j=0;j<10;j++) {
                        list.add(Thread.currentThread().getName()+"..."+new Random().nextInt(1000));
                    }
                }
            });         
        }
        //4.关闭线程池
        eService.shutdown();
        //等所有线程都执行完毕
        while(!eService.isTerminated());
        //5.打印结果
        System.out.println("元素个数:"+list.size());
        for (String string : list) {
            System.out.println(string);
        }
    }
}

结果如下,没有问题:

元素个数:50
pool-1-thread-2...222
pool-1-thread-2...688
pool-1-thread-2...770
    ......
pool-1-thread-4...568
pool-1-thread-4...537
pool-1-thread-4...413

2. OnWriteArrayList源码分析

  • final transient ReentrantLock lock = new ReentrantLock();

    此集合所使用的的锁lock是重入锁ReentrantLock。

  • private transient volatile Object[] array;

    此集合实际存储的数组array。

  • 在上节中调用的无参构造方法创建的是一个空的数组。

public OnWriteArrayList() {
    setArray(new Object[0]);
}
final void setArray(Object[] a) {
    array = a;
}
  • add(E)添加元素是先把原来的数组到一个长度加1的新数组里,然后对新数组进行操作,最后再把新数组赋给原数组。这个操作上了锁。
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.Of(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}
  • remove(int)删除元素同样是复制原数组到一个长度减1的新数组里,然后对新数组进行操作,最后再把新数组赋给原数组。这个操作也上了锁。
public E remove(int index) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        E oldValue = get(elements, index);
        int numMoved = len - index - 1;
        if (numMoved == 0)
            setArray(Arrays.Of(elements, len - 1));
        else {
            Object[] newElements = new Object[len - 1];
            System.array(elements, 0, newElements, 0, index);
            System.array(elements, index + 1, newElements, index,
                             numMoved);
            setArray(newElements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

有关数组修改的操作都上了锁,也就说写操作是互斥访问的。

有关读操作的代码都是直接进行了访问,没有上锁,也就是说在写的同时可以读。

private E get(Object[] a, int index) {
    return (E) a[index];
}

3. OnWriteArraySet集合

  • 线程安全的Set,底层使用OnWriteArrayList实现。

唯一不同在于,使用addIfAbsent()添加元素,会遍历数组,如果已有元素(比较依据是equals),则不添加(扔掉副本)。

//演示OnWriteArraySet的使用
public class Demo3 {
    public static void main(String[] args) {
        OnWriteArraySet<String> set=new OnWriteArraySet<String>();
        set.add("tang");
        set.add("he");
        set.add("yu");
        set.add("wang");
        set.add("tang");//重复元素,添加失败
        System.out.println(set.size());
        System.out.println(set.toString());
    }
}

这个set集合是顺序输出的,结果如下:

4
[tang, he, yu, wang]

4. OnWriteArraySet源码分析

  • private final OnWriteArrayList<E> al

这个集合实际上使用的就是OnWriteArrayList集合。

它的无参构造方法new的就是OnWriteArrayList对象,所以它是有序的。

public OnWriteArraySet() {
    al = new OnWriteArrayList<E>();
}

添加元素的操作和OnWriteArrayList大同小异。

public boolean add(E e) {
    return al.addIfAbsent(e);
}
public boolean addIfAbsent(E e) {
    Object[] snapshot = getArray();
    return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
    addIfAbsent(e, snapshot);
}

这是一个三元表达式,意思是存在相同元素返回false,否则添加元素。

先进入indexOf方法查看源码:

private static int indexOf(Object o, Object[] elements,
                           int index, int fence) {
    if (o == null) {
        for (int i = index; i < fence; i++)
            if (elements[i] == null)
                return i;
    } else {
        for (int i = index; i < fence; i++)
            if (o.equals(elements[i]))
                return i;
    }
    return -1;
}

add方法是添加单个元素,index参数就是0,这个方法就是在遍历数组,如果数组中已经存在相同元素则返回数组下标,注意看它的比较依据是equals方法;如果不存在则返回-1。

addIfAbsent所返回的三元表达式中,如果indexOf方法返回数组下标,则返回false,表示已经存在相同元素,添加失败;否则返回-1执行addIfAbsent(e, snapshot),进入该方法:

private boolean addIfAbsent(E e, Object[] snapshot) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] current = getArray();
        int len = current.length;
        if (snapshot != current) {
            // Optimize for lost race to another addXXX operation
            int common = Math.min(snapshot.length, len);
            for (int i = 0; i < common; i++)
                if (current[i] != snapshot[i] && eq(e, current[i]))
                    return false;
            if (indexOf(e, current, common, len) >= 0)
                return false;
        }
        Object[] newElements = Arrays.Of(current, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

我们可以忽略if语句,重点关注它的添加操作,发现它也将原数组到长度加一的新数组中,再对新数组进行操作,这个写操作上了锁。其他的写方法都调用了OnWriteArrayList的方法,同样是写操作上锁,读操作可以同时执行。

九、Queue接口(队列)

Collection的子接口,表示队列FIFO(First In First Out),先进先出。

常用方法:

  • 抛出异常:

    • boolean add(E e)

    顺序添加一个元素(到达上限后,再添加则会抛出异常)。

    • E remove()

    获得第一个元素并移除(如果队列没有元素时,则抛出异常)。

    • E element()

    获得第一个元素但不移除(如果队列没有元素时,则抛异常)。

  • 返回特殊值:(建议使用以下方法

    • boolean offer(E e)

    顺序添加一个元素(到达上限后,再添加则会返回false)。

    • E poll()

    获得第一个元素并移除(如果队列没有元素时,则返回null)。

    • E peek()

    获得第一个元素但不移除(如果队列没有元素时,则返回null)。

//演示Queue实现类的使用
public class Demo4 {
    public static void main(String[] args) {
        //创建队列
        Queue<String> queue=new LinkedList<String>();
        //入队
        queue.offer("tang");
        queue.offer("he");
        queue.offer("yu");
        queue.offer("wang");
        queue.offer("fan");
        System.out.println("队首元素:"+queue.peek());
        System.out.println("元素个数:"+queue.size());
        //出队
        int size=queue.size();
        for(int i=0;i<size;i++) {
            System.out.println(queue.poll());
        }
        System.out.println("出队完毕:"+queue.size());
    }
}

需要注意的是因为LinkedList是线程不安全的集合,所以不能在多线程的环境中使用。该程序输出如下:

队首元素:tang
元素个数:5
tang
he
yu
wang
fan
出队完毕:0

1. ConcurrentLinkedQueue类

  • Queue接口的实现类。线程安全、可高效读写的队列,高并发下性能最好的队列。

  • 无锁、CAS(Compare and Swap)比较交换算法,修改的方法包含三个核心参数(V,E,N)。

  • V:要更新的变量;E:预期值;N:新值。

  • 只有当V==E,V=N;否则表示V已被更新过,则取消当前操作。

    也就是说假如当前值V是80,要将其改成100,先将V读取出来,读取的V就是预期值;如果预期值E和V相等,就把V的值更新成新值100;如果不等,说明中间有其他线程更新了V,就取消当前操作。

//演示线程安全的队列
public class Demo5 {
    public static void main(String[] args) throws InterruptedException {
        //创建安全队列
        ConcurrentLinkedQueue<Integer> queue=new ConcurrentLinkedQueue<Integer>();
        //两个线程执行入队操作
        Thread t1=new Thread(new Runnable() {       
            @Override
            public void run() {
                for(int i=1;i<=5;i++) {
                    queue.offer(i);
                }
            }
        });
        Thread t2=new Thread(new Runnable() {       
            @Override
            public void run() {
                for(int i=6;i<=10;i++) {
                    queue.offer(i);
                }
            }
        });
        //启动线程
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        for(int i=1;i<=10;i++) {
            System.out.println(queue.poll());
        }
    }
}

因为是两个线程同时添加,所以结果不是顺序的:

1
6
2
3
4
7
5
8
9
10

2. BlockingQueue接口(阻塞队列)

  • Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法。

  • 方法

    • void put(E e)

    将指定元素插入此队列中,如果没有可用空间,则等待。

    • E take()

    获取并移除此队列头部元素,如果没有可用元素,则等待。

  • 可用于解决生产者消费者问题

2.1 阻塞队列(实现类)

  • ArrayBlockingQueue

    数组结构实现,有界队列。

  • LinkedBlockingQueue

    链表结构实现,有界队列。默认上限Integer.MAX_VALUE

通过一个小程序演示一下所谓的阻塞:

public class Demo6 {
    public static void main(String[] args) throws InterruptedException {
        //创建一个有界队列
        ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue<Integer>(3);
        //添加数据使用put
        arrayBlockingQueue.put(1);
        arrayBlockingQueue.put(2);
        arrayBlockingQueue.put(3);
        System.out.println(arrayBlockingQueue.size());
        System.out.println(arrayBlockingQueue.toString());
        arrayBlockingQueue.put(4);
        System.out.println("我不会被执行。");
    }
}

该程序执行后可以通过控制台看见程序并没有结束,也没有打印最后一句话,说明当前线程(主线程)被阻塞了:

3
[1, 2, 3]

2.2 重写生产者消费者问题

public class Demo7 {
    public static void main(String[] args) {
        //创建队列
        ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(6);
        //创建两个线程
        Thread t1=new Thread(new Runnable() {       
            @Override
            public void run() {
                for(int i=1;i<=30;i++) {
                    try {
                        queue.put(i);
                        System.out.println("生产者生产了一个产品,产品ID:"+i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }               
                }
            }
        });
        Thread t2=new Thread(new Runnable() {       
            @Override
            public void run() {
                for(int i=1;i<=30;i++) {
                    try {
                        queue.take();
                        System.out.println("消费者消费了一个产品,产品ID:"+i);
                    } catch (InterruptedException e) {
                        // TODO 自动生成的 catch 块
                        e.printStackTrace();
                    }                   
                }
            }
        });
        t1.start();
        t2.start();
    }
}

需要注意的是插入队尾的方法是put,删除队首元素的方法是take。结果运行如下:

生产者生产了一个产品,产品ID:1
生产者生产了一个产品,产品ID:2
生产者生产了一个产品,产品ID:3
生产者生产了一个产品,产品ID:4
生产者生产了一个产品,产品ID:5
生产者生产了一个产品,产品ID:6
消费者消费了一个产品,产品ID:1
    ......
消费者消费了一个产品,产品ID:25
消费者消费了一个产品,产品ID:26
消费者消费了一个产品,产品ID:27
消费者消费了一个产品,产品ID:28
消费者消费了一个产品,产品ID:29
消费者消费了一个产品,产品ID:30

十、ConcurrentHashMap

  • 初始容量默认为16段(Segment),使用分段锁设计。每一段都对应着一个哈希表。
  • 不对整个Map加锁,而是为每个Segment加锁。对一个Segment的操作不影响其他Segment。
  • 当多个对象存入同一个Segment时,才需要互斥。
  • 最理想状态为16个对象分别存入16个Segment,并行数量16。
  • 使用方式与HashMap无异。

注:在JDK1.8之后,ConcurrentHashMap不再采用分段锁,而是采用无锁算法CAS。

//演示线程安全的Map
public class Demo8 {
    public static void main(String[] args) {
        //创建集合
        ConcurrentHashMap<String, Integer> hashMap=new ConcurrentHashMap<String, Integer>();
        //使用多线程添加数据
        for(int i=0;i<5;i++) {
            new Thread(new Runnable() {             
                @Override
                public void run() {
                    for(int k=0;k<10;k++) {
                        hashMap.put(Thread.currentThread().getName(), k);
                        System.out.println(hashMap);
                    }
                }
            }).start();
        }
    }
}

没有问题,不再演示结果。

给TA打赏
共{{data.count}}人
人已打赏
Java笔记多线程

Java多线程(一)

2021-9-13 11:16:39

Java笔记

JavaSE——IO流

2021-9-15 23:39:24

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
购物车
优惠劵
有新私信 私信列表
搜索