1.Rxjs基本概念
从接触Angular2开始,一直在使用Rxjs但没有深究,最近抽空整体了解下Rxjs,略作整理。Rxjs可以理解为一个管理事件或任务序列(异步)的库,它提供了一个核心类型Observable可观察对象,卫星类型(围绕于Observable): Observer、Scheduler、Subject和操作符。这几个概念可以用下图来举例说明(windows画图工具随便画的,粗糙了些,示意还是清晰的):

Observable 可观察对象:是一串事件或消息的集合,在图中列举了一个需要发送一串数字序列的可观察对象。
Observer 观察者:是一组回调函数,用来处理Observable中的内容。在图中它接收了一串数字序列。
Subscription 订阅:Observable的执行,功能好比一个开关,可以订阅或取消订阅。在图中,如果订阅则它下游的Observer将收到数字;取消订阅,下游的Observer将收不到任何消息。一个Observable可以被多个Observer订阅。
Subject 主题:将Observable中的内容广播到多个Observer。在图中相当于一个多分支开关,如果主题被订阅则所有观察者都将收到同样的一串数字序列。
Scheduler 调度者:决定何时启动订阅和何时发送通知。如图所示,调度者不是必须的。在图中数字序列从Observable发出,经过调度,序列的顺序被改变了。
文中示例的Angular及Rxjs版本:
"@angular/core": "~4.0.0",
"rxjs": "5.0.1"
2.Rxjs简单示例
import { Observable} from 'rxjs/Rx';//引入可观察对象
public testRx1(){
//1.产生一个可观察对象,包括即时发送和延时发送
let myobservable = Observable. create(( observer : any)=>{
observer. next( 1); //发送数字1
observer. next( 2);
observer. next( 3);
setTimeout(() =>{ observer. next( 4);}, 1000); //1秒后发送数字4
});
//2.创建一个观察者仅包含next操作
let myobserver = { next:( x : any) => console. log( 'receive data:', x)};
console. log( 'before subscribe');
//3.对可观察对象进行订阅
let mysubscripton = myobservable . subscribe ( myobserver );
//4.可以取消订阅
//mysubscripton.unsubscribe();
console. log( 'after subscribe');
}
运行得到结果

3.Observable可观察对象
可观察对象是一串消息或事件的集合,可以发送多值。在“Rxjs简单示例“中可以看到,使用Observale.create创造了一个可观察对象myobservable,在里面首先发送1,2,3三个值,间隔1s后发送数字4。但是创造一个可观察对象的方法是多种多样的。可以通过操作符创建和事件流产生,如下例所示:
public createObservable(){
let myobserver = { next:( x : any) => console. log( x)};
//通过操作符产生observable
let myobservable1 = Observable. from([ 1, 2, 3, 4]). subscribe( myobserver); //运行结果:1,2,3
let myobservable2 = Observable. of( 'a', 'b', 'c'). subscribe( myobserver); //运行结果:a,b,c
let myobservable3 = Observable. interval( 1000). subscribe( myobserver); //运行结果:0,1,2,3,4,5,6,......
//通过事件流产生observable
let button = document. getElementById( 'mybtn');
let myobservable = Observable. fromEvent( button, 'click')
. throttleTime( 1000) //间隔一秒才允许响应click事件
. scan(( count : number) => count + 1, 0) //初始值为0开始累加
. subscribe(( count : number) => console. log( `clicked ${ count } times`)); //ES6模板字符串
//疯狂点击按钮,运行结果:
//clicked 1 times
//---间隔1秒---
//clicked 2 times
//---间隔1秒---
//clicked 3 times ......
}
4.Obsever观察者
所谓观察者,实质上是一组回调函数,当它订阅可观察对象时,提供三种类型回调函数(next/error/complete)。在“Rxjs简单示例“中可以看到,最简化情况下,仅仅提供一个next类型回调就可以。完整的情况如下例所示:
public createObserver(){
let myobserver = { next:( x : any) => console. log( x),
error:( error : any) => console. log( 'something wrong:', error),
complete:() => console. log( 'finished')
};
let myobservable1 = Observable. from([ 1, 2, 3, 4]). subscribe( myobserver); //运行结果:1,2,3,4,finished
}
也可以将三个回调函数分别作为参数传递给subscribe函数,如果只提供一个函数,则默认为next函数,如下例所示:
let myobservable1 = Observable. from([ 1, 2, 3, 4])
. subscribe(
( x : any) => console. log( x),
( error : any) => console. log( 'error:', error),
() => console. log( 'finished')
);
//运行结果:1,2,3,4,finished
仔细观察“
Rxjs简单示例”可以发现
使用Observale.create创造可观察对象时,里面有个参数observer,该observer就 是一个观察者,拥有next方法。
5.Subject主题
主题既是可观察对象,可以被订阅,例如:Subject.subscribe({next:(x)=>……})。
主题又是观察者对象,拥有next()/error()/complete()方法,调用Subject.next(1),则数据1将会被多播至Subject的观察者们。同时Subject可以观察其他的可观察对象,如下图所示:
import { Subject} from 'rxjs/Rx';
public CreateSub(){
let subject = new Subject();
let obsrver1 = { next:( x : any) => console. log( 'observer1:', x)}
let obsrver2 = { next:( x : any) => console. log( 'observer2:', x)}
subject. subscribe( obsrver1);
subject. subscribe( obsrver2);
subject. next( 1);
subject. next( 'a');
subject. next( 2);
}
运行结果:

Subject的扩展很丰富,例如:BehaviorSubject,ReplaySubject,AsyncSub等,后续章节再详细讲解。
6.Scheduler调度者
调度者控制着何时启动一个订阅和何时发送通知。它由三个组件构成:
1.数据结构:决定可观察对象中的任务或事件根据什么标准排列和存储。
2.执行上下文:决定何时任务被执行或事件发生。
3.虚拟时钟:可观察对象中的任务或事件将仅遵循该时钟表示的时间。
下面的例子使用observeOn操作符指定调度程序,注意与“Rxjs简单示例”的运行结果相比较
public testScheduler(){
//1.产生一个可观察对象,包括即时发送和延时发送
let myobservable = Observable. create(( observer : any) =>{
observer. next( 1); //发送数字1
observer. next( 2);
observer. next( 3);
setTimeout(() =>{ observer. next( 4);}, 1000); //1秒后发送数字4
}). observeOn( Scheduler. async);//调度者
//2.创建一个观察者仅包含next操作
let myobserver = { next:( x : any) => console. log( 'receive data:', x)};
console. log( 'before subscribe');
//3.对可观察对象进行订阅
var mysubscripton = myobservable. subscribe( myobserver);
//4.可以取消订阅
//mysubscripton.unsubscribe();
console. log( 'after subscribe');
}
本例的运行结果

对比下“
Rxjs简单示例”的运行结果

为何时序完全不同呢?前面提到过Observable.create里面有一个观察者,调度者通过observeOn对该观察者进行了调度,
让消息延时发送,才出现数据在“after subscribe“之后才被next出来的现象。
RxJS提供的内置调度器有三种,示例中使用的async调度器是其中之一。可能平时感觉没有使用到调度者,其实所有的操作
符处理并发时都可以选择调度器,例如 from([10, 20, 30], Scheduler.async)。如果不明确指明调度器则系统自动选择最合适的。
| 调度器 | 目的 |
|---|
null | 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。 |
Rx.Scheduler.queue | 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。 |
Rx.Scheduler.asap | 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。 |
Rx.Scheduler.async | 使用 setInterval 的调度。用于基于时间的操作符。 |
7.操作符
Rxjs强大的原因,源自于它丰富的操作符。前面例子中提到的from就是操作符。操作符按调用方法不同分为静态操作符和实例操
作符。示例:
let source = Observable . interval ( 1000 ) //产生从0开始的整数序列,每个数值间隔一秒
. take ( 10 ); //允许发送十个值后停止
let subscribe = source . subscribe (
( x : any ) => { this . count ++ ; console . log ( 'ret x:' , x )}
);

示例代码中,interval()就是典型的静态操作符,定义的Observable类上,需要通过Observable调用。输入一个非可观察对象的参数,
得到一个全新的可观察对象,假设为newObsv。代码中的take()就是典型的实例操作符,通过实例newObsv调用,产生新的可观察对
象,不且不会改变当前的newObsv。