Расширенные операторы RxJs, которые вы знаете, но недостаточно хорошо
Поскольку RxJS все еще остается важной частью Angular и обладает огромным количеством операторов, я решил написать эту статьи, чтобы выделить некоторые операторы, их комбинации и практические случаи использования, где они могут быть применены.
В этой статье мы рассмотрим следующие операторы:
- forkJoin() против combineLatest()
- auditTime() против debounceTime()
- pairwise()
- raceWith()
- iif()
- defer()
RxJS: forkJoin() против combineLatest()
Оба этих оператора возвращают последнее значение, сгенерированное несколькими Observable.
Оператор combineLatest эмиттирует массив самых последних значений из всех Observable, только когда каждый из Observable уже сгенерировал хотя бы одно значение. Он комбинирует последние эмиттированные значения из нескольких Observable всякий раз, когда любой из Observable эмиттирует новое значение. Имейте в виду, что иногда мы можем подписаться на холодный Observable, который уже сгенерировал значения, и наш combineLatest()
никогда не эмиттирует.
combineLatest([
this.stockPrice$, // emits stock prices
this.exchangeRate$ // emits exchange rates
]).subscribe(([stockPrice, exchangeRate]) => {
// will be logged every time any of the above observables emits
console.log(`Price: ${stockPrice}, Rate: ${exchangeRate}`);
});
Оператор forkJoin ожидает завершения всех Observable, а затем эмиттирует единый массив, содержащий последнее эмиттированное значение от каждого Observable. Если хотя бы один Observable выдает ошибку или возвращает EMPTY
(завершается без значения), forkJoin()
также вызовет ошибку или вернет EMPTY
. Вы, возможно, слышали, что forkJoin
очень похож на работу Promise.all()
, так как оба эмиттируют только один раз, когда все операции завершаются
forkJoin({
userProfile: this.api.getUserProfile(),
userSettings: this.api.getUserSettings(),
userPreferences: this.api.getUserPreferences()
}).subscribe(({ userProfile, userSettings, userPreferences }) => {
// will be logged only once, when all of the observables emits
console.log(userProfile, userSettings, userPreferences);
});
Одной из ошибок, которая иногда возникает, является использование соединения WebSocket внутри оператора forkJoin
. Это следует избегать, так как forkJoin
ожидает завершения всех своих Observable перед тем, как эмиттировать значение. Однако Observable на основе WebSocket, как правило, предназначены для непрерывной эмитации значений (горячие Observable) и никогда не завершаются, если не было явно сделано отписки.
forkJoin({
// API call (completes after fetching data)
apiData: this.http.get('/api/data'),
// WebSocket connection (never completes)
websocketData: this.websocketService.getUpdates()
}).subscribe(result => {
// will NEVER be logged
console.log('Result:', result);
});
RxJS: auditTime() против debounceTime()
Для меня эти два оператора всегда вызывали путаницу, потому что они похожи, но тонкие отличия имеют ключевое значение.
При использовании debounceTime, он задерживает передачу значения из исходного наблюдаемого объекта до тех пор, пока не наступит "пауза" в эмиссиях на заданный период времени. Используйте его, когда хотите дождаться, пока пользователь или событие "остепенятся" перед тем, как предпринять действие.
С другой стороны, auditTime регулярно выбирает значения из исходного наблюдаемого объекта и передает наиболее актуальное значение в конце каждого интервала. Используйте его, когда нужны периодические обновления во время выполнения какого-либо действия.
С высокой вероятностью вы уже привыкли использовать debounceTime
в полях ввода, однако auditTime
может быть более полезным при отслеживании изменения размера окна или поведения прокрутки. Вот пример, демонстрирующий разницу в поведении между этими двумя операторами при отслеживании изменения размера окна. Обратите внимание, что auditTime
передает значения во время изменения размера окна, в то время как debounceTime
передает значения только тогда, когда пользователь останавливает действие.
// this will emit periodically
fromEvent(window, 'resize')
.pipe(
auditTime(500),
map(() => [window.innerWidth, window.innerHeight])
).subscribe((dimensions) => {
console.log(`AUDIT TIME:`, dimensions);
});
// this emits only when use stops the resizing
fromEvent(window, 'resize')
.pipe(
debounceTime(500),
map(() => [window.innerWidth, window.innerHeight])
).subscribe((dimensions) => {
console.log(`DEBOUNCE TIME:`, dimensions);
});
ПРИМЕЧАНИЕ: Полезное утилитарное средство, которое вы можете создать с использованием замыканий (инъекционный токен), — это функция, которая возвращает сигнал для отслеживания событий изменения размера окна:
export const WINDOW_RESIZE_LISTENER =
new InjectionToken('Window resize listener', {
factory: () => {
const windowRef = inject(WINDOW);
return toSignal(
fromEvent(windowRef, 'resize').pipe(
auditTime(300),
map(() => windowRef.innerWidth),
startWith(windowRef.innerWidth),
takeUntilDestroyed(),
), { initialValue: windowRef.innerWidth });
},
});
И вы можете использовать этот инъекционный токен внутри компонента следующим образом:
windowResize = inject(WINDOW_RESIZE_LISTENER);
// ^^ this is a signal
Другие примеры использования auditTime()
включают прослушивание игровых вводов или потоков живых данных, событий, которые никогда не прекращаются, где требуется периодическое выполнение логики.
RxJS: pairwise()
Оператор pairwise в RxJS — это оператор трансформации, который эмитирует предыдущие и текущие значения из наблюдаемого объекта в виде пары [previous, current]
. Это полезно, когда необходимо сравнить последовательные значения, эмитируемые исходным наблюдаемым объектом.
Полезным примером может быть отслеживание изменений роутера для улучшения навигации.
import { Router, NavigationEnd } from '@angular/router';
import { filter, pairwise } from 'rxjs/operators';
this.router.events.pipe(
// filter for NavigationEnd events
filter(event => event instanceof NavigationEnd),
// pair consecutive route navigation events
pairwise()
).subscribe(([previous, current]: [NavigationEnd, NavigationEnd]) => {
console.log('Previous URL:', previous.url);
console.log('Current URL:', current.url);
});
Еще одним распространенным примером является отслеживание изменений в полях структуры формы.
@Component({
imports: [ReactiveFormsModule],
})
export class FormTrackerComponent {
myForm = inject(FormBuilder).nonNullable.group({
name: '',
email: '',
});
constructor() {
// track changes in the form
this.myForm.valueChanges
.pipe(
// start with the initial form state
startWith(this.myForm.value),
// get the previous and current form values
pairwise(),
// get changed fields
map(([prev, curr]) => this.getChangedFields(prev, curr)),
// filter only distinct field keys
scan((acc, curr) =>
[...new Set([...acc, ...curr])], [] as string[]
)
)
.subscribe((fieldChange) => {
console.log('Changed fields:', fieldChange);
});
}
/**
* identify which fields have changed between two states.
* @returns - name of the field (name, email, age)
*/
private getChangedFields(previous: any, current: any): string[] {
return Object.keys(current).filter(
(key) => previous[key] !== current[key]
);
}
}
RxJS: race()
Оператор race()
подписывается на несколько наблюдаемых объектов и выдает значения от того наблюдаемого объекта, который срабатывает первым (и продолжает прослушивание), отменяя все остальные подписки.
Лично я не так часто использовал оператор race()
, но недавно столкнулся со сценарием, когда его использование могло бы быть уместным. Допустим, вы делаете запрос к API на проблемный эндпоинт, что означает, что запрос может застрять в состоянии ожидания и никогда не завершиться (с ошибкой или успешным ответом). В идеале вы хотите подождать определенный период, и если запрос все еще остается в состоянии ожидания, отменить его и показать сообщение об ошибке. Существует множество различных решений для этой задачи, но использование оператора race()
— одно из тех, что пришло мне в голову. Вот пример:
@Component({ /* ... */ })
export class App {
#userAPIService = inject(UserAPIService);
displayItems = toSignal(race(
this.#userAPIService.getUsers().pipe(
map((data) => ({ status: 'loaded' as const, data}))
),
of({ status: 'failed' as const,}).pipe(delay(2000))
// ^^ emit failed status if no response after 2s
).pipe(startWith({ status: 'loading' as const})),
{ initialValue: { status: 'loading' } }
);
eff = effect(() => console.log(this.displayItemsSignal()));
}
Сигнал displayItem
имеет немедленное значение {status: 'loading'}
, что позволяет отобразить состояние загрузки в пользовательском интерфейсе. Затем, либо вызов API getUsers()
завершается, либо, если вызов остается в состоянии ожидания более 2 секунд, будет сгенерировано значение {status: 'error'}
, и вызов API будет отменен.
Возможно, оператор race()
является слишком сложным для этого случая, и вы хотите рассмотреть возможность использования оператора timeout()
для того же примера. В результате вы получите:
displayItemsSignal = toSignal(
this.userAPIService.getUsers().pipe(
map((data) => ({
status: 'loaded' as const,
data,
})),
startWith({
status: 'loading' as const,
}),
timeout({
each: 1000,
with: () => of({ status: 'failed' as const }),
})
), { initialValue: { status: 'loading' } });
RxJS: defer()
Оператор defer()
позволяет создавать новый observable по запросу. Логика observable выполняется только при подписке на него, что делает его оценку ленивой. Лично я не видел, чтобы его использовали часто, однако это может быть хорошим дополнением к вашему проекту, если вы активно используете Promises.
Например, предположим, у вас есть сервис, который выполняет API-запросы, но вместо использования httpClient
и возврата Observable он возвращает Promise.
@Injectable({ providedIn: 'root' })
export class UserAPIService {
#data = [{ name: 'user1' }, { name: 'user2' }, /*...*/];
getUsersPromise(): Promise<DataItem[]> {
return new Promise((res) =>
setTimeout(() => {
res(this.#data);
}, 200)
);
}
}
Теперь вы хотите отобразить чекбокс, и как только чекбокс будет нажат, вы хотите загрузить пользователей (выполнить API-запрос). Вы погуглили, как преобразовать Promises в Observables, и знаете, что для этого нужно использовать оператор from
, поэтому у вас получается код, подобный следующему:
@Component({
imports: [ReactiveFormsModule, AsyncPipe],
template: `
<label for="checkBox">check me</label>
<input type="checkbox" name="checkBox" [formControl]="control" />
@if(control.value){
@for(item of displayItems$ | async; track item.name){
{{ item.name }}
}
}
`,
})
export class App {
control = new FormControl<boolean>(false, { nonNullable: true });
displayItems$ = from(this.inject(UserAPIService).getUsersPromise());
}
Я не нашёл этой информации в документации по rxjs from()
, однако, когда вы используете from()
, он немедленно преобразует Promises в Observables, что означает, что getUsersPromise()
выполняется, даже до того, как чекбокс будет нажат.
Это поведение может не быть серьёзным недостатком, так как вы хотите загрузить данные пользователей в любом случае. Всё зависит от конкретного случая использования, является ли загрузка желаемым поведением. Если вы хотите дождаться подписки (когда чекбокс будет нажат), вы можете использовать defer
, например:
displayItems$ = defer(() => from(this.userAPIService.getUsersPromise()))
Используя defer()
, выполнение getUsersPromise()
(вызов API) откладывается до момента, когда происходит подписка.
RxJS: iif()
Существует множество сценариев, когда мы слушаем сгенерированные значения Observable и используем switchMap
(или другой высший по порядку observable) с условием для определения того, что возвращать. Вот один из примеров:
displayItemsSignal = toSignal(
this.checkboxControl.valueChanges.pipe(
switchMap((isChecked) =>
isChecked
? this.userAPIService.getUsers()
: this.groupAPIService.getGroups()
)
), { initialValue: [] });
Хотя этот пример работает нормально, вы можете использовать оператор iif()
для упрощения синтаксиса, как getUsers()
, так и getGroups()
возвращают Observable.
displayItemsSignal = toSignal(
this.checkboxControl.valueChanges.pipe(
switchMap((isChecked) =>
iif(
() => isChecked,
this.userAPIService.getUsers(),
this.groupAPIService.getGroups()
// ^^ both return an Observable of items
)
)
), { initialValue: [] });
Однако есть один нюанс. Предположим, что вместо Observables сервис использует Promises для получения данных:
@Injectable({ providedIn: 'root' })
export class UserAPIService {
#data = [{ name: 'user1' }, { name: 'user2' }, /*...*/];
getUsersPromise(): Promise<DataItem[]> {
return new Promise((res) =>
setTimeout(() => {
console.log('UserAPIService resolved');
res(this.data);
}, 200)
);
}
}
@Injectable({ providedIn: 'root' })
export class GroupAPIService {
#data = [{ name: 'group1' }, { name: 'group2' }, /*...*/];
getGroupPromise(): Promise<DataItem[]> {
return new Promise((res) =>
setTimeout(() => {
console.log('GroupAPIService resolved');
res(this.data);
}, 200)
);
}
}
если вы используете оператор iif()
с Promises, как показано ниже
displayItemsSignal = toSignal(
this.checkboxControl.valueChanges.pipe(
switchMap((isChecked) =>
iif(
() => isChecked,
this.userAPIService.getUsersPromise(),
this.groupAPIService.getGroupPromise()
// ^^ both return a Promise of items
)
)
), { initialValue: [] });
Что произойдет, так это то, что независимо от того, отмечен ли флажок или нет, оба метода, getUsersPromise()
и getGroupPromise()
, будут выполнены. Это определенно не то, чего мы хотели.
Вы можете решить эту проблему двумя способами. Первый — вернуться к тернарному оператору, например, вот так:
displayItemsSignal = toSignal(
this.checkboxControl.valueChanges.pipe(
switchMap((isChecked) =>
isChecked
? this.userAPIService.getUsersPromise()
: this.groupAPIService.getGroupPromise()
)
), { initialValue: [] });
Это решает проблему, когда вы работаете с Promises; однако, если вы хотите использовать оператор iif()
, то вам также следует использовать оператор defer()
, чтобы избежать немедленного выполнения, например:
displayItemsSignal = toSignal(
this.checkboxControl.valueChanges.pipe(
switchMap((isChecked) =>
iif(
() => isChecked,
defer(() => this.userAPIService.getUsersPromise()),
defer(() => this.groupAPIService.getGroupPromise())
// ^^ delay the promise execution only if subscription happens
)
)
), { initialValue: [] });
Подводя итоги, вы можете свободно использовать условный оператор iif()
при работе с Observable. Однако, когда вы работаете с Promises, используйте либо условный (тернарный) оператор, либо комбинируйте iif()
с оператором defer()
.