灵感来源于一个猪队友给我的题目
看到这个,我抓住的关键字是: 任何子任务失败,要通知所有子任务执行取消逻辑。
这不就是消息广播吗?观察者模式!
干活
首先是收听者
package com.example.broadcast;
/**
* 每个节点即是广播者,也是收听者
*/
public interface Listener {
/**
* 设置调度中心
*/
void setCenter(DispatchCenter center);
/**
* 主动通知其它收听者
*/
void notice(String msg);
/**
* 自己收到通知的处理逻辑
* @param msg
*/
void whenReceived(String msg);
/**
* 收听者标志:唯一
* @return
*/
String identify();
}
然后是调度中心
package com.example.broadcast;
/**
* 调度中心
*/
public interface DispatchCenter {
/**
* 广播
* @param own 广播的时候,要排除自己
* @param msg 广播消息
*/
void broadcast(String own, String msg);
/**
* 添加收听者
* @param listener
*/
void addListener(Listener listener);
}
调度中心实现
package com.example.broadcast;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class DispatchCenterImpl implements DispatchCenter {
private static final Map MAP = new ConcurrentHashMap<>();
@Override
public void broadcast(String own, String msg) {
MAP.forEach((k,v) -> {
// 不用给自己发通知
if (!k.equals(own)){
v.whenReceived(msg);
}
});
}
@Override
public void addListener(Listener listener) {
listener.setCenter(this);
MAP.put(listener.identify(), listener);
}
}
剩下三个收听者
package com.example.broadcast;
import java.util.UUID;
public class ListenerA implements Listener {
private DispatchCenter center;
private String identify;
public ListenerA() {
identify = UUID.randomUUID().toString();
}
@Override
public void setCenter(DispatchCenter center) {
this.center = center;
}
@Override
public void notice(String msg) {
center.broadcast(identify, msg);
}
@Override
public void whenReceived(String msg) {
System.out.println(this.getClass().getName() + "收到消息:" + msg);
}
@Override
public String identify() {
return identify;
}
}
B和C除了类名不一样,其他都一样,不再赘述。目录如下
测试
package com.example.broadcast;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
DispatchCenter center = new DispatchCenterImpl();
ListenerA listenerA = new ListenerA();
ListenerB listenerB = new ListenerB();
ListenerC listenerC = new ListenerC();
center.addListener(listenerA);
center.addListener(listenerB);
center.addListener(listenerC);
ExecutorService executorService = Executors.newFixedThreadPool(3);
// A触发1条事件
executorService.submit(() -> {
int i = 1;
while (i > 0){
listenerA.notice(listenerA.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
i--;
}
});
// B触发2条事件
executorService.submit(() -> {
int i = 2;
while (i > 0){
listenerB.notice(listenerB.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
i--;
}
});
// C触发3条事件
executorService.submit(() -> {
int i = 3;
while (i > 0){
listenerC.notice(listenerC.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
i--;
}
});
executorService.shutdown();
}
}
输出:
流程图
当其中的B节点,发生了错误,除了把自己处理好之外
1. 向调度中心发送广播请求,并携带需要的消息
2. 调度中心遍历收听者,挨个通知(执行)每一个收听者接受消息的逻辑
关于停止任务
因为题目要求,【快速取消】所有子任务
关于线程停止的方法也有很多:
1. 优雅退出run方法
2. 暴力stop
3. run方法抛出异常
如果说要求,A异常了,B和C收到消息之后,线程立即停止,不能有一点迟疑,说实话我还没想到该怎么做。因为你要知道,实际上的任务的run方法内部,不太可能是个while循环,人家可能就是个顺序执行,所以停止标志位的方式,并不适用。
我先写个按照标志位停止的”玩具”。
修改三个收听者代码和测试类
package com.example.broadcast;
import lombok.SneakyThrows;
import java.util.Random;
import java.util.UUID;
public class ListenerA implements Listener,Runnable {
private DispatchCenter center;
private String identify;
public ListenerA() {
identify = UUID.randomUUID().toString();
}
@Override
public void setCenter(DispatchCenter center) {
this.center = center;
}
@Override
public void notice(String msg) {
center.broadcast(identify, msg);
}
@Override
public void whenReceived(String msg) {
System.out.println(this.getClass().getName() + "收到消息:" + msg);
}
@Override
public String identify() {
return identify;
}
@SneakyThrows
@Override
public void run() {
// 5秒之后,模拟发生异常
Thread.sleep(5000);
notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
System.out.println(this.getClass().getName() + "程序异常,并已经传播了消息...");
}
}
package com.example.broadcast;
import lombok.SneakyThrows;
import java.util.UUID;
public class ListenerB implements Listener,Runnable {
private DispatchCenter center;
private String identify;
private volatile Boolean stopFlag = false;
public ListenerB() {
identify = UUID.randomUUID().toString();
}
@Override
public void setCenter(DispatchCenter center) {
this.center = center;
}
@Override
public void notice(String msg) {
center.broadcast(identify, msg);
}
@Override
public void whenReceived(String msg) {
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
// 停止当前线程
stopFlag = true;
}
@Override
public String identify() {
return identify;
}
@SneakyThrows
@Override
public void run() {
while (!stopFlag){
Thread.sleep(1000);
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务");
}
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead");
}
}
package com.example.broadcast;
import lombok.SneakyThrows;
import java.util.UUID;
public class ListenerC implements Listener,Runnable {
private DispatchCenter center;
private String identify;
private volatile Boolean stopFlag = false;
public ListenerC() {
identify = UUID.randomUUID().toString();
}
@Override
public void setCenter(DispatchCenter center) {
this.center = center;
}
@Override
public void notice(String msg) {
center.broadcast(identify, msg);
}
@Override
public void whenReceived(String msg) {
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
// 停止当前线程
stopFlag = true;
}
@Override
public String identify() {
return identify;
}
@SneakyThrows
@Override
public void run() {
while (!stopFlag){
Thread.sleep(1000);
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务");
}
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead");
}
}
测试
package com.example.broadcast;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
DispatchCenter center = new DispatchCenterImpl();
ListenerA listenerA = new ListenerA();
ListenerB listenerB = new ListenerB();
ListenerC listenerC = new ListenerC();
center.addListener(listenerA);
center.addListener(listenerB);
center.addListener(listenerC);
ExecutorService executorService = Executors.newFixedThreadPool(3);
// A
executorService.submit(listenerA);
// B
executorService.submit(listenerB);
// C
executorService.submit(listenerC);
executorService.shutdown();
}
}
再想一想
这个问题想想并不简单:
1.这不是单一线程处理异常的情况(如果只是单一线程,自己的异常自己捕获并处理即可)
2. A出现了异常,B收到了取消任务通知,问题在于, 1)不知道B目前执行到哪里了,没办法让B停下手中的工作。2)如果杀死B线程,那么执行一半的任务,会不会导致什么程序异常,或者脏数据之类的?
3. 稳妥一点的方法就是,A出现了异常,B收到了通知之后, 照常执行任务,只是当收到了异常通知的时候,会在正常逻辑的后面调用一个 任务回退方法;而所有任务正常工作,则不会调用这个回退方法。
4. 这个思路让我想到了分布式事务,是不是有内味了?
改动收听者接口,增加回滚方法
package com.example.broadcast;
/**
* 每个节点即是广播者,也是收听者
*/
public interface Listener {
/**
* 设置调度中心
*/
void setCenter(DispatchCenter center);
/**
* 主动通知其它收听者
*/
void notice(String msg);
/**
* 自己收到通知的处理逻辑
* @param msg
*/
void whenReceived(String msg);
/**
* 收听者标志:唯一
* @return
*/
String identify();
/**
* 发生异常时,任务回退方法
*/
void rollback();
}
A
package com.example.broadcast;
import lombok.SneakyThrows;
import java.util.Random;
import java.util.UUID;
public class ListenerA implements Listener,Runnable {
private DispatchCenter center;
private String identify;
public ListenerA() {
identify = UUID.randomUUID().toString();
}
@Override
public void setCenter(DispatchCenter center) {
this.center = center;
}
@Override
public void notice(String msg) {
center.broadcast(identify, msg);
}
@Override
public void whenReceived(String msg) {
System.out.println(this.getClass().getName() + "收到消息:" + msg);
}
@Override
public String identify() {
return identify;
}
@Override
public void rollback() {
System.out.println(this.getClass().getName() + "任务回退!!!");
}
@SneakyThrows
@Override
public void run() {
// 5秒之后,模拟发生异常
Thread.sleep(5000);
notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
// A异常,回滚
rollback();
}
}
B
package com.example.broadcast;
import lombok.SneakyThrows;
import java.util.UUID;
public class ListenerB implements Listener,Runnable {
private DispatchCenter center;
private String identify;
private volatile Boolean rollbackFlag = false;
public ListenerB() {
identify = UUID.randomUUID().toString();
}
@Override
public void setCenter(DispatchCenter center) {
this.center = center;
}
@Override
public void notice(String msg) {
center.broadcast(identify, msg);
}
@Override
public void whenReceived(String msg) {
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
// 任务需要回滚
rollbackFlag = true;
}
@Override
public String identify() {
return identify;
}
@Override
public void rollback() {
System.out.println(this.getClass().getName() + "任务回退!!!");
}
@SneakyThrows
@Override
public void run() {
// 模拟任务耗时,执行6秒的任务
for (int i = 0; i < 3; i++){
Thread.sleep(2000);
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务");
}
if (rollbackFlag){
rollback();
}
}
}
C
package com.example.broadcast;
import lombok.SneakyThrows;
import java.util.UUID;
public class ListenerC implements Listener,Runnable {
private DispatchCenter center;
private String identify;
private volatile Boolean rollbackFlag = false;
public ListenerC() {
identify = UUID.randomUUID().toString();
}
@Override
public void setCenter(DispatchCenter center) {
this.center = center;
}
@Override
public void notice(String msg) {
center.broadcast(identify, msg);
}
@Override
public void whenReceived(String msg) {
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
// 任务需要回滚
rollbackFlag = true;
}
@Override
public String identify() {
return identify;
}
@Override
public void rollback() {
System.out.println(this.getClass().getName() + "任务回退!!!");
}
@SneakyThrows
@Override
public void run() {
// 模拟任务耗时,执行9秒的任务
for (int i = 0; i < 3; i++){
Thread.sleep(3000);
System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务");
}
if (rollbackFlag){
rollback();
}
}
}
测试Main不变,执行
Original: https://www.cnblogs.com/LUA123/p/14042974.html
Author: 露娜妹
Title: Java设计模式——观察者模式的灵活应用
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/538375/
转载文章受原作者版权保护。转载请注明原作者出处!