阅读 84

实力大于名气的Rxjs

Rxjs作为一个不是特别流行的库可能被不少前端工程师所忽略,关于Rxjs的博客与学习视频也少之又少,但笔者最近恰巧接触到Angular项目,Angular中高度集成了Rxjs,则有机会接触到强大的Rxjs,特出此博客记录一下学习过程

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

Rxjs中文文档

Rxjs在线测试

Rxjs中的基本概念

  • Observable (可观察对象):  表示一个概念,这个概念是一个可调用的未来值或事件的集合。

  • Observer (观察者):  一个回调函数的集合,它知道如何去监听由 Observable 提供的值。

  • Subscription (订阅):  表示 Observable 的执行,主要用于取消 Observable 的执行。

  • Operators (操作符):  采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。

  • Subject (主体):  相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。

  • Schedulers (调度器):  用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

在日常开发时,我们常用Rxjs来完成一种数据发布订阅者模式的响应式处理,这就牵扯到Rxjs中众多的基本概念,下面以一个简单的例子来展示。

import { Observable } from 'rxjs'; const observable = new Observable(subscriber => {   subscriber.next(1);   subscriber.next(2);   subscriber.next(3);   setTimeout(() => {     subscriber.next(4);     subscriber.complete();   }, 1000); }); console.log('just before subscribe'); observable.subscribe({   next(x) { console.log('got value ' + x); },   error(err) { console.error('something wrong occurred: ' + err); },   complete() { console.log('done'); } }); console.log('just after subscribe'); //控制台输出 just before subscribe got value 1 got value 2 got value 3 just after subscribe got value 4 done 复制代码

在上述例子中,我们通过new Observable来创建了一个可观察的对象,然后通过observable.subscribe(observer(回调函数))去监听订阅由Observable提供的值。然后运行回调函数去处理数据。

image.png 当然Rxjs不止于此,订阅者也可不止一个,更有非常多的API可供选择。

import { of } from 'rxjs'; import { map } from 'rxjs/operators'; of(1, 2, 3)   .pipe(map((x) => x * x))   .subscribe((v) => console.log(`value: ${v}`)); // Logs: // value: 1 // value: 4 // value: 9 复制代码

Observable与Subject

  • observable 其实可以理解为一个拟定的行为,而一旦被observer订阅,就执行一遍,仅此而已。这个拟定的行为里,可以包含同步代码也可以包含异步代码,但是从订阅到触发,这个过程就一定是同步触发的。其实新建信号很少用这个observable,而一般会在进一步封装别人提供的信号时使用。

  • subject,简单理解成一个信号发射器,你想要发信号,那就用subject吧!subject实际继承自observable,但相比observable而言,subject内部还维护了一个订阅者队列,每当subject有信号发出,就会遍历队列,推送信号。那么,显然,subject其实就是中间人设计模式。

区别:代码演示

import { BehaviorSubject, Observable,Subject } from 'rxjs' const observable = new Observable(subscriber => {subscriber.next(Math.random()*20)}); observable.subscribe((v)=>{console.log('ObservableA:' + v)}) observable.subscribe((v)=>{console.log('ObservableB:' + v)}) var subject = new Subject(); subject.subscribe({   next: (v) => console.log('subjectA: ' + v) }); subject.subscribe({   next: (v) => console.log('subjectB: ' + v) }); subject.next(Math.random()*20) //控制台输出 ObservableA:11.299099927979421 ObservableB:7.6997538111151975 subjectA:2.7308313413849827 subjectB:2.7308313413849827 复制代码

从以上代码我们可以看出,ObservableA与ObservableB的数据不同,而subjectA与subjectB的数据相同。
其原因是Observable是单播而Subject可广播。
每次订阅Observable就会触发Observable中数据的产生,获得一个随机值,则每次订阅获得的数据不同。当订阅Subject则会被添入到一个订阅者队列中,每当Subject有信号发出,就会遍历队列,推送一样的信号。
实现多播的 Observables “多播 Observable” 通过 Subject 来发送通知

import { Observable,Subject } from 'rxjs' const observable = new Observable((observer) => {   observer.next(Math.random()); }); const subject = new Subject(); // 订阅者 1 subject.subscribe((data) => {   console.log(data);  }); // 订阅者 2 subject.subscribe((data) => {   console.log(data);  }); observable.subscribe(subject); //控制台输出 0.5630103286085317 0.5630103286085317 复制代码

这样利用Subject作为中间人进行数据广播就能让订阅者获取到相同的数据。

Subject的变体

BehaviorSubject

BehaviorSubject 是 Subject 的变体之一。BehaviorSubject 的特性就是它会存储“当前”的值。这意味着你始终可以直接拿到 BehaviorSubject 最后一次发出的值。

import * as Rx from "rxjs"; const subject = new Rx.BehaviorSubject(Math.random()); // 订阅者 A subject.subscribe((data) => {   console.log('Subscriber A:', data); }); subject.next(Math.random()); // 订阅者 B subject.subscribe((data) => {   console.log('Subscriber B:', data); }); subject.next(Math.random()); console.log(subject.value) // 输出 // Subscriber A: 0.24957144215097515 // Subscriber A: 0.8751123892486292 // Subscriber B: 0.8751123892486292 // Subscriber A: 0.1901322109907977 // Subscriber B: 0.1901322109907977 // 0.1901322109907977 复制代码

ReplaySubject

相比 BehaviorSubject 而言,ReplaySubject 是可以给新订阅者发送“旧”数据的。另外,ReplaySubject 还有一个额外的特性就是它可以记录一部分的 observable execution,从而存储一些旧的数据用来“重播”给新来的订阅者。

import * as Rx from "rxjs"; const subject = new Rx.ReplaySubject(2); // 订阅者 A subject.subscribe((data) => {   console.log('Subscriber A:', data); }); subject.next(Math.random()) subject.next(Math.random()) subject.next(Math.random()) // 订阅者 B subject.subscribe((data) => {   console.log('Subscriber B:', data); }); subject.next(Math.random()); // Subscriber A: 0.3541746356538569 // Subscriber A: 0.12137498878080955 // Subscriber A: 0.531935186034298 // Subscriber B: 0.12137498878080955  subject 会将上两个值“重播”给订阅者B。 // Subscriber B: 0.531935186034298 // Subscriber A: 0.6664809293975393 // Subscriber B: 0.6664809293975393 复制代码

AsyncSubject

BehaviorSubject 和 ReplaySubject 都可以用来存储一些数据,而 AsyncSubject 就不一样了。AsyncSubject 只会在 Observable execution 完成后,将其最终值发给订阅者。请看代码:

import * as Rx from "rxjs"; const subject = new Rx.AsyncSubject(); // 订阅者A subject.subscribe((data) => {   console.log('Subscriber A:', data); }); subject.next(Math.random()) subject.next(Math.random()) subject.next(Math.random()) // 订阅者B subject.subscribe((data) => {   console.log('Subscriber B:', data); }); subject.next(Math.random()); subject.complete();//完成后才把最终值发给订阅者 // Subscriber A: 0.4447275989704571 // Subscriber B: 0.4447275989704571 复制代码

Rxjs有哪些好处

解决回调地狱

在远古回调函数时代,若用回调函数去处理复杂的业务逻辑则可能出现以下情况

fs.readFile('a.txt', 'utf-8', function(err, data) {     fs.readFile('b.txt', 'utf-8', function(err, data1) {         fs.readFile('c.txt', 'utf-8', function(err, data2) {             // ......         })     }) }) 复制代码

因为下一次的调用方法需要上一次方法的数据支持,则会导致回调函数一层套一层,出现回调地狱的情况,随着业务逻辑的增加也让代码变得难以维护。
当然在ES6中也出现了回调地狱的解决方法Promise,Promise采用链式调用让代码可以一节接一节而不是一层套一层,让代码更加简洁明了。

 function readData(filePath) {     return new Promise((resolve, reject) => {         fs.readFile(filePath, 'utf-8', (err, data) => {             if (err) reject(err);             resolve(data);         })     }); } readData('a.txt').then(res => {     return readData('b.txt'); }).then(res => {     return readData('c.txt'); }).then(res => {     return readData('d.txt'); }).catch(err => {     console.log(err); }) 复制代码

Rxjs在使用方式上与Promise很像

function readData(filePath) {     return new Observable((observer) => {         fs.readFile(filePath, 'utf-8', (err, data) => {             if (err) observer.error(err);             observer.next(data);         })     }); } Rx.Observable .forkJoin(readData('a.txt'), readData('b.txt'), readData('c.txt')) .subscribe(data => console.log(data)); 复制代码

这样一来Rxjs与Promise都能优雅地解决回调地狱,但Rxjs比Promise更加强大,究竟强大在哪呢,我们稍后对比。

实现响应式编程

在基础概念中也提及了响应式编程的概念,响应式编程的思想广泛用于实际的项目中,下面就以实际项目中的代码片段演示:

//业务场景,在初始化加载中请求到表单数据 getLists() {     const params: GroupListParams = {       page: this.page     };     setTimeout(() => this.loading.show(), 100);     this.groupService.getGroupList(params).subscribe(       data => {         this.groupLists = data.list;         setTimeout(() => this.loading.hide(), 100);         this.paginationConfig$.next(data.pages);       },       error => {         setTimeout(() => this.loading.hide(), 100);         throw new ServerError(error.message);       }     );   } getGroupList(params: GroupListParams): Observable<GroupListRes> {     const sendParams = this.generateHttpParams(params);     return this.http       .get(API.GET_GROUP_LIST, { params: sendParams })       .pipe(map((resp: UglyResponse) => resp.data || {}));   } 复制代码

在以上的代码中,getGroupList()产生一个Observable对象,并可对产生的数据通过pipe来处理数据,在getLists()中可以订阅getGroupList()中产生的数据,接收到data。这样就可以实现一个简单的响应式编程,每当调用一次getList()就可以产生新数据,订阅者可以接收到新产生的数据并在视图上及时更新。

实现数据流处理模式

我们有时面对复杂的业务常见,需要对数据进行层层筛选,如下图所示,Rxjs中所提供的API能让这些对数据的操作变得易如反掌。

image.png

//例子 import { of } from 'rxjs'; import { map } from 'rxjs/operators'; import { filter } from 'rxjs/operators'; of(1, 2, 3)   .pipe(map((x) => x * x))   .pipe(filter((x)=>x<5))   .subscribe((v) => console.log(`value: ${v}`)); //控制台输出 value: 1 value: 4 复制代码

Rxjs优势的冰山一角

Rxjs作为一个非常强大的第三方库,相对于其他解决方案的优势也是很多,在这里笔者就列举一些自己所遇到的地方。

Rxjs/operator API

控制最大并发数

Promise解决方案

class PromisePool {     constructor(max, fn) {       this.max = max; // 最大并发数       this.fn = fn;   // 自定义的请求函数       this.pool = []; // 并发池       this.urls = []; // 剩余的请求地址     }        start(urls) {       this.urls = urls;       // 先循环把并发池塞满       while (this.pool.length < this.max) {         let url = this.urls.shift();         this.setTask(url);       }       // 利用Promise.race 方法来获得并发池中某任务完成的信号       let race = Promise.race(this.pool);       return this.run(race);     }        run(race) {       race         .then(res => {           // 每当并发池跑完一个任务,就再塞入一个任务           let url = this.urls.shift();           this.setTask(url);           return this.run(Promise.race(this.pool));         });     }     setTask(url) {       if (!url) return;       let task = this.fn(url);       this.pool.push(task); // 将该任务推入pool并发池中       console.log(`\x1B[43m ${url} 开始,当前并发数:${this.pool.length}`);       task.then(res => {         // 请求结束后将该Promise任务从并发池中移除         this.pool.splice(this.pool.indexOf(task), 1);         console.log(`\x1B[43m ${url} 结束,当前并发数:${this.pool.length}`);       });     }   }      // test   const URLs = [     'bytedance.com',     'tencent.com',     'alibaba.com',     'microsoft.com',     'apple.com',     'hulu.com',     'amazon.com'   ];   let dur = 0;   // 自定义请求函数   var requestFn = url => {     return new Promise(resolve => {       setTimeout(_ => {         resolve(`任务 ${url} 完成`);       }, 1000*dur++)     }).then(res => {       console.log('外部逻辑 ', res);     })   }      const pool = new PromisePool(3, requestFn); // 并发数为3   pool.start(URLs); 复制代码

Rxjs解决方案(mergeMap)

相比于Promise的需要自己造轮子,Rxjs则提供了现成的API可供使用

// 假设这是你的http请求函数 function httpGet(url) {   return new Promise(resolve => setTimeout(() => resolve(`Result: ${url}`), 2000)); } const array = [   'https://httpbin.org/ip',    'https://httpbin.org/user-agent',   'https://httpbin.org/delay/3', ]; // mergeMap是专门用来处理并发处理的rxjs操作符 // mergeMap第二个参数2的意思是,from(array)每次并发量是2,只有promise执行结束才接着取array里面的数据 // mergeMap第一个参数httpGet的意思是每次并发,从from(array)中取的数据如何包装,这里是作为httpGet的参数 const source = from(array).pipe(mergeMap(httpGet, 2)).subscribe(val => console.log(val)); 复制代码

在线代码预览:控制并发

retry重试

Promise解决方案

//随机产生一个1-20随机数,小于10则成功,大于10则失败 function getData(){     let p = new Promise(function(resolve, reject){         setTimeout(function(){             var num = Math.ceil(Math.random()*20); //生成1-20随机数             console.log('随机数生成的值:',num)             if(num<=10){                 console.log('符合条件,值为'+num)                 resolve(num);             }             else{      reject('数字大于10了执行失败');             }         }, 2000);     })     return p     } //封装myGetData添加重试功能 function myGetData(getData, times, delay) {     return new Promise(function(resolve, reject) {        function attempt () {         getData().then(resolve).catch(function(erro) {         console.log(`还有 ${times} 次尝试`)           if (0 == times) {             reject(erro)           } else {             times--             setTimeout(attempt(), delay)           }         })       }        attempt()     })   } // 执行函数,五次重试,每隔一秒执行一次 myGetData(getData,5,1000) //控制台输出 随机数生成的值: 13 还有 5 次尝试 随机数生成的值: 11 还有 4 次尝试 随机数生成的值: 4 符合条件,值为4 复制代码

Rxjs解决方案

// RxJS v6+ import { interval, of, throwError } from 'rxjs'; import { mergeMap, retry } from 'rxjs/operators'; // 每1秒发出值 const source = interval(1000); const example = source.pipe(   mergeMap(val => {     // 抛出错误以进行演示     if (val > 5) {       return throwError('Error!');     }     return of(val);   }),   // 出错的话可以重试2次   retry(2) ); /*   输出:   0..1..2..3..4..5..   0..1..2..3..4..5..   0..1..2..3..4..5..   "Error!: Retried 2 times then quit!" */ const subscribe = example.subscribe({   next: val => console.log(val),   error: val => console.log(`${val}: Retried 2 times then quit!`) }); 复制代码

总结

Rxjs是一个非常强大的第三方库,相较于Promise,Rxjs封装了更多的API,也避免了使用Promise时重复造轮子的现象出现,可以满足更加多变的业务场景。
Rxjs实现数据的响应式编程与数据流处理也很强大,在Angular的项目中应用也十分广泛。

本博客介绍的只是Rxjs的冰山一角,如有兴趣可深入学习。


作者:XQ_dayday_up
链接:https://juejin.cn/post/7000605558159966215


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐