Java设计模式——观察者模式的灵活应用

灵感来源于一个猪队友给我的题目

Java设计模式——观察者模式的灵活应用

看到这个,我抓住的关键字是: 任何子任务失败,要通知所有子任务执行取消逻辑。

这不就是消息广播吗?观察者模式!

干活

首先是收听者

package com.example.broadcast;

/**
 * 每个节点即是广播者,也是收听者
 */
public interface Listener {

    /**
     * 设置调度中心
     */
    void setCenter(DispatchCenter center);

    /**
     * 主动通知其它收听者
     */
    void notice(String msg);

    /**
     * 自己收到通知的处理逻辑
     * @param msg
     */
    void whenReceived(String msg);

    /**
     * 收听者标志:唯一
     * @return
     */
    String identify();

}

然后是调度中心

package com.example.broadcast;

/**
 * 调度中心
 */
public interface DispatchCenter {

    /**
     * 广播
     * @param own 广播的时候,要排除自己
     * @param msg 广播消息
     */
    void broadcast(String own, String msg);

    /**
     * 添加收听者
     * @param listener
     */
    void addListener(Listener listener);

}

调度中心实现

package com.example.broadcast;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class DispatchCenterImpl implements DispatchCenter {

    private static final Map MAP = new ConcurrentHashMap<>();

    @Override
    public void broadcast(String own, String msg) {
        MAP.forEach((k,v) -> {
            // 不用给自己发通知
            if (!k.equals(own)){
                v.whenReceived(msg);
            }
        });
    }

    @Override
    public void addListener(Listener listener) {
        listener.setCenter(this);
        MAP.put(listener.identify(), listener);
    }
}

剩下三个收听者

package com.example.broadcast;

import java.util.UUID;

public class ListenerA implements Listener {

    private DispatchCenter center;
    private String identify;

    public ListenerA() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "收到消息:" + msg);
    }

    @Override
    public String identify() {
        return identify;
    }
}

B和C除了类名不一样,其他都一样,不再赘述。目录如下

Java设计模式——观察者模式的灵活应用

测试

package com.example.broadcast;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        DispatchCenter center = new DispatchCenterImpl();
        ListenerA listenerA = new ListenerA();
        ListenerB listenerB = new ListenerB();
        ListenerC listenerC = new ListenerC();
        center.addListener(listenerA);
        center.addListener(listenerB);
        center.addListener(listenerC);

        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // A触发1条事件
        executorService.submit(() -> {
            int i = 1;
            while (i > 0){
                listenerA.notice(listenerA.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                i--;
            }
        });
        // B触发2条事件
        executorService.submit(() -> {
            int i = 2;
            while (i > 0){
                listenerB.notice(listenerB.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                i--;
            }
        });
        // C触发3条事件
        executorService.submit(() -> {
            int i = 3;
            while (i > 0){
                listenerC.notice(listenerC.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                i--;
            }
        });

        executorService.shutdown();

    }

}

输出:

Java设计模式——观察者模式的灵活应用

流程图

Java设计模式——观察者模式的灵活应用

当其中的B节点,发生了错误,除了把自己处理好之外
1. 向调度中心发送广播请求,并携带需要的消息
2. 调度中心遍历收听者,挨个通知(执行)每一个收听者接受消息的逻辑

关于停止任务

因为题目要求,【快速取消】所有子任务
关于线程停止的方法也有很多:
1. 优雅退出run方法
2. 暴力stop
3. run方法抛出异常

如果说要求,A异常了,B和C收到消息之后,线程立即停止,不能有一点迟疑,说实话我还没想到该怎么做。因为你要知道,实际上的任务的run方法内部,不太可能是个while循环,人家可能就是个顺序执行,所以停止标志位的方式,并不适用。
我先写个按照标志位停止的”玩具”。

修改三个收听者代码和测试类

package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.Random;
import java.util.UUID;

public class ListenerA implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;

    public ListenerA() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "收到消息:" + msg);
    }

    @Override
    public String identify() {
        return identify;
    }

    @SneakyThrows
    @Override
    public void run() {
        // 5秒之后,模拟发生异常
        Thread.sleep(5000);
        notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
        System.out.println(this.getClass().getName() + "程序异常,并已经传播了消息...");
    }
}
package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.UUID;

public class ListenerB implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;
    private volatile Boolean stopFlag = false;

    public ListenerB() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
        // 停止当前线程
        stopFlag = true;
    }

    @Override
    public String identify() {
        return identify;
    }

    @SneakyThrows
    @Override
    public void run() {
        while (!stopFlag){
            Thread.sleep(1000);
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务");
        }
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead");
    }
}
package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.UUID;

public class ListenerC implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;
    private volatile Boolean stopFlag = false;

    public ListenerC() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
        // 停止当前线程
        stopFlag = true;
    }

    @Override
    public String identify() {
        return identify;
    }

    @SneakyThrows
    @Override
    public void run() {
        while (!stopFlag){
            Thread.sleep(1000);
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务");
        }
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead");
    }
}

测试

package com.example.broadcast;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        DispatchCenter center = new DispatchCenterImpl();
        ListenerA listenerA = new ListenerA();
        ListenerB listenerB = new ListenerB();
        ListenerC listenerC = new ListenerC();
        center.addListener(listenerA);
        center.addListener(listenerB);
        center.addListener(listenerC);

        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // A
        executorService.submit(listenerA);
        // B
        executorService.submit(listenerB);
        // C
        executorService.submit(listenerC);

        executorService.shutdown();

    }

}

Java设计模式——观察者模式的灵活应用

再想一想

这个问题想想并不简单:
1.这不是单一线程处理异常的情况(如果只是单一线程,自己的异常自己捕获并处理即可)
2. A出现了异常,B收到了取消任务通知,问题在于, 1)不知道B目前执行到哪里了,没办法让B停下手中的工作。2)如果杀死B线程,那么执行一半的任务,会不会导致什么程序异常,或者脏数据之类的?
3. 稳妥一点的方法就是,A出现了异常,B收到了通知之后, 照常执行任务,只是当收到了异常通知的时候,会在正常逻辑的后面调用一个 任务回退方法;而所有任务正常工作,则不会调用这个回退方法。
4. 这个思路让我想到了分布式事务,是不是有内味了?

改动收听者接口,增加回滚方法

package com.example.broadcast;

/**
 * 每个节点即是广播者,也是收听者
 */
public interface Listener {

    /**
     * 设置调度中心
     */
    void setCenter(DispatchCenter center);

    /**
     * 主动通知其它收听者
     */
    void notice(String msg);

    /**
     * 自己收到通知的处理逻辑
     * @param msg
     */
    void whenReceived(String msg);

    /**
     * 收听者标志:唯一
     * @return
     */
    String identify();

    /**
     * 发生异常时,任务回退方法
     */
    void rollback();

}

A

package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.Random;
import java.util.UUID;

public class ListenerA implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;

    public ListenerA() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "收到消息:" + msg);
    }

    @Override
    public String identify() {
        return identify;
    }

    @Override
    public void rollback() {
        System.out.println(this.getClass().getName() + "任务回退!!!");
    }

    @SneakyThrows
    @Override
    public void run() {
        // 5秒之后,模拟发生异常
        Thread.sleep(5000);
        notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
        // A异常,回滚
        rollback();
    }
}

B

package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.UUID;

public class ListenerB implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;
    private volatile Boolean rollbackFlag = false;

    public ListenerB() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
        // 任务需要回滚
        rollbackFlag = true;
    }

    @Override
    public String identify() {
        return identify;
    }

    @Override
    public void rollback() {
        System.out.println(this.getClass().getName() + "任务回退!!!");
    }

    @SneakyThrows
    @Override
    public void run() {
        // 模拟任务耗时,执行6秒的任务
        for (int i = 0; i < 3; i++){
            Thread.sleep(2000);
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务");
        }
        if (rollbackFlag){
            rollback();
        }
    }
}

C

package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.UUID;

public class ListenerC implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;
    private volatile Boolean rollbackFlag = false;

    public ListenerC() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
        // 任务需要回滚
        rollbackFlag = true;
    }

    @Override
    public String identify() {
        return identify;
    }

    @Override
    public void rollback() {
        System.out.println(this.getClass().getName() + "任务回退!!!");
    }

    @SneakyThrows
    @Override
    public void run() {
        // 模拟任务耗时,执行9秒的任务
        for (int i = 0; i < 3; i++){
            Thread.sleep(3000);
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务");
        }
        if (rollbackFlag){
            rollback();
        }
    }
}

测试Main不变,执行

Java设计模式——观察者模式的灵活应用

Original: https://www.cnblogs.com/LUA123/p/14042974.html
Author: 露娜妹
Title: Java设计模式——观察者模式的灵活应用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/538375/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 从源码中理解Spring Boot自动装配原理

    SpringBoot 定义了一套接口规范,这套规范规定: SpringBoot在启动时会扫描外部引用jar包中的 META-INF/spring.factories文件,将文件中配…

    Java 2023年6月16日
    099
  • Java集合原理分析和知识点大杂烩(多图初学者必备!!)

    一、数据结构 ​ 数据结构就是计算机存储、组织数据的方式。 ​ 在计算机科学中,算法的时间复杂度是一个函数,它定性描述了该算法的运行时间,常用O符号来表述。​ 时间复杂度是同一问题…

    Java 2023年6月7日
    091
  • java生成机器码

    java根据系统参数生成每个计算机的唯一标识。 获取CPU序列号 /** * 获取CPU序列号 * @return * @throws IOException */ public …

    Java 2023年6月16日
    089
  • 浅谈限流组件的应用和设计原则

    做业务的同学都知道,在现实情况中,往往会出现流量暴增的情况。这些流量可能来自于黑客的爬虫,也可能来自于节日大促,或者其他一些渠道。当然业界都有对策,比如反爬、熔断、降级、限流等等不…

    Java 2023年6月16日
    090
  • 戏说领域驱动设计(廿六)——再谈事务

    有关事务的内容,在前面我们已经不只谈过一次,没办法,这是一个绕不开的话题。你敢说你在开发中不用到它?最起码聚合进行序列化的时候得启动一个本地事务吧。当然了,如果你用的是NoSQL,…

    Java 2023年6月7日
    082
  • Java 线程的五种状态 与 创建线程

    Java 线程的 5 种状态 线程状态图: 线程共包含以下五种状态:1、 新建状态(New): 线程对象被创建后,就进入了新建状态,例如,Thread thread = new T…

    Java 2023年6月5日
    096
  • mysql面试题整理

    1 myisam 和 innodb 引擎的区别 innodb 支持事务,外键,myisam 不支持 innodb 支持 mvcc ,myisam 不支持 innodb 支持表锁、行…

    Java 2023年6月5日
    0143
  • 【java基础】枚举

    public enum Color { Red,Blue,Green; } Color red = Color.Red;//&#x679A;&#x4E3E;&amp…

    Java 2023年6月9日
    0128
  • js简单实现拦截访问指定网页

    最近闲的无事,写个脚本玩玩,实现拦截访问指定网址 要想实现这个功能,就要自定义一个浏览器插件 最简单的浏览器插件有两个文件,分别是 manifest.json和 **.js。首先新…

    Java 2023年6月8日
    0110
  • 擦肩而过

    你在路上擦肩而过的一个人,都有可能是别人朝思暮想还见不到的。 有没有一种可能,我上周去的肯德基,那个座位她曾经坐过? 有没有一种可能,今晚梦见她的时候,她也会梦见我? 可能都只是我…

    Java 2023年6月14日
    075
  • Spring注解是如何生效的?

    现在大部分开发已经采用Spring Boot了,少了很多配置文件,方便了许多。以前在使用注解,比如@Autowired、@Resource 或者事务相关的一些注解时,我们会首先在配…

    Java 2023年5月30日
    083
  • Spring Cloud Hystrix Dashboard熔断器-Turbine集群监控(六)

    绿色计数: 表示成功的请求数 蓝色计数: 表示断路器打开后,直接被短路的请求数 黄色计数: 表示请求超时数 紫色计数: 表示因为线程池满而被拒绝的请求数 红色计数: 表示因为异常而…

    Java 2023年5月30日
    098
  • 设计模式 21 状态模式

    状态模式(State Pattern)属于 行为型模式 在标准大气压下, 水在 0 ~ 100 度之间时,会呈现 液态;在 0 度以下会变成 固态;100 度以上会变成气态。 物质…

    Java 2023年6月6日
    091
  • 设计模式 — Template Method(模板方法)

    直接上代码、先按原来开发步骤、在重构到模式、即在现成代码间寻找变化点、在使用对应的设计模式! 按流程执行代码 import org.junit.Test; // 程序库开发人员 c…

    Java 2023年6月16日
    083
  • Spring Cloud Sleuth+ZipKin+ELK服务链路追踪(七)

    序言 sleuth是spring cloud的分布式跟踪工具,主要记录链路调用数据,本身只支持内存存储,在业务量大的场景下,为拉提升系统性能也可通过http传输数据,也可换做rab…

    Java 2023年5月30日
    086
  • C#线程调度AutoResetEvent和ManualResetEvent区别

    共同点: 均继承 EventWaitHandle 接口,因此,均具有以下功能: Reset() //红灯 Set() //绿灯 WaitOne() // 等待信号 本质都是阻塞信号…

    Java 2023年5月29日
    071
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球