Skip to content
On this page

rxjs

RxJS(Reactive Extensions for JavaScript)是一个用于构建异步和事件驱动程序的库,基于可观察序列的概念。RxJS 提供了强大的功能来处理异步数据流,使得编写复杂的异步操作变得更加简单和高效。

核心概念

1. Observable (可观察对象)

  • 可观察对象是 RxJS 的核心,用于表示一个可以被观察的数据流。可以从各种来源创建 Observable,比如事件、HTTP 请求、定时器等。

  • 示例:

    JavaScript
    import { Observable } from 'rxjs';
    
    const observable = new Observable(subscriber => {
      subscriber.next('Hello');
      subscriber.next('World');
      subscriber.complete();
    });
    
    observable.subscribe(value => console.log(value));
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
  1. Observer (观察者)

    • 观察者是一个回调对象,用于响应 Observable 发出的数据、错误或完成通知。观察者包含 nexterrorcomplete 三个方法。

    • 示例:

      JavaScript
      const observer = {
        next: value => console.log(value),
        error: err => console.error(err),
        complete: () => console.log('Completed')
      };
      
      observable.subscribe(observer);
      
      
      1
      2
      3
      4
      5
      6
      7
      8
  2. Subscription (订阅)

    • 订阅表示对 Observable 的订阅过程,订阅后,Observable 会开始发出数据。可以通过调用 unsubscribe 方法取消订阅。
    • 示例:
      JavaScript
      const subscription = observable.subscribe(observer);
      subscription.unsubscribe();
      
      1
      2
  3. Operators (操作符)

    • 操作符是用于转换、过滤和组合 Observable 的函数。RxJS 提供了丰富的操作符来处理数据流,如 mapfiltermergeconcatswitchMap 等。

    • 示例:

      JavaScript
      import { from } from 'rxjs';
      import { map, filter } from 'rxjs/operators';
      
      const numbers = from([1, 2, 3, 4, 5]);
      const squareOdd = numbers.pipe(
        filter(n => n % 2 !== 0),
        map(n => n * n)
      );
      
      squareOdd.subscribe(value => console.log(value));
      
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
  4. Subject (主题)

    • Subject 是一种特殊的 Observable,它既是 Observable 也是 Observer。可以用来多播值给多个订阅者。

    • 示例:

      typescript
      import { Subject } from 'rxjs';
      
      const subject = new Subject();
      
      subject.subscribe(value => console.log(`Observer A: ${value}`));
      subject.subscribe(value => console.log(`Observer B: ${value}`));
      
      subject.next('Hello');
      subject.next('World');
      
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10

常用操作符

  1. 创建操作符

    • of: 创建一个发出指定值的 Observable。
    • from: 从数组、Promise、迭代器或 Observable-like 对象创建 Observable。
    • interval: 创建一个发出递增数字的 Observable,使用定时器。
  2. 转换操作符

    • map: 将 Observable 发出的每个值应用一个函数,返回新的值。
    • switchMap: 映射每个值为一个新的 Observable,并取消前一个内部 Observable 的订阅。
  3. 过滤操作符

    • filter: 过滤出符合条件的值。
    • take: 只取前 n 个值。
  4. 组合操作符

    • merge: 将多个 Observable 合并成一个。
    • concat: 按顺序串联多个 Observable。
  5. 错误处理操作符

    • catchError: 捕获错误并返回一个新的 Observable 或抛出错误。
    • retry: 在发生错误时重新订阅源 Observable。

代码示例

以下是一个综合使用 RxJS 核心概念和操作符的示例:

typescript
import { of, from, interval, Subject } from 'rxjs';
import { map, filter, switchMap, take, catchError } from 'rxjs/operators';

// 创建一个简单的 Observable
const observable = of(1, 2, 3, 4, 5);

observable
  .pipe(
    filter(x => x % 2 === 0),
    map(x => x * x)
  )
  .subscribe(value => console.log(`Filtered and squared: ${value}`));

// 从数组创建 Observable
const arrayObservable = from([10, 20, 30, 40, 50]);

arrayObservable
  .pipe(
    map(x => x / 10),
    take(3)
  )
  .subscribe(value => console.log(`Mapped and taken: ${value}`));

// 使用 interval 创建定时 Observable
const intervalObservable = interval(1000);

intervalObservable
  .pipe(
    take(5),
    switchMap(val => of(`Interval value: ${val}`))
  )
  .subscribe(value => console.log(value));

// 使用 Subject 进行多播
const subject = new Subject();

subject.subscribe(value => console.log(`Observer A: ${value}`));
subject.subscribe(value => console.log(`Observer B: ${value}`));

subject.next('Hello');
subject.next('World');

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

通过 RxJS,开发者可以更容易地处理复杂的异步操作和事件驱动的编程模式,提高代码的可读性和可维护性。

沪ICP备20006251号-1