JUC自定义线程池练习

JUC自定义线程池练习

JUC自定义线程池练习

首先上面该线程池的大致流程

自定义阻塞队列

  • 首先定义一个双向的队列和锁一定两个等待的condition
  • 本类用lock来控制多线程下的流程执行
  • take和push方法就是死等,调用await就是等,后面优化为限时等待
  • take调用后取出阻塞队列的task后会调用fullWaitSet的signal方法来唤醒因为阻塞队列满了的线程将task放入阻塞队列。
@Slf4j
class TaskQueue {

    // 双向的阻塞队列
    private Deque deque;
    // 队列最大容量
    private int capacity;

    // 锁
    private ReentrantLock lock = new ReentrantLock();
    // 消费者任务池空的等待队列
    private Condition emptyWaitSet = lock.newCondition();
    // 生产者任务池满的等待队列
    private Condition fullWaitSet = lock.newCondition();

    public TaskQueue(int capacity) {
        this.capacity = capacity;
        deque = new ArrayDeque<>(capacity);
    }

    // 死等take,即从阻塞队列取出任务
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("取走任务");
            T task = deque.pollFirst();
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    // 线程添加任务,属于是死等添加
    public void push(T task) {
        lock.lock();
        try {
            while (deque.size() >= capacity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("添加任务");
            deque.offerLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    public int getSize() {
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }

}

优化,死等优化为超时等

  • awaitNanos方法返回的是等待的剩余时间,如果已经等了base时间就会返回0,如果没有就会返回大于0即还没有等待的时间,防止虚假唤醒导致重新等待时间加长。当然在本题的设计中不会出现虚假唤醒的情况。
public T poll(Long timeout,TimeUnit unit) {
    lock.lock();
    try {
        long base = unit.toNanos(timeout);
        while (deque.isEmpty()) {
            try {
                if (base

线程池类

  • 成员变量如下,对于Worker就工作线程
@Slf4j
class ThreadPool {
    // 阻塞队列大小
    private int capacity;
    // 阻塞队列
    private TaskQueue taskQueue;
    // 工作线程
    private HashSet workerSet = new HashSet<>();
    // 核心数
    private int coreNum;
    // 超时等待时间
    private long timeout;
    // 超时等待单位
    private TimeUnit unit;
    // 拒绝策略
    private RejectPolicy rejectPolicy;

    // 线程对象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            // 就是线程把当前分配的任务做完,然后还要去阻塞队列找活干,没活就退出
            // taks 如果不为空就执行然后讲其置为空,后续再次进入循环后会从阻塞队列中再次取出task,
            // 如果不为空就继续执行,但是因为take死等,会导致无法结束
            // 使用了这个超时等的方法,当无法取出时就会退出程序
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("开始执行任务");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            // 当没有任务可执行,线程自动销毁,由于这是根据对象来销毁,且hashset无序,所以这里无需保证其的线程安全。
            workerSet.remove(this);
        }
    }

    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 当线程数大于核心数,就将任务放入阻塞队列
     * 否则创建线程进行处理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        // 需要synchronized关键字控制多线程下对执行方法的执行,保证共享变量workerSet安全。
        synchronized (workerSet) {
            // 如果已经存在的工作线程已经大于核心数,就不适合在进行创建线程了,创太多线程对于执行并不会加快,反而会因为线程不断切换而拖累CPU的执行。
            if (workerSet.size() >= coreNum) {
                taskQueue.push(runnable);
            } else {
                // 如果工作线程小于核心数就可创建一个worker线程来工作
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

测试类

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任务{}", j);
            });
        }
    }

}

优化—拒绝策略

我们没有进行优化的就是当任务太多导致阻塞线程也满了,此时任务线程就会进行阻塞,直到等到有人在线程池中取走任务。也就是push方法,我们在旧的方法中仍采用的是死等的方法。

但是方法中有很多死等,超时等,放弃任务,抛出异常,让调用者自己执行任务等等方法。

我们就可用讲其进行抽象,把操作交给调用者。

定义了如下的函数式接口,即为拒绝策略。

@FunctionalInterface
interface RejectPolicy{
    void reject(TaskQueue taskQueue,T task);
}

将在TaskQueue任务队列中定义不同的策略,我们只要传入这个函数式接口的实现对象就可用实现定制拒绝的策略。

在TaskQueue类添加一个方法,用来调用拒绝策略

public void tryAndAdd(T task,RejectPolicy rejectPolicy){
    lock.lock();
    try {
        if (deque.size() >= capacity) {
            rejectPolicy.reject(this,task);
        }else{
            log.debug("添加任务");
            deque.offerLast(task);
            emptyWaitSet.signal();
        }
    } finally {
        lock.unlock();
    }
}

更改了构造方法的线程池类,这样就可用传入一个自定义的拒绝策略。

@Slf4j
class ThreadPool {
    // 阻塞队列大小
    private int capacity;
    // 阻塞队列
    private TaskQueue taskQueue;
    // 工作线程
    private HashSet workerSet = new HashSet<>();
    // 核心数
    private int coreNum;
    // 超时等待时间
    private long timeout;
    // 超时等待单位
    private TimeUnit unit;
    // 拒绝策略
    private RejectPolicy rejectPolicy;

    // 线程对象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("开始执行任务");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            workerSet.remove(this);
        }
    }

    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 当线程数大于核心数,就将任务放入阻塞队列
     * 否则创建线程进行处理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        synchronized (workerSet) {
            if (workerSet.size() >= coreNum) {
                taskQueue.tryAndAdd(runnable,rejectPolicy);
            } else {
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

将启动类修改如下

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            // 采用死等的方法,当然我们可用在taskQueue中定义更多的方法让调用者选择
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任务{}", j);
            });
        }
    }

}

这样我们就完成了自定义的线程池。

Original: https://www.cnblogs.com/duizhangz/p/16259337.html
Author: 大队长11
Title: JUC自定义线程池练习

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/599170/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 剑指 Offer II 091. 粉刷房子

    剑指 Offer II 091. 粉刷房子 动态规划当前粉刷房子的花费可以由上一家粉刷房子的花费推导出来,所以可以使用动态规划求解这道题。首先确定dp数组的含义,每个房子都可以被粉…

    数据库 2023年6月16日
    0101
  • Python–异常

    1 # -*- coding:utf-8 -*- 2 name = ‘tj’ 3 try: 4 int(name) 5 except (IndexError,KeyError) a…

    数据库 2023年6月9日
    071
  • Java8Stream流

    Stream流呢,以前我也有所了解,像一些面试题中也出现过,Java8的新特性,有一块就是这个Stream操作集合,而且在看一些项目中也使用的比较多。但总感觉自己学的一知半解,所以…

    数据库 2023年6月11日
    079
  • CSS样式

    css概述 层叠样式表(cascading style sheet) 层叠是指==将多个样式施加在一个元素(标签)上== 作用: 美化页面 将html代码与样式代码分离 好处: 功…

    数据库 2023年6月16日
    092
  • 项目中所用到的mysql重复过滤

    问题:首先用户会本地上传一批号码(可能重复)到我们项目,通过解析文件,把号码入库(只验证是不是号码其他不做改动)到号码表,然后对号码进行去重操作. 表结构为:主键(id),号码(m…

    数据库 2023年6月11日
    076
  • 为什么我选择MySQL Workbench・一

    一、官方 官方提供的工具必然有其优势。 MySQL Workbench有两个版本,社区版和商业版。社区版是免费的。 二、第一个选择 使用MySQL之前用的是SQL Server而微…

    数据库 2023年6月9日
    070
  • springcloud Alibaba 阿里组件 nacos注册中心 gateway网关 flowable流程引擎 vue.js前后分离

    springcloud + springcloud Alibaba + flowable 流程引擎 1.代码生成器: 正反双向freemaker模版技术 ,0个代码不用写,生成完整…

    数据库 2023年6月6日
    092
  • 一次较波折的MySQL调优

    春节假期的一天,阳光明媚,春暖花开,恰逢冬奥会开幕,想着这天一定是生肖吉日,就能顺风顺水了。没想到,我遇到了一位客户,有点波折。 [En] Spring Festival holi…

    数据库 2023年5月24日
    078
  • BigDecimal 设置小数位数、小数比例转换整数

    控制小数位数 DecimalFormat decimalFormat = new DecimalFormat("0.00"); decimalFormat.fo…

    数据库 2023年6月6日
    087
  • 第五届蓝帽杯-溯源取证wp

    直接使用vmdk文件创建虚拟机,结果弹出提示 点击全部允许之后,进不去系统,到达了initramfs页面 由于提供的vmdk文件只有1G大小,将其转回raw文件,发现有10个G的大…

    数据库 2023年6月11日
    0110
  • Hadoop集群模式安装笔记

    前言 Hadoop集群= HDFS集群+ YARN集群特点:两个集群逻辑上分离,通常物理上在一起;并且都是标准的主从架构集群 Hadoop安装 &#x65B9;&#…

    数据库 2023年6月11日
    083
  • URL解码时,为什么将加号解码为空?

    以下代码在.NET Framework 2.0 中测试。 先看一个例子: test.aspx页面: 当参数 parameters 输出到页面后,值已经不为”A+B&#8…

    数据库 2023年6月11日
    055
  • 正则表达式=Regex=regular expression

    正则表达式=Regex=regular expression 反向引用*2 \index索引引用 \b(\w+)\b\s+\1\b \k \b(? 数量符/限定符62 贪婪Gree…

    数据库 2023年6月15日
    065
  • Centos7开放及查看端口

    Centos7开放及查看端口 1、开放端口 firewall-cmd –zone=public –add-port=5672/tcp –permanent # &#x…

    数据库 2023年6月6日
    0102
  • Java基础十—JavaIO

    CPU指令与内核态、用户态 在操作系统中,CPU负责执行指令,这些指令有些来自应用程序,有些是来自底层系统。有些指令是非常危险的,如清除内存,网络连接等等,如果错误调用的话有可能导…

    数据库 2023年6月6日
    0115
  • Sql的字符串匹配 like

    患者信息表: Patients +————–+———+ | Column Name | Type | +————–+——–…

    数据库 2023年6月14日
    079
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球