Retseptlar13 min read

CQRS

Oddiy CRUD (Create, Read, Update va Delete) ilovalari oqimi quyidagicha tasvirlanishi mumkin:

Oddiy CRUD (Create, Read, Update va Delete) ilovalari oqimi quyidagicha tasvirlanishi mumkin:

  1. Controllerlar qatlami HTTP so'rovlarni qabul qiladi va vazifalarni servislar qatlamiga delegatsiya qiladi.
  2. Servislar qatlami biznes mantiqning asosiy qismi joylashgan qismdir.
  3. Servislar entitetlarni o'zgartirish / saqlash uchun repository/DAOlardan foydalanadi.
  4. Entitetlar qiymatlar uchun konteyner bo'lib, setter va getterlarga ega.

Bu pattern odatda kichik va o'rta hajmdagi ilovalar uchun yetarli bo'lsa-da, yirik, murakkab ilovalar uchun eng yaxshi tanlov bo'lmasligi mumkin. Bunday hollarda CQRS (Command and Query Responsibility Segregation) modeli ko'proq mos va masshtablanuvchi bo'lishi mumkin (ilova talablariga qarab). Ushbu modelning afzalliklariga quyidagilar kiradi:

  • Mas'uliyatni ajratish. Model o'qish va yozish operatsiyalarini alohida modellarga ajratadi.
  • Masshtablanuvchanlik. O'qish va yozish operatsiyalarini mustaqil ravishda masshtablash mumkin.
  • Moslashuvchanlik. Model o'qish va yozish operatsiyalari uchun turli ma'lumotlar omborlaridan foydalanishga imkon beradi.
  • Ishlash. Model o'qish va yozish operatsiyalari uchun optimallashtirilgan turli ma'lumotlar omborlarini ishlatishga imkon beradi.

Bu modelni qo'llab-quvvatlash uchun Nest yengil CQRS modulini taqdim etadi. Ushbu bob uni qanday ishlatishni tushuntiradi.

O'rnatish

Avval kerakli paketni o'rnating:

Terminal
1$ npm install --save @nestjs/cqrs

O'rnatish tugagach, ilovangizning ildiz moduliga (odatda AppModule) o'ting va CqrsModule.forRoot() ni import qiling:

TypeScript
1import { Module } from '@nestjs/common';
2import { CqrsModule } from '@nestjs/cqrs';
3
4@Module({
5  imports: [CqrsModule.forRoot()],
6})
7export class AppModule {}

Ushbu modul ixtiyoriy konfiguratsiya obyektini qabul qiladi. Quyidagi parametrlar mavjud:

AttributeDescriptionDefault
commandPublisherThe publisher responsible for dispatching commands to the system.DefaultCommandPubSub
eventPublisherThe publisher used to publish events, allowing them to be broadcasted or processed.DefaultPubSub
queryPublisherThe publisher used for publishing queries, which can trigger data retrieval operations.DefaultQueryPubSub
unhandledExceptionPublisherPublisher responsible for handling unhandled exceptions, ensuring they are tracked and reported.DefaultUnhandledExceptionPubSub
eventIdProviderService that provides unique event IDs by generating or retrieving them from event instances.DefaultEventIdProvider
rethrowUnhandledDetermines whether unhandled exceptions should be rethrown after being processed, useful for debugging and error management.false

Commandlar

Commandlar ilova holatini o'zgartirish uchun ishlatiladi. Ular ma'lumotga emas, vazifaga yo'naltirilgan bo'lishi kerak. Command yuborilganda, u tegishli Command Handler tomonidan ko'rib chiqiladi. Handler ilova holatini yangilash uchun mas'ul.

TypeScript
heroes-game.service
1@Injectable()
2export class HeroesGameService {
3  constructor(private commandBus: CommandBus) {}
4
5  async killDragon(heroId: string, killDragonDto: KillDragonDto) {
6    return this.commandBus.execute(
7      new KillDragonCommand(heroId, killDragonDto.dragonId)
8    );
9  }
10}

Yuqoridagi kod parchasi KillDragonCommand klassini yaratib, uni CommandBus ning execute() metodiga uzatadi. Ko'rsatilgan command klassi quyidagicha:

TypeScript
kill-dragon.command
1export class KillDragonCommand extends Command<{
2  actionId: string // This type represents the command execution result
3}> {
4  constructor(
5    public readonly heroId: string,
6    public readonly dragonId: string,
7  ) {
8    super();
9  }
10}

Ko'rib turganingizdek, KillDragonCommand Command klassini kengaytiradi. Command klassi @nestjs/cqrs paketidan eksport qilinadigan sodda utiliti bo'lib, commandning qaytarish turini belgilash imkonini beradi. Bu yerda qaytish turi actionId xususiyatiga ega obyekt. Endi KillDragonCommand yuborilganda, CommandBus#execute() metodining qaytish turi Promise<{{ '{' }} actionId: string {{ '}' }}> deb aniqlanadi. Bu command handlerdan ma'lumot qaytarmoqchi bo'lganingizda foydali.

Hint

Command klassidan meros olish ixtiyoriy. U faqat command qaytish turini belgilamoqchi bo'lsangiz kerak bo'ladi.

CommandBus commandlar oqimi ni ifodalaydi. U commandlarni tegishli handlerlarga yuborish uchun mas'ul. execute() metodi handler qaytargan qiymatga yechiladigan promise qaytaradi.

Keling, KillDragonCommand command uchun handler yarataylik.

TypeScript
kill-dragon.handler
1@CommandHandler(KillDragonCommand)
2export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
3  constructor(private repository: HeroesRepository) {}
4
5  async execute(command: KillDragonCommand) {
6    const { heroId, dragonId } = command;
7    const hero = this.repository.findOneById(+heroId);
8
9    hero.killEnemy(dragonId);
10    await this.repository.persist(hero);
11
12    // "ICommandHandler<KillDragonCommand>" forces you to return a value that matches the command's return type
13    return {
14      actionId: crypto.randomUUID(), // This value will be returned to the caller
15    }
16  }
17}

Bu handler repository dan Hero entitetini olib, killEnemy() metodini chaqiradi va keyin o'zgarishlarni saqlaydi. KillDragonHandler klassi ICommandHandler interfeysini implementatsiya qiladi, u execute() metodini implementatsiya qilishni talab qiladi. execute() metodi argument sifatida command obyektini oladi.

ICommandHandler<KillDragonCommand> command qaytish turiga mos qiymat qaytarishni majbur qiladi. Bu holatda qaytish turi actionId xususiyatiga ega obyekt. Bu faqat Command klassidan meros olgan commandlar uchun amal qiladi. Aks holda, xohlagan narsani qaytarishingiz mumkin.

Va nihoyat, KillDragonHandler ni modulda provider sifatida ro'yxatdan o'tkazganingizga ishonch hosil qiling:

TypeScript
1providers: [KillDragonHandler];

Querylar

Querylar ilova holatidan ma'lumot olish uchun ishlatiladi. Ular vazifaga emas, ma'lumotga yo'naltirilgan bo'lishi kerak. Query yuborilganda, u tegishli Query Handler tomonidan ko'rib chiqiladi. Handler ma'lumotni olish uchun mas'ul.

QueryBus CommandBus bilan bir xil patternni kuzatadi. Query handlerlar IQueryHandler interfeysini implementatsiya qilishi va @QueryHandler() dekoratori bilan belgilanishi kerak. Quyidagi misolni ko'rib chiqing:

TypeScript
1export class GetHeroQuery extends Query<Hero> {
2  constructor(public readonly heroId: string) {}
3}

Command klassiga o'xshab, Query klassi ham @nestjs/cqrs paketidan eksport qilinadigan sodda utiliti bo'lib, query qaytish turini belgilash imkonini beradi. Bu yerda qaytish turi Hero obyektidir. Endi GetHeroQuery yuborilganda, QueryBus#execute() metodining qaytish turi Promise<Hero> sifatida aniqlanadi.

Hero ni olish uchun query handler yaratishimiz kerak:

TypeScript
get-hero.handler
1@QueryHandler(GetHeroQuery)
2export class GetHeroHandler implements IQueryHandler<GetHeroQuery> {
3  constructor(private repository: HeroesRepository) {}
4
5  async execute(query: GetHeroQuery) {
6    return this.repository.findOneById(query.heroId);
7  }
8}

GetHeroHandler klassi IQueryHandler interfeysini implementatsiya qiladi, u execute() metodini implementatsiya qilishni talab qiladi. execute() metodi argument sifatida query obyektini oladi va query qaytish turiga mos ma'lumotni (bu yerda Hero obyekti) qaytarishi kerak.

Va nihoyat, GetHeroHandler ni modulda provider sifatida ro'yxatdan o'tkazganingizga ishonch hosil qiling:

TypeScript
1providers: [GetHeroHandler];

Endi query yuborish uchun QueryBus dan foydalaning:

TypeScript
1const hero = await this.queryBus.execute(new GetHeroQuery(heroId)); // "hero" will be auto-inferred as "Hero" type

Eventlar

Eventlar ilova holatidagi o'zgarishlar haqida ilovaning boshqa qismlarini xabardor qilish uchun ishlatiladi. Ular modellalar tomonidan yoki to'g'ridan-to'g'ri EventBus orqali yuboriladi. Event yuborilganda, u tegishli Event Handlerlar tomonidan ko'rib chiqiladi. Handlerlar, masalan, read modelni yangilashi mumkin.

Namoyish uchun event klassini yarataylik:

TypeScript
hero-killed-dragon.event
1export class HeroKilledDragonEvent {
2  constructor(
3    public readonly heroId: string,
4    public readonly dragonId: string,
5  ) {}
6}

Eventlarni EventBus.publish() metodi orqali to'g'ridan-to'g'ri yuborish mumkin bo'lsa-da, ularni modeldan ham yuborishimiz mumkin. Hero modelini killEnemy() metodi chaqirilganda HeroKilledDragonEvent ni yuboradigan qilib yangilaylik.

TypeScript
hero.model
1export class Hero extends AggregateRoot {
2  constructor(private id: string) {
3    super();
4  }
5
6  killEnemy(enemyId: string) {
7    // Business logic
8    this.apply(new HeroKilledDragonEvent(this.id, enemyId));
9  }
10}

apply() metodi eventlarni yuborish uchun ishlatiladi. U argument sifatida event obyektini qabul qiladi. Biroq modelimiz EventBus dan bexabar bo'lganligi sababli, uni modelga bog'lashimiz kerak. Buni EventPublisher klassi yordamida amalga oshirishimiz mumkin.

TypeScript
kill-dragon.handler
1@CommandHandler(KillDragonCommand)
2export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
3  constructor(
4    private repository: HeroesRepository,
5    private publisher: EventPublisher,
6  ) {}
7
8  async execute(command: KillDragonCommand) {
9    const { heroId, dragonId } = command;
10    const hero = this.publisher.mergeObjectContext(
11      await this.repository.findOneById(+heroId),
12    );
13    hero.killEnemy(dragonId);
14    hero.commit();
15  }
16}

EventPublisher#mergeObjectContext metodi event publisherni taqdim etilgan obyektga birlashtiradi, ya'ni endi obyekt eventlar oqimiga event yubora oladi.

Ushbu misolda modelda commit() metodini ham chaqirayotganimizga e'tibor bering. Bu metod bajarilmagan eventlarni yuborish uchun ishlatiladi. Eventlarni avtomatik yuborish uchun autoCommit xususiyatini true ga o'rnatishimiz mumkin:

TypeScript
1export class Hero extends AggregateRoot {
2  constructor(private id: string) {
3    super();
4    this.autoCommit = true;
5  }
6}

Agar event publisherni mavjud bo'lmagan obyektda emas, balki klassga birlashtirmoqchi bo'lsak, EventPublisher#mergeClassContext metodidan foydalanishimiz mumkin:

TypeScript
1const HeroModel = this.publisher.mergeClassContext(Hero);
2const hero = new HeroModel('id'); // <-- HeroModel is a class

Endi HeroModel klassining har bir nusxasi mergeObjectContext() metodidan foydalanmasdan event yubora oladi.

Bundan tashqari, EventBus yordamida eventlarni qo'lda yuborishimiz mumkin:

TypeScript
1this.eventBus.publish(new HeroKilledDragonEvent());
Hint

EventBus inject qilinadigan klassdir.

Har bir event bir nechta Event Handler ga ega bo'lishi mumkin.

TypeScript
hero-killed-dragon.handler
1@EventsHandler(HeroKilledDragonEvent)
2export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
3  constructor(private repository: HeroesRepository) {}
4
5  handle(event: HeroKilledDragonEvent) {
6    // Business logic
7  }
8}
Hint

Event handlerlardan foydalanishni boshlaganingizda an'anaviy HTTP web kontekstidan tashqariga chiqishingizni yodda tuting.

  • CommandHandlers dagi xatolarni hamon o'rnatilgan Exception filterlar ushlashi mumkin.
  • EventHandlers dagi xatolar Exception filterlar tomonidan ushlanmaydi: ularni qo'lda hal qilishingiz kerak bo'ladi. Oddiy try/catch, Sagas orqali kompensatsion event qo'zg'atish yoki boshqa usullar.
  • CommandHandlers dagi HTTP javoblarini mijozga yuborish mumkin.
  • EventHandlers dagi HTTP javoblarini yuborib bo'lmaydi. Mijozga ma'lumot yubormoqchi bo'lsangiz WebSocket, SSE yoki boshqa yechimlardan foydalanishingiz mumkin.

Command va querylar bilan bo'lgani kabi, HeroKilledDragonHandler ni modulda provider sifatida ro'yxatdan o'tkazganingizga ishonch hosil qiling:

TypeScript
1providers: [HeroKilledDragonHandler];

Sagalari

Saga eventlarni tinglaydigan va yangi commandlarni qo'zg'atishi mumkin bo'lgan uzoq davom etuvchi jarayon. U odatda ilovadagi murakkab ish oqimlarini boshqarish uchun ishlatiladi. Masalan, foydalanuvchi ro'yxatdan o'tganda, saga UserRegisteredEvent ni tinglab, foydalanuvchiga xush kelibsiz xatini yuborishi mumkin.

Saga juda kuchli imkoniyatdir. Bitta saga 1..* eventlarni tinglashi mumkin. RxJS kutubxonasi yordamida biz event oqimlarini filtrlay, map qila, fork va merge qila olamiz va murakkab ish jarayonlarini yaratamiz. Har bir saga Observable qaytaradi, u command nusxasini yaratadi. Bu command keyin CommandBus tomonidan asinxron yuboriladi.

HeroKilledDragonEvent ni tinglab, DropAncientItemCommand commandni yuboradigan sagani yarataylik.

TypeScript
heroes-game.saga
1@Injectable()
2export class HeroesGameSagas {
3  @Saga()
4  dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
5    return events$.pipe(
6      ofType(HeroKilledDragonEvent),
7      map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
8    );
9  }
10}
Hint

ofType operatori va @Saga() dekoratori @nestjs/cqrs paketidan eksport qilinadi.

@Saga() dekoratori metodni saga sifatida belgilaydi. events$ argumenti barcha event'larning Observable stream'i hisoblanadi. ofType operatori stream'ni ko'rsatilgan event turi bo'yicha filter qiladi. map operatori esa event'ni yangi command instance'iga o'giradi.

Bu misolda HeroKilledDragonEvent ni DropAncientItemCommand command'iga map qilyapmiz. Keyin DropAncientItemCommand command'i CommandBus tomonidan avtomatik dispatch qilinadi.

Query, command va event handler'larda bo'lgani kabi, HeroesGameSagas ni modul ichida provider sifatida ro'yxatdan o'tkazishni unutmang:

TypeScript
1providers: [HeroesGameSagas];

Handled qilinmagan exception'lar

Event handler'lar asynchronous bajariladi, shu sabab ilova nomuvofiq holatga tushib qolmasligi uchun exception'larni doim to'g'ri handle qilish kerak. Agar exception handle qilinmasa, EventBus UnhandledExceptionInfo obyektini yaratadi va uni UnhandledExceptionBus stream'iga yuboradi. Bu stream Observable bo'lib, handle qilinmagan exception'larni qayta ishlash uchun ishlatiladi.

TypeScript
1private destroy$ = new Subject<void>();
2
3constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
4  this.unhandledExceptionsBus
5    .pipe(takeUntil(this.destroy$))
6    .subscribe((exceptionInfo) => {
7      // Handle exception here
8      // e.g. send it to external service, terminate process, or publish a new event
9    });
10}
11
12onModuleDestroy() {
13  this.destroy$.next();
14  this.destroy$.complete();
15}

Exception'larni filter qilish uchun ofType operatoridan quyidagicha foydalanish mumkin:

TypeScript
1this.unhandledExceptionsBus
2  .pipe(
3    takeUntil(this.destroy$),
4    UnhandledExceptionBus.ofType(TransactionNotAllowedException),
5  )
6  .subscribe((exceptionInfo) => {
7    // Handle exception here
8  });

Bu yerda TransactionNotAllowedException filter qilmoqchi bo'lgan exception'imiz hisoblanadi.

UnhandledExceptionInfo obyektida quyidagi property'lar mavjud:

TypeScript
1export interface UnhandledExceptionInfo<
2  Cause = IEvent | ICommand,
3  Exception = any,
4> {
5  /**
6   * The exception that was thrown.
7   */
8  exception: Exception;
9  /**
10   * The cause of the exception (event or command reference).
11   */
12  cause: Cause;
13}

Barcha event'larga subscribe bo'lish

CommandBus, QueryBus va EventBus'ning barchasi Observable hisoblanadi. Bu shuni anglatadiki, butun stream'ga subscribe bo'lib, masalan, barcha event'larni qayta ishlashimiz mumkin. Masalan, barcha event'larni console'ga log qilish yoki event store'ga saqlash mumkin.

TypeScript
1private destroy$ = new Subject<void>();
2
3constructor(private eventBus: EventBus) {
4  this.eventBus
5    .pipe(takeUntil(this.destroy$))
6    .subscribe((event) => {
7      // Save events to database
8    });
9}
10
11onModuleDestroy() {
12  this.destroy$.next();
13  this.destroy$.complete();
14}

Request scope

Boshqa dasturlash tillari fonidan kelganlar uchun Nest'da ko'p narsalar kiruvchi request'lar orasida ulashilishini bilish g'alati tuyulishi mumkin. Bunga database connection pool, global holatga ega singleton service'lar va boshqalar kiradi. Esda tuting, Node.js har bir request alohida thread'da qayta ishlanadigan request/response multi-threaded stateless model'ga amal qilmaydi. Shu sabab singleton instance'lardan foydalanish ilovalarimiz uchun xavfsiz hisoblanadi.

Ammo ayrim edge case'larda handler uchun request-based lifecycle kerak bo'lishi mumkin. Masalan, GraphQL ilovalarida har bir request uchun alohida cache, request tracking yoki multi-tenancy holatlari. Scope'larni qanday boshqarish haqida ko'proq ma'lumotni bu yerda topasiz.

CQRS bilan birga request-scoped provider'lardan foydalanish murakkab bo'lishi mumkin, chunki CommandBus, QueryBus va EventBus singleton hisoblanadi. Yaxshiyamki, @nestjs/cqrs package'i buni soddalashtiradi va har bir qayta ishlanayotgan command, query yoki event uchun request-scoped handler'ning yangi instance'ini avtomatik yaratadi.

Handler'ni request-scoped qilish uchun quyidagilardan birini qilishingiz mumkin:

  1. Request-scoped provider'ga bog'laning.
  2. Quyidagidek @CommandHandler, @QueryHandler yoki @EventsHandler dekoratori orqali uning scope'ini aniq REQUEST qilib belgilang:
TypeScript
1@CommandHandler(KillDragonCommand, {
2  scope: Scope.REQUEST,
3})
4export class KillDragonHandler {
5  // Implementation here
6}

Request payload'ini istalgan request-scoped provider ichiga inject qilish uchun @Inject(REQUEST) dekoratoridan foydalaniladi. Ammo CQRS ichida request payload tabiati context'ga bog'liq bo'ladi: u HTTP request, scheduled job yoki command'ni ishga tushiradigan boshqa har qanday operatsiya bo'lishi mumkin.

Payload AsyncContext ni kengaytiradigan class'ning instance'i bo'lishi kerak (@nestjs/cqrs tomonidan taqdim etiladi). U request context sifatida ishlaydi va request lifecycle davomida foydalaniladigan ma'lumotlarni saqlaydi.

TypeScript
1import { AsyncContext } from '@nestjs/cqrs';
2
3export class MyRequest extends AsyncContext {
4  constructor(public readonly user: User) {
5    super();
6  }
7}

Command bajarilganda custom request context'ni CommandBus#execute metodiga ikkinchi argument sifatida uzating:

TypeScript
1const myRequest = new MyRequest(user);
2await this.commandBus.execute(
3  new KillDragonCommand(heroId, killDragonDto.dragonId),
4  myRequest,
5);

Shunda MyRequest instance'i mos handler ichida REQUEST provider sifatida mavjud bo'ladi:

TypeScript
1@CommandHandler(KillDragonCommand, {
2  scope: Scope.REQUEST,
3})
4export class KillDragonHandler {
5  constructor(
6    @Inject(REQUEST) private request: MyRequest, // Inject the request context
7  ) {}
8
9  // Handler implementation here
10}

Query'lar uchun ham xuddi shu yondashuvdan foydalanish mumkin:

TypeScript
1const myRequest = new MyRequest(user);
2const hero = await this.queryBus.execute(new GetHeroQuery(heroId), myRequest);

Query handler ichida esa:

TypeScript
1@QueryHandler(GetHeroQuery, {
2  scope: Scope.REQUEST,
3})
4export class GetHeroHandler {
5  constructor(
6    @Inject(REQUEST) private request: MyRequest, // Inject the request context
7  ) {}
8
9  // Handler implementation here
10}

For events, while you can pass the request provider to EventBus#publish, this is less common. Instead, use EventPublisher to merge the request provider into a model:

TypeScript
1const hero = this.publisher.mergeObjectContext(
2  await this.repository.findOneById(+heroId),
3  this.request, // Inject the request context here
4);

Request-scoped event handlers subscribing to these events will have access to the request provider.

Sagas are always singleton instances because they manage long-running processes. However, you can retrieve the request provider from event objects:

TypeScript
1@Saga()
2dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
3  return events$.pipe(
4    ofType(HeroKilledDragonEvent),
5    map((event) => {
6      const request = AsyncContext.of(event); // Retrieve the request context
7      const command = new DropAncientItemCommand(event.heroId, fakeItemID);
8
9      AsyncContext.merge(request, command); // Merge the request context into the command
10      return command;
11    }),
12  );
13}

Alternatively, use the request.attachTo(command) method to tie the request context to the command.

Example

A working example is available here.