当前位置: 首页 > news >正文

3-1多线程-线程池

java线程池ThreadPoolExecutor

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

1、ThreadPoolExecutor的构造函数

public ThreadPoolExecutor(
	int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
}

构造函数的参数含义如下:

corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到WorkQueue任务队列中
maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的WorkQueue任务队列的类型,决定线程池会开辟的最大线程数量
keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
unit:keepAliveTime的单位
workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种
threadFactory:线程工厂,用于创建线程,一般用默认即可
handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务
  1、拒绝策略AbortPolicy:如果队列满了,最新的提交任务将会被拒绝,并抛出RejectedExecution异常
  2、CallerRunsPolicy:它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务,从而降低调用者的调用速度
  3、DiscardOldestPolicy:将会丢弃最老的任务,保存最新插入的任务
  4、DiscardPolicy:将会悄悄的丢弃提交的任务,而不报任何异常

2、workQueue任务队列

2-1、直接提交队列

直接提交队列(SynchronousQueue):它是同步Queue,内部没有存储,阻塞队列,是一个没有数据缓冲的BlockingQueue,它没有容量,只能存一个取一个

void put(E o):向队列提交一个元素,阻塞直到其他线程take或者poll此元素
boolean offer(E o):向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或者"碰巧"有poll操作,那么将返回true,否则返回false
E take():获取并删除一个元素,阻塞直到有其他线程offer/put.
boolean poll():获取并删除一个元素,如果此时有其他线程正在被put阻塞(即其他线程提交元素正等待被接收)或者"碰巧"有offer操作,那么将返回true,否则返回false.

使用场景:

生产者消费者模式

2-2、有界的任务队列

有界的任务队列ArrayBlockingQueue

案例演示:

ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

结果分析:

使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限

2-3、无界的任务队列:

无界的任务队列LinkedBlockingQueue

BlockingQueue<Object> bq1=new LinkedBlockingQueue<Object>(10000);
public LinkedBlockingQueue() {
   this(Integer.MAX_VALUE);
}

案例演示:

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

结果分析:

使用无界任务队列,当然也不是真的无界,这个值是2的31次方-1。线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题

2-4、优先任务队列

优先任务队列PriorityBlockingQueue

public class zzzz {
    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000,
                TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 20; i++) {
            pool.execute(new ThreadTask(i));
        }
    }
}
class ThreadTask implements Runnable, Comparable<ThreadTask> {
    private int priority;

    public ThreadTask(int priority) {
        this.priority = priority;
    }
    //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
    public int compareTo(ThreadTask o) {
        return this.priority > o.priority ? -1 : 1;
    }
    public void run() {
        //让线程阻塞,使后续任务进入缓存队列
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("priority:" + this.priority + ",ThreadName:" + Thread.currentThread().getName());
    }
}

结果分析:

可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。
通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

3、拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但这种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。

ThreadPoolExecutor自带的拒绝策略如下:

1、AbortPolicy拒绝策略:如果队列满了,最新的提交任务将会被拒绝,并抛出RejectedExecutionException异常
2、CallerRunsPolicy:它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者(把任务队列中的任务放在调用者线程),使用调用者的线程来执行任务,从而降低调用者的调用速度
3、DiscardOldestPolicy:将会丢弃最老的任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交,从而保存最新插入的任务
4、DiscardPolicy:直接丢弃任务,而不报任何异常,业务场景中需允许任务的丢失

以上内置的策略均实现了RejectedExecutionHandler接口

案例:自定义拒绝策略

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Demo3 {
    public static int j=1;
    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());

        for (int i=20;i>=1;i--){

            int finalI = i;
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(finalI *100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"--------"+j++);
                }
            });
        }
    }
}

可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;

int  n = Runtime.getRuntime().availableProcessors();
最佳线程数目 = cpu的核数*2

相关文章:

  • Linux-远程管理命令
  • mysql:数据库调优策略,sql调优
  • HighCharts结构及详细配置(中文对比)
  • 面试之 Python 框架 Flask、Django、DRF
  • 2023年FOF/MOM基金研究报告
  • 在CentOS-6.9配置apache服务(1)---基于个人主页的身份验证
  • End-to-End Entity Resolution for Big Data: A Survey Matching部分学习笔记
  • 自动驾驶感知——红外传感器
  • stream操作常用API 示例详解
  • IB EE 学习干货,从选学科/课题/写稿/对稿/交稿几个方面入手分享
  • 集成学习面试常见问题
  • [C++][原创]jsoncpp用法及其注意事项
  • 芒果改进YOLOv7系列:结合最新Wise-IoU损失函数,涨点神器|超越CIoU, SIoU性能,助力YOLOv7模型涨点1.4%,最新目标检测的损失函数
  • 【靶机】vulnhub靶机clover:1
  • 状态空间模型与卡尔曼滤波
  • 【快速开始】vuejs环境搭建第一个项目
  • 聊聊关于矩阵反向传播的梯度计算
  • 测试岗外包4年终上岸,这段日子说起来都是泪啊
  • linux secure boot(安全启动)下为内核模块签名
  • 解决数据兼容性问题
  • 电加热油锅炉工作原理_电加热导油
  • 大型电蒸汽锅炉_工业电阻炉
  • 燃气蒸汽锅炉的分类_大连生物质蒸汽锅炉
  • 天津市维修锅炉_锅炉汽化处理方法
  • 蒸汽汽锅炉厂家_延安锅炉厂家
  • 山西热水锅炉厂家_酒店热水 锅炉
  • 蒸汽锅炉生产厂家_燃油蒸汽发生器
  • 燃煤锅炉烧热水_张家口 淘汰取缔燃煤锅炉
  • 生物质锅炉_炉
  • 锅炉天然气_天燃气热风炉