Android 第三方框架之RxJava
一:介绍
在GitHub上的介绍:一个在Java VM上应用可观测的序列来组成异步的,基于事件的程序库
总结:RxJava 是一个基于事件流,实现异步操作的库
特点:异步,简洁
异步操作很要害的一点是程序的简洁性,因为在调度过程比较复杂的状况下,异步代码常常会既难写也难被读懂。 Android 发明的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的劣势也是简洁,但它的简洁的不同凡响之处在于,随着程序逻辑变得越来越简单,它仍然可能放弃简洁
二:原理
RxJava原理基于一种扩大的观察者模式
被观察者(Observable)通过订阅(Subscribe)按程序发送事件(Event)给观察者(Observer),观察者(Observer)按程序接管事件以及做出对应的响应动作
.被观察者(Observable)产生事件
.观察者(Observer)接管事件,并给出响应动作
.订阅(Subscribe)连贯 被观察者&观察者
.事件(Event)被观察者&观察者连贯的桥梁
观察者模式是一种一对多的对象依赖关系
例如:铃声,学生,老师,上课
铃声(Ring)被观察者(Observable)
学生,老师(Student,Teacher)观察者(Observer)
//被观察者的抽象类 abstract class Subject { //定义一个汇合存储观察者对象 protected List<Observer> observers = new ArrayList<Observer>(); //减少观察者,通过add办法订阅观察者 public void add(Observer observer) { observers.add(observer); } //删除观察者 public void remove(Observer observer) { observers.remove(observer); } //告诉观察者办法 public abstract void notifyObserve1(); public abstract void notifyObserve2(); } /** * 铃声(被观察者)具体实现 */ public class Ring extends Subject { @Override public void notifyObserve1() { System.out.println("上课铃响了"); for (Object object : observers) { ((Observer) object).response1(); } System.out.println("------------"); } @Override public void notifyObserve2() { System.out.println("下课铃响了"); for (Object object : observers) { ((Observer) object).response2(); } System.out.println("------------"); } }
观察者
/** * 形象观察者 */ public interface Observer { //观察者反馈 public abstract void response1();//上课反馈 public abstract void response2();//下课反馈 } /** * 老师(观察者)具体老师的实现 */ public class Teacher implements Observer { @Override public void response1() { System.out.println("老师筹备认真讲课"); } @Override public void response2() { System.out.println("老师筹备下课劳动"); } } //应用: Subject subject = new Ring();//创立被观察者 Observer teacher = new Teacher();//创立观察者 subject.add(teacher);//订阅观察者 subject.notifyObserve1();//发送事件1 subject.notifyObserve2();//发送事件2
RxJava个别的事件流的应用
增加依赖:
//rxjava2 implementation 'io.reactivex.rxjava2:rxjava:2.2.20' implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
RxJava观察者的事件回调办法除了一般事件onNext(),(相当于onClick()/onEvent())之外,还定义了两个非凡的事件:onCompleted()和onError()
.onCompleted():事件队列完结。RxJava不仅把每个事件独自解决,还会把他们看作一个队列。RxJava规定,当不会再有新的onNext()收回时,须要触发onCompleted()办法作为标记
.onError(): 事件队列异样。在事件处理过程中出异样时,onError()会被触发,同时队列主动终止,不容许再有事件收回
.在一个正确运行的事件序列中,onCompleted()和onError()有且只有一个,并且是事件序列中的最初一个。须要留神的是,onCompleted()和onError()二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个
//1.创立被观察者Observable对象,应用Observable.create创立 Observable.create(new ObservableOnSubscribe<Integer>() { //2.在复写的subscribe()里定义须要发送的事件 @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception { //通过ObservableEmitter类对象产生事件并告诉观察者 // ObservableEmitter类介绍 // a. 定义:事件发射器 // b. 作用:定义须要发送的事件 & 向观察者发送事件 emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete();//事件实现,能够抉择持续发送事件 } }).subscribe(new Observer<Integer>() {//订阅观察者 // 通过通过订阅(subscribe)连贯观察者和被观察者 // 创立观察者 & 定义响应事件的行为 @Override public void onSubscribe(@NonNull Disposable d) { Log.d("TAG", "开始采纳subscribe连贯"); } @Override public void onNext(@NonNull Integer value) { Log.d("TAG", "处理事件"+ value ); } @Override public void onError(@NonNull Throwable e) { Log.d("TAG", "解决Error事件,不再接管事件"); } @Override public void onComplete() { Log.d("TAG", "解决Complete事件,不再接管事件"); } });
应用来说,每一个网络申请接口相当于一个被观察者
public interface ApiService { // 获取工作 @POST("task/apply") Observable<BaseBean<GetTaskBean>> getTask(@Body RequestTaskBean requestTaskBean);//这相当于被观察者 须要去订阅观察者 }
应用
HttpObservable.getObservable(RetrofitFactory.getApiService().getTask(bean)) .subscribe(GET_TASK_BEAN_HTTPOBSERVER); //HttpObservable.getObservable(RetrofitFactory.getApiService().getTask(bean)) 这个是失去被观察者 .subscribe()订阅 GET_TASK_BEAN_HTTPOBSERVER :是观察者 HttpObserver<GetTaskBean> GET_TASK_BEAN_HTTPOBSERVER = new HttpObserver<GetTaskBean>(RequestTagConfig.TASK_APPLY) { //这是一个观察者的实现 }
三:RxJava的操作符
转换类操作符(map flatMap concatMap flatMapIterable switchMap scan groupBy…);
过滤类操作符(fileter take takeLast takeUntil distinct distinctUntilChanged skip skipLast …);
组合类操作符(merge zip join combineLatest and/when/then switch startSwitch…)。
转换操作符Map
map()函数承受一个Func1类型的参数,而后把这个Func1利用到每一个由Observable发射的值上,将发射的值转换为咱们冀望的值
假如咱们须要将一组数字转换成字符串,咱们能够通过map这样实现:
Observable.just(1, 2, 3, 4, 5) .map(new Func1<Integer, String>() { @Override public String call(Integer i) { return "This is " + i; } }).subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });
过滤操作符Filter
filter(Func1)用来过滤观测序列中咱们不想要的值,只返回满足条件的值,咱们看下原理图
还是拿后面文章中的小区Community[] communities来举例,假如我须要赛选出所有房源数大于10个的小区,咱们能够这样实现:
Observable.from(communities) .filter(new Func1<Community, Boolean>() { @Override public Boolean call(Community community) { return community.houses.size()>10; } }).subscribe(new Action1<Community>() { @Override public void call(Community community) { System.out.println(community.name); } });
组合操作符Merge
merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。你能够简略的将它了解为两个Obsrvable合并成了一个Observable,合并后的数据是无序的。
咱们看上面的例子,一共有两个Observable:一个用来发送字母,另一个用来发送数字;当初咱们须要两连个Observable发射的数据合并。
String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"}; Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS) .map(new Func1<Long, String>() { @Override public String call(Long position) { return letters[position.intValue()]; } }).take(letters.length); Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5); Observable.merge(letterSequence, numberSequence) .subscribe(new Observer<Serializable>() { @Override public void onCompleted() { System.exit(0); } @Override public void onError(Throwable e) { System.out.println("Error:" + e.getMessage()); } @Override public void onNext(Serializable serializable) { System.out.print(serializable.toString()+" "); } });
程序输入:A 0 B C 1 D E 2 F 3 G H 4
RxJava线程切换
总所周知 RxJava 在切换线程时用到了两个办法 subscribeOn() 和 observeOn() 上面来别离解释一下这两个办法
subscribeOn() : 影响的是最开始的被观察者所在的线程。当应用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
observeOn() : 影响的是跟在前面的操作(指定观察者运行的线程)。所以如果想要屡次扭转线程,能够屡次应用 observeOn;
*/ public static <T> Observable<T> getObservable(Observable<T> apiObservable) { // showLog(request); Observable<T> observable = apiObservable .onErrorResumeNext(new HttpResultFunction<>()) .subscribeOn(Schedulers.io())//线程切换,影响最开始的被观察者的线程 .observeOn(AndroidSchedulers.mainThread());//切换主线程影响的是观察者的线程 return observable; }
因为subscribeOn(Schedulers.io())它指定了最开始的被观察者所在的线程所以前面的操作都是依据最开始的被观察者制订的线程运行的,又因为 .observeOn(AndroidSchedulers.mainThread()) 它指定了都面的操作符应用主线程运行。
RxJava线程调度Schedulers
类型 | 含意 | 利用场景 |
---|---|---|
Schedulers.immediate() | 以后线程 = 不指定线程 | 默认 |
AndroidSchedulers.mainThread() | Android主线程 | 操作UI |
Schedulers.newThread() | 惯例新线程 | 网络申请、读写文件等io密集型操作 |
Schedulers.io() | io操作线程 | 网络申请、读写文件等io密集型操作 |
Schedulers.computation() | CPU计算操作线程 | 大量计算操作 |
//被观察者 (Observable) 在 子线程 中生产事件(如实现耗时操作等等) //观察者(Observer)在 主线程 接管 & 响应事件(即实现UI操作) Observable.just("head.png", "icon.png") .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { //网络获取 Log.i(TAG, "apply:----------->1 " + Thread.currentThread().getName()); return Bitmap.createBitmap(100, 100, Bitmap.Config.ARGB_8888); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) // .subscribeOn(Schedulers.newThread()) .subscribe(new Consumer<Bitmap>() { @Override public void accept(Bitmap bitmap) throws Exception { // 子线程 Log.i(TAG, "apply:----------->2 " + Thread.currentThread().getName()); // imageView.setIamgeView(bitmap) } });
三:RxJava背压策略
观察者和被观察者之间存在2种订阅关系,同步和异步
对于异步订阅关系,存在被观察者发送事件速度与观察者接管事件速度不匹配的状况
1.发送 & 接管事件速度 = 单位工夫内 发送&接管事件的数量
2.大多数状况,次要是 被观察者发送事件速度 > 观察者接管事件速度
问题:
被观察者 发送事件速度太快,而观察者 来不及接管所有事件,从而导致观察者无奈及时响应 / 解决所有发送过去事件的问题,最终导致缓存区溢出、事件失落 & OOM
例如:
被观察者发送事件的速度=10ms/个
观察者的接管速度=5s/个
即呈现发送 & 接管事件重大不匹配的问题
Observable.create(new ObservableOnSubscribe<Integer>() { // 1. 创立被观察者 & 生产事件 @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { Log.d(TAG, "发送了事件"+ i ); Thread.sleep(10); // 发送事件速度:10ms / 个 emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行 .observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行 .subscribe(new Observer<Integer>() { // 2. 通过通过订阅(subscribe)连贯观察者和被观察者 @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采纳subscribe连贯"); } @Override public void onNext(Integer value) { try { // 接管事件速度:5s / 个 Thread.sleep(5000); Log.d(TAG, "接管到了事件"+ value ); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
解决方案:采纳背压策略
在异步订阅关系中,管制事件发送&接管速度
背压的作用域 = 异步订阅关系,即 被观察者 & 观察者处在不同线程中
背压策略的具体实现:Flowable
在 RxJava2.0中,采纳 Flowable 实现 背压策略,对应的观察者Subscriber
Flowable应用
/** * 步骤1:创立被观察者 = Flowable */ Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }, BackpressureStrategy.ERROR); // 须要传入背压参数BackpressureStrategy,上面会具体解说 /** * 步骤2:创立观察者 = Subscriber */ Subscriber<Integer> downstream = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { // 比照Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription // 相同点:Subscription具备Disposable参数的作用,即Disposable.dispose()切断连贯, 同样的调用Subscription.cancel()切断连贯 // 不同点:Subscription减少了void request(long n) Log.d(TAG, "onSubscribe"); s.request(Long.MAX_VALUE);//响应式拉取,须要多少要多少 // 对于request()上面会持续具体阐明 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; /** * 步骤3:建设订阅关系 */ upstream.subscribe(downstream);
相干的背压博客参考:https://www.gaodaima.com/p/ceb…
四:RxJva其余罕用操作符
1.from 接管一个汇合作为输出,而后每次输入一个元素给subscriber
Observable.from(T[] params)Observable.from(new Integer[]{1, 2, 3, 4, 5}) .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.i(TAG, "number:" + number); } });
2.just 接管一个可变参数作为输出,最终也是生成数组,和调from()区别
Observable.just(T... params) //params的个数为1 ~ 10Observable.just(1, 2, 3, 4, 5) .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.i(TAG, "number:" + number); } });
3.timer 能够做定时操作,换句话,就是提早执行。事件间隔由timer管制。
//提早2s执行Log 打印Hello World(一次) Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { Log.i(TAG, "Hello World!"); } });
4.interval 定时的周期性操作,与timer 的区别就在于它能够反复操作。事件间隔由interval管制
//定时2s反复打印Log Hello World Observable.interval(2, TimeUnit.SECONDS) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { Log.i(TAG, "Hello World!"); } });
END:春天播下种子,才有秋的播种;已经艰辛拼搏,才有苦涩生存。