Skip to content

13 RxJS

13.1 Overview

13.1.1 What is RxJS?

RXIS是一个用于处理异步编程的 Javascript 库,目标是使编写异步和基于回调的代码更容易。

13.1.2 为什么要学习 RxJS?

就像 Angular 深度集成 TypeScript 一样,Angular 也深度集成了 RXJS。

服务、表单、事件、全局状态管理、异步请求⋯

13.1.3 快速入门

  1. 可观察对象(Observable): 类比 Promise 对象,内部可以用于执行异步代码,通过调用内部提供的方法将异步代码执行的结果传递到可观察对象外部。

  2. 观察者(Observer): 类比then 方法中的回调函数,用于接收可观察对象中传递出来数据。

  3. 订阅(subscribe): 类比 then 方法,通过订阅将可观察对象和观察者连接起来,当可观察对象发出数据时,订阅者可以接收到数据。

Screenshot 2025-01-26 at 23.27.08

https://www.bilibili.com/video/BV1WP4y187Tu/?spm_id_from=333.337.search-card.all.click&vd_source=73e7d2c4251a7c9000b22d21b70f5635

import { Observable } from "rxjs"
const observable = new Observable( function(observer) {
    setTimeout(function(){
        observer.next({
            name:"zhangsan"
        })
    }, 2000)
})

const observer = {
    next: function(value) {
        console.log(value)
    }
}
observable.subscribe(observer)

src/app/components/rxjs/rxjs-observable/rxjs-observable.component.ts:

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-rxjs-observable',
  templateUrl: './rxjs-observable.component.html',
  styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {

  constructor() { }


  ngOnInit(): void { // 组件初始化的时候调用
    // 你可以在这里写你的代码
    const observable = new Observable(function (observer) {
      setTimeout(function () {
        observer.next({
          name: "zhangsan"
        })
      }, 2000)
    })

    const observer = {
      next: function (value: any) {
        console.log(value)
      }
    }
    observable.subscribe(observer)
  }

  // 解释下上述代码
  // observable 是一个可观察对象, 你可以理解为一个事件源
  // observer 是一个观察者, 你可以理解为一个事件处理函数
  // observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
  // observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
  // setTimeout 是一个定时器, 2秒后执行 observer.next 函数
  // observer.next 函数的参数是一个对象, 你可以理解为事件的参数
  // 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
  // 你可以在浏览器的控制台中, 查看输出的结果

  // 一定要写在ngOnInit里面吗?
  // 不是的, 你可以写在任何地方
  // 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
  // 组件的生命周期钩子函数, 是在组件初始化的时候调用的
  // 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数


}

Screenshot 2025-02-06 at 13.02.39

Screenshot 2025-02-06 at 13.02.58

13.2 Observable

13.2.1 Observable

  1. 在 Observable 对象内部可以多次调用 next 方法向外发送数据。
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-rxjs-observable',
  templateUrl: './rxjs-observable.component.html',
  styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {

  constructor() { }


  ngOnInit(): void { // 组件初始化的时候调用
    // 你可以在这里写你的代码

    const observable = new Observable(function (observer) {
      let index = 0;
      setInterval(function () {
        observer.next(index++)
      }, 1000)
      // setTimeout(function () {
      //   observer.next({
      //     name: "zhangsan"
      //   })
      // }, 2000)
    })

    const observer = {
      next: function (value: any) {
        console.log(value)
      }
    }
    observable.subscribe(observer)
  }

  // 解释下上述代码
  // observable 是一个可观察对象, 你可以理解为一个事件源
  // observer 是一个观察者, 你可以理解为一个事件处理函数
  // observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
  // observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
  // setTimeout 是一个定时器, 2秒后执行 observer.next 函数
  // observer.next 函数的参数是一个对象, 你可以理解为事件的参数
  // 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
  // 你可以在浏览器的控制台中, 查看输出的结果

  // 一定要写在ngOnInit里面吗?
  // 不是的, 你可以写在任何地方
  // 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
  // 组件的生命周期钩子函数, 是在组件初始化的时候调用的
  // 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数


}

Screenshot 2025-02-06 at 13.18.47

  1. 当所有数据发送完成以后,可以调用complete 方法终止数据发送。
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-rxjs-observable',
  templateUrl: './rxjs-observable.component.html',
  styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {

  constructor() { }


  ngOnInit(): void { // 组件初始化的时候调用
    // 你可以在这里写你的代码

    const observable = new Observable(function (observer) {
      let index = 0;
      let timer = setInterval(function () {
        observer.next(index++);
        if (index > 3) {
          observer.complete();
          clearInterval(timer);
        }
      }, 1000)
      // setTimeout(function () {
      //   observer.next({
      //     name: "zhangsan"
      //   })
      // }, 2000)

    })

    const observer = {
      next: function (value: any) {
        console.log(value)
      },
      complete: function () {
        console.log("complete")
      }
    }
    observable.subscribe(observer)
  }

  // 解释下上述代码
  // observable 是一个可观察对象, 你可以理解为一个事件源
  // observer 是一个观察者, 你可以理解为一个事件处理函数
  // observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
  // observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
  // setTimeout 是一个定时器, 2秒后执行 observer.next 函数
  // observer.next 函数的参数是一个对象, 你可以理解为事件的参数
  // 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
  // 你可以在浏览器的控制台中, 查看输出的结果

  // 一定要写在ngOnInit里面吗?
  // 不是的, 你可以写在任何地方
  // 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
  // 组件的生命周期钩子函数, 是在组件初始化的时候调用的
  // 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数


}

Screenshot 2025-02-06 at 13.20.37

  1. 当 Observable 内部逻辑发生错误时,可以调用 error 方法将失败信息发送给订阅者,Observable终止。调用next不会有效
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-rxjs-observable',
  templateUrl: './rxjs-observable.component.html',
  styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {

  constructor() { }


  ngOnInit(): void { // 组件初始化的时候调用
    // 你可以在这里写你的代码

    const observable = new Observable(function (observer) {
      let index = 0;
      let timer = setInterval(function () {
        observer.next(index++);
        if (index > 3) {
          // observer.complete();
          observer.error("error"); // 触发错误
          clearInterval(timer);
        }
      }, 1000)
      // setTimeout(function () {
      //   observer.next({
      //     name: "zhangsan"
      //   })
      // }, 2000)

    })

    const observer = {
      next: function (value: any) {
        console.log(value)
      },
      complete: function () {
        console.log("complete")
      },
      error: function (error: any) {
        console.log(error)
      }
    }
    observable.subscribe(observer)
  }

  // 解释下上述代码
  // observable 是一个可观察对象, 你可以理解为一个事件源
  // observer 是一个观察者, 你可以理解为一个事件处理函数
  // observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
  // observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
  // setTimeout 是一个定时器, 2秒后执行 observer.next 函数
  // observer.next 函数的参数是一个对象, 你可以理解为事件的参数
  // 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
  // 你可以在浏览器的控制台中, 查看输出的结果

  // 一定要写在ngOnInit里面吗?
  // 不是的, 你可以写在任何地方
  // 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
  // 组件的生命周期钩子函数, 是在组件初始化的时候调用的
  // 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数


}

Screenshot 2025-02-06 at 13.22.27

  1. 可观察对象是惰性的,只有被订阅后才会执行
const observable = new Observable (function () {
 console.1og ("Hello IRxJS" )
 } )
 // observable. subscribe ()
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-rxjs-observable',
  templateUrl: './rxjs-observable.component.html',
  styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {

  constructor() { }


  ngOnInit(): void { // 组件初始化的时候调用
    // 你可以在这里写你的代码
    const observable = new Observable(function (observer) {
      console.log("Hello RxJS");
    });
  }


}

Screenshot 2025-02-06 at 13.25.32

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-rxjs-observable',
  templateUrl: './rxjs-observable.component.html',
  styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {

  constructor() { }


  ngOnInit(): void { // 组件初始化的时候调用
    // 你可以在这里写你的代码
    const observable = new Observable(function (observer) {
      console.log("Hello RxJS");
    });
    observable.subscribe();
  }
}

Screenshot 2025-02-06 at 13.26.20

  1. 可观察对象可以有n多订阅者,每次被订阅时都会得到执行

Screenshot 2025-02-06 at 13.26.55

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-rxjs-observable',
  templateUrl: './rxjs-observable.component.html',
  styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {

  constructor() { }


  ngOnInit(): void { // 组件初始化的时候调用
    // 你可以在这里写你的代码
    const observable = new Observable(function (observer) {
      console.log("Hello RxJS");
    });
    observable.subscribe();
    observable.subscribe();
    observable.subscribe();
    observable.subscribe();
  }
}

Screenshot 2025-02-06 at 13.27.34

13.2.2 Subject

  1. 用于创建空的可观察对象,在订阅后不会立即执行,next 方法可以在可观察对象外部调用

src/app/components/rxjs/rxjs-subject/rxjs-subject.component.ts:

import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';

@Component({
  selector: 'app-rxjs-subject',
  templateUrl: './rxjs-subject.component.html',
  styleUrls: ['./rxjs-subject.component.css']
})
export class RxjsSubjectComponent implements OnInit {

  demoSubject: Subject<String> = new Subject<String>();
  // 不能在 class 里直接使用 const,而应该用 this.demoSubject 来定义类属性。
  // 把 demoSubject 作为类的属性,这样可以在类的 ngOnInit() 生命周期内访问


  constructor() { }

  ngOnInit(): void {
    // 在 ngOnInit() 里订阅 Subject 并发送值,这样确保在组件初始化时运行。
    // 订阅 Subject
    this.demoSubject.subscribe({
      next: function (value) {
        console.log(value);
      }
    });

    this.demoSubject.subscribe({
      next: function (value) {
        console.log(value);
      }
    });
    //    // 2 秒后发射值
    setTimeout(() => {
      this.demoSubject.next('Hello');
    }, 2000);
    // setTimeout 放在类体外:setTimeout 需要在 ngOnInit 或者其他方法中调用,而不是直接写在类体内。
    // 箭头函数 => 替换 function,避免 this 上下文丢失问题
  }


}

Screenshot 2025-02-09 at 17.42.57

广播的时候.

不同组件的数据共享, 在不同的组件中引入这个可观察对象并且订阅, 可以向服务端发送请求获取数据.

13.2.3 BehaviorSubject

拥有 Subject 全部功能,但是在创建 Obervable 对象时可以传入默认值,观察者订阅后可以直接拿到默认值。

src/app/components/rxjs/rxjs-behavior-subject/rxjs-behavior-subject.component.ts:

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject } from 'rxjs';

@Component({
  selector: 'app-rxjs-behavior-subject',
  templateUrl: './rxjs-behavior-subject.component.html',
  styleUrls: ['./rxjs-behavior-subject.component.css']
})
export class RxjsBehaviorSubjectComponent implements OnInit {
  demoBehaviorSubject = new BehaviorSubject<String>('Initial Value');
  // BehaviorSubject 必须有初始值

  constructor() { }

  ngOnInit(): void {
    // 订阅 BehaviorSubject
    this.demoBehaviorSubject.subscribe({
      next: function (value) { // next是接收值的回调函数
        console.log(value);
      }
    });

    this.demoBehaviorSubject.next('Hello');
    // next 是发送值的方法
    // 这行代码会触发订阅者的回调函数,输出 'Hello'
    // 这是Observale 给订阅者发送值的方式
  }

}
// BehaviorSubject 是一种特殊的 Subject,它在创建时需要一个初始值,并且总是会返回当前的值。
// 它的主要特点是:
// 1. 必须有初始值:BehaviorSubject 在创建时必须提供一个初始值。
// 2. 当前值:BehaviorSubject 总是会返回最近发送的值,即使在订阅之后。
// 3. 缓存值:BehaviorSubject 会缓存最近发送的值,新的订阅者会立即收到这个值。
// 4. 订阅者:BehaviorSubject 可以有多个订阅者,所有订阅者都会收到相同的值。
// 5. 完成和错误:BehaviorSubject 也可以完成或发送错误,完成后无法再发送新值。
// BehaviorSubject 的使用场景:
// 1. 状态管理:BehaviorSubject 非常适合用于状态管理,因为它总是能提供最新的状态。
// 2. 表单处理:在表单处理中,可以使用 BehaviorSubject 来管理表单的状态和验证。
// 3. 数据流:BehaviorSubject 可以用于处理数据流,例如在组件之间传递数据。
// 4. 缓存数据:BehaviorSubject 可以用于缓存数据,例如从服务器获取的数据。
// 例子:
// 1. 创建 BehaviorSubject
// 2. 订阅 BehaviorSubject
// 3. 发送新值

Screenshot 2025-02-09 at 17.46.58

13.2.4 ReplySubject

功能类似 Subject,但有新订阅者时两者处理方式不同,Subject 不会广播历史结果,而ReplaySubject 会广播所有历史结果。

src/app/components/rxjs/rxjs-replay-subject/rxjs-replay-subject.component.ts:

import { Component, OnInit } from '@angular/core';
import { ReplaySubject } from 'rxjs';

@Component({
  selector: 'app-rxjs-replay-subject',
  templateUrl: './rxjs-replay-subject.component.html',
  styleUrls: ['./rxjs-replay-subject.component.css']
})
export class RxjsReplaySubjectComponent implements OnInit {
  rSubject = new ReplaySubject();


  constructor() { }

  ngOnInit(): void {
    this.rSubject.subscribe(value => {
      console.log(value);
    });

    this.rSubject.next('Hello 1');
    this.rSubject.next('Hello 2');

    setTimeout(() => {
      this.rSubject.subscribe({
        next: function (value) {
          console.log(value);
        }
      });
    }, 2000);

  }

}

13.3 辅助方法

13.3.1 range

range(start, length), 调用方法后返回observable对象, 被订阅后会发出指定范围的数值.

Screenshot 2025-02-09 at 21.43.55

src/app/components/rxjs/helper-method/helper-method.component.ts:

import { Component, OnInit } from '@angular/core';
import { range } from 'rxjs';
@Component({
  selector: 'app-helper-method',
  templateUrl: './helper-method.component.html',
  styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    // rxjs 的辅助方法
    // 如下代码是在组件初始化时执行的
    // 会在控制台打印出 1-9
    range(1, 10).subscribe(function (value) {
      console.log(value);
    });
    // 要写在方法体内,不能写在类体内
    // 为什么?
    // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
  }



}

方法内部并不是一次发出length个数值, 而是发送了length次, 每次发送一个数值, 就是说内部调用了length次next方法.

Screenshot 2025-02-09 at 21.53.43

13.3.2 of

将参数列表作为数据流返回

Screenshot 2025-02-10 at 11.57.27

  // ----------------
  // of
  ngOnInit(): void {
    of("a", 1, true, {}, []).subscribe(console.log);
  }

Screenshot 2025-02-10 at 12.02.36

13.3.3 from

将Array, Promise, Iterator转换为observable对象

Screenshot 2025-02-09 at 22.19.33

src/app/components/rxjs/helper-method/helper-method.component.ts:

import { Component, OnInit } from '@angular/core';
import { from, range } from 'rxjs';
@Component({
  selector: 'app-helper-method',
  templateUrl: './helper-method.component.html',
  styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {

  constructor() { }
  // from
  ngOnInit(): void {
    from(["a", "b", "c"]).subscribe(console.log);
    from(this.p()).subscribe(console.log);
  }


  p() {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve({ a: 1 });
      }, 2000);
    });
  }


}

Screenshot 2025-02-09 at 22.25.39

13.3.4 interval, timer

interval: 每隔一段时间发出一个数值, 数值递增

Screenshot 2025-02-10 at 10.24.55

src/app/components/rxjs/helper-method/helper-method.component.ts:

import { Component, OnInit } from '@angular/core';
import { forkJoin, from, fromEvent, interval, map, range } from 'rxjs';
import axios from 'axios';
@Component({
  selector: 'app-helper-method',
  templateUrl: './helper-method.component.html',
  styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {

  constructor() { }
  // --------
  // interval
  ngOnInit(): void {
    interval(1000).subscribe(console.log);
  }

}

Screenshot 2025-02-10 at 10.30.22

timer: 间隔时间过去以后发出数值, 行为终止, 或间隔时间发出数值后, 继续按第二个参数的时间间隔继续发出值.

Screenshot 2025-02-10 at 10.25.05

13.3.5 concat

13.3.6 merge

13.3.7 combineLatest

13.3.8 zip

13.3.9 forkJoin

forkjoin是Rx版本的Promise.all(), 即表示等到所有的Observable都完成后, 才一次性返回值.

Screenshot 2025-02-09 at 22.40.18

import { Component, OnInit } from '@angular/core';
import { forkJoin, from, range } from 'rxjs';
import axios from 'axios';
@Component({
  selector: 'app-helper-method',
  templateUrl: './helper-method.component.html',
  styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {

  constructor() { }
  // range
  // ngOnInit(): void {
  //   // rxjs 的辅助方法
  //   // 如下代码是在组件初始化时执行的
  //   // 会在控制台打印出 1-9
  //   range(1, 10).subscribe(function (value) {
  //     console.log(value);
  //   });
  //   // 要写在方法体内,不能写在类体内
  //   // 为什么?
  //   // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
  // }


  // from
  // ngOnInit(): void {
  //   from(["a", "b", "c"]).subscribe(console.log);
  //   from(this.p()).subscribe(console.log);
  // }


  // p() {
  //   return new Promise((resolve) => {
  //     setTimeout(() => {
  //       resolve({ a: 1 });
  //     }, 2000);
  //   });
  // }

  // -----------
  //forkJoin
  // example:
  // ngOnInit(): void {
  //   forkJoin({
  //     goods: from(axios.get('https://api.example.com/goods')), // 假设这是获取商品的 API
  //     category: from(axios.get('https://api.example.com/category')), // 假设这是获取分类的 API
  //   }).subscribe(console.log);
  // }
  ngOnInit(): void {
    const mockGoods = [
      { id: 1, name: 'Apple', price: 1.5 },
      { id: 2, name: 'Banana', price: 0.9 },
      { id: 3, name: 'Cherry', price: 2.0 }
    ];

    const mockCategory = [
      { id: 1, name: 'Fruits' },
      { id: 2, name: 'Vegetables' }
    ];

    forkJoin({
      goods: from(Promise.resolve({ data: mockGoods })),
      category: from(Promise.resolve({ data: mockCategory }))
    }).subscribe(console.log);
  }




}

Screenshot 2025-02-09 at 23.13.21

import { Component, OnInit } from '@angular/core';
import { forkJoin, from, range } from 'rxjs';
import axios from 'axios';
@Component({
  selector: 'app-helper-method',
  templateUrl: './helper-method.component.html',
  styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {

  constructor() { }
  // range
  // ngOnInit(): void {
  //   // rxjs 的辅助方法
  //   // 如下代码是在组件初始化时执行的
  //   // 会在控制台打印出 1-9
  //   range(1, 10).subscribe(function (value) {
  //     console.log(value);
  //   });
  //   // 要写在方法体内,不能写在类体内
  //   // 为什么?
  //   // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
  // }


  // from
  // ngOnInit(): void {
  //   from(["a", "b", "c"]).subscribe(console.log);
  //   from(this.p()).subscribe(console.log);
  // }


  // p() {
  //   return new Promise((resolve) => {
  //     setTimeout(() => {
  //       resolve({ a: 1 });
  //     }, 2000);
  //   });
  // }

  // -----------
  //forkJoin
  // example:
  // ngOnInit(): void {
  //   forkJoin({
  //     goods: from(axios.get('https://api.example.com/goods')), // 假设这是获取商品的 API
  //     category: from(axios.get('https://api.example.com/category')), // 假设这是获取分类的 API
  //   }).subscribe(console.log);
  // }
  ngOnInit(): void {
    const mockGoods = [
      { id: 1, name: 'Apple', price: 1.5 },
      { id: 2, name: 'Banana', price: 0.9 },
      { id: 3, name: 'Cherry', price: 2.0 }
    ];

    const mockCategory = [
      { id: 1, name: 'Fruits' },
      { id: 2, name: 'Vegetables' }
    ];

    // forkJoin({
    //   goods: from(Promise.resolve({ data: mockGoods })),
    //   category: from(Promise.resolve({ data: mockCategory }))
    // }).subscribe(console.log);
    // 只想要用data里的数据
    forkJoin({
      goods: from(Promise.resolve({ data: mockGoods })),
      category: from(Promise.resolve({ data: mockCategory }))
    }).subscribe(response => console.log(response.goods.data, response.category.data));

  }

Screenshot 2025-02-09 at 23.16.26

13.3.10 throwError

13.3.11 retry

13.3.12 race

13.3.13 fromEvent

将事件转换为Observable.

import { Component, OnInit } from '@angular/core';
import { forkJoin, from, fromEvent, map, range } from 'rxjs';
import axios from 'axios';
@Component({
  selector: 'app-helper-method',
  templateUrl: './helper-method.component.html',
  styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    const button = document.getElementById('myButton');

    if (button) {
      fromEvent(button, 'click').subscribe(console.log);
    }

    if (button) {
      fromEvent(button, 'click')
        .pipe(
          map(event => event.target)
        )
        .subscribe(console.log);
    }

  }

Screenshot 2025-02-09 at 23.44.01

13.4 操作符

  1. 数据流:从可观察对象内部输出的数据就是数据流,可观察对象内部可以向外部源源不断的输出数据。
  2. 操作符:用于操作数据流,可以对象数据流进行转换,过滤等等操作。

13.4.1 map, mapTo

map:对数据流进行转换,基于原有值进行转换。

Screenshot 2025-02-09 at 21.54.26

src/app/components/rxjs/operators/operators.component.ts:

import { Component, OnInit } from '@angular/core';
import { map, range } from 'rxjs';

@Component({
  selector: 'app-operators',
  templateUrl: './operators.component.html',
  styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    // rxjs 的辅助方法
    // 如下代码是在组件初始化时执行的
    // 会在控制台打印出 1-9
    range(1, 10)
      .pipe(map(n => n * 10))
      .subscribe(function (value) {
        console.log(value);
      });
    // 要写在方法体内,不能写在类体内
    // 为什么?
    // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
  }

}

Screenshot 2025-02-09 at 21.57.18

mapTo:对数据流进行转换,不关心原有值,可以直接传入要转换后的值。

13.4.2 filter

13.4.3 pluck

获取数据流对象中的属性值

Screenshot 2025-02-09 at 23.46.22

src/app/components/rxjs/operators/operators.component.ts:

import { Component, OnInit } from '@angular/core';
import { fromEvent, map, pluck, range } from 'rxjs';

@Component({
  selector: 'app-operators',
  templateUrl: './operators.component.html',
  styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {

  constructor() { }

  // pipeable operators
  // ngOnInit(): void {
  //   // rxjs 的辅助方法
  //   // 如下代码是在组件初始化时执行的
  //   // 会在控制台打印出 1-9
  //   range(1, 10)
  //     .pipe(map(n => n * 10))
  //     .subscribe(function (value) {
  //       console.log(value);
  //     });
  //   // 要写在方法体内,不能写在类体内
  //   // 为什么?
  //   // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
  // }


  // plugin operators
  ngOnInit(): void {
    const button = document.getElementById('button');
    if (button) {
      fromEvent(button, 'click')
        .pipe(pluck("target"))
        .subscribe(console.log);
    }
  }


}

Screenshot 2025-02-10 at 10.10.36

13.4.4 first

13.4.5 startWith

13.4.6 every

13.4.7 delay, delayWhen

13.4.8 take, takeWhile, takeUtil

take: 获取数据流中的前几个

Screenshot 2025-02-10 at 10.50.33

import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap } from 'rxjs';

@Component({
  selector: 'app-operators',
  templateUrl: './operators.component.html',
  styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    range(1, 10).subscribe(console.log);
  }


}

Screenshot 2025-02-10 at 10.52.33

import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap, take } from 'rxjs';

@Component({
  selector: 'app-operators',
  templateUrl: './operators.component.html',
  styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {

  constructor() { }
  ngOnInit(): void {
    range(1, 10).pipe(take(5)).subscribe(console.log);
  }


}

Screenshot 2025-02-10 at 10.53.19

takeWhile: 根据条件从数据源前面开始获取.

Screenshot 2025-02-10 at 10.50.47

  ngOnInit(): void {
    range(1, 10).pipe(takeWhile(n => n < 3)).subscribe(console.log);
  }

Screenshot 2025-02-10 at 10.55.06

takeUntil: 接收可观察对象, 当可观察对象发出值时, 终止主数据源

Screenshot 2025-02-10 at 10.55.24

  ngOnInit(): void {
    const button = document.getElementById('button');

    if (button) {
      fromEvent(button, 'mousemove').subscribe(console.log);
    }
  }

Screenshot 2025-02-10 at 11.21.13

  // -----
  // takeUntil
  ngOnInit(): void {
    const button = document.getElementById('button');

    if (button) {
      fromEvent(button, 'mousemove')
        .pipe(
          takeUntil(fromEvent(button, 'click')
          )
        )
        .subscribe(console.log);
    }
  }

Screenshot 2025-02-10 at 11.23.55

13.4.9 skip, skipWhile, skipUtil

13.4.10 last

13.4.11 concatAll, concatMap

13.4.13 reduce, scan

13.4.14 mergeAll, mergeMap

13.4.15 throttleTime

节流,可观察对象高频次向外部发出数据流,通过 throttleTime 限制在规定时间内每次只向订阅者传递一次数据流。

Screenshot 2025-02-10 at 11.25.18

import { Component, OnInit } from '@angular/core';
import { from, fromEvent, interval, map, pluck, range, switchMap, take, takeUntil, takeWhile, throttle, throttleTime } from 'rxjs';

@Component({
  selector: 'app-operators',
  templateUrl: './operators.component.html',
  styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {

  constructor() { }

  // ----
  // throttleTime
  ngOnInit(): void {
    fromEvent(document, 'click').pipe(
      throttleTime(2000) // 2 秒内只取第一个点击事件
    ).subscribe(console.log);
  }

}

Screenshot 2025-02-10 at 11.31.31

13.4.16 debounceTime

防抖,触发高频事件,只响应最后一次。

Screenshot 2025-02-10 at 11.31.52

  // --- 
  // debounceTime
  ngOnInit(): void {
    fromEvent(document, 'click').pipe(
      debounceTime(2000) // 2 秒内只取最后一个点击事件
    ).subscribe(console.log);
  }

Screenshot 2025-02-10 at 11.33.21

13.4.17 distinctUntilChanged

检测数据源当前发出的数据流是否和上次发出的相同, 如相同, 跳过, 不相同, 发出.

Screenshot 2025-02-10 at 12.03.26

  // ---
  // distinctUntilChanged
  ngOnInit(): void {
    of(1, 1, 2, 2, 3, 4).pipe(
      distinctUntilChanged()
    ).subscribe(console.log);
  }

Screenshot 2025-02-10 at 12.06.00

13.4.18 groupBy

对数据流进行分组

13.4.19 withLatestFrom

13.4.20 switchMap

切换可观察对象

Screenshot 2025-02-10 at 10.33.26

import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap } from 'rxjs';

@Component({
  selector: 'app-operators',
  templateUrl: './operators.component.html',
  styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {

  constructor() { }

  // switchMap
  ngOnInit(): void {
    const button = document.getElementById('button');
    if (button) {
      fromEvent(button, 'click')
        // .pipe(
        //   switchMap(event => interval(1000))
        // )
        .subscribe(console.log);
    }
  }


}

Screenshot 2025-02-10 at 10.37.08

import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap } from 'rxjs';

@Component({
  selector: 'app-operators',
  templateUrl: './operators.component.html',
  styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {

  constructor() { }


  // switchMap
  ngOnInit(): void {
    const button = document.getElementById('button');
    if (button) {
      fromEvent(button, 'click')
        .pipe(
          switchMap(event => interval(1000))
        )
        .subscribe(console.log);
    }
  }


}

Screenshot 2025-02-10 at 10.37.47

Screenshot 2025-02-10 at 10.38.06

13.5 Practice

13.5.1 Element Drag and Drop

src/app/components/rxjs/practices/element-drag-and-drop/element-drag-and-drop.component.html:

<p>element-drag-and-drop works!</p>
<style>
    #box {
        width: 200px;
        height: 200px;
        background: skyblue;
        position: absolute;
        left: 0;
        top: 10;
    }
</style>
<div id="box">box</div>

原生:

src/app/components/rxjs/practices/element-drag-and-drop/element-drag-and-drop.component.ts:

import { Component, OnInit } from '@angular/core';

@Component({
  selector: 'app-element-drag-and-drop',
  templateUrl: './element-drag-and-drop.component.html',
  styleUrls: ['./element-drag-and-drop.component.css']
})
export class ElementDragAndDropComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    const box = document.getElementById('box');

    if (box) {
      box.onmousedown = function (event) {
        const target = event.target as HTMLElement;
        if (target) {
          let distanceX = event.clientX - target.offsetLeft;
          let distanceY = event.clientY - target.offsetTop;
          document.onmousemove = function (event) {
            let left = event.clientX - distanceX;
            let top = event.clientY - distanceY;
            box.style.left = left + 'px';
            box.style.top = top + 'px';
          }
        }
      }
    }

    if (box) {
      box.onmouseup = function () {
        document.onmousemove = null;
      }
    }

  }
}

使用rxjs

import { Component, OnInit } from '@angular/core';
import { from, fromEvent, map, switchMap, take, takeUntil } from 'rxjs';

@Component({
  selector: 'app-element-drag-and-drop',
  templateUrl: './element-drag-and-drop.component.html',
  styleUrls: ['./element-drag-and-drop.component.css']
})
export class ElementDragAndDropComponent implements OnInit {

  constructor() { }

  // ngOnInit(): void {
  //   const box = document.getElementById('box');

  //   if (box) {
  //     box.onmousedown = function (event) {
  //       const target = event.target as HTMLElement;
  //       if (target) {
  //         let distanceX = event.clientX - target.offsetLeft;
  //         let distanceY = event.clientY - target.offsetTop;
  //         document.onmousemove = function (event) {
  //           let left = event.clientX - distanceX;
  //           let top = event.clientY - distanceY;
  //           box.style.left = left + 'px';
  //           box.style.top = top + 'px';
  //         }
  //       }
  //     }
  //   }

  //   if (box) {
  //     box.onmouseup = function () {
  //       document.onmousemove = null;
  //     }
  //   }

  // }

  ngOnInit(): void {
    const box = document.getElementById('box');
    if (box) {
      fromEvent<MouseEvent>(box, "mousedown").pipe(
        map(event => ({ distanceX: event.clientX - box.offsetLeft, distanceY: event.clientY - box.offsetTop })),
        switchMap(({ distanceX, distanceY }) => fromEvent<MouseEvent>(document, "mousemove").pipe(
          map(event => ({ left: event.clientX - distanceX, top: event.clientY - distanceY })),
          takeUntil(fromEvent(document, "mouseup"))
        )),

      ).subscribe(({ left, top }) => {
        box.style.left = left + 'px';
        box.style.top = top + 'px';
      });
    }
  }
}

Screenshot 2025-02-10 at 13.56.06

src/app/components/rxjs/practices/search/search.component.html:

<p>search works!</p>
<input id="search" type="text" placeholder="Search..." />

src/app/components/rxjs/practices/search/search.component.ts:

import { Component, OnInit } from '@angular/core';
import axios from 'axios';
import { debounceTime, distinct, distinctUntilChanged, from, fromEvent, map, switchMap } from 'rxjs';

@Component({
  selector: 'app-search',
  templateUrl: './search.component.html',
  styleUrls: ['./search.component.css']
})
export class SearchComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    const search = document.getElementById('search');
    if (search) {
      fromEvent(search, 'keyup').pipe(
        debounceTime(500),
        map(event => {
          const target = event.target as HTMLInputElement;
          return target ? target.value : '';
        }),
        distinctUntilChanged(),
        switchMap(keyword => from(axios.get(`https://api.github.com/search/users?q=${keyword}`)))
      ).subscribe(console.log);
    }
  }

}

Screenshot 2025-02-10 at 14.11.36

只想要data

import { Component, OnInit } from '@angular/core';
import axios from 'axios';
import { debounceTime, distinct, distinctUntilChanged, from, fromEvent, map, pluck, switchMap } from 'rxjs';

@Component({
  selector: 'app-search',
  templateUrl: './search.component.html',
  styleUrls: ['./search.component.css']
})
export class SearchComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    const search = document.getElementById('search');
    if (search) {
      fromEvent(search, 'keyup').pipe(
        debounceTime(500),
        map(event => {
          const target = event.target as HTMLInputElement;
          return target ? target.value : '';
        }),
        distinctUntilChanged(),
        switchMap(keyword => from(axios.get(`https://api.github.com/search/users?q=${keyword}`)).pipe(
          pluck('data')
        ))
      ).subscribe(console.log);
    }
  }

}

Screenshot 2025-02-10 at 14.10.55

13.5.3 Chained Requests

先获取token, 再根据token获取用户信息

src/app/components/rxjs/practices/chained-requests/chained-requests.component.html:

<p>chained-requests works!</p>
<button id="button">Get user information</button>

src/app/components/rxjs/practices/chained-requests/chained-requests.component.ts:

import { Component, OnInit } from '@angular/core';
import axios from 'axios';
import { concatMap, from, fromEvent } from 'rxjs';

@Component({
  selector: 'app-chained-requests',
  templateUrl: './chained-requests.component.html',
  styleUrls: ['./chained-requests.component.css']
})
export class ChainedRequestsComponent implements OnInit {

  token: string | null = null;
  constructor() { }
  ngOnInit(): void {
    const button = document.getElementById('button');
    if (button) {
      fromEvent(button, 'click').pipe(
        concatMap(() => from(axios.get('http://localhost:3000/token')))
      ).subscribe(response => {
        this.token = response.data.access_token;
        console.log('Token:', this.token);
      });
    }
  }




}

Screenshot 2025-02-10 at 15.01.37

src/app/db.json:

{
    "token": {
        "access_token": "mocked_token_123456"
    }
}

第一步:安装 json-server 作为 Mock Server

在你的 Angular 项目根目录下(和 package.json 在同一级),打开终端并运行:

npm install -g json-server

作用json-server 允许你快速搭建一个 REST API 模拟后端返回数据。

第二步:创建 db.json 文件

在你的 Angular 项目根目录(和 src/ 平级)新建一个文件 db.json

这个文件充当数据库,json-server 读取它并返回相应的 API 数据。

第三步:启动 json-server , 在db.json所在路径下

json-server --watch db.json --port 3000

如果成功,你会看到:

\{^_^}/ hi!

  Loading db.json
  Done

  Resources
  http://localhost:3000/token

  Home
  http://localhost:3000