美团动态线程池实践思路,开源了

大家好,今天我们来聊一个比较实用的话题,动态可监控的线程池实践,全新开源项目(DynamicTp)地址在文章末尾,欢迎交流学习。

写在前面

稍微有些Java编程经验的小伙伴都知道,Java的精髓在juc包,这是大名鼎鼎的Doug Lea老爷
子的杰作,评价一个程序员Java水平怎么样,一定程度上看他对juc包下的一些技术掌握的怎么样,这也是面试中的基本上必问的一些技术点之一。

juc包主要包括:

1.原子类(AtomicXXX)
2.锁类(XXXLock)
3.线程同步类(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
4.任务执行器类(Executor体系类,包括今天的主角ThreadPoolExecutor)
5.并发集合类(ConcurrentXXX、CopyOnWriteXXX)相关集合类
6.阻塞队列类(BlockingQueue继承体系类)
7.Future相关类
8.其他一些辅助工具类

多线程编程场景下,这些类都是必备技能,会这些可以帮助我们写出高质量、高性能、少bug的代码,同时这些也是Java中比较难啃的一些技术,需要持之以恒,学以致用,在使用中感受他们带来的奥妙。

上边简单罗列了下juc包下功能分类,这篇文章我们主要来介绍动态可监控线程池的,所以具体内容也就不展开讲了,以后有时间单独来聊吧。看这篇文章前,希望读者最好有一定的线程池ThreadPoolExecutor使用经验,不然看起来会有点懵。

如果你对ThreadPoolExecutor不是很熟悉,推荐阅读下面两篇文章

javadoop: https://www.javadoop.com/post/java-thread-pool

美团技术博客: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

背景

使用ThreadPoolExecutor过程中你是否有以下痛点呢?

1.代码中创建了一个ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
2.凭经验设置参数值,上线后发现需要调整,改代码重启服务,非常麻烦
3.线程池相对开发人员来说是个黑盒,运行情况不能感知到,直到出现问题

如果你有以上痛点,这篇文章要介绍的动态可监控线程池(DynamicTp)或许能帮助到你。

如果看过ThreadPoolExecutor的源码,大概可以知道其实它有提供一些set方法,可以在运行时动态去修改相应的值,这些方法有:

public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。

综上,我们总结出以下的背景

  • 广泛性:在Java开发中,想要提高系统性能,线程池已经是一个90%以上的人都会选择使用的基础工具
  • 不确定性:项目中可能会创建很多线程池,既有IO密集型的,也有CPU密集型的,但线程池的参数并不好确定;需要有套机制在运行过程中动态去调整参数
  • 无感知性,线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理
  • 高可用性,配置变更需要及时推送到客户端;需要有高可用的配置管理推送服务,配置中心是现在大多数互联网系统都会使用的组件,与之结合可以大幅度减少开发量及接入难度

简介

我们基于配置中心对线程池ThreadPoolExecutor做一些扩展,实现对运行中线程池参数的动态修改,实时生效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包括(队列容量、线程池活性、拒绝触发等);同时也会定时采集线程池指标数据供监控平台可视化使用。使我们能时刻感知到线程池的负载,根据情况及时调整,避免出现问题影响线上业务。

    |  __ \                            (_) |__   __|
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __
    | |  | | | | | '_ \ / _ | '_  _ | |/ __| | '_ \
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/
             __/ |                              | |
            |___/                               |_|
     :: Dynamic Thread Pool ::

特性

  • 参考美团线程池实践 ,对线程池参数动态化管理,增加监控、报警功能
  • 基于Spring框架,现只支持SpringBoot项目使用,轻量级,引入starter即可食用
  • 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心,默认支持Nacos、Apollo,同时也提供SPI接口可自定义扩展实现
  • 内置通知报警功能,提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝策略触发报警),默认支持企业微信、钉钉报警,同时提供SPI接口可自定义扩展实现
  • 内置线程池指标采集功能,支持通过MicroMeter、JsonLog日志输出、Endpoint三种方式,可通过SPI接口自定义扩展实现
  • 集成管理常用第三方组件的线程池,已集成SpringBoot内置WebServer(Tomcat、Undertow、Jetty)的线程池管理

架构设计

主要分四大模块

  • 配置变更监听模块: 1.监听特定配置中心的指定配置文件(默认实现Nacos、Apollo),可通过内部提供的SPI接口扩展其他实现 2.解析配置文件内容,内置实现yml、properties配置文件的解析,可通过内部提供的SPI接口扩展其他实现 3.通知线程池管理模块实现刷新
  • 线程池管理模块: 1.服务启动时从配置中心拉取配置信息,生成线程池实例注册到内部线程池注册中心中 2.监听模块监听到配置变更时,将变更信息传递给管理模块,实现线程池参数的刷新 3.代码中通过getExecutor()方法根据线程池名称来获取线程池对象实例
  • 监控模块: 实现监控指标采集以及输出,默认提供以下三种方式,也可通过内部提供的SPI接口扩展其他实现 1.默认实现Json log输出到磁盘 2.MicroMeter采集,引入MicroMeter相关依赖 3.暴雷Endpoint端点,可通过http方式访问
  • 通知告警模块: 对接办公平台,实现通告告警功能,默认实现钉钉、企微,可通过内部提供的SPI接口扩展其他实现,通知告警类型如下 1.线程池参数变更通知 2.阻塞队列容量达到设置阈值告警 3.线程池活性达到设置阈值告警 4.触发拒绝策略告警

美团动态线程池实践思路,开源了

使用

  • *maven依赖

  • apollo应用用接入用此依赖


        io.github.lyh200
        dynamic-tp-spring-boot-starter-apollo
        1.0.1

  1. spring-cloud场景下的nacos应用接入用此依赖

        io.github.lyh200
        dynamic-tp-spring-cloud-starter-nacos
        1.0.1

  1. 非spring-cloud场景下的nacos应用接入用此依赖

        io.github.lyh200
        dynamic-tp-spring-boot-starter-nacos
        1.0.1

  • 线程池配置
spring:
  dynamic:
    tp:
      enabled: true
      enabledBanner: true        # 是否开启banner打印,默认true
      enabledCollect: false      # 是否开启监控指标采集,默认false
      collectorType: logging     # 监控数据采集器类型(JsonLog | MicroMeter),默认logging
      logPath: /home/logs        # 监控日志数据路径,默认${user.home}/logs
      monitorInterval: 5         # 监控时间间隔(报警判断、指标采集),默认5s
      nacos:                     # nacos配置,不配置有默认值(规则name-dev.yml这样)
        dataId: dynamic-tp-demo-dev.yml
        group: DEFAULT_GROUP
      apollo:                    # apollo配置,不配置默认拿apollo配置第一个namespace
        namespace: dynamic-tp-demo-dev.yml
      configType: yml            # 配置文件类型
      platforms:                 # 通知报警平台配置
        - platform: wechat
          urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替换
          receivers: test1,test2                   # 接受人企微名称
        - platform: ding
          urlKey: f80dad441fcd655438f4a08dcd6a     # 替换
          secret: SECb5441fa6f375d5b9d21           # 替换,非sign模式可以没有此值
          receivers: 15810119805                   # 钉钉账号手机号
      tomcatTp:                                    # tomcat web server线程池配置
          minSpare: 100
          max: 400
      jettyTp:                                     # jetty web server线程池配置
          min: 100
          max: 400
      undertowTp:                                  # undertow web server线程池配置
          ioThreads: 100
          workerThreads: 400
      executors:                                   # 动态线程池配置
        - threadPoolName: dynamic-tp-test-1
          corePoolSize: 6
          maximumPoolSize: 8
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue   # 任务队列,查看源码QueueTypeEnum枚举类
          rejectedHandlerType: CallerRunsPolicy    # 拒绝策略,查看RejectedTypeEnum枚举类
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: test           # 线程名前缀
          notifyItems:                     # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)
            - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类
              enabled: true
              threshold: 80                # 报警阈值
              platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台
              interval: 120                # 报警间隔(单位:s)
            - type: change
              enabled: true
            - type: liveness
              enabled: true
              threshold: 80
            - type: reject
              enabled: true
              threshold: 1
  • 代码方式生成,服务启动会自动注册
@Configuration
public class DtpConfig {

   @Bean
   public DtpExecutor demo1Executor() {
       return DtpCreator.createDynamicFast("demo1-executor");
  }

   @Bean
   public ThreadPoolExecutor demo2Executor() {
       return ThreadPoolBuilder.newBuilder()
              .threadPoolName("demo2-executor")
              .corePoolSize(8)
              .maximumPoolSize(16)
              .keepAliveTime(50)
              .allowCoreThreadTimeOut(true)
              .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
              .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
              .buildDynamic();
  }
}
  • 代码调用,根据线程池名称获取
public static void main(String[] args) {
       DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");
       dtpExecutor.execute(() -> System.out.println("test"));
}

注意事项

  1. 配置文件配置的参数会覆盖通过代码生成方式配置的参数
  2. 阻塞队列只有VariableLinkedBlockingQueue类型可以修改capacity,该类型功能和LinkedBlockingQueue相似,只是capacity不是final类型,可以修改,
    VariableLinkedBlockingQueue参考RabbitMq的实现
  3. 启动看到如下日志输出证明接入成功

|  __ \                            (_) |__   __|
| |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __
| |  | | | | | '_ \ / _ | '_  _ | |/ __| | '_ \
| |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
|_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/
         __/ |                              | |
        |___/                               |_|
 :: Dynamic Thread Pool ::

DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
  1. 配置变更会推送通知消息,且会高亮变更的字段

DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]

通知报警

触发报警阈值会推送相应报警消息,且会高亮显示相关字段,活性告警、容量告警、拒绝告警

美团动态线程池实践思路,开源了

配置变更会推送通知消息,且会高亮变更的字段

美团动态线程池实践思路,开源了

监控日志

通过主配置文件collectType属性配置指标采集类型,默认值:logging

  • micrometer方式:通过引入micrometer相关依赖采集到相应的平台
    (如Prometheus,InfluxDb…)
  • logging:指标数据以json格式输出日志到磁盘,地址${logPath}/ dynamictp/${appName}.monitor.log
2022-01-16 15:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8}
2022-01-16 15:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8}
2022-01-16 15:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8}
2022-01-16 15:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8}
2022-01-16 15:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8}
  • 暴露EndPoint端点(dynamic-tp),可以通过http方式请求
[
    {
        "dtp_name": "remoting-call",
        "core_pool_size": 8,
        "maximum_pool_size": 16,
        "queue_type": "SynchronousQueue",
        "queue_capacity": 0,
        "queue_size": 0,
        "fair": false,
        "queue_remaining_capacity": 0,
        "active_count": 2,
        "task_count": 2760,
        "completed_task_count": 2760,
        "largest_pool_size": 16,
        "pool_size": 8,
        "wait_task_count": 0,
        "reject_count": 12462,
        "reject_handler_name": "CallerRunsPolicy"
    },
    {
        "max_memory": "220 MB",
        "total_memory": "140 MB",
        "free_memory": "44 MB",
        "usable_memory": "125 MB"
    }
]

项目地址

gitee地址https://gitee.com/yanhom/dynamic-tp

github地址https://github.com/lyh200/dynamic-tp

联系我

对项目有什么想法或者建议,可以加我微信交流,或者创建issues,一起完善项目

公众号:CodeFox

微信:yanhom1314

Original: https://www.cnblogs.com/yanhom/p/15895335.html
Author: yanhom1314
Title: 美团动态线程池实践思路,开源了

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/610596/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Java se

    一些需要注意的点 ConcurrentHashMap CAS是乐观锁 CopyOnWriteArrayList的add方法 Lamda表达式 lambda 表达式只能引用标记了 f…

    Java 2023年6月9日
    0148
  • Monitor(管程/监视器)详解

    说明 Monitor,直译为”监视器”,而操作系统领域一般翻译为”管程”。管程是指管理共享变量以及对共享变量操作的过程,让它们支持并…

    Java 2023年6月16日
    072
  • Spring中的设计模式

    Spring中的设计模式 设计模式:表示面向对象软件开发中最好的计算机编程实践。 1、 工厂模式(ioc是一种思想,但DI注入是ioc思想的结果行为,是工厂设计模式) (1)通过B…

    Java 2023年6月5日
    084
  • 浅析js前端如何将word文档转为html:docx是怎样存储图片的、Java使用POI如何实现、前端使用Mammoth.js如何实现

    先聊聊需求背景:公司运营需求,说在富文本编辑器中发布包含图片的 Word 文档时,图片和文本内容不能一起复制,每次她们都得分开处理,对于包含较多图片的 Word 时,她们处理起来很…

    Java 2023年5月29日
    079
  • Java开发学习(十一)—-基于注解开发bean作用范围与生命周期管理

    一、注解开发bean作用范围与生命周期管理 前面使用注解已经完成了bean的管理,接下来将通过配置实现的内容都换成对应的注解实现,包含两部分内容: bean作…

    Java 2023年5月29日
    088
  • 域名+端口号 访问minio服务问题

    业务上需要用到分布式文件服务,选择了minio作为文件服务的组件,搭建好服务后使用IP+端口号(http://xx.xx.xx.xx:9001)的形式访问在所有环境下都没有问题。上…

    Java 2023年6月13日
    081
  • 并发编程之:BlockingQueue

    大家好,我是小黑,一个在互联网苟且偷生的农民工。 队列 学过数据结构的同学应该都知道,队列是数据结构中一种特殊的线性表结构,和平时使用的List,Set这些数据结构相比有点特殊,它…

    Java 2023年6月7日
    086
  • JAVA 异常 基本知识

    异常 异常定义 异常是运行过程中出现的错误 人为错误:填写错误等 随机错误:网络中断、内存耗尽等 一个健壮的程序必须处理各种各样的错误 Java的异常是class Object T…

    Java 2023年6月9日
    071
  • Java核心技术-反射

    Day2 利用反射分析类的能力 在Java.lang.reflect包中有三个类Filed Method Constructor,分别用于描述类的字段,方法和构造器。 这三个类有一…

    Java 2023年6月5日
    0124
  • 阅读笔记一

    这两天又读了一遍《程序员修炼之道——从小工到专家》感觉这次收获挺大的,比第一次读的时候收获大多了(如果第一次那样读算是读的话),先从整体上说说这本书吧,我记着第一次阅读的时候,当时…

    Java 2023年6月7日
    090
  • 线程同步

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Java 2023年6月7日
    065
  • Spring 源码(3)Spring BeanFactory 是怎么创建的?

    Spring创建 BeanFactory 的方式 按照 Bean的配置方式手动创建可以分为两种: 使用 XMl配置的 Bean 这种方式使用 xml配置文件配置 Bean的信息并且…

    Java 2023年6月14日
    092
  • Java 设计模式 – Observer 观察者模式

    说明都在注释: package ObserverModel; package ObserverModel; <span class="hljs-keyword&qu…

    Java 2023年6月9日
    057
  • 多线程并发如何高效实现生产者/消费者?

    前言 无需引入第三方消息队列组件,我们如何利用内置C#语法高效实现生产者/消费者对数据进行处理呢?在.NET Core共享框架(Share Framework)引入了通道(Chan…

    Java 2023年5月30日
    087
  • Flink SQL 批模式下 ClickHouse 批量写入

    Flink SQL 批模式下 ClickHouse 批量写入 内置使用 JdbcBatchingOutputFormat 批量处理类 pom依赖 ru.yandex.clickho…

    Java 2023年6月16日
    086
  • 书写高质量sql的一些建议

    It’s better to light a candle than to curse the darkness 老生常谈的不要使用 select * 如果硬要使用se…

    Java 2023年6月13日
    085
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球