RxJava简析

rxjava文档地址https://mcxiaoke.gitbooks.io/rxdocs/content/ 这个是中文版的

android studio 添加依赖 implementation ‘io.reactivex.rxjava3:rxjava:3.0.4’

首先,打印helloworld:

public void hello(String args){
    Flowable.fromArray(args).subscribe(s -> System.out.println("hello " + s + "!"));
}

跟以前其他语言不大一样,看上去很麻烦,我们一步步来看

Flowable.fromArray(args)

这个方法最重要的就是里面的最后一句

new FlowableFromArray<>(items)

果然FlowableFromArray是Flowable的子类,所以真正的实现在子类里面

Flowable.fromArray(args).subscribe

subscribe进到里面的是

public final Disposable subscribe(@NonNull Consumersuper T> onNext, @NonNull Consumersuper Throwable> onError,
        @NonNull Action onComplete) {
    Objects.requireNonNull(onNext, "onNext is null");
    Objects.requireNonNull(onError, "onError is null");
    Objects.requireNonNull(onComplete, "onComplete is null");

    LambdaSubscriber ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);

    subscribe(ls);

    return ls;
}

看上去最重要的就是这两句了

LambdaSubscriber ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);

subscribe(ls);

先进到subscribe(ls)中,发现这句

subscribeActual(flowableSubscriber)

跳进去发现是个抽象方法,那么实现肯定在子类啦,进到子类FlowableFromArray

@Override
public void subscribeActual(Subscribersuper T> s) {
    if (s instanceof ConditionalSubscriber) {
        s.onSubscribe(new ArrayConditionalSubscription<>(
                (ConditionalSubscribersuper T>)s, array));
    } else {
        s.onSubscribe(new ArraySubscription<>(s, array));
    }
}

跳进去又发现onSubscribe是个抽象方法,那么实现方法在哪呢,对啦,就是之前看到的LambdaSubscriber

public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.setOnce(this, s)) {
        try {
            onSubscribe.accept(this);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            s.cancel();
            onError(ex);
        }
    }
}

这个onSubscribe.accept(this)跳过去就是接口Consumer的accept方法了

所以一开始的helloworld代码也可以改成

FlowableFromArray flowableFromArray = new FlowableFromArray(new String[]{args});
flowableFromArray.subscribe(new Consumer() {
    @Override
    public void accept(String s) throws Throwable {
        System.out.println("hello " + s + "!");
    }
});

是不是很麻烦,饶了一大圈,没关系,我们继续往下看

这里给出一些名词的翻译

  • Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式
  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念
  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者
  • Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现
  • emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射
  • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项

下面是常用的操作符列表:

  1. 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 错误处理 Catch和Retry
  6. 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 转换操作 To
  10. 连接操作 Connect, Publish, RefCount, Replay
  11. 反压操作,用于增加特殊的流程控制策略的操作符

下面我们来看第一个操作符:Create

Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscribersuper Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

我们一起来看源码

首先是Observable的create方法

public final static  Observable create(OnSubscribe f) {
    return new Observable(hook.onCreate(f));
}

这里没什么,就是返回创建一个Observable对象,但是要注意里面的参数OnSubscribe

public interface OnSubscribe extends Action1super T>> {
    // cover for generics insanity
}

public interface Action1 extends Action {
    void call(T t);
}

这个参数是一个接口,它的父类里有个抽象待实现的方法call,而且call方法被传了Subscriber进去

我们来看Subscriber这个类,原来是个接口,而且它的父类Observer有三个很重要的方法

public interface Observer {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

第一个create方法算是完成了,我们可以拆分来看

Observable integerObservable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscribersuper Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 0; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
});

第二个方法subscribe,它的参数也是Subscriber,即intergerObservable.subscribe(Subscriber)

所以我们就看出来了,Observable这个被观察者先是通过call增加一系列的监听,然后通过subscribe订阅监听。这样,当call里的内容开始执行后,触发监听回调

下面我要放大招了,我把源码简化了一下

public interface MyOnSubscribe {
    void call(MySubscriber subscriber);
}

public interface MySubscriber {
    void onNext();

    void onCompleted();

    void onError();
}

public class MyObservable {

    MyOnSubscribe onSubscribe;

    public MyObservable(MyOnSubscribe onSubscribe) {
        this.onSubscribe = onSubscribe;
    }

    public final static MyObservable create(MyOnSubscribe onSubscribe) {
        return new MyObservable(onSubscribe);
    }

    public final void subscribe(MySubscriber subscriber) {
        onSubscribe.call(subscriber);
    }
}

测试代码

public void hello() {
        MyObservable.create(new MyOnSubscribe() {
            @Override
            public void call(MySubscriber subscriber) {
                try {
                    for (int i = 0; i < 5; i++) {
                        subscriber.onNext();
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError();
                }
            }
        }).subscribe(new MySubscriber() {
            @Override
            public void onNext() {
                System.out.println(1);
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError() {
                System.out.println("onError");
            }
        });
    }

得到的结果是一样的。所以说,代码万变不离其中,只要灵活运用接口,接口就是用来监听的

第二个操作符from

Integer[] items = {0, 1, 2, 3, 4, 5};
Observable.from(items).subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        System.out.println(integer);
    }
});

先看Observable的from方法

public final static  Observable from(T[] array) {
    return from(Arrays.asList(array));
}

其实就是把数组转成list,但是再点from进去就很重要

public final static  Observable from(Iterableextends T> iterable) {
    return create(new OnSubscribeFromIterable(iterable));
}
public OnSubscribeFromIterable(Iterableextends T> iterable) {
    if (iterable == null) {
        throw new NullPointerException("iterable must not be null");
    }
    this.is = iterable;
}

OnSubscribeFromIterable是继承自OnSubscribe的,所以后面调的call方法,实际上是调的OnSubscribeFromIterable里的call方法,我们来看一下

@Override
public void call(final Subscribersuper T> o) {
    final Iteratorextends T> it = is.iterator();
    if (!it.hasNext() && !o.isUnsubscribed())
        o.onCompleted();
    else
        o.setProducer(new IterableProducer(o, it));
}

真相大白了,在这里做了迭代。还有一个操作符just,其实底层里面调的就是from,只不过还限制了参数个数,而且参数类型必须相同,感觉用处不大

第三个操作符repeat

Observable.just(1, 2).repeat(4).subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        System.out.println(integer);
    }
});

repeat点进去是OnSubcribRedo.repeat,紧追着count这个参数,会看到一个RedoFinite类

public static final class RedoFinite implements Func1extends Notification>, Observable> {
    private final long count;

    public RedoFinite(long count) {
        this.count = count;
    }

    @Override
    public Observable call(Observableextends Notification> ts) {
        return ts.map(new Func1, Notification>() {

            int num=0;

            @Override
            public Notification call(Notification terminalNotification) {
                if(count == 0) {
                    return terminalNotification;
                }

                num++;
                if(num  count) {
                    return Notification.createOnNext(num);
                } else {
                    return terminalNotification;
                }
            }

        }).dematerialize();
    }
}

这里就看到了,有个num++和num

Original: https://www.cnblogs.com/anni-qianqian/p/13959959.html
Author: 嘉禾世兴
Title: RxJava简析

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/538945/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 两万字!多线程50问!

    前言 大家好,我是 捡田螺的小男孩。金九银十快要来了,整理了50道多线程并发面试题,大家可以点赞、收藏起来,慢慢品!~ github地址,麻烦给个star鼓励一下,感谢感谢 公众号…

    Java 2023年6月5日
    096
  • Django基础学习之Cookie 和 Sessions 应用

    在Django里面,使用Cookie和Session看起来好像是一样的,使用的方式都是request.COOKIES[XXX]和request.session[XXX],其中XXX…

    Java 2023年5月29日
    051
  • java- 接口笔记

    接口 interface 接口就是一组规范(就像我们法律一样),所有实现类都要遵守。 面向对象的精髓,最能体现这一点的就是接口。为什么我们讨论设计模式都只针对具备了抽象能力的语言(…

    Java 2023年6月15日
    0106
  • 切入点表达式

    按类匹配和按方法匹配详解 1.按类匹配:匹配到的Java类中全部方法作为目标连接点,使用within关键字。 匹配到某个类 <aop:pointcut expression=…

    Java 2023年6月9日
    087
  • MySql主要性能指标说明

    在项目当中数据库一般都会成为主要的性能与负载瓶颈,那么针对数据库各项性能指标的监控与对应的优化是开发与运维人员需要面对的主要工作,而且这部分的工作会贯穿项目从开发到运行的整个周期里…

    Java 2023年6月9日
    073
  • redis命令操作list 和redis命令操作set&sortedset

    redis命令操作list 列表类型 list:可以添加一个元素列表的头部(左边)或者尾部(右边) 添加: lpush key value:将元素加入列表左边 rpush key …

    Java 2023年6月6日
    0107
  • X86寄存器

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Java 2023年6月9日
    089
  • springboot2.3.x whitelabel error page 404

    一般资料都提3个问题 controller 位置不对 未添加依赖 注释 我的问题是 properties 要用 thymeleaf-spring5.version Original…

    Java 2023年5月30日
    074
  • windows 2003 不同网段 无法 文件共享 VSS设置

    网上搜索了很多解答,比如设置VPN, 修改ETC表, 自己也设置了VPN,确实可以,设置的过程需要关闭WINDOWS自带的防火墙,让我觉得不太习惯。 就研究了下防火墙的例外项 点击…

    Java 2023年6月14日
    082
  • 实现cookie跨域访问

    通过HTML SCRIPT标签跨域写cookie: 由于html的script标签的src属性,可以支持访问任何域的资源,通过script发起一个get类型的网络请求,获取资源。 …

    Java 2023年6月7日
    071
  • npm 和 maven 使用 Nexus3 私服 | 前后端一起学

    前文《Docker 搭建 Nexus3 私服 》介绍了在 docker 环境下安装 nexus3 以及 nexus3 的基本操作和管理,本文分别介绍 npm(前端)和 maven(…

    Java 2023年6月16日
    069
  • 设计模式之单例模式

    1、什么是单例模式 ​ 单例模式是指保证 某个类在整个软件系统中只有一个对象实例,并且该类仅提供一个返回其对象实例的方法(通常为静态方法) 2、单例模式的种类 ​ 经典的单例模式实…

    Java 2023年6月8日
    084
  • JAVA_AesCBC例子

    java;gutter:true; import javax.crypto.Cipher; import javax.crypto.spec.IvParameterSpec; im…

    Java 2023年5月29日
    077
  • MYSQL进阶

    之前写了篇文章, 记录了MySQL的一些常用命令, 现在看来仍然有遗漏的地方, 所以补充一下 组合查询 即 UNION 使用 UNION来组合两个查询,如果第一个查询返回 M 行,…

    Java 2023年6月7日
    0108
  • Java连载151-JUnit简介以及HashMap初步分析

    一、配置JUnit环境 JUnit是一个集成测试单元框架,我们先下载软件包,来配置环境 <span class="hljs-keyword">pac…

    Java 2023年6月13日
    096
  • Redisson报错

    org.redisson.client.RedisResponseTimeoutException: Redis server response timeout (3000 ms)…

    Java 2023年6月6日
    092
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球