阅读 179

响应式编程RxJava (一)

1.什么是RxJava?

1.1什么是响应式编程?

是一种基于异步数据流概念的编程模式(异步数据流编程) 数据流 ->河流(被观测、被过滤、被操作)

1.2响应式编程的设计原则是:

保持数据的不变性 没有共享 阻塞是有害的

1.3在我们的Java里面提供了解决方案 - RxJava?\

RxJava:Reactive Extensions Java(Java响应式编程)
响应式编程最初诞生.Net里面
iOS开发中也有响应式编程(block)


        // 传统写法:加载文件 //      new Thread() { //          @Override //          public void run() { //              super.run(); //              for (File folder : folders) { //                  File[] files = folder.listFiles(); //                  for (File file : files) { //                      if (file.getName().endsWith(".png")) { //                          final Bitmap bitmap = getBitmapFromFile(file); //                          // 更新UI线程 //                          runOnUiThread(new Runnable() { //                              @Override //                              public void run() { //                                  imageCollectorView.addImage(bitmap); //                              } //                          }); //                      } //                  } //              } //          } //      }.start(); 复制代码

RxJava写法


        File[] folders = new File[10];         Observable.from(folders)         //便利         .flatMap(new Func1<File, Observable<File>>() {             @Override             public Observable<File> call(File file) {                 return Observable.from(file.listFiles());             }         })         //过滤         .filter(new Func1<File, Boolean>() {             @Override             public Boolean call(File file) {                     //条件                 return file.getName().endsWith(".png");             }         })         //加载图片         .map(new Func1<File, Bitmap>() {             @Override             public Bitmap call(File file) {                 return getBitmapFromFile(file);             }         })         .subscribeOn(Schedulers.io())         .observeOn(AndroidSchedulers.mainThread())         //更新UI         .subscribe(new Action1<Bitmap>() {             @Override             public void call(Bitmap bitmap) {                 imageCollectorView.addImage(bitmap);             }         }); 复制代码

文件数组
flatMap:相当于我们手动的起嵌套循环
队列数据结构
你会发现以下这个简单的案例有哪些优势
第一点:你不需要考虑线程问题
第二点:你不要关心如何更新UI线程,如何调用

2.RxJava整体架构设计?


 整体架构设计 -> 主要观察者模式  同时里面还采用其他的设计模式 代理模式、迭代器模式、Builder设计模式(构建者模式)  整体RxJava框架,角色划分:        Observable   :被观察者        Observer      : 观察者        Subscrible    : 订阅        Subjects       : 科目               Observable 和 Subjects 是两个“生产“实体,Observer和Subscrible是两个“消费”实体           热Observables 和冷Observables   从发射物的角度来看,有两种不同的Observables:热的和冷的。一个“热”的Observable典型的只要一创建完就开始发射数据。因此所有后续订阅它的观察者可能从序列中间得某个位置开始接收数据(有一些数据错过了)。一个“冷”的Observable会一直等待,知道由观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。    热和冷  热:主动         场景:容器中目前只有一个观察者,向所有的观察者发送3条数据,因为热Observables一旦创建就立马发送消息,假设我现在发送到了第二条数据,突然之后增加了一个观察者,这个时候,第二个观察者就收不到之前的消息。 冷:被动        场景:容器中目前只有1个观察者,因为冷Observables一旦创建就会等待观察者订阅,一定有观察者订阅了,我立马将所有的消息发送给这个观察者(订阅人) 复制代码

3.RxJava基本API?

第一个案例:如何创建Observables?

subscribe 相关源码:


    public final Subscription subscribe(final Observer<? super T> observer) {         if (observer instanceof Subscriber) {             return subscribe((Subscriber<? super T>)observer);         }         if (observer == null) {             throw new NullPointerException("observer is null");         }         return subscribe(new ObserverSubscriber<T>(observer));     }     static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {      // validate and proceed         if (subscriber == null) {             throw new IllegalArgumentException("subscriber can not be null");         }         if (observable.onSubscribe == null) {             throw new IllegalStateException("onSubscribe function can not be null.");             /*              * the subscribe function can also be overridden but generally that's not the appropriate approach              * so I won't mention that in the exception              */         }         // new Subscriber so onStart it         subscriber.onStart();         /*          * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls          * to user code from within an Observer"          */         // if not already wrapped         if (!(subscriber instanceof SafeSubscriber)) {             // assign to `observer` so we return the protected version             subscriber = new SafeSubscriber<T>(subscriber);         }         // The code below is exactly the same an unsafeSubscribe but not used because it would         // add a significant depth to already huge call stacks.         try {             // allow the hook to intercept and/or decorate             RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);             return RxJavaHooks.onObservableReturn(subscriber);         } catch (Throwable e) {             // special handling for certain Throwable/Error/Exception types             Exceptions.throwIfFatal(e);             // in case the subscriber can't listen to exceptions anymore             if (subscriber.isUnsubscribed()) {                 RxJavaHooks.onError(RxJavaHooks.onObservableError(e));             } else {                 // if an unhandled error occurs executing the onSubscribe we will propagate it                 try {                     subscriber.onError(RxJavaHooks.onObservableError(e));                 } catch (Throwable e2) {                     Exceptions.throwIfFatal(e2);                     // if this happens it means the onError itself failed (perhaps an invalid function implementation)                     // so we are unable to propagate the error correctly and will just throw                     RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);                     // TODO could the hook be the cause of the error in the on error handling.                     RxJavaHooks.onObservableError(r);                     // TODO why aren't we throwing the hook's return value.                     throw r; // NOPMD                 }             }             return Subscriptions.unsubscribed();         }     } public class SafeSubscriber<T> extends Subscriber<T> {     private final Subscriber<? super T> actual;     boolean done;     public SafeSubscriber(Subscriber<? super T> actual) {         super(actual);         this.actual = actual;     }     /**      * Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.      * <p>      * The {@code Observable} will not call this method if it calls {@link #onError}.      */     @Override     public void onCompleted() {         if (!done) {             done = true;             try {                 actual.onCompleted();             } catch (Throwable e) {                 // we handle here instead of another method so we don't add stacks to the frame                 // which can prevent it from being able to handle StackOverflow                 Exceptions.throwIfFatal(e);                 RxJavaHooks.onError(e);                 throw new OnCompletedFailedException(e.getMessage(), e);             } finally { // NOPMD                 try {                     // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken                     // and we throw an UnsubscribeFailureException.                     unsubscribe();                 } catch (Throwable e) {                     RxJavaHooks.onError(e);                     throw new UnsubscribeFailedException(e.getMessage(), e);                 }             }         }     }     /**      * Notifies the Subscriber that the {@code Observable} has experienced an error condition.      * <p>      * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or      * {@link #onCompleted}.      *      * @param e      *          the exception encountered by the Observable      */     @Override     public void onError(Throwable e) {         // we handle here instead of another method so we don't add stacks to the frame         // which can prevent it from being able to handle StackOverflow         Exceptions.throwIfFatal(e);         if (!done) {             done = true;             _onError(e);         }     }     /**      * Provides the Subscriber with a new item to observe.      * <p>      * The {@code Observable} may call this method 0 or more times.      * <p>      * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or      * {@link #onError}.      *      * @param t      *          the item emitted by the Observable      */     @Override     public void onNext(T t) {         try {             if (!done) {                 actual.onNext(t);             }         } catch (Throwable e) {             // we handle here instead of another method so we don't add stacks to the frame             // which can prevent it from being able to handle StackOverflow             Exceptions.throwOrReport(e, this);         }     }     /**      * The logic for {@code onError} without the {@code isFinished} check so it can be called from within      * {@code onCompleted}.      *      * @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>      */     @SuppressWarnings("deprecation")     protected void _onError(Throwable e) { // NOPMD         RxJavaPlugins.getInstance().getErrorHandler().handleError(e);         try {             actual.onError(e);         } catch (OnErrorNotImplementedException e2) { // NOPMD             /*              * onError isn't implemented so throw              *              * https://github.com/ReactiveX/RxJava/issues/198              *              * Rx Design Guidelines 5.2              *              * "when calling the Subscribe method that only has an onNext argument, the OnError behavior              * will be to rethrow the exception on the thread that the message comes out from the observable              * sequence. The OnCompleted behavior in this case is to do nothing."              */             try {                 unsubscribe();             } catch (Throwable unsubscribeException) {                 RxJavaHooks.onError(unsubscribeException);                 throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD             }             throw e2;         } catch (Throwable e2) {             /*              * throw since the Rx contract is broken if onError failed              *              * https://github.com/ReactiveX/RxJava/issues/198              */             RxJavaHooks.onError(e2);             try {                 unsubscribe();             } catch (Throwable unsubscribeException) {                 RxJavaHooks.onError(unsubscribeException);                 throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));             }             throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));         }         // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch         try {             unsubscribe();         } catch (Throwable unsubscribeException) {             RxJavaHooks.onError(unsubscribeException);             throw new OnErrorFailedException(unsubscribeException);         }     }     /**      * Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.      *      * @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}      */     public Subscriber<? super T> getActual() {         return actual;     } } 复制代码

subscriber 实际上就是Observer

RxJava基本使用 源码分析

Observable创建原理分析: 第一步:调用Observable.create()方法 第二步:添加观察者订阅监听Observable.OnSubscrible 第三步:在Observable.create方法中创建被观察者new Observable(hook.onCreate(f)); 第四步:在Observable类构造方法中保存了观察者订阅监听

订阅观察者原理分析: 第一步:注册观察者监听observable.subscribe(new Observer()) 第二步:在Observable类中调用了

public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber<T>(observer)); } 方法中注册观察者 复制代码

第三步:在Observable类中调用了

public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } 复制代码

第四步:调用了Observable.subscribe(subscriber, this);方法
第五步:在 Observable.subscribe方法中调用了监听观察者订阅的回调接口
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);


    private Observable<String> observableString;     @Override     protected void onCreate(Bundle savedInstanceState) {         super.onCreate(savedInstanceState);         setContentView(R.layout.activity_simple2);         // 创建一个被观察者         // 配置回调接口---OnSubscribe         // 为什么要配置?         // 监听观察者订阅,一旦有观察者订阅了,立马回调改接口         observableString = Observable                 .create(new Observable.OnSubscribe<String>() {                     @Override                     public void call(Subscriber<? super String> observer) {                         Log.i("main", "回到了");                         //访问请求                         // 所以在这个方法里面我们可以干一些事情                         // 进行数据通信(说白了就是通知观察者)                         for (int i = 0; i < 5; i++) {                             observer.onNext("第" + i + "个数据");                         }                         //访问完成                         // 当我们的数据传递完成                         observer.onCompleted();                     }                 });     } 复制代码


    public void click(View v) {         // 观察者订阅         // 回调原理:         // 核心代码:         // hook.onSubscribeStart(observable,         // observable.onSubscribe).call(subscriber);         observableString.subscribe(new Observer<String>() {             @Override             public void onCompleted() {                 Log.i("main", "---onCompleted---");             }             @Override             public void onError(Throwable e) {                 System.out.println("Oh,no! Something wrong happened!");             }             @Override             public void onNext(String item) {                 // 接受数据                 Log.i("main", "观察者接收到了数据: " + item);             }         });     } 复制代码

结果输出

08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 回到了 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第0个数据 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第1个数据 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第2个数据 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第3个数据 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第4个数据 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: ---onCompleted--- 复制代码

observableString.subscribe 中 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 调用call方法

另一种方式自动发送


 private Observable<String> observableString;     @Override     protected void onCreate(Bundle savedInstanceState) {         super.onCreate(savedInstanceState);         setContentView(R.layout.activity_simple2);         List<String> items = new ArrayList<String>();         items.add("Kpioneer");         items.add("Xpioneer");         items.add("haocai");         items.add("Huhu");         // 框架本身提供了这样的API         // from: 一旦当你有观察者注册,立马发送消息序列         // 框架内部实现         // 框架内部调用create方法         // 迭代器模式         // OnSubscribeFromIterable类专门用于遍历集合         // OnSubscribeFromArray类专门用于遍历数组         observableString = Observable.from(items);     } 复制代码


    public void click(View v) {         observableString.subscribe(new Observer<String>() {             @Override             public void onCompleted() {                 Log.i("main", "---onCompleted---");             }             @Override             public void onError(Throwable e) {                 System.out.println("Oh,no! Something wrong happened!");             }             @Override             public void onNext(String item) {                 // 接受数据                 Log.i("main", "观察者接收到了数据: " + item);             }         });     } 复制代码

结果输出

08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: Kpioneer 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: Xpioneer 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: haocai 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: Huhu 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: ---onCompleted--- 复制代码


/**  * Copyright 2014 Netflix, Inc.  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package rx.internal.operators; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import rx.*; import rx.Observable.OnSubscribe; import rx.exceptions.Exceptions; /**  * Converts an {@code Iterable} sequence into an {@code Observable}.  * <p>  * ![](http://upload-images.jianshu.io/upload_images/1824809-fa9342290145e00e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)  * <p>  * You can convert any object that supports the Iterable interface into an Observable that emits each item in  * the object, with the {@code toObservable} operation.  * @param <T> the value type of the items  */ public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {     final Iterable<? extends T> is;     public OnSubscribeFromIterable(Iterable<? extends T> iterable) {         if (iterable == null) {             throw new NullPointerException("iterable must not be null");         }         this.is = iterable;     }     @Override     public void call(final Subscriber<? super T> o) {         Iterator<? extends T> it;         boolean b;         try {             it = is.iterator();             b = it.hasNext();         } catch (Throwable ex) {             Exceptions.throwOrReport(ex, o);             return;         }         if (!o.isUnsubscribed()) {             if (!b) {                 o.onCompleted();             } else {                 o.setProducer(new IterableProducer<T>(o, it));             }         }     }     static final class IterableProducer<T> extends AtomicLong implements Producer {         /** */         private static final long serialVersionUID = -8730475647105475802L;         // 具体的观察者         private final Subscriber<? super T> o;        // 具体的数据         private final Iterator<? extends T> it;         IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {             this.o = o;             this.it = it;         }         @Override         public void request(long n) {             if (get() == Long.MAX_VALUE) {                 // already started with fast-path                 return;             }             if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {                 fastPath();             } else             if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {                 slowPath(n);             }         }         void slowPath(long n) {             // backpressure is requested             final Subscriber<? super T> o = this.o;             final Iterator<? extends T> it = this.it;             long r = n;             long e = 0;             for (;;) {                 while (e != r) {                     if (o.isUnsubscribed()) {                         return;                     }                     T value;                     try {                         value = it.next();                     } catch (Throwable ex) {                         Exceptions.throwOrReport(ex, o);                         return;                     }                     o.onNext(value);                     if (o.isUnsubscribed()) {                         return;                     }                     boolean b;                     try {                         b = it.hasNext();                     } catch (Throwable ex) {                         Exceptions.throwOrReport(ex, o);                         return;                     }                     if (!b) {                         if (!o.isUnsubscribed()) {                             o.onCompleted();                         }                         return;                     }                     e++;                 }                 r = get();                 if (e == r) {                     r = BackpressureUtils.produced(this, e);                     if (r == 0L) {                         break;                     }                     e = 0L;                 }             }         }         void fastPath() {             // fast-path without backpressure             final Subscriber<? super T> o = this.o;             final Iterator<? extends T> it = this.it;             for (;;) {                 if (o.isUnsubscribed()) {                     return;                 }                 T value;                 try {                     value = it.next();                 } catch (Throwable ex) {                     Exceptions.throwOrReport(ex, o);                     return;                 }                 o.onNext(value);                 if (o.isUnsubscribed()) {                     return;                 }                 boolean b;                 try {                     b  = it.hasNext();                 } catch (Throwable ex) {                     Exceptions.throwOrReport(ex, o);                     return;                 }                 if (!b) {                     if (!o.isUnsubscribed()) {                         o.onCompleted();                     }                     return;                 }             }         }     } }


作者:冬日毛毛雨
链接:https://juejin.cn/post/7028812791867768845


文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐