tomcat线程池

tomcat线程池和普通的线程池设计上有所区别,下面主要来看看它是如何设计的

tomcat中线程池的创建

<span>org.apache.tomcat.util.net.AbstractEndpoint#createExecutor</span>

tomcat&#x521B;&#x5EFA;&#x7EBF;&#x7A0B;&#x6C60;
public void createExecutor() {
  internalExecutor = true;
  // &#x4EFB;&#x52A1;&#x961F;&#x5217;&#x548C;&#x666E;&#x901A;&#x7684;&#x961F;&#x5217;&#x6709;&#x6240;&#x533A;&#x522B;&#xFF0C;&#x540E;&#x7EED;&#x5206;&#x6790;
  TaskQueue taskqueue = new TaskQueue();
  // &#x7EBF;&#x7A0B;&#x5DE5;&#x5382;&#x7528;&#x4E8E;&#x521B;&#x5EFA;&#x7EBF;&#x7A0B;  &#x672C;&#x5730;&#x9879;&#x76EE;name=http-nio-port-exec-&#x5E8F;&#x53F7;
  TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
  // &#x521B;&#x5EFA;&#x7EBF;&#x7A0B;&#x6C60;&#xFF0C;&#x6CE8;&#x610F;&#x8FD9;&#x4E2A;ThreadPoolExecutor&#x548C;java.util.concurrent&#x5305;&#x4E0B;&#x7684;ThreadPoolExecutor&#x6709;&#x6240;&#x533A;&#x522B;
  executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
  // &#x7ED9;&#x4EFB;&#x52A1;&#x961F;&#x5217;&#x8BBE;&#x7F6E;&#x7EBF;&#x7A0B;&#x6C60;&#xFF0C;&#x7528;&#x4E8E;&#x540E;&#x7EED;&#x4EFB;&#x52A1;&#x6765;&#x4E86;&#x5224;&#x65AD;&#x662F;&#x521B;&#x5EFA;&#x7EBF;&#x7A0B;&#x6267;&#x884C;&#x8FD8;&#x662F;&#x5C06;&#x7EBF;&#x7A0B;&#x6DFB;&#x52A0;&#x5230;&#x4EFB;&#x52A1;&#x961F;&#x5217;
  taskqueue.setParent( (ThreadPoolExecutor) executor);
}

tomcat的ThreadPoolExecutor

tomcat的ThreadPoolExecutor实际上继承了java包的ThreadPoolExecutor再其上定制了一些功能

submittedCount:记录了线程池中正有多少线程在执行任务(还没执行完)

lastContextStoppedTime:记录上次上下文停止的时间

lastTimeThreadKilledItself:记录线程上一次为防止内存泄漏自我kill的时间

构造方法:调用了父类ThreadPoolExecutor,同时调用了prestartAllCoreThreads方法,再完成线程池的创建后预热核心线程,使得任务到来时能够直接执行任务,不用再花时间去创建线程,提高了效率。

execute方法:执行executor方法时首先将submittedCount加1,再调用父类的executor方法执行任务。若抛出RejectedExecutionException异常则再回尝试将任务添加到任务队列汇中

afterExecute:重写父类方法,任务执行完成后调用afterExecute钩子方法将submittedCount减1,再尝试停止线程

contextStopping:若容器上下文停止,则会记录lastContextStoppedTime为当前时间并中断正在运行的线程。调用currentThreadShouldBeStopped方法的时候会判断线程TaskThread创建的时间是否在lastContextStoppedTime之前,表示当前线程是在上一个上下文运行期间创建,则会尝试kill线程

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
    /**
     * The string manager for this package.

     */
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");

    /**
     * The number of tasks submitted but not yet finished. This includes tasks
     * in the queue and tasks that have been handed to a worker thread but the
     * latter did not start executing the task yet.

     * This number is always greater or equal to {@link #getActiveCount()}.

     */
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);

    /**
     * Most recent time in ms when a thread decided to kill itself to avoid
     * potential memory leaks. Useful to throttle the rate of renewals of
     * threads.

     */
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);

    /**
     * Delay in ms between 2 threads being renewed. If negative, do not renew threads.

     */
    private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        prestartAllCoreThreads();
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        prestartAllCoreThreads();
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
        prestartAllCoreThreads();
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
        prestartAllCoreThreads();
    }

    public long getThreadRenewalDelay() {
        return threadRenewalDelay;
    }

    public void setThreadRenewalDelay(long threadRenewalDelay) {
        this.threadRenewalDelay = threadRenewalDelay;
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedCount.decrementAndGet();

        if (t == null) {
            stopCurrentThreadIfNeeded();
        }
    }

    /**
     * If the current thread was started before the last time when a context was
     * stopped, an exception is thrown so that the current thread is stopped.

     */
    protected void stopCurrentThreadIfNeeded() {
        if (currentThreadShouldBeStopped()) {
            long lastTime = lastTimeThreadKilledItself.longValue();
            if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
                if (lastTimeThreadKilledItself.compareAndSet(lastTime,
                        System.currentTimeMillis() + 1)) {
                    // OK, it's really time to dispose of this thread

                    final String msg = sm.getString(
                                    "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
                                    Thread.currentThread().getName());

                    throw new StopPooledThreadException(msg);
                }
            }
        }
    }

    protected boolean currentThreadShouldBeStopped() {
        if (threadRenewalDelay >= 0
            && Thread.currentThread() instanceof TaskThread) {
            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
            if (currentTaskThread.getCreationTime() <
                    this.lastContextStoppedTime.longValue()) {
                return true;
            }
        }
        return false;
    }

    public int getSubmittedCount() {
        return submittedCount.get();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void execute(Runnable command) {
        execute(command,0,TimeUnit.MILLISECONDS);
    }

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <code>Executor</code> implementation.

     * If no threads are available, it will be added to the work queue.

     * If the work queue is full, the system will wait for the specified
     * time and it throw a RejectedExecutionException if the queue is still
     * full after that.

     *
     * @param command the runnable task
     * @param timeout A timeout for the completion of the task
     * @param unit The timeout time unit
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution - the queue is full
     * @throws NullPointerException if command or unit is null
     */
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

    public void contextStopping() {
        this.lastContextStoppedTime.set(System.currentTimeMillis());

        // save the current pool parameters to restore them later
        int savedCorePoolSize = this.getCorePoolSize();
        TaskQueue taskQueue =
                getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
        if (taskQueue != null) {
            // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
            // checks that queue.remainingCapacity()==0. I did not understand
            // why, but to get the intended effect of waking up idle threads, I
            // temporarily fake this condition.

            taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
        }

        // setCorePoolSize(0) wakes idle threads
        this.setCorePoolSize(0);

        // TaskQueue.take() takes care of timing out, so that we are sure that
        // all threads of the pool are renewed in a limited time, something like
        // (threadKeepAlive + longest request time)

        if (taskQueue != null) {
            // ok, restore the state of the queue and pool
            taskQueue.setForcedRemainingCapacity(null);
        }
        this.setCorePoolSize(savedCorePoolSize);
    }

    private static class RejectHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r,
                java.util.concurrent.ThreadPoolExecutor executor) {
            throw new RejectedExecutionException();
        }

    }

}</runnable></runnable></runnable></runnable>

tomcat的TaskQueue

首先简单翻一下注释,为线程池运行专门设计的队列,使用该队列线程池如果有空闲队列则会创建线程池执行任务而不是将任务放到任务队列中。

它继承了LinkedBlockingQueue无界队列,容量为Integer.MAX_VALUE。

在execute方法中可以看到,当线程池线程大于核心数量的时候,会执行任务队列的offer方法,下来来分析下TaskQueue的offer方法:

tomcat线程池

1.若parent为空也就是未给队列设置线程池,则调用父类offer方法,将任务添加到队列中

2.线程池当前线程数量等于线程池最大数量,无法添加更多的线程,调用父类offer方法,将任务添加到队列中

3.线程池正在执行任务的线程数量小于等于线程池已有的线程数量,说明当前线程池有空闲线程,调用父类offer方法,将任务添加到队列中,等待线程从队列中取任务运行

4.线程池线程数量小于线程池最大数量说明还可以增加线程,返回false,运行addWorker(command,false)向线程池添加非核心线程运行任务

5.都不满足,调用父类offer方法,将任务添加到队列中

从上面的分析我们可以看到该任务队列TaskQueue和普通的任务队列不一样,当线程池的线程数量小于最大线程数量时,任务不会添加到任务队列中,而是会添加非核心线程来运行任务,当线程池线程数量达到最大数量时,才会将任务添加到任务队列中

当然TaskQueue也有force方法直接调用父类offer方法将任务添加到任务队列中

/**
 * As task queue specifically designed to run with a thread pool executor. The
 * task queue is optimised to properly utilize threads within a thread pool
 * executor. If you use a normal queue, the executor will spawn threads when
 * there are idle threads and you wont be able to force items onto the queue
 * itself.

 */
public class TaskQueue extends LinkedBlockingQueue<runnable> {

    private static final long serialVersionUID = 1L;

    private transient volatile ThreadPoolExecutor parent = null;

    // No need to be volatile. This is written and read in a single thread
    // (when stopping a context and firing the  listeners)
    private Integer forcedRemainingCapacity = null;

    public TaskQueue() {
        super();
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    public void setParent(ThreadPoolExecutor tp) {
        parent = tp;
    }

    public boolean force(Runnable o) {
        if (parent == null || parent.isShutdown()) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
    }

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (parent == null || parent.isShutdown()) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getpoolsize())) return super.offer(o); if we have less threads than maximum force creation of a new thread (parent.getpoolsize()<parent.getmaximumpoolsize()) false; reached here, need to add it the queue } @override public runnable poll(long timeout, timeunit unit) throws interruptedexception { unit); (runnable="=" null && parent !="null)" poll timed out, gives an opportunity stop current needed avoid memory leaks. parent.stopcurrentthreadifneeded(); runnable; take() (parent parent.currentthreadshouldbestopped()) poll(parent.getkeepalivetime(timeunit.milliseconds), timeunit.milliseconds); yes, this may (in case timeout) which normally does not occur with but threadpoolexecutor implementation allows super.take(); int remainingcapacity() (forcedremainingcapacity threadpoolexecutor.setcorepoolsize checks that remainingcapacity="=0" allow interrupt idle i don't see why, hack conform "requirement" forcedremainingcapacity.intvalue(); super.remainingcapacity(); void setforcedremainingcapacity(integer forcedremainingcapacity) this.forcedremainingcapacity="forcedRemainingCapacity;" }< code></=(parent.getpoolsize()))></runnable>

当有请求来时,从socket获取到可读事件并将socket封装成一个任务(任务主要是解析请求然后下发到servlet执行),然后调用线程池的execute方法

    /**
     * Process the given SocketWrapper with the given status. Used to trigger
     * processing as if the Poller (for those endpoints that have one)
     * selected the socket.

     *
     * @param socketWrapper The socket wrapper to process
     * @param event         The socket event to be processed
     * @param dispatch      Should the processing be performed on a new
     *                          container thread
     *
     * @return if processing was triggered successfully
     */
    public boolean processSocket(SocketWrapperBase<s> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<s> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }</s></s>

Original: https://www.cnblogs.com/monianxd/p/16579339.html
Author: 默念x
Title: tomcat线程池

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/621766/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 当你想静下来的时候,你就可以静下来。

    当你想静下来的时候,你就可以静下来。1,2年前,我有时还在为当时选的专业恼悔,因为继续教育是同事推荐的,最后同事给我的消息是,他在疫情后去其他公司,做人工智能的公司,拿月薪20K,…

    数据库 2023年6月11日
    084
  • java-配置tomcat服务器启动出现闪退解决办法

    1.配置tomcat服务器注意的地方:1.1下载tomcat软件,选择绿色免安装版,解压即可使用。1.2tomcat服务器是java语言编写的,想要运行tomcat需要java环境…

    数据库 2023年6月11日
    0101
  • JUC学习笔记(七)

    1.1、读写锁介绍 现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取…

    数据库 2023年6月6日
    0101
  • jar工具常用命令

    参考链接:https://www.ibm.com/developerworks/cn/java/j-jar/index.html Original: https://www.cnb…

    数据库 2023年6月9日
    068
  • 01-MySQL主从复制

    问题导入 在之前项目的基础功能实现中,后台管理和移动端在进行数据访问的时候,都是直接操作数据库MySQL。此时的系统有且仅有一台MySQL服务器,则可能会出现如下问题 ①、读和写所…

    数据库 2023年5月24日
    068
  • HTML&CSS-盒模型运用居中方式合集

    { margin: 0; padding: 0; list-style: none; 清除浏览器默认样式 .father1 { width: 400px; height: 400p…

    数据库 2023年6月11日
    096
  • 面试必问之 CopyOnWriteArrayList,你了解多少?

    一、摘要 在介绍 CopyOnWriteArrayList 之前,我们一起先来看看如下方法执行结果,代码内容如下: public static void main(String[]…

    数据库 2023年6月14日
    078
  • 服务器部署 Vue 和 Django 项目的全记录

    本篇记录我在一个全新服务器上部署 Vue 和 Django 前后端项目的全过程,内容包括服务器初始配置、安装 Django 虚拟环境、python web 服务器 uWSGI 和反…

    数据库 2023年6月14日
    090
  • python 学习笔记(十二)–Django 基本知识点小结

    构造函数格式: 作用: 向客户端浏览器返回相应,同时携带响应体内容。 参数: –content:表示返回的内容。 –status_code:返回的HTTP响…

    数据库 2023年6月16日
    067
  • Activiti7 多实例子流程

    顾名思义,子流程是一个包含其他活动、网关、事件等的活动,这些活动本身形成了一个流程,该流程是更大流程的一部分。 使用子流程确实有一些限制: 一个子流程只能有一个none类型的启动事…

    数据库 2023年6月14日
    0152
  • Linux中MySQL的安装以及卸载

    一.MySQL MySQL 是一种开放源代码的关系型数据库管理系统,开发者为瑞典MySQL AB公司。在2008年1月16号被Sun公司收购。而2009年,SUN又被Oracle收…

    数据库 2023年5月24日
    0101
  • Springboot学习笔记(一)—— 安装

    springboot越来越流行了,相比较于springMVC,springboot采用了一种约定大于配置的理念,可以一键安装,一键运行,一键部署,内置tomcat,省去了一大堆配置…

    数据库 2023年6月9日
    096
  • MYSQL(进阶篇)——一篇文章带你深入掌握MYSQL

    MYSQL(进阶篇)——一篇文章带你深入掌握MYSQL 我们在上篇文章中已经学习了MYSQL的基本语法和概念 在这篇文章中我们将讲解底层结构和一些新的语法帮助你更好的运用MYSQL…

    数据库 2023年6月14日
    093
  • VS code 每次退出都要重新下载解决方案

    VS code 每次退出都要重新下载解决方案 打开文件-首选项-设置 在搜索栏输入Extensions: Auto Update 然后把所有打钩的取消 ,退出vs code 的时候…

    数据库 2023年6月16日
    093
  • 翻译 | 解读首部 Kubernetes 纪录片

    Honeypot.io 自诩为欧洲最大的技术人才招聘平台,同时提供开发者视频网站,又被称其为 ” 开发者的 Netflix“。2022 年 1 月,该公司与…

    数据库 2023年5月24日
    077
  • spring的自动注入

    Spring自动注入 spring的ioc 在刚开始学习spring的时候肯定都知道spring的两个特点:ioc,aop,控制反转和切面编程,这篇就只说说ioc ioc是什么:在…

    数据库 2023年6月16日
    078
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球