13 RxJS
13.1 Overview
13.1.1 What is RxJS?
RXIS是一个用于处理异步编程的 Javascript 库,目标是使编写异步和基于回调的代码更容易。
13.1.2 为什么要学习 RxJS?
就像 Angular 深度集成 TypeScript 一样,Angular 也深度集成了 RXJS。
服务、表单、事件、全局状态管理、异步请求⋯
13.1.3 快速入门
-
可观察对象(Observable): 类比 Promise 对象,内部可以用于执行异步代码,通过调用内部提供的方法将异步代码执行的结果传递到可观察对象外部。
-
观察者(Observer): 类比then 方法中的回调函数,用于接收可观察对象中传递出来数据。
-
订阅(subscribe): 类比 then 方法,通过订阅将可观察对象和观察者连接起来,当可观察对象发出数据时,订阅者可以接收到数据。
https://www.bilibili.com/video/BV1WP4y187Tu/?spm_id_from=333.337.search-card.all.click&vd_source=73e7d2c4251a7c9000b22d21b70f5635
import { Observable } from "rxjs"
const observable = new Observable( function(observer) {
setTimeout(function(){
observer.next({
name:"zhangsan"
})
}, 2000)
})
const observer = {
next: function(value) {
console.log(value)
}
}
observable.subscribe(observer)
src/app/components/rxjs/rxjs-observable/rxjs-observable.component.ts
:
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs-observable',
templateUrl: './rxjs-observable.component.html',
styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {
constructor() { }
ngOnInit(): void { // 组件初始化的时候调用
// 你可以在这里写你的代码
const observable = new Observable(function (observer) {
setTimeout(function () {
observer.next({
name: "zhangsan"
})
}, 2000)
})
const observer = {
next: function (value: any) {
console.log(value)
}
}
observable.subscribe(observer)
}
// 解释下上述代码
// observable 是一个可观察对象, 你可以理解为一个事件源
// observer 是一个观察者, 你可以理解为一个事件处理函数
// observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
// observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
// setTimeout 是一个定时器, 2秒后执行 observer.next 函数
// observer.next 函数的参数是一个对象, 你可以理解为事件的参数
// 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
// 你可以在浏览器的控制台中, 查看输出的结果
// 一定要写在ngOnInit里面吗?
// 不是的, 你可以写在任何地方
// 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
// 组件的生命周期钩子函数, 是在组件初始化的时候调用的
// 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数
}
13.2 Observable
13.2.1 Observable
- 在 Observable 对象内部可以多次调用 next 方法向外发送数据。
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs-observable',
templateUrl: './rxjs-observable.component.html',
styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {
constructor() { }
ngOnInit(): void { // 组件初始化的时候调用
// 你可以在这里写你的代码
const observable = new Observable(function (observer) {
let index = 0;
setInterval(function () {
observer.next(index++)
}, 1000)
// setTimeout(function () {
// observer.next({
// name: "zhangsan"
// })
// }, 2000)
})
const observer = {
next: function (value: any) {
console.log(value)
}
}
observable.subscribe(observer)
}
// 解释下上述代码
// observable 是一个可观察对象, 你可以理解为一个事件源
// observer 是一个观察者, 你可以理解为一个事件处理函数
// observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
// observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
// setTimeout 是一个定时器, 2秒后执行 observer.next 函数
// observer.next 函数的参数是一个对象, 你可以理解为事件的参数
// 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
// 你可以在浏览器的控制台中, 查看输出的结果
// 一定要写在ngOnInit里面吗?
// 不是的, 你可以写在任何地方
// 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
// 组件的生命周期钩子函数, 是在组件初始化的时候调用的
// 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数
}
- 当所有数据发送完成以后,可以调用complete 方法终止数据发送。
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs-observable',
templateUrl: './rxjs-observable.component.html',
styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {
constructor() { }
ngOnInit(): void { // 组件初始化的时候调用
// 你可以在这里写你的代码
const observable = new Observable(function (observer) {
let index = 0;
let timer = setInterval(function () {
observer.next(index++);
if (index > 3) {
observer.complete();
clearInterval(timer);
}
}, 1000)
// setTimeout(function () {
// observer.next({
// name: "zhangsan"
// })
// }, 2000)
})
const observer = {
next: function (value: any) {
console.log(value)
},
complete: function () {
console.log("complete")
}
}
observable.subscribe(observer)
}
// 解释下上述代码
// observable 是一个可观察对象, 你可以理解为一个事件源
// observer 是一个观察者, 你可以理解为一个事件处理函数
// observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
// observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
// setTimeout 是一个定时器, 2秒后执行 observer.next 函数
// observer.next 函数的参数是一个对象, 你可以理解为事件的参数
// 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
// 你可以在浏览器的控制台中, 查看输出的结果
// 一定要写在ngOnInit里面吗?
// 不是的, 你可以写在任何地方
// 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
// 组件的生命周期钩子函数, 是在组件初始化的时候调用的
// 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数
}
- 当 Observable 内部逻辑发生错误时,可以调用 error 方法将失败信息发送给订阅者,Observable终止。调用next不会有效
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs-observable',
templateUrl: './rxjs-observable.component.html',
styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {
constructor() { }
ngOnInit(): void { // 组件初始化的时候调用
// 你可以在这里写你的代码
const observable = new Observable(function (observer) {
let index = 0;
let timer = setInterval(function () {
observer.next(index++);
if (index > 3) {
// observer.complete();
observer.error("error"); // 触发错误
clearInterval(timer);
}
}, 1000)
// setTimeout(function () {
// observer.next({
// name: "zhangsan"
// })
// }, 2000)
})
const observer = {
next: function (value: any) {
console.log(value)
},
complete: function () {
console.log("complete")
},
error: function (error: any) {
console.log(error)
}
}
observable.subscribe(observer)
}
// 解释下上述代码
// observable 是一个可观察对象, 你可以理解为一个事件源
// observer 是一个观察者, 你可以理解为一个事件处理函数
// observable.subscribe(observer) 是订阅事件, 当事件发生的时候, 会调用 observer 的 next 函数
// observer.next 是事件处理函数, 当事件发生的时候, 会调用这个函数
// setTimeout 是一个定时器, 2秒后执行 observer.next 函数
// observer.next 函数的参数是一个对象, 你可以理解为事件的参数
// 你可以理解为, 当2秒后, 事件发生了, 会调用 observer.next 函数, 并传递一个对象参数
// 你可以在浏览器的控制台中, 查看输出的结果
// 一定要写在ngOnInit里面吗?
// 不是的, 你可以写在任何地方
// 但是要注意, 你写在ngOnInit里面, 是因为ngOnInit是组件的生命周期钩子函数
// 组件的生命周期钩子函数, 是在组件初始化的时候调用的
// 你写在ngOnInit里面, 是因为你想在组件初始化的时候, 执行这个函数
}
- 可观察对象是惰性的,只有被订阅后才会执行
const observable = new Observable (function () {
console.1og ("Hello IRxJS" )
} )
// observable. subscribe ()
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs-observable',
templateUrl: './rxjs-observable.component.html',
styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {
constructor() { }
ngOnInit(): void { // 组件初始化的时候调用
// 你可以在这里写你的代码
const observable = new Observable(function (observer) {
console.log("Hello RxJS");
});
}
}
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs-observable',
templateUrl: './rxjs-observable.component.html',
styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {
constructor() { }
ngOnInit(): void { // 组件初始化的时候调用
// 你可以在这里写你的代码
const observable = new Observable(function (observer) {
console.log("Hello RxJS");
});
observable.subscribe();
}
}
- 可观察对象可以有n多订阅者,每次被订阅时都会得到执行
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs-observable',
templateUrl: './rxjs-observable.component.html',
styleUrls: ['./rxjs-observable.component.css']
})
export class RxjsObservableComponent {
constructor() { }
ngOnInit(): void { // 组件初始化的时候调用
// 你可以在这里写你的代码
const observable = new Observable(function (observer) {
console.log("Hello RxJS");
});
observable.subscribe();
observable.subscribe();
observable.subscribe();
observable.subscribe();
}
}
13.2.2 Subject
- 用于创建空的可观察对象,在订阅后不会立即执行,next 方法可以在可观察对象外部调用
src/app/components/rxjs/rxjs-subject/rxjs-subject.component.ts
:
import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
@Component({
selector: 'app-rxjs-subject',
templateUrl: './rxjs-subject.component.html',
styleUrls: ['./rxjs-subject.component.css']
})
export class RxjsSubjectComponent implements OnInit {
demoSubject: Subject<String> = new Subject<String>();
// 不能在 class 里直接使用 const,而应该用 this.demoSubject 来定义类属性。
// 把 demoSubject 作为类的属性,这样可以在类的 ngOnInit() 生命周期内访问
constructor() { }
ngOnInit(): void {
// 在 ngOnInit() 里订阅 Subject 并发送值,这样确保在组件初始化时运行。
// 订阅 Subject
this.demoSubject.subscribe({
next: function (value) {
console.log(value);
}
});
this.demoSubject.subscribe({
next: function (value) {
console.log(value);
}
});
// // 2 秒后发射值
setTimeout(() => {
this.demoSubject.next('Hello');
}, 2000);
// setTimeout 放在类体外:setTimeout 需要在 ngOnInit 或者其他方法中调用,而不是直接写在类体内。
// 箭头函数 => 替换 function,避免 this 上下文丢失问题
}
}
广播的时候.
不同组件的数据共享, 在不同的组件中引入这个可观察对象并且订阅, 可以向服务端发送请求获取数据.
13.2.3 BehaviorSubject
拥有 Subject 全部功能,但是在创建 Obervable 对象时可以传入默认值,观察者订阅后可以直接拿到默认值。
src/app/components/rxjs/rxjs-behavior-subject/rxjs-behavior-subject.component.ts
:
import { Component, OnInit } from '@angular/core';
import { BehaviorSubject } from 'rxjs';
@Component({
selector: 'app-rxjs-behavior-subject',
templateUrl: './rxjs-behavior-subject.component.html',
styleUrls: ['./rxjs-behavior-subject.component.css']
})
export class RxjsBehaviorSubjectComponent implements OnInit {
demoBehaviorSubject = new BehaviorSubject<String>('Initial Value');
// BehaviorSubject 必须有初始值
constructor() { }
ngOnInit(): void {
// 订阅 BehaviorSubject
this.demoBehaviorSubject.subscribe({
next: function (value) { // next是接收值的回调函数
console.log(value);
}
});
this.demoBehaviorSubject.next('Hello');
// next 是发送值的方法
// 这行代码会触发订阅者的回调函数,输出 'Hello'
// 这是Observale 给订阅者发送值的方式
}
}
// BehaviorSubject 是一种特殊的 Subject,它在创建时需要一个初始值,并且总是会返回当前的值。
// 它的主要特点是:
// 1. 必须有初始值:BehaviorSubject 在创建时必须提供一个初始值。
// 2. 当前值:BehaviorSubject 总是会返回最近发送的值,即使在订阅之后。
// 3. 缓存值:BehaviorSubject 会缓存最近发送的值,新的订阅者会立即收到这个值。
// 4. 订阅者:BehaviorSubject 可以有多个订阅者,所有订阅者都会收到相同的值。
// 5. 完成和错误:BehaviorSubject 也可以完成或发送错误,完成后无法再发送新值。
// BehaviorSubject 的使用场景:
// 1. 状态管理:BehaviorSubject 非常适合用于状态管理,因为它总是能提供最新的状态。
// 2. 表单处理:在表单处理中,可以使用 BehaviorSubject 来管理表单的状态和验证。
// 3. 数据流:BehaviorSubject 可以用于处理数据流,例如在组件之间传递数据。
// 4. 缓存数据:BehaviorSubject 可以用于缓存数据,例如从服务器获取的数据。
// 例子:
// 1. 创建 BehaviorSubject
// 2. 订阅 BehaviorSubject
// 3. 发送新值
13.2.4 ReplySubject
功能类似 Subject,但有新订阅者时两者处理方式不同,Subject 不会广播历史结果,而ReplaySubject 会广播所有历史结果。
src/app/components/rxjs/rxjs-replay-subject/rxjs-replay-subject.component.ts
:
import { Component, OnInit } from '@angular/core';
import { ReplaySubject } from 'rxjs';
@Component({
selector: 'app-rxjs-replay-subject',
templateUrl: './rxjs-replay-subject.component.html',
styleUrls: ['./rxjs-replay-subject.component.css']
})
export class RxjsReplaySubjectComponent implements OnInit {
rSubject = new ReplaySubject();
constructor() { }
ngOnInit(): void {
this.rSubject.subscribe(value => {
console.log(value);
});
this.rSubject.next('Hello 1');
this.rSubject.next('Hello 2');
setTimeout(() => {
this.rSubject.subscribe({
next: function (value) {
console.log(value);
}
});
}, 2000);
}
}
13.3 辅助方法
13.3.1 range
range(start, length), 调用方法后返回observable对象, 被订阅后会发出指定范围的数值.
src/app/components/rxjs/helper-method/helper-method.component.ts
:
import { Component, OnInit } from '@angular/core';
import { range } from 'rxjs';
@Component({
selector: 'app-helper-method',
templateUrl: './helper-method.component.html',
styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {
constructor() { }
ngOnInit(): void {
// rxjs 的辅助方法
// 如下代码是在组件初始化时执行的
// 会在控制台打印出 1-9
range(1, 10).subscribe(function (value) {
console.log(value);
});
// 要写在方法体内,不能写在类体内
// 为什么?
// 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
}
}
方法内部并不是一次发出length个数值, 而是发送了length次, 每次发送一个数值, 就是说内部调用了length次next方法.
13.3.2 of
将参数列表作为数据流返回
13.3.3 from
将Array, Promise, Iterator转换为observable对象
src/app/components/rxjs/helper-method/helper-method.component.ts
:
import { Component, OnInit } from '@angular/core';
import { from, range } from 'rxjs';
@Component({
selector: 'app-helper-method',
templateUrl: './helper-method.component.html',
styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {
constructor() { }
// from
ngOnInit(): void {
from(["a", "b", "c"]).subscribe(console.log);
from(this.p()).subscribe(console.log);
}
p() {
return new Promise((resolve) => {
setTimeout(() => {
resolve({ a: 1 });
}, 2000);
});
}
}
13.3.4 interval, timer
interval: 每隔一段时间发出一个数值, 数值递增
src/app/components/rxjs/helper-method/helper-method.component.ts
:
import { Component, OnInit } from '@angular/core';
import { forkJoin, from, fromEvent, interval, map, range } from 'rxjs';
import axios from 'axios';
@Component({
selector: 'app-helper-method',
templateUrl: './helper-method.component.html',
styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {
constructor() { }
// --------
// interval
ngOnInit(): void {
interval(1000).subscribe(console.log);
}
}
timer: 间隔时间过去以后发出数值, 行为终止, 或间隔时间发出数值后, 继续按第二个参数的时间间隔继续发出值.
13.3.5 concat
13.3.6 merge
13.3.7 combineLatest
13.3.8 zip
13.3.9 forkJoin
forkjoin是Rx版本的Promise.all(), 即表示等到所有的Observable都完成后, 才一次性返回值.
import { Component, OnInit } from '@angular/core';
import { forkJoin, from, range } from 'rxjs';
import axios from 'axios';
@Component({
selector: 'app-helper-method',
templateUrl: './helper-method.component.html',
styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {
constructor() { }
// range
// ngOnInit(): void {
// // rxjs 的辅助方法
// // 如下代码是在组件初始化时执行的
// // 会在控制台打印出 1-9
// range(1, 10).subscribe(function (value) {
// console.log(value);
// });
// // 要写在方法体内,不能写在类体内
// // 为什么?
// // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
// }
// from
// ngOnInit(): void {
// from(["a", "b", "c"]).subscribe(console.log);
// from(this.p()).subscribe(console.log);
// }
// p() {
// return new Promise((resolve) => {
// setTimeout(() => {
// resolve({ a: 1 });
// }, 2000);
// });
// }
// -----------
//forkJoin
// example:
// ngOnInit(): void {
// forkJoin({
// goods: from(axios.get('https://api.example.com/goods')), // 假设这是获取商品的 API
// category: from(axios.get('https://api.example.com/category')), // 假设这是获取分类的 API
// }).subscribe(console.log);
// }
ngOnInit(): void {
const mockGoods = [
{ id: 1, name: 'Apple', price: 1.5 },
{ id: 2, name: 'Banana', price: 0.9 },
{ id: 3, name: 'Cherry', price: 2.0 }
];
const mockCategory = [
{ id: 1, name: 'Fruits' },
{ id: 2, name: 'Vegetables' }
];
forkJoin({
goods: from(Promise.resolve({ data: mockGoods })),
category: from(Promise.resolve({ data: mockCategory }))
}).subscribe(console.log);
}
}
import { Component, OnInit } from '@angular/core';
import { forkJoin, from, range } from 'rxjs';
import axios from 'axios';
@Component({
selector: 'app-helper-method',
templateUrl: './helper-method.component.html',
styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {
constructor() { }
// range
// ngOnInit(): void {
// // rxjs 的辅助方法
// // 如下代码是在组件初始化时执行的
// // 会在控制台打印出 1-9
// range(1, 10).subscribe(function (value) {
// console.log(value);
// });
// // 要写在方法体内,不能写在类体内
// // 为什么?
// // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
// }
// from
// ngOnInit(): void {
// from(["a", "b", "c"]).subscribe(console.log);
// from(this.p()).subscribe(console.log);
// }
// p() {
// return new Promise((resolve) => {
// setTimeout(() => {
// resolve({ a: 1 });
// }, 2000);
// });
// }
// -----------
//forkJoin
// example:
// ngOnInit(): void {
// forkJoin({
// goods: from(axios.get('https://api.example.com/goods')), // 假设这是获取商品的 API
// category: from(axios.get('https://api.example.com/category')), // 假设这是获取分类的 API
// }).subscribe(console.log);
// }
ngOnInit(): void {
const mockGoods = [
{ id: 1, name: 'Apple', price: 1.5 },
{ id: 2, name: 'Banana', price: 0.9 },
{ id: 3, name: 'Cherry', price: 2.0 }
];
const mockCategory = [
{ id: 1, name: 'Fruits' },
{ id: 2, name: 'Vegetables' }
];
// forkJoin({
// goods: from(Promise.resolve({ data: mockGoods })),
// category: from(Promise.resolve({ data: mockCategory }))
// }).subscribe(console.log);
// 只想要用data里的数据
forkJoin({
goods: from(Promise.resolve({ data: mockGoods })),
category: from(Promise.resolve({ data: mockCategory }))
}).subscribe(response => console.log(response.goods.data, response.category.data));
}
13.3.10 throwError
13.3.11 retry
13.3.12 race
13.3.13 fromEvent
将事件转换为Observable.
import { Component, OnInit } from '@angular/core';
import { forkJoin, from, fromEvent, map, range } from 'rxjs';
import axios from 'axios';
@Component({
selector: 'app-helper-method',
templateUrl: './helper-method.component.html',
styleUrls: ['./helper-method.component.css']
})
export class HelperMethodComponent implements OnInit {
constructor() { }
ngOnInit(): void {
const button = document.getElementById('myButton');
if (button) {
fromEvent(button, 'click').subscribe(console.log);
}
if (button) {
fromEvent(button, 'click')
.pipe(
map(event => event.target)
)
.subscribe(console.log);
}
}
13.4 操作符
- 数据流:从可观察对象内部输出的数据就是数据流,可观察对象内部可以向外部源源不断的输出数据。
- 操作符:用于操作数据流,可以对象数据流进行转换,过滤等等操作。
13.4.1 map, mapTo
map:对数据流进行转换,基于原有值进行转换。
src/app/components/rxjs/operators/operators.component.ts
:
import { Component, OnInit } from '@angular/core';
import { map, range } from 'rxjs';
@Component({
selector: 'app-operators',
templateUrl: './operators.component.html',
styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {
constructor() { }
ngOnInit(): void {
// rxjs 的辅助方法
// 如下代码是在组件初始化时执行的
// 会在控制台打印出 1-9
range(1, 10)
.pipe(map(n => n * 10))
.subscribe(function (value) {
console.log(value);
});
// 要写在方法体内,不能写在类体内
// 为什么?
// 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
}
}
mapTo:对数据流进行转换,不关心原有值,可以直接传入要转换后的值。
13.4.2 filter
13.4.3 pluck
获取数据流对象中的属性值
src/app/components/rxjs/operators/operators.component.ts
:
import { Component, OnInit } from '@angular/core';
import { fromEvent, map, pluck, range } from 'rxjs';
@Component({
selector: 'app-operators',
templateUrl: './operators.component.html',
styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {
constructor() { }
// pipeable operators
// ngOnInit(): void {
// // rxjs 的辅助方法
// // 如下代码是在组件初始化时执行的
// // 会在控制台打印出 1-9
// range(1, 10)
// .pipe(map(n => n * 10))
// .subscribe(function (value) {
// console.log(value);
// });
// // 要写在方法体内,不能写在类体内
// // 为什么?
// // 因为类体内的代码在类实例化时就会执行,而方法体内的代码只有在调用该方法时才会执行。
// }
// plugin operators
ngOnInit(): void {
const button = document.getElementById('button');
if (button) {
fromEvent(button, 'click')
.pipe(pluck("target"))
.subscribe(console.log);
}
}
}
13.4.4 first
13.4.5 startWith
13.4.6 every
13.4.7 delay, delayWhen
13.4.8 take, takeWhile, takeUtil
take: 获取数据流中的前几个
import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap } from 'rxjs';
@Component({
selector: 'app-operators',
templateUrl: './operators.component.html',
styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {
constructor() { }
ngOnInit(): void {
range(1, 10).subscribe(console.log);
}
}
import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap, take } from 'rxjs';
@Component({
selector: 'app-operators',
templateUrl: './operators.component.html',
styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {
constructor() { }
ngOnInit(): void {
range(1, 10).pipe(take(5)).subscribe(console.log);
}
}
takeWhile: 根据条件从数据源前面开始获取.
takeUntil: 接收可观察对象, 当可观察对象发出值时, 终止主数据源
ngOnInit(): void {
const button = document.getElementById('button');
if (button) {
fromEvent(button, 'mousemove').subscribe(console.log);
}
}
// -----
// takeUntil
ngOnInit(): void {
const button = document.getElementById('button');
if (button) {
fromEvent(button, 'mousemove')
.pipe(
takeUntil(fromEvent(button, 'click')
)
)
.subscribe(console.log);
}
}
13.4.9 skip, skipWhile, skipUtil
13.4.10 last
13.4.11 concatAll, concatMap
13.4.13 reduce, scan
13.4.14 mergeAll, mergeMap
13.4.15 throttleTime
节流,可观察对象高频次向外部发出数据流,通过 throttleTime 限制在规定时间内每次只向订阅者传递一次数据流。
import { Component, OnInit } from '@angular/core';
import { from, fromEvent, interval, map, pluck, range, switchMap, take, takeUntil, takeWhile, throttle, throttleTime } from 'rxjs';
@Component({
selector: 'app-operators',
templateUrl: './operators.component.html',
styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {
constructor() { }
// ----
// throttleTime
ngOnInit(): void {
fromEvent(document, 'click').pipe(
throttleTime(2000) // 2 秒内只取第一个点击事件
).subscribe(console.log);
}
}
13.4.16 debounceTime
防抖,触发高频事件,只响应最后一次。
// ---
// debounceTime
ngOnInit(): void {
fromEvent(document, 'click').pipe(
debounceTime(2000) // 2 秒内只取最后一个点击事件
).subscribe(console.log);
}
13.4.17 distinctUntilChanged
检测数据源当前发出的数据流是否和上次发出的相同, 如相同, 跳过, 不相同, 发出.
// ---
// distinctUntilChanged
ngOnInit(): void {
of(1, 1, 2, 2, 3, 4).pipe(
distinctUntilChanged()
).subscribe(console.log);
}
13.4.18 groupBy
对数据流进行分组
13.4.19 withLatestFrom
13.4.20 switchMap
切换可观察对象
import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap } from 'rxjs';
@Component({
selector: 'app-operators',
templateUrl: './operators.component.html',
styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {
constructor() { }
// switchMap
ngOnInit(): void {
const button = document.getElementById('button');
if (button) {
fromEvent(button, 'click')
// .pipe(
// switchMap(event => interval(1000))
// )
.subscribe(console.log);
}
}
}
import { Component, OnInit } from '@angular/core';
import { fromEvent, interval, map, pluck, range, switchMap } from 'rxjs';
@Component({
selector: 'app-operators',
templateUrl: './operators.component.html',
styleUrls: ['./operators.component.css']
})
export class OperatorsComponent implements OnInit {
constructor() { }
// switchMap
ngOnInit(): void {
const button = document.getElementById('button');
if (button) {
fromEvent(button, 'click')
.pipe(
switchMap(event => interval(1000))
)
.subscribe(console.log);
}
}
}
13.5 Practice
13.5.1 Element Drag and Drop
src/app/components/rxjs/practices/element-drag-and-drop/element-drag-and-drop.component.html
:
<p>element-drag-and-drop works!</p>
<style>
#box {
width: 200px;
height: 200px;
background: skyblue;
position: absolute;
left: 0;
top: 10;
}
</style>
<div id="box">box</div>
原生:
src/app/components/rxjs/practices/element-drag-and-drop/element-drag-and-drop.component.ts
:
import { Component, OnInit } from '@angular/core';
@Component({
selector: 'app-element-drag-and-drop',
templateUrl: './element-drag-and-drop.component.html',
styleUrls: ['./element-drag-and-drop.component.css']
})
export class ElementDragAndDropComponent implements OnInit {
constructor() { }
ngOnInit(): void {
const box = document.getElementById('box');
if (box) {
box.onmousedown = function (event) {
const target = event.target as HTMLElement;
if (target) {
let distanceX = event.clientX - target.offsetLeft;
let distanceY = event.clientY - target.offsetTop;
document.onmousemove = function (event) {
let left = event.clientX - distanceX;
let top = event.clientY - distanceY;
box.style.left = left + 'px';
box.style.top = top + 'px';
}
}
}
}
if (box) {
box.onmouseup = function () {
document.onmousemove = null;
}
}
}
}
使用rxjs
import { Component, OnInit } from '@angular/core';
import { from, fromEvent, map, switchMap, take, takeUntil } from 'rxjs';
@Component({
selector: 'app-element-drag-and-drop',
templateUrl: './element-drag-and-drop.component.html',
styleUrls: ['./element-drag-and-drop.component.css']
})
export class ElementDragAndDropComponent implements OnInit {
constructor() { }
// ngOnInit(): void {
// const box = document.getElementById('box');
// if (box) {
// box.onmousedown = function (event) {
// const target = event.target as HTMLElement;
// if (target) {
// let distanceX = event.clientX - target.offsetLeft;
// let distanceY = event.clientY - target.offsetTop;
// document.onmousemove = function (event) {
// let left = event.clientX - distanceX;
// let top = event.clientY - distanceY;
// box.style.left = left + 'px';
// box.style.top = top + 'px';
// }
// }
// }
// }
// if (box) {
// box.onmouseup = function () {
// document.onmousemove = null;
// }
// }
// }
ngOnInit(): void {
const box = document.getElementById('box');
if (box) {
fromEvent<MouseEvent>(box, "mousedown").pipe(
map(event => ({ distanceX: event.clientX - box.offsetLeft, distanceY: event.clientY - box.offsetTop })),
switchMap(({ distanceX, distanceY }) => fromEvent<MouseEvent>(document, "mousemove").pipe(
map(event => ({ left: event.clientX - distanceX, top: event.clientY - distanceY })),
takeUntil(fromEvent(document, "mouseup"))
)),
).subscribe(({ left, top }) => {
box.style.left = left + 'px';
box.style.top = top + 'px';
});
}
}
}
13.5.2 Search
src/app/components/rxjs/practices/search/search.component.html
:
src/app/components/rxjs/practices/search/search.component.ts
:
import { Component, OnInit } from '@angular/core';
import axios from 'axios';
import { debounceTime, distinct, distinctUntilChanged, from, fromEvent, map, switchMap } from 'rxjs';
@Component({
selector: 'app-search',
templateUrl: './search.component.html',
styleUrls: ['./search.component.css']
})
export class SearchComponent implements OnInit {
constructor() { }
ngOnInit(): void {
const search = document.getElementById('search');
if (search) {
fromEvent(search, 'keyup').pipe(
debounceTime(500),
map(event => {
const target = event.target as HTMLInputElement;
return target ? target.value : '';
}),
distinctUntilChanged(),
switchMap(keyword => from(axios.get(`https://api.github.com/search/users?q=${keyword}`)))
).subscribe(console.log);
}
}
}
只想要data
import { Component, OnInit } from '@angular/core';
import axios from 'axios';
import { debounceTime, distinct, distinctUntilChanged, from, fromEvent, map, pluck, switchMap } from 'rxjs';
@Component({
selector: 'app-search',
templateUrl: './search.component.html',
styleUrls: ['./search.component.css']
})
export class SearchComponent implements OnInit {
constructor() { }
ngOnInit(): void {
const search = document.getElementById('search');
if (search) {
fromEvent(search, 'keyup').pipe(
debounceTime(500),
map(event => {
const target = event.target as HTMLInputElement;
return target ? target.value : '';
}),
distinctUntilChanged(),
switchMap(keyword => from(axios.get(`https://api.github.com/search/users?q=${keyword}`)).pipe(
pluck('data')
))
).subscribe(console.log);
}
}
}
13.5.3 Chained Requests
先获取token, 再根据token获取用户信息
src/app/components/rxjs/practices/chained-requests/chained-requests.component.html
:
src/app/components/rxjs/practices/chained-requests/chained-requests.component.ts
:
import { Component, OnInit } from '@angular/core';
import axios from 'axios';
import { concatMap, from, fromEvent } from 'rxjs';
@Component({
selector: 'app-chained-requests',
templateUrl: './chained-requests.component.html',
styleUrls: ['./chained-requests.component.css']
})
export class ChainedRequestsComponent implements OnInit {
token: string | null = null;
constructor() { }
ngOnInit(): void {
const button = document.getElementById('button');
if (button) {
fromEvent(button, 'click').pipe(
concatMap(() => from(axios.get('http://localhost:3000/token')))
).subscribe(response => {
this.token = response.data.access_token;
console.log('Token:', this.token);
});
}
}
}
src/app/db.json
:
第一步:安装
json-server
作为 Mock Server在你的 Angular 项目根目录下(和
package.json
在同一级),打开终端并运行:作用:
json-server
允许你快速搭建一个 REST API 模拟后端返回数据。第二步:创建
db.json
文件在你的 Angular 项目根目录(和
src/
平级)新建一个文件db.json
这个文件充当数据库,
json-server
读取它并返回相应的 API 数据。第三步:启动
json-server
, 在db.json所在路径下
json-server --watch db.json --port 3000
如果成功,你会看到: