在日常研发过程中,我们经常面临着需要在线程内,线程间进行消息传递,比如在修改一些开源组件源码的过程中,需要将外部参数透传到内部,如果进行方法参数重载,则涉及到的改动量过大,这样,我们可以依赖ThreadLocal 来进行消息传递。
ThreadLocal 是 存储在线程栈帧中的一块数据存储区域,其可以做到线程与线程之间的读写隔离。
但是在我们的日常场景中,经常会出现 父线程 需要向子线程中传递消息,而 ThreadLocal 仅能在当前线程上进行数据缓存,因此 我们需要使用 InheritableThreadLocal 来实现 父子线程间的消息传递
java;gutter:true;
// 定义消息public class ThreadLocalMessage {</p>
<pre><code>private final InheritableThreadLocal msg;
private ThreadLocalMessage() {
msg = new InheritableThreadLocal<>();
}
public Msg getMsg() {
return this.msg.get();
}
public void setMsg(Msg msg) {
this.msg.set(msg);
}
public void clear() {
msg.remove();
}
private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();
public static ThreadLocalMessage getInstance() {
return threadLocalMessage;
}
/**
* 获取线程中的消息
*
* @return
*/
public static Msg getOrCreateMsg() {
Msg msg = ThreadLocalMessage.getInstance().getMsg();
if (msg == null) {
msg = new Msg();
}
return msg;
}
public static class Msg {
/**
* taskId
*/
private String taskId;
private Map others;
private int retCode;
public Msg() {
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public String toString() {
return "Msg{" +
"taskId='" + taskId + '\'' +
", others=" + others +
", retCode=" + retCode +
'}';
}
}
</code></pre>
<p>}</p>
<pre><code>
;gutter:true;
// 定义线程池@EnableAsync
@Configuration
public class ExecutorConfig {
private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${executor.corePool:2}")
private Integer corePool;
@Value("${executor.maxPool:10}")
private Integer maxPool;
@Value("${executor.queue:2}")
private Integer queue;
@Bean("cdl-executor")
public Executor executor() {
log.info("start async Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePool);
//配置最大线程数
executor.setMaxPoolSize(maxPool);
//配置队列大小
executor.setQueueCapacity(queue);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-executor-");
// 设置拒绝策略
executor.setRejectedExecutionHandler((r, e) -> {
// …..
});
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor; // 使用TTL 初始化 executor
//return TtlExecutors.getTtlExecutor(executor);
}
}
java;gutter:true;
// 创建子线程进行消息传递并打印public String test() throws Exception{
for (int i = 0 ; i < 20; i++){
ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg();
msg.setTaskId("task_id_"+i);
ThreadLocalMessage.getInstance().setMsg(msg);
myService.testThread(i);
ThreadLocalMessage.getInstance().clear();
}
return "ok";
}</p>
<pre><code>
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:dcc6220a-c2a5-41e8-8ce6-aca8faece79f<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:f4c7d4a5-773f-497c-b696-64094317f979</font>*</details>
![InheritableThreadLocal 在线程池中进行父子线程间消息传递出现消息丢失的解析](https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230602/2888789-20220629150811790-1500648858.png)
经过测试,我们发现 只有10个线程 的消息传递成功了,其余10个线程的消息均丢失了,这是什么原因呢。。。
遇到这个问题,我们首先得弄清楚 InheritableThreadLocal 是如何在父子线程间进行消息传递的
InheritableThreadLocal 在父线程创 **建子线程**的时候,会将父线程中InheritableThreadLocal 中存储的数据 拷贝一份 存储到子线程的 InheritableThreadLocal 中
而我们使用的 线程池,线程池是会反复利用线程的,当线程池没有被创建满,每次都是新创建线程,直到线程池创建满了,再需要使用线程就会从线程池中拿已经创建好的线程。
问题就出在这里,由于后面的线程 是从线程池中去捞已经创建好的线程,不会走创建逻辑,也就无法触发 InheritableThreadLocal 中向子线程 拷贝,这也就是为什么 InheritableThreadLocal 合并线程池 使用时,出现了 消息丢失的原因
如何解决????
阿里巴巴开源的TTL ,用于解决线程池中的父子线程复用,线程数据传递,可以完美解决这个问题
;gutter:true;
com.alibaba
transmittable-thread-local
2.0.0
java;gutter:true;
@EnableAsync
@Configuration
public class ExecutorConfig {</p>
<pre><code>private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${executor.corePool:2}")
private Integer corePool;
@Value("${executor.maxPool:10}")
private Integer maxPool;
@Value("${executor.queue:2}")
private Integer queue;
@Bean("cdl-executor")
public Executor executor() {
log.info("start async Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePool);
//配置最大线程数
executor.setMaxPoolSize(maxPool);
//配置队列大小
executor.setQueueCapacity(queue);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-executor-");
// 设置拒绝策略
executor.setRejectedExecutionHandler((r, e) -> {
// .....
});
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
// 使用TTL 的 executor
return TtlExecutors.getTtlExecutor(executor);
//return executor;
}
</code></pre>
<p>}</p>
<pre><code>
;gutter:true;
public class ThreadLocalMessage {
private final TransmittableThreadLocal msg;
private ThreadLocalMessage() {
msg = new TransmittableThreadLocal<>();
}
public Msg getMsg() {
return this.msg.get();
}
public void setMsg(Msg msg) {
this.msg.set(msg);
}
public void clear() {
msg.remove();
}
private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();
public static ThreadLocalMessage getInstance() {
return threadLocalMessage;
}
/**
* 获取线程中的消息
*
* @return
*/
public static Msg getOrCreateMsg() {
Msg msg = ThreadLocalMessage.getInstance().getMsg();
if (msg == null) {
msg = new Msg();
}
return msg;
}
public static class Msg {
/**
* taskId
*/
private String taskId;
public Msg() {
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public String toString() {
return "Msg{" +
"taskId=’" + taskId + ‘\” +
‘}’;
}
}
}
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:34d855e8-6e2e-422b-bd41-f74d4e8f42b1
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:9f27e9a6-19af-4da7-80b9-03e501b447a7
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:d8d6fcaa-1473-4504-adc2-4b044dc95cdf
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:01e60b76-e926-42e9-a56d-7df5601e9aba
Original: https://www.cnblogs.com/hujunhui530/p/16423851.html
Author: applogize
Title: InheritableThreadLocal 在线程池中进行父子线程间消息传递出现消息丢失的解析
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/562887/
转载文章受原作者版权保护。转载请注明原作者出处!