实力大于名气的Rxjs
Rxjs作为一个不是特别流行的库可能被不少前端工程师所忽略,关于Rxjs的博客与学习视频也少之又少,但笔者最近恰巧接触到Angular项目,Angular中高度集成了Rxjs,则有机会接触到强大的Rxjs,特出此博客记录一下学习过程
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。
Rxjs中文文档
Rxjs在线测试
Rxjs中的基本概念
Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像
map
、filter
、concat
、flatMap
等这样的操作符来处理集合。Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
setTimeout
或requestAnimationFrame
或其他。
在日常开发时,我们常用Rxjs来完成一种数据发布订阅者模式的响应式处理,这就牵扯到Rxjs中众多的基本概念,下面以一个简单的例子来展示。
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); setTimeout(() => { subscriber.next(4); subscriber.complete(); }, 1000); }); console.log('just before subscribe'); observable.subscribe({ next(x) { console.log('got value ' + x); }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); } }); console.log('just after subscribe'); //控制台输出 just before subscribe got value 1 got value 2 got value 3 just after subscribe got value 4 done 复制代码
在上述例子中,我们通过new Observable
来创建了一个可观察的对象,然后通过observable.subscribe(observer(回调函数))
去监听订阅由Observable提供的值。然后运行回调函数去处理数据。
当然Rxjs不止于此,订阅者也可不止一个,更有非常多的API可供选择。
import { of } from 'rxjs'; import { map } from 'rxjs/operators'; of(1, 2, 3) .pipe(map((x) => x * x)) .subscribe((v) => console.log(`value: ${v}`)); // Logs: // value: 1 // value: 4 // value: 9 复制代码
Observable与Subject
observable 其实可以理解为一个拟定的行为,而一旦被observer订阅,就执行一遍,仅此而已。这个拟定的行为里,可以包含同步代码也可以包含异步代码,但是从订阅到触发,这个过程就一定是同步触发的。其实新建信号很少用这个observable,而一般会在进一步封装别人提供的信号时使用。
subject,简单理解成一个信号发射器,你想要发信号,那就用subject吧!subject实际继承自observable,但相比observable而言,subject内部还维护了一个订阅者队列,每当subject有信号发出,就会遍历队列,推送信号。那么,显然,subject其实就是中间人设计模式。
区别:代码演示
import { BehaviorSubject, Observable,Subject } from 'rxjs' const observable = new Observable(subscriber => {subscriber.next(Math.random()*20)}); observable.subscribe((v)=>{console.log('ObservableA:' + v)}) observable.subscribe((v)=>{console.log('ObservableB:' + v)}) var subject = new Subject(); subject.subscribe({ next: (v) => console.log('subjectA: ' + v) }); subject.subscribe({ next: (v) => console.log('subjectB: ' + v) }); subject.next(Math.random()*20) //控制台输出 ObservableA:11.299099927979421 ObservableB:7.6997538111151975 subjectA:2.7308313413849827 subjectB:2.7308313413849827 复制代码
从以上代码我们可以看出,ObservableA与ObservableB的数据不同,而subjectA与subjectB的数据相同。
其原因是Observable是单播而Subject可广播。
每次订阅Observable就会触发Observable中数据的产生,获得一个随机值,则每次订阅获得的数据不同。当订阅Subject则会被添入到一个订阅者队列中,每当Subject有信号发出,就会遍历队列,推送一样的信号。
实现多播的 Observables “多播 Observable” 通过 Subject 来发送通知
import { Observable,Subject } from 'rxjs' const observable = new Observable((observer) => { observer.next(Math.random()); }); const subject = new Subject(); // 订阅者 1 subject.subscribe((data) => { console.log(data); }); // 订阅者 2 subject.subscribe((data) => { console.log(data); }); observable.subscribe(subject); //控制台输出 0.5630103286085317 0.5630103286085317 复制代码
这样利用Subject作为中间人进行数据广播就能让订阅者获取到相同的数据。
Subject的变体
BehaviorSubject
BehaviorSubject 是 Subject 的变体之一。BehaviorSubject 的特性就是它会存储“当前”的值。这意味着你始终可以直接拿到 BehaviorSubject 最后一次发出的值。
import * as Rx from "rxjs"; const subject = new Rx.BehaviorSubject(Math.random()); // 订阅者 A subject.subscribe((data) => { console.log('Subscriber A:', data); }); subject.next(Math.random()); // 订阅者 B subject.subscribe((data) => { console.log('Subscriber B:', data); }); subject.next(Math.random()); console.log(subject.value) // 输出 // Subscriber A: 0.24957144215097515 // Subscriber A: 0.8751123892486292 // Subscriber B: 0.8751123892486292 // Subscriber A: 0.1901322109907977 // Subscriber B: 0.1901322109907977 // 0.1901322109907977 复制代码
ReplaySubject
相比 BehaviorSubject 而言,ReplaySubject 是可以给新订阅者发送“旧”数据的。另外,ReplaySubject 还有一个额外的特性就是它可以记录一部分的 observable execution,从而存储一些旧的数据用来“重播”给新来的订阅者。
import * as Rx from "rxjs"; const subject = new Rx.ReplaySubject(2); // 订阅者 A subject.subscribe((data) => { console.log('Subscriber A:', data); }); subject.next(Math.random()) subject.next(Math.random()) subject.next(Math.random()) // 订阅者 B subject.subscribe((data) => { console.log('Subscriber B:', data); }); subject.next(Math.random()); // Subscriber A: 0.3541746356538569 // Subscriber A: 0.12137498878080955 // Subscriber A: 0.531935186034298 // Subscriber B: 0.12137498878080955 subject 会将上两个值“重播”给订阅者B。 // Subscriber B: 0.531935186034298 // Subscriber A: 0.6664809293975393 // Subscriber B: 0.6664809293975393 复制代码
AsyncSubject
BehaviorSubject 和 ReplaySubject 都可以用来存储一些数据,而 AsyncSubject 就不一样了。AsyncSubject 只会在 Observable execution 完成后,将其最终值发给订阅者。请看代码:
import * as Rx from "rxjs"; const subject = new Rx.AsyncSubject(); // 订阅者A subject.subscribe((data) => { console.log('Subscriber A:', data); }); subject.next(Math.random()) subject.next(Math.random()) subject.next(Math.random()) // 订阅者B subject.subscribe((data) => { console.log('Subscriber B:', data); }); subject.next(Math.random()); subject.complete();//完成后才把最终值发给订阅者 // Subscriber A: 0.4447275989704571 // Subscriber B: 0.4447275989704571 复制代码
Rxjs有哪些好处
解决回调地狱
在远古回调函数时代,若用回调函数去处理复杂的业务逻辑则可能出现以下情况
fs.readFile('a.txt', 'utf-8', function(err, data) { fs.readFile('b.txt', 'utf-8', function(err, data1) { fs.readFile('c.txt', 'utf-8', function(err, data2) { // ...... }) }) }) 复制代码
因为下一次的调用方法需要上一次方法的数据支持,则会导致回调函数一层套一层,出现回调地狱的情况,随着业务逻辑的增加也让代码变得难以维护。
当然在ES6中也出现了回调地狱的解决方法Promise,Promise采用链式调用让代码可以一节接一节而不是一层套一层,让代码更加简洁明了。
function readData(filePath) { return new Promise((resolve, reject) => { fs.readFile(filePath, 'utf-8', (err, data) => { if (err) reject(err); resolve(data); }) }); } readData('a.txt').then(res => { return readData('b.txt'); }).then(res => { return readData('c.txt'); }).then(res => { return readData('d.txt'); }).catch(err => { console.log(err); }) 复制代码
Rxjs在使用方式上与Promise很像
function readData(filePath) { return new Observable((observer) => { fs.readFile(filePath, 'utf-8', (err, data) => { if (err) observer.error(err); observer.next(data); }) }); } Rx.Observable .forkJoin(readData('a.txt'), readData('b.txt'), readData('c.txt')) .subscribe(data => console.log(data)); 复制代码
这样一来Rxjs与Promise都能优雅地解决回调地狱,但Rxjs比Promise更加强大,究竟强大在哪呢,我们稍后对比。
实现响应式编程
在基础概念中也提及了响应式编程的概念,响应式编程的思想广泛用于实际的项目中,下面就以实际项目中的代码片段演示:
//业务场景,在初始化加载中请求到表单数据 getLists() { const params: GroupListParams = { page: this.page }; setTimeout(() => this.loading.show(), 100); this.groupService.getGroupList(params).subscribe( data => { this.groupLists = data.list; setTimeout(() => this.loading.hide(), 100); this.paginationConfig$.next(data.pages); }, error => { setTimeout(() => this.loading.hide(), 100); throw new ServerError(error.message); } ); } getGroupList(params: GroupListParams): Observable<GroupListRes> { const sendParams = this.generateHttpParams(params); return this.http .get(API.GET_GROUP_LIST, { params: sendParams }) .pipe(map((resp: UglyResponse) => resp.data || {})); } 复制代码
在以上的代码中,getGroupList()产生一个Observable对象,并可对产生的数据通过pipe来处理数据,在getLists()中可以订阅getGroupList()中产生的数据,接收到data。这样就可以实现一个简单的响应式编程,每当调用一次getList()就可以产生新数据,订阅者可以接收到新产生的数据并在视图上及时更新。
实现数据流处理模式
我们有时面对复杂的业务常见,需要对数据进行层层筛选,如下图所示,Rxjs中所提供的API能让这些对数据的操作变得易如反掌。
//例子 import { of } from 'rxjs'; import { map } from 'rxjs/operators'; import { filter } from 'rxjs/operators'; of(1, 2, 3) .pipe(map((x) => x * x)) .pipe(filter((x)=>x<5)) .subscribe((v) => console.log(`value: ${v}`)); //控制台输出 value: 1 value: 4 复制代码
Rxjs优势的冰山一角
Rxjs作为一个非常强大的第三方库,相对于其他解决方案的优势也是很多,在这里笔者就列举一些自己所遇到的地方。
Rxjs/operator API
控制最大并发数
Promise解决方案
class PromisePool { constructor(max, fn) { this.max = max; // 最大并发数 this.fn = fn; // 自定义的请求函数 this.pool = []; // 并发池 this.urls = []; // 剩余的请求地址 } start(urls) { this.urls = urls; // 先循环把并发池塞满 while (this.pool.length < this.max) { let url = this.urls.shift(); this.setTask(url); } // 利用Promise.race 方法来获得并发池中某任务完成的信号 let race = Promise.race(this.pool); return this.run(race); } run(race) { race .then(res => { // 每当并发池跑完一个任务,就再塞入一个任务 let url = this.urls.shift(); this.setTask(url); return this.run(Promise.race(this.pool)); }); } setTask(url) { if (!url) return; let task = this.fn(url); this.pool.push(task); // 将该任务推入pool并发池中 console.log(`\x1B[43m ${url} 开始,当前并发数:${this.pool.length}`); task.then(res => { // 请求结束后将该Promise任务从并发池中移除 this.pool.splice(this.pool.indexOf(task), 1); console.log(`\x1B[43m ${url} 结束,当前并发数:${this.pool.length}`); }); } } // test const URLs = [ 'bytedance.com', 'tencent.com', 'alibaba.com', 'microsoft.com', 'apple.com', 'hulu.com', 'amazon.com' ]; let dur = 0; // 自定义请求函数 var requestFn = url => { return new Promise(resolve => { setTimeout(_ => { resolve(`任务 ${url} 完成`); }, 1000*dur++) }).then(res => { console.log('外部逻辑 ', res); }) } const pool = new PromisePool(3, requestFn); // 并发数为3 pool.start(URLs); 复制代码
Rxjs解决方案(mergeMap)
相比于Promise的需要自己造轮子,Rxjs则提供了现成的API可供使用
// 假设这是你的http请求函数 function httpGet(url) { return new Promise(resolve => setTimeout(() => resolve(`Result: ${url}`), 2000)); } const array = [ 'https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', ]; // mergeMap是专门用来处理并发处理的rxjs操作符 // mergeMap第二个参数2的意思是,from(array)每次并发量是2,只有promise执行结束才接着取array里面的数据 // mergeMap第一个参数httpGet的意思是每次并发,从from(array)中取的数据如何包装,这里是作为httpGet的参数 const source = from(array).pipe(mergeMap(httpGet, 2)).subscribe(val => console.log(val)); 复制代码
在线代码预览:控制并发
retry重试
Promise解决方案
//随机产生一个1-20随机数,小于10则成功,大于10则失败 function getData(){ let p = new Promise(function(resolve, reject){ setTimeout(function(){ var num = Math.ceil(Math.random()*20); //生成1-20随机数 console.log('随机数生成的值:',num) if(num<=10){ console.log('符合条件,值为'+num) resolve(num); } else{ reject('数字大于10了执行失败'); } }, 2000); }) return p } //封装myGetData添加重试功能 function myGetData(getData, times, delay) { return new Promise(function(resolve, reject) { function attempt () { getData().then(resolve).catch(function(erro) { console.log(`还有 ${times} 次尝试`) if (0 == times) { reject(erro) } else { times-- setTimeout(attempt(), delay) } }) } attempt() }) } // 执行函数,五次重试,每隔一秒执行一次 myGetData(getData,5,1000) //控制台输出 随机数生成的值: 13 还有 5 次尝试 随机数生成的值: 11 还有 4 次尝试 随机数生成的值: 4 符合条件,值为4 复制代码
Rxjs解决方案
// RxJS v6+ import { interval, of, throwError } from 'rxjs'; import { mergeMap, retry } from 'rxjs/operators'; // 每1秒发出值 const source = interval(1000); const example = source.pipe( mergeMap(val => { // 抛出错误以进行演示 if (val > 5) { return throwError('Error!'); } return of(val); }), // 出错的话可以重试2次 retry(2) ); /* 输出: 0..1..2..3..4..5.. 0..1..2..3..4..5.. 0..1..2..3..4..5.. "Error!: Retried 2 times then quit!" */ const subscribe = example.subscribe({ next: val => console.log(val), error: val => console.log(`${val}: Retried 2 times then quit!`) }); 复制代码
总结
Rxjs是一个非常强大的第三方库,相较于Promise,Rxjs封装了更多的API,也避免了使用Promise时重复造轮子的现象出现,可以满足更加多变的业务场景。
Rxjs实现数据的响应式编程与数据流处理也很强大,在Angular的项目中应用也十分广泛。
本博客介绍的只是Rxjs的冰山一角,如有兴趣可深入学习。
作者:XQ_dayday_up
链接:https://juejin.cn/post/7000605558159966215