Appearance
rxjs
RxJS(Reactive Extensions for JavaScript)是一个用于构建异步和事件驱动程序的库,基于可观察序列的概念。RxJS 提供了强大的功能来处理异步数据流,使得编写复杂的异步操作变得更加简单和高效。
核心概念
1. Observable (可观察对象):
可观察对象是 RxJS 的核心,用于表示一个可以被观察的数据流。可以从各种来源创建 Observable,比如事件、HTTP 请求、定时器等。
示例:
JavaScriptimport { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next('Hello'); subscriber.next('World'); subscriber.complete(); }); observable.subscribe(value => console.log(value));
1
2
3
4
5
6
7
8
9
10
Observer (观察者):
观察者是一个回调对象,用于响应 Observable 发出的数据、错误或完成通知。观察者包含
next
、error
和complete
三个方法。示例:
JavaScriptconst observer = { next: value => console.log(value), error: err => console.error(err), complete: () => console.log('Completed') }; observable.subscribe(observer);
1
2
3
4
5
6
7
8
Subscription (订阅):
- 订阅表示对 Observable 的订阅过程,订阅后,Observable 会开始发出数据。可以通过调用
unsubscribe
方法取消订阅。 - 示例:JavaScript
const subscription = observable.subscribe(observer); subscription.unsubscribe();
1
2
- 订阅表示对 Observable 的订阅过程,订阅后,Observable 会开始发出数据。可以通过调用
Operators (操作符):
操作符是用于转换、过滤和组合 Observable 的函数。RxJS 提供了丰富的操作符来处理数据流,如
map
、filter
、merge
、concat
、switchMap
等。示例:
JavaScriptimport { from } from 'rxjs'; import { map, filter } from 'rxjs/operators'; const numbers = from([1, 2, 3, 4, 5]); const squareOdd = numbers.pipe( filter(n => n % 2 !== 0), map(n => n * n) ); squareOdd.subscribe(value => console.log(value));
1
2
3
4
5
6
7
8
9
10
11
Subject (主题):
Subject 是一种特殊的 Observable,它既是 Observable 也是 Observer。可以用来多播值给多个订阅者。
示例:
typescriptimport { Subject } from 'rxjs'; const subject = new Subject(); subject.subscribe(value => console.log(`Observer A: ${value}`)); subject.subscribe(value => console.log(`Observer B: ${value}`)); subject.next('Hello'); subject.next('World');
1
2
3
4
5
6
7
8
9
10
常用操作符
创建操作符:
of
: 创建一个发出指定值的 Observable。from
: 从数组、Promise、迭代器或 Observable-like 对象创建 Observable。interval
: 创建一个发出递增数字的 Observable,使用定时器。
转换操作符:
map
: 将 Observable 发出的每个值应用一个函数,返回新的值。switchMap
: 映射每个值为一个新的 Observable,并取消前一个内部 Observable 的订阅。
过滤操作符:
filter
: 过滤出符合条件的值。take
: 只取前 n 个值。
组合操作符:
merge
: 将多个 Observable 合并成一个。concat
: 按顺序串联多个 Observable。
错误处理操作符:
catchError
: 捕获错误并返回一个新的 Observable 或抛出错误。retry
: 在发生错误时重新订阅源 Observable。
代码示例
以下是一个综合使用 RxJS 核心概念和操作符的示例:
typescript
import { of, from, interval, Subject } from 'rxjs';
import { map, filter, switchMap, take, catchError } from 'rxjs/operators';
// 创建一个简单的 Observable
const observable = of(1, 2, 3, 4, 5);
observable
.pipe(
filter(x => x % 2 === 0),
map(x => x * x)
)
.subscribe(value => console.log(`Filtered and squared: ${value}`));
// 从数组创建 Observable
const arrayObservable = from([10, 20, 30, 40, 50]);
arrayObservable
.pipe(
map(x => x / 10),
take(3)
)
.subscribe(value => console.log(`Mapped and taken: ${value}`));
// 使用 interval 创建定时 Observable
const intervalObservable = interval(1000);
intervalObservable
.pipe(
take(5),
switchMap(val => of(`Interval value: ${val}`))
)
.subscribe(value => console.log(value));
// 使用 Subject 进行多播
const subject = new Subject();
subject.subscribe(value => console.log(`Observer A: ${value}`));
subject.subscribe(value => console.log(`Observer B: ${value}`));
subject.next('Hello');
subject.next('World');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
通过 RxJS,开发者可以更容易地处理复杂的异步操作和事件驱动的编程模式,提高代码的可读性和可维护性。