Navbatlar
Navbatlar keng tarqalgan ilova masshtablash va unumdorlik muammolarini hal qilishga yordam beradigan kuchli dizayn patternidir. Navbatlar hal qilishi mumkin bo'lgan muammolarga mis
Navbatlar keng tarqalgan ilova masshtablash va unumdorlik muammolarini hal qilishga yordam beradigan kuchli dizayn patternidir. Navbatlar hal qilishi mumkin bo'lgan muammolarga misollar:
- Qayta ishlashdagi piklarni tekislash. Masalan, foydalanuvchilar resurs talabchan vazifalarni istalgan vaqtda boshlashi mumkin bo'lsa, bu vazifalarni sinxron bajarish o'rniga navbatga qo'shishingiz mumkin. Keyin ishchi jarayonlar vazifalarni navbatdan boshqariladigan tartibda oladi. Ilova masshtablashgan sari back-end vazifalarini bajarishni kengaytirish uchun yangi navbat iste'molchilarini osongina qo'shishingiz mumkin.
- Node.js event loop ini to'sib qo'yishi mumkin bo'lgan monolit vazifalarni bo'laklash. Masalan, foydalanuvchi so'rovi audio transkodlash kabi CPU talabchan ishni talab qilsa, bu vazifani boshqa jarayonlarga delegatsiya qilishingiz mumkin, shunda foydalanuvchiga xizmat ko'rsatuvchi jarayonlar javobchan bo'lib qoladi.
- Turli xizmatlar orasida ishonchli aloqa kanalini taqdim etish. Masalan, bir jarayon yoki xizmatda vazifalarni (job) navbatga qo'yib, ularni boshqa jarayon yoki xizmatda iste'mol qilishingiz mumkin. Istalgan jarayon yoki xizmatdan job hayotiy siklidagi yakunlanish, xato yoki boshqa holat o'zgarishlari bo'yicha (holat hodisalarini tinglab) xabardor bo'lishingiz mumkin. Navbat ishlab chiqaruvchilari yoki iste'molchilari ishlamay qolsa, ularning holati saqlanadi va tugunlar qayta ishga tushirilganda vazifalarni qayta ishlash avtomatik tiklanadi.
Nest BullMQ integratsiyasi uchun @nestjs/bullmq paketini va Bull integratsiyasi uchun @nestjs/bull paketini taqdim etadi. Ikkala paket ham bir xil jamoa tomonidan ishlab chiqilgan tegishli kutubxonalar ustidagi abstraksiyalar/o'rab qo'yuvchilar hisoblanadi. Bull hozirda maintenance rejimida, jamoa xatolarni tuzatishga e'tibor qaratmoqda, BullMQ esa faol rivojlanmoqda, zamonaviy TypeScript implementatsiyasi va boshqa xususiyatlar to'plamiga ega. Agar Bull sizning talablaringizga mos bo'lsa, u hali ham ishonchli va amalda sinalgan tanlov. Nest paketlari BullMQ yoki Bull navbatlarini Nest ilovangizga qulay tarzda integratsiya qilishni osonlashtiradi.
BullMQ ham, Bull ham job ma'lumotlarini saqlash uchun Redis dan foydalanadi, shuning uchun sizda Redis o'rnatilgan bo'lishi kerak. Ular Redisga asoslanganligi sababli, navbat arxitekturasi to'liq taqsimlangan va platformadan mustaqil bo'lishi mumkin. Masalan, Nestda bir (yoki bir nechta) tugunda ishlayotgan ba'zi navbat prodyuserlari va iste'molchilari hamda listenerlari bo'lishi, boshqa prodyuserlar, iste'molchilar va listenerlar esa boshqa tarmoq tugunlarida boshqa Node.js platformalarida ishlashi mumkin.
Ushbu bob @nestjs/bullmq va @nestjs/bull paketlarini qamrab oladi. Shuningdek, ko'proq fon va aniq implementatsiya tafsilotlari uchun BullMQ va Bull hujjatlarini o'qishni tavsiya qilamiz.
BullMQ o'rnatish
BullMQ dan foydalanishni boshlash uchun avval kerakli bog'liqliklarni o'rnatamiz.
1$ npm install --save @nestjs/bullmq bullmqO'rnatish jarayoni tugagach, BullModule ni root AppModule ga import qilamiz.
1import { Module } from '@nestjs/common';
2import { BullModule } from '@nestjs/bullmq';
3
4@Module({
5 imports: [
6 BullModule.forRoot({
7 connection: {
8 host: 'localhost',
9 port: 6379,
10 },
11 }),
12 ],
13})
14export class AppModule {}forRoot() metodi ilovada ro'yxatdan o'tadigan barcha navbatlar uchun (aks holda alohida ko'rsatilmagan bo'lsa) ishlatiladigan bullmq paket konfiguratsiya obyektini ro'yxatdan o'tkazish uchun ishlatiladi. Ma'lumot uchun, konfiguratsiya obyektidagi ba'zi xossalar quyidagilar:
connection: ConnectionOptions- Redis ulanishini sozlash opsiyalari. Batafsil Connections. Ixtiyoriy.prefix: string- Barcha navbat kalitlari uchun prefiks. Ixtiyoriy.defaultJobOptions: JobOpts- Yangi joblar uchun default sozlamalarni boshqarish opsiyalari. Batafsil JobOpts. Ixtiyoriy.settings: AdvancedSettings- Navbat konfiguratsiyasining ilg'or sozlamalari. Odatda bularni o'zgartirmaslik kerak. Batafsil AdvancedSettings. Ixtiyoriy.extraOptions- Modulni init qilish uchun qo'shimcha opsiyalar. Batafsil Manual Registration
Barcha opsiyalar ixtiyoriy bo'lib, navbat xatti-harakatini batafsil boshqarishni ta'minlaydi. Ular to'g'ridan-to'g'ri BullMQ Queue konstruktoriga uzatiladi. Ushbu opsiyalar va boshqa opsiyalar haqida batafsil bu yerda o'qing.
Navbatni ro'yxatdan o'tkazish uchun BullModule.registerQueue() dinamik modulini import qiling, quyidagicha:
1BullModule.registerQueue({
2 name: 'audio',
3});registerQueue() metodiga vergul bilan ajratilgan bir nechta konfiguratsiya obyektlarini uzatib, bir nechta navbat yarating.
registerQueue() metodi navbatlarni instansiyalash va/yoki ro'yxatdan o'tkazish uchun ishlatiladi. Navbatlar bir xil credentiallarga ega bo'lgan bir xil Redis bazasiga ulangan modullar va jarayonlar o'rtasida bo'lishiladi. Har bir navbat o'z name xossasi bilan noyobdir. Navbat nomi ham injection token sifatida (controller/providerlarga navbatni inject qilish uchun), ham consumer klasslari va listenerlarni navbatlarga bog'lash uchun dekoratorlarga argument sifatida ishlatiladi.
Shuningdek, muayyan navbat uchun oldindan sozlangan ba'zi opsiyalarni quyidagicha override qilishingiz mumkin:
1BullModule.registerQueue({
2 name: 'audio',
3 connection: {
4 port: 6380,
5 },
6});BullMQ joblar orasidagi parent - child munosabatlarini ham qo'llab-quvvatlaydi. Bu imkoniyat joblarni ixtiyoriy chuqurlikdagi daraxt tugunlari sifatida tashkil qilishga imkon beradi. Batafsil bu yerda o'qing.
Flow qo'shish uchun quyidagicha qilishingiz mumkin:
1BullModule.registerFlowProducer({
2 name: 'flowProducerName',
3});Joblar Redisda saqlanadi, shuning uchun har safar aniq nomlangan navbat instansiyalanganda (masalan, ilova ishga tushganda/qayta ishga tushganda), u oldingi tugallanmagan sessiyadan qolgan eski joblarni qayta ishlashga urinadi.
Har bir navbatda bir yoki bir nechta prodyuserlar, iste'molchilar va listenerlar bo'lishi mumkin. Iste'molchilar joblarni navbatdan muayyan tartibda oladi: FIFO (default), LIFO yoki prioritetlarga ko'ra. Navbatni qayta ishlash tartibini boshqarish bu yerda muhokama qilinadi.
Nomlangan konfiguratsiyalar
Agar navbatlaringiz bir nechta turli Redis instansiyalariga ulanadigan bo'lsa, nomlangan konfiguratsiyalar deb ataladigan usuldan foydalanishingiz mumkin. Bu funksiya sizga bir nechta konfiguratsiyalarni belgilangan kalitlar ostida ro'yxatdan o'tkazish imkonini beradi, keyin navbat opsiyalarida ularga murojaat qilasiz.
Masalan, ilovangizda bir nechta navbatlar ishlatadigan qo'shimcha Redis instansiyasi (defaultdan tashqari) bo'lsa, uning konfiguratsiyasini quyidagicha ro'yxatdan o'tkazishingiz mumkin:
1BullModule.forRoot('alternative-config', {
2 connection: {
3 port: 6381,
4 },
5});Yuqoridagi misolda 'alternative-config' shunchaki konfiguratsiya kaliti (ixtiyoriy satr bo'lishi mumkin).
Shundan so'ng, registerQueue() opsiyalar obyektida bu konfiguratsiyani ko'rsatishingiz mumkin:
1BullModule.registerQueue({
2 configKey: 'alternative-config',
3 name: 'video',
4});Prodyuserlar
Job prodyuserlari joblarni navbatlarga qo'shadi. Prodyuserlar odatda ilova servislaridir (Nest providers). Navbatga job qo'shish uchun, avval servisga navbatni inject qiling:
1import { Injectable } from '@nestjs/common';
2import { Queue } from 'bullmq';
3import { InjectQueue } from '@nestjs/bullmq';
4
5@Injectable()
6export class AudioService {
7 constructor(@InjectQueue('audio') private audioQueue: Queue) {}
8}@InjectQueue() dekoratori registerQueue() metodi chaqiruvida berilgan nom orqali navbatni aniqlaydi (masalan, 'audio').
Endi navbatning add() metodini chaqirib, foydalanuvchi belgilagan job obyektini uzatib job qo'shing. Joblar serializatsiya qilinadigan JavaScript obyektlari sifatida ifodalanadi (chunki ular Redis bazasida shunday saqlanadi). Uzatayotgan jobning shakli ixtiyoriy; undan job obyektining semantikasini ifodalash uchun foydalaning. Shuningdek, unga nom berishingiz kerak. Bu faqat berilgan nomdagi joblarni qayta ishlaydigan maxsus iste'molchilar yaratish imkonini beradi.
1const job = await this.audioQueue.add('transcode', {
2 foo: 'bar',
3});Job opsiyalari
Joblar bilan qo'shimcha opsiyalar bog'lanishi mumkin. Queue.add() metodida job argumentidan keyin opsiyalar obyektini uzating. Job opsiyalarining ba'zi xossalari:
priority:number- Ixtiyoriy prioritet qiymati. 1 (eng yuqori prioritet) dan MAX_INT (eng past prioritet) gacha. Prioritetlardan foydalanish unumdorlikka ozgina ta'sir qiladi, shuning uchun ehtiyotkorlik bilan ishlating.delay:number- Ushbu job qayta ishlanishi mumkin bo'lishidan oldin kutish vaqti (millisekundlarda). Aniq delaylar uchun server va klientlar soatlari sinxron bo'lishi kerak.attempts:number- Job yakunlanguncha sinab ko'rishlar soni.repeat:RepeatOpts- Cron spetsifikatsiyasiga ko'ra jobni takrorlash. Batafsil RepeatOpts.backoff:number | BackoffOpts- Job muvaffaqiyatsiz bo'lsa avtomatik qayta urinishlar uchun backoff sozlamasi. Batafsil BackoffOpts.lifo:boolean-truebo'lsa, jobni navbatning chap oxiri o'rniga o'ng oxiriga qo'shadi (default false).jobId:number|string- Job ID ni override qiladi - default holatda job ID noyob butun son bo'ladi, ammo bu sozlama bilan job ID ni o'zgartirishingiz mumkin. Agar bu opsiyadan foydalansangiz, jobId noyob bo'lishini ta'minlash sizning vazifangiz. Agar mavjud bo'lgan ID bilan job qo'shishga urinsangiz, u qo'shilmaydi.removeOnComplete:boolean | number-truebo'lsa, job muvaffaqiyatli yakunlanganda o'chiriladi. Son berilsa, saqlab qolinadigan joblar sonini bildiradi. Default xatti-harakat - jobni completed to'plamida saqlash.removeOnFail:boolean | number-truebo'lsa, job barcha urinishlardan keyin muvaffaqiyatsiz bo'lganda o'chiriladi. Son berilsa, saqlab qolinadigan joblar sonini bildiradi. Default xatti-harakat - jobni failed to'plamida saqlash.stackTraceLimit:number- stacktrace da yozib olinadigan qatorlar sonini cheklaydi.
Quyida job opsiyalari bilan joblarni sozlashning bir nechta misollari keltirilgan.
Job boshlanishini kechiktirish uchun delay konfiguratsiya xossasidan foydalaning.
1const job = await this.audioQueue.add(
2 'transcode',
3 {
4 foo: 'bar',
5 },
6 { delay: 3000 }, // 3 seconds delayed
7);Jobni navbatning o'ng oxiriga qo'shish uchun (jobni LIFO (Last In First Out) tarzida qayta ishlash), konfiguratsiya obyektidagi lifo xossasini true ga o'rnating.
1const job = await this.audioQueue.add(
2 'transcode',
3 {
4 foo: 'bar',
5 },
6 { lifo: true },
7);Jobni prioritet bilan ishlash uchun priority xossasidan foydalaning.
1const job = await this.audioQueue.add(
2 'transcode',
3 {
4 foo: 'bar',
5 },
6 { priority: 2 },
7);Opsiyalarning to'liq ro'yxati uchun API hujjatlarini bu yerda va bu yerda ko'ring.
Iste'molchilar
Iste'molchi - bu navbatga qo'shilgan joblarni qayta ishlaydigan yoki navbatdagi hodisalarni tinglaydigan, yoki ikkalasini ham qiladigan metodlarni belgilovchi klass. Iste'molchi klassni quyidagicha @Processor() dekoratori yordamida e'lon qiling:
1import { Processor } from '@nestjs/bullmq';
2
3@Processor('audio')
4export class AudioConsumer {}Iste'molchilar @nestjs/bullmq paketi ularni topishi uchun providers sifatida ro'yxatdan o'tkazilishi kerak.
Bu yerda dekoratorning string argumenti (masalan, 'audio') klass metodlari bog'lanadigan navbat nomidir.
1import { Processor, WorkerHost } from '@nestjs/bullmq';
2import { Job } from 'bullmq';
3
4@Processor('audio')
5export class AudioConsumer extends WorkerHost {
6 async process(job: Job<any, any, string>): Promise<any> {
7 let progress = 0;
8 for (let i = 0; i < 100; i++) {
9 await doSomething(job.data);
10 progress += 1;
11 await job.updateProgress(progress);
12 }
13 return {};
14 }
15}Process metodi worker bo'sh bo'lganida va navbatda qayta ishlash uchun joblar bo'lganda chaqiriladi. Bu handler metodi yagona argument sifatida job obyektini oladi. Handler metodidan qaytgan qiymat job obyektida saqlanadi va keyinroq, masalan completed hodisasi listenerida ishlatilishi mumkin.
Job obyektlarida holat bilan ishlash imkonini beradigan bir nechta metodlar mavjud. Masalan, yuqoridagi kod job progressini yangilash uchun updateProgress() metodidan foydalanadi. Job obyektining to'liq API ma'lumotnomasi uchun bu yerga qarang.
Eski versiyada, Bull da, job handler metodi faqat muayyan turdagi joblarni (ma'lum name ga ega joblarni) qayta ishlashini @Process() dekoratoriga shu name ni berib belgilashingiz mumkin edi.
Bu BullMQ bilan ishlamaydi, o'qishda davom eting.
1@Process('transcode')
2async transcode(job: Job<unknown>) { ... }Bu xatti-harakat BullMQda u keltirib chiqaradigan chalkashliklar sabab qo'llab-quvvatlanmaydi. Buning o'rniga, har bir job nomi uchun turli servislar yoki mantiqni chaqirish uchun switch case lar kerak bo'ladi:
1import { Processor, WorkerHost } from '@nestjs/bullmq';
2import { Job } from 'bullmq';
3
4@Processor('audio')
5export class AudioConsumer extends WorkerHost {
6 async process(job: Job<any, any, string>): Promise<any> {
7 switch (job.name) {
8 case 'transcode': {
9 let progress = 0;
10 for (i = 0; i < 100; i++) {
11 await doSomething(job.data);
12 progress += 1;
13 await job.progress(progress);
14 }
15 return {};
16 }
17 case 'concatenate': {
18 await doSomeLogic2();
19 break;
20 }
21 }
22 }
23}Bu BullMQ hujjatlaridagi named processor bo'limida yoritilgan.
Request-scoped iste'molchilar
Iste'molchi request-scoped qilib belgilansa (injection scope lar haqida batafsil bu yerda), har bir job uchun klassning alohida instansiyasi yaratiladi. Job tugagach instansiya garbage-collector tomonidan tozalanadi.
1@Processor({
2 name: 'audio',
3 scope: Scope.REQUEST,
4})Request-scoped iste'molchi klasslari dinamik tarzda instansiyalanib, bitta jobga scoped bo'lgani uchun, konstruktor orqali standart yondashuv bilan JOB_REF ni inject qilishingiz mumkin.
1constructor(@Inject(JOB_REF) jobRef: Job) {
2 console.log(jobRef);
3}JOB_REF tokeni @nestjs/bullmq paketidan import qilinadi.
Hodisa listenerlari
BullMQ navbat va/yoki job holati o'zgarishida foydali hodisalar to'plamini generatsiya qiladi. Bu hodisalarga Worker darajasida @OnWorkerEvent(event) dekoratori bilan yoki Queue darajasida maxsus listener klassi va @OnQueueEvent(event) dekoratori bilan obuna bo'lish mumkin.
Worker hodisalari iste'molchi klass ichida (ya'ni @Processor() dekoratori bilan bezatilgan klassda) e'lon qilinishi kerak. Hodisani tinglash uchun @OnWorkerEvent(event) dekoratoridan foydalaning va ishlov beriladigan hodisani bering. Masalan, audio navbatida job active holatga kirganda chiqariladigan hodisani tinglash uchun quyidagi konstruktsiyadan foydalaning:
1import { Processor, Process, OnWorkerEvent } from '@nestjs/bullmq';
2import { Job } from 'bullmq';
3
4@Processor('audio')
5export class AudioConsumer {
6 @OnWorkerEvent('active')
7 onActive(job: Job) {
8 console.log(
9 `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
10 );
11 }
12
13 // ...
14}Hodisalar va ularning argumentlarining to'liq ro'yxatini WorkerListener xossalari sifatida bu yerda ko'rishingiz mumkin.
QueueEvent listenerlar @QueueEventsListener(queue) dekoratoridan foydalanishi va @nestjs/bullmq taqdim etgan QueueEventsHost klassini kengaytirishi kerak. Hodisani tinglash uchun @OnQueueEvent(event) dekoratoridan foydalaning va ishlov beriladigan hodisani bering. Masalan, audio navbatida job active holatga kirganda chiqariladigan hodisani tinglash uchun quyidagi konstruktsiyadan foydalaning:
1import {
2 QueueEventsHost,
3 QueueEventsListener,
4 OnQueueEvent,
5} from '@nestjs/bullmq';
6
7@QueueEventsListener('audio')
8export class AudioEventsListener extends QueueEventsHost {
9 @OnQueueEvent('active')
10 onActive(job: { jobId: string; prev?: string }) {
11 console.log(`Processing job ${job.jobId}...`);
12 }
13
14 // ...
15}QueueEvent listenerlar @nestjs/bullmq paketi ularni topishi uchun providers sifatida ro'yxatdan o'tkazilishi kerak.
Hodisalar va ularning argumentlarining to'liq ro'yxatini QueueEventsListener xossalari sifatida bu yerda ko'rishingiz mumkin.
Navbatni boshqarish
Navbatlarda pauza berish yoki davom ettirish, turli holatlardagi joblar sonini olish va yana bir qancha boshqaruv funksiyalarini bajarishga imkon beradigan API mavjud. Navbat API ning to'liq ro'yxatini bu yerda topasiz. Bu metodlarning istalganini Queue obyektida to'g'ridan-to'g'ri chaqiring, quyida pause/resume misollarida ko'rsatilgandek.
pause() metodini chaqirib navbatni pauza qiling. Pauza qilingan navbat resume qilinmaguncha yangi joblarni qayta ishlamaydi, ammo hozir qayta ishlanayotgan joblar yakunlanguncha davom etadi.
1await audioQueue.pause();Pauzadan chiqarish uchun resume() metodidan foydalaning:
1await audioQueue.resume();Alohida jarayonlar
Job handlerlar alohida (fork qilingan) jarayonda ham ishlashi mumkin (manba). Bu bir qancha afzalliklarga ega:
- Jarayon sandbox qilingan bo'ladi, shuning uchun u crash bo'lsa workerga ta'sir qilmaydi.
- Navbatga ta'sir qilmasdan bloklovchi kodni ishga tushirishingiz mumkin (joblar to'xtab qolmaydi).
- Ko'p yadroli CPUlardan ancha yaxshi foydalanish.
- Redisga kamroq ulanishlar.
1import { Module } from '@nestjs/common';
2import { BullModule } from '@nestjs/bullmq';
3import { join } from 'node:path';
4
5@Module({
6 imports: [
7 BullModule.registerQueue({
8 name: 'audio',
9 processors: [join(__dirname, 'processor.js')],
10 }),
11 ],
12})
13export class AppModule {}Eslatma: funksiyangiz fork qilingan jarayonda bajarilayotganligi sababli, Dependency Injection (va IoC container) mavjud bo'lmaydi. Bu shuni anglatadiki, processor funksiyangizga kerak bo'lgan tashqi bog'liqliklarning barcha instansiyalarini ichida saqlashi (yoki yaratishi) kerak bo'ladi.
Async sozlash
bullmq opsiyalarini statik emas, asinxron tarzda uzatishni xohlashingiz mumkin. Bunday holatda, asinxron konfiguratsiya bilan ishlashning bir nechta usullarini taqdim etadigan forRootAsync() metodidan foydalaning. Xuddi shuningdek, navbat opsiyalarini asinxron tarzda uzatish uchun registerQueueAsync() metodidan foydalaning.
Usullardan biri - factory funksiyasidan foydalanish:
1BullModule.forRootAsync({
2 useFactory: () => ({
3 connection: {
4 host: 'localhost',
5 port: 6379,
6 },
7 }),
8});Factory funksiyamiz har qanday asinxron provider kabi ishlaydi (masalan, async bo'lishi mumkin va inject orqali bog'liqliklarni qabul qiladi).
1BullModule.forRootAsync({
2 imports: [ConfigModule],
3 useFactory: async (configService: ConfigService) => ({
4 connection: {
5 host: configService.get('QUEUE_HOST'),
6 port: configService.get('QUEUE_PORT'),
7 },
8 }),
9 inject: [ConfigService],
10});Muqobil ravishda, useClass sintaksisidan foydalanishingiz mumkin:
1BullModule.forRootAsync({
2 useClass: BullConfigService,
3});Yuqoridagi konstruktsiya BullConfigService ni BullModule ichida instansiyalaydi va createSharedConfiguration() metodini chaqirib opsiyalar obyektini taqdim etish uchun foydalanadi. Bu BullConfigService SharedBullConfigurationFactory interfeysini implementatsiya qilishi kerakligini anglatadi, quyida ko'rsatilgandek:
1@Injectable()
2class BullConfigService implements SharedBullConfigurationFactory {
3 createSharedConfiguration(): BullModuleOptions {
4 return {
5 connection: {
6 host: 'localhost',
7 port: 6379,
8 },
9 };
10 }
11}BullModule ichida BullConfigService ni yaratmasdan, boshqa moduldan import qilingan providerni ishlatish uchun useExisting sintaksisidan foydalaning.
1BullModule.forRootAsync({
2 imports: [ConfigModule],
3 useExisting: ConfigService,
4});Bu konstruktsiya useClass bilan bir xil ishlaydi, lekin bitta muhim farqi bor - BullModule yangi ConfigService instansiyasini yaratish o'rniga import qilingan modullardan mavjud ConfigService ni qayta ishlatish uchun qidiradi.
Xuddi shuningdek, navbat opsiyalarini asinxron tarzda uzatish uchun registerQueueAsync() metodidan foydalaning, shunchaki name atributini factory funksiyasidan tashqarida ko'rsatishni unutmang.
1BullModule.registerQueueAsync({
2 name: 'audio',
3 useFactory: () => ({
4 redis: {
5 host: 'localhost',
6 port: 6379,
7 },
8 }),
9});Qo'lda ro'yxatdan o'tkazish
Default holatda, BullModule onModuleInit lifecycle funksiyasida BullMQ komponentlarini (navbatlar, processorlar va hodisa listener servislarini) avtomatik ro'yxatdan o'tkazadi. Biroq, ayrim holatlarda bu xatti-harakat ideal bo'lmasligi mumkin. Avtomatik ro'yxatdan o'tkazishni to'xtatish uchun BullModule da manualRegistration ni yoqing:
1BullModule.forRoot({
2 extraOptions: {
3 manualRegistration: true,
4 },
5});Bu komponentlarni qo'lda ro'yxatdan o'tkazish uchun BullRegistrar ni inject qiling va register funksiyasini, ideal holatda OnModuleInit yoki OnApplicationBootstrap ichida chaqiring.
1import { Injectable, OnModuleInit } from '@nestjs/common';
2import { BullRegistrar } from '@nestjs/bullmq';
3
4@Injectable()
5export class AudioService implements OnModuleInit {
6 constructor(private bullRegistrar: BullRegistrar) {}
7
8 onModuleInit() {
9 if (yourConditionHere) {
10 this.bullRegistrar.register();
11 }
12 }
13}BullRegistrar#register funksiyasini chaqirmasangiz, BullMQ komponentlari ishlamaydi - ya'ni joblar qayta ishlanmaydi.
Bull o'rnatish
Agar BullMQ ni tanlagan bo'lsangiz, bu bo'lim va keyingi boblarni o'tkazib yuboring.
Bull dan foydalanishni boshlash uchun avval kerakli bog'liqliklarni o'rnatamiz.
1$ npm install --save @nestjs/bull bullO'rnatish jarayoni tugagach, BullModule ni root AppModule ga import qilamiz.
1import { Module } from '@nestjs/common';
2import { BullModule } from '@nestjs/bull';
3
4@Module({
5 imports: [
6 BullModule.forRoot({
7 redis: {
8 host: 'localhost',
9 port: 6379,
10 },
11 }),
12 ],
13})
14export class AppModule {}forRoot() metodi ilovada ro'yxatdan o'tadigan barcha navbatlar uchun (aks holda alohida ko'rsatilmagan bo'lsa) ishlatiladigan bull paket konfiguratsiya obyektini ro'yxatdan o'tkazish uchun ishlatiladi. Konfiguratsiya obyektining quyidagi xossalari mavjud:
limiter: RateLimiter- Navbat joblari qayta ishlanadigan tezlikni boshqarish opsiyalari. Batafsil RateLimiter. Ixtiyoriy.redis: RedisOpts- Redis ulanishini sozlash opsiyalari. Batafsil RedisOpts. Ixtiyoriy.prefix: string- Barcha navbat kalitlari uchun prefiks. Ixtiyoriy.defaultJobOptions: JobOpts- Yangi joblar uchun default sozlamalarni boshqarish opsiyalari. Batafsil JobOpts. Ixtiyoriy. Eslatma: FlowProducer orqali joblarni rejalashtirsangiz, ular kuchga kirmaydi. Izoh uchun bullmq#1034 ga qarang.settings: AdvancedSettings- Navbat konfiguratsiyasining ilg'or sozlamalari. Odatda bularni o'zgartirmaslik kerak. Batafsil AdvancedSettings. Ixtiyoriy.
Barcha opsiyalar ixtiyoriy bo'lib, navbat xatti-harakatini batafsil boshqarishni ta'minlaydi. Ular to'g'ridan-to'g'ri Bull Queue konstruktoriga uzatiladi. Ushbu opsiyalar haqida batafsil bu yerda o'qing.
Navbatni ro'yxatdan o'tkazish uchun BullModule.registerQueue() dinamik modulini import qiling, quyidagicha:
1BullModule.registerQueue({
2 name: 'audio',
3});registerQueue() metodiga vergul bilan ajratilgan bir nechta konfiguratsiya obyektlarini uzatib, bir nechta navbat yarating.
registerQueue() metodi navbatlarni instansiyalash va/yoki ro'yxatdan o'tkazish uchun ishlatiladi. Navbatlar bir xil credentiallarga ega bo'lgan bir xil Redis bazasiga ulangan modullar va jarayonlar o'rtasida bo'lishiladi. Har bir navbat o'z name xossasi bilan noyobdir. Navbat nomi ham injection token sifatida (controller/providerlarga navbatni inject qilish uchun), ham consumer klasslari va listenerlarni navbatlarga bog'lash uchun dekoratorlarga argument sifatida ishlatiladi.
Shuningdek, muayyan navbat uchun oldindan sozlangan ba'zi opsiyalarni quyidagicha override qilishingiz mumkin:
1BullModule.registerQueue({
2 name: 'audio',
3 redis: {
4 port: 6380,
5 },
6});Joblar Redisda saqlanadi, shuning uchun har safar aniq nomlangan navbat instansiyalanganda (masalan, ilova ishga tushganda/qayta ishga tushganda), u oldingi tugallanmagan sessiyadan qolgan eski joblarni qayta ishlashga urinadi.
Har bir navbatda bir yoki bir nechta prodyuserlar, iste'molchilar va listenerlar bo'lishi mumkin. Iste'molchilar joblarni navbatdan muayyan tartibda oladi: FIFO (default), LIFO yoki prioritetlarga ko'ra. Navbatni qayta ishlash tartibini boshqarish bu yerda muhokama qilinadi.
Nomlangan konfiguratsiyalar
Agar navbatlaringiz bir nechta Redis instansiyalariga ulanadigan bo'lsa, nomlangan konfiguratsiyalar deb ataladigan usuldan foydalanishingiz mumkin. Bu funksiya sizga bir nechta konfiguratsiyalarni belgilangan kalitlar ostida ro'yxatdan o'tkazish imkonini beradi, keyin navbat opsiyalarida ularga murojaat qilasiz.
Masalan, ilovangizda bir nechta navbatlar ishlatadigan qo'shimcha Redis instansiyasi (defaultdan tashqari) bo'lsa, uning konfiguratsiyasini quyidagicha ro'yxatdan o'tkazishingiz mumkin:
1BullModule.forRoot('alternative-config', {
2 redis: {
3 port: 6381,
4 },
5});Yuqoridagi misolda 'alternative-config' shunchaki konfiguratsiya kaliti (ixtiyoriy satr bo'lishi mumkin).
Shundan so'ng, registerQueue() opsiyalar obyektida bu konfiguratsiyani ko'rsatishingiz mumkin:
1BullModule.registerQueue({
2 configKey: 'alternative-config',
3 name: 'video',
4});Prodyuserlar
Job prodyuserlari joblarni navbatlarga qo'shadi. Prodyuserlar odatda ilova servislaridir (Nest providers). Navbatga job qo'shish uchun, avval servisga navbatni inject qiling:
1import { Injectable } from '@nestjs/common';
2import { Queue } from 'bull';
3import { InjectQueue } from '@nestjs/bull';
4
5@Injectable()
6export class AudioService {
7 constructor(@InjectQueue('audio') private audioQueue: Queue) {}
8}@InjectQueue() dekoratori registerQueue() metodi chaqiruvida berilgan nom orqali navbatni aniqlaydi (masalan, 'audio').
Endi navbatning add() metodini chaqirib, foydalanuvchi belgilagan job obyektini uzatib job qo'shing. Joblar serializatsiya qilinadigan JavaScript obyektlari sifatida ifodalanadi (chunki ular Redis bazasida shunday saqlanadi). Uzatayotgan jobning shakli ixtiyoriy; undan job obyektining semantikasini ifodalash uchun foydalaning.
1const job = await this.audioQueue.add({
2 foo: 'bar',
3});Nomlangan joblar
Joblar noyob nomlarga ega bo'lishi mumkin. Bu faqat berilgan nomdagi joblarni qayta ishlaydigan maxsus iste'molchilar yaratish imkonini beradi.
1const job = await this.audioQueue.add('transcode', {
2 foo: 'bar',
3});Nomlangan joblardan foydalanganda, navbatga qo'shilgan har bir noyob nom uchun processor yaratishingiz kerak, aks holda navbat ushbu job uchun processor yetishmayotganini bildiradi. Nomlangan joblarni iste'mol qilish bo'yicha batafsil bu yerga qarang.
Job opsiyalari
Joblar bilan qo'shimcha opsiyalar bog'lanishi mumkin. Queue.add() metodida job argumentidan keyin opsiyalar obyektini uzating. Job opsiyalari xossalari:
priority:number- Ixtiyoriy prioritet qiymati. 1 (eng yuqori prioritet) dan MAX_INT (eng past prioritet) gacha. Prioritetlardan foydalanish unumdorlikka ozgina ta'sir qiladi, shuning uchun ehtiyotkorlik bilan ishlating.delay:number- Ushbu job qayta ishlanishi mumkin bo'lishidan oldin kutish vaqti (millisekundlarda). Aniq delaylar uchun server va klientlar soatlari sinxron bo'lishi kerak.attempts:number- Job yakunlanguncha sinab ko'rishlar soni.repeat:RepeatOpts- Cron spetsifikatsiyasiga ko'ra jobni takrorlash. Batafsil RepeatOpts.backoff:number | BackoffOpts- Job muvaffaqiyatsiz bo'lsa avtomatik qayta urinishlar uchun backoff sozlamasi. Batafsil BackoffOpts.lifo:boolean-truebo'lsa, jobni navbatning chap oxiri o'rniga o'ng oxiriga qo'shadi (default false).timeout:number- Job timeout xatosi bilan muvaffaqiyatsiz bo'lishidan oldin o'tadigan millisekundlar soni.jobId:number|string- Job ID ni override qiladi - default holatda job ID noyob butun son bo'ladi, ammo bu sozlama bilan job ID ni o'zgartirishingiz mumkin. Agar bu opsiyadan foydalansangiz, jobId noyob bo'lishini ta'minlash sizning vazifangiz. Agar mavjud bo'lgan ID bilan job qo'shishga urinsangiz, u qo'shilmaydi.removeOnComplete:boolean | number-truebo'lsa, job muvaffaqiyatli yakunlanganda o'chiriladi. Son berilsa, saqlab qolinadigan joblar sonini bildiradi. Default xatti-harakat - jobni completed to'plamida saqlash.removeOnFail:boolean | number-truebo'lsa, job barcha urinishlardan keyin muvaffaqiyatsiz bo'lganda o'chiriladi. Son berilsa, saqlab qolinadigan joblar sonini bildiradi. Default xatti-harakat - jobni failed to'plamida saqlash.stackTraceLimit:number- stacktrace da yozib olinadigan qatorlar sonini cheklaydi.
Quyida job opsiyalari bilan joblarni sozlashning bir nechta misollari keltirilgan.
Job boshlanishini kechiktirish uchun delay konfiguratsiya xossasidan foydalaning.
1const job = await this.audioQueue.add(
2 {
3 foo: 'bar',
4 },
5 { delay: 3000 }, // 3 seconds delayed
6);Jobni navbatning o'ng oxiriga qo'shish uchun (jobni LIFO (Last In First Out) tarzida qayta ishlash), konfiguratsiya obyektidagi lifo xossasini true ga o'rnating.
1const job = await this.audioQueue.add(
2 {
3 foo: 'bar',
4 },
5 { lifo: true },
6);Jobni prioritet bilan ishlash uchun priority xossasidan foydalaning.
1const job = await this.audioQueue.add(
2 {
3 foo: 'bar',
4 },
5 { priority: 2 },
6);Iste'molchilar
Iste'molchi - bu navbatga qo'shilgan joblarni qayta ishlaydigan yoki navbatdagi hodisalarni tinglaydigan, yoki ikkalasini ham qiladigan metodlarni belgilovchi klass. Iste'molchi klassni quyidagicha @Processor() dekoratori yordamida e'lon qiling:
1import { Processor } from '@nestjs/bull';
2
3@Processor('audio')
4export class AudioConsumer {}Iste'molchilar @nestjs/bull paketi ularni topishi uchun providers sifatida ro'yxatdan o'tkazilishi kerak.
Bu yerda dekoratorning string argumenti (masalan, 'audio') klass metodlari bog'lanadigan navbat nomidir.
Iste'molchi klassi ichida @Process() dekoratori bilan handler metodlarini bezab, job handlerlarini e'lon qiling.
1import { Processor, Process } from '@nestjs/bull';
2import { Job } from 'bull';
3
4@Processor('audio')
5export class AudioConsumer {
6 @Process()
7 async transcode(job: Job<unknown>) {
8 let progress = 0;
9 for (let i = 0; i < 100; i++) {
10 await doSomething(job.data);
11 progress += 1;
12 await job.progress(progress);
13 }
14 return {};
15 }
16}Dekorator qo'llangan metod (masalan, transcode()) worker bo'sh bo'lganida va navbatda qayta ishlash uchun joblar bo'lganda chaqiriladi. Bu handler metodi yagona argument sifatida job obyektini oladi. Handler metodidan qaytgan qiymat job obyektida saqlanadi va keyinroq, masalan completed hodisasi listenerida ishlatilishi mumkin.
Job obyektlarida holat bilan ishlash imkonini beradigan bir nechta metodlar mavjud. Masalan, yuqoridagi kod job progressini yangilash uchun progress() metodidan foydalanadi. Job obyektining to'liq API ma'lumotnomasi uchun bu yerga qarang.
Job handler metodi faqat muayyan turdagi joblarni (ma'lum name ga ega joblarni) qayta ishlashini @Process() dekoratoriga shu name ni berib belgilashingiz mumkin. Siz bir iste'molchi klassida har bir job turi (name) uchun alohida @Process() handlerlarga ega bo'lishingiz mumkin. Nomlangan joblardan foydalanganda, har bir nom uchun mos handler bo'lishini ta'minlang.
1@Process('transcode')
2async transcode(job: Job<unknown>) { ... }Bir xil navbat uchun bir nechta iste'molchi aniqlaganda, @Process({{ '{' }} concurrency: 1 {{ '}' }}) ichidagi concurrency opsiyasi ishlamaydi. Minimal concurrency qiymati aniqlangan iste'molchilar soniga teng bo'ladi. Bu, @Process() handlerlar nomlangan joblarni qayta ishlash uchun boshqa name dan foydalansa ham qo'llanadi.
Request-scoped iste'molchilar
Iste'molchi request-scoped qilib belgilansa (injection scope lar haqida batafsil bu yerda), har bir job uchun klassning alohida instansiyasi yaratiladi. Job tugagach instansiya garbage-collector tomonidan tozalanadi.
1@Processor({
2 name: 'audio',
3 scope: Scope.REQUEST,
4})Request-scoped iste'molchi klasslari dinamik tarzda instansiyalanib, bitta jobga scoped bo'lgani uchun, konstruktor orqali standart yondashuv bilan JOB_REF ni inject qilishingiz mumkin.
1constructor(@Inject(JOB_REF) jobRef: Job) {
2 console.log(jobRef);
3}JOB_REF tokeni @nestjs/bull paketidan import qilinadi.
Hodisa listenerlari
Bull navbat va/yoki job holati o'zgarishida foydali hodisalar to'plamini generatsiya qiladi. Nest core standart hodisalarga obuna bo'lishga imkon beradigan dekoratorlar to'plamini taqdim etadi. Ular @nestjs/bull paketidan eksport qilinadi.
Hodisa listenerlari iste'molchi klass ichida (ya'ni @Processor() dekoratori bilan bezatilgan klassda) e'lon qilinishi kerak. Hodisani tinglash uchun quyidagi jadvaldagi dekoratorlardan birini ishlatib handler e'lon qiling. Masalan, audio navbatida job active holatga kirganda chiqariladigan hodisani tinglash uchun quyidagi konstruktsiyadan foydalaning:
1import { Processor, Process, OnQueueActive } from '@nestjs/bull';
2import { Job } from 'bull';
3
4@Processor('audio')
5export class AudioConsumer {
6
7 @OnQueueActive()
8 onActive(job: Job) {
9 console.log(
10 `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
11 );
12 }
13 ...Bull taqsimlangan (multi-node) muhitda ishlagani uchun hodisalarning joylashuvini (locality) tushunchasini belgilaydi. Bu tushuncha hodisalar butunlay bitta jarayonda yoki turli jarayonlardagi umumiy navbatlarda yuz berishi mumkinligini tan oladi. Local hodisa - lokal jarayondagi navbatda harakat yoki holat o'zgarishi sodir bo'lganda yuzaga keladigan hodisa. Boshqacha qilib aytganda, hodisa prodyuserlari va iste'molchilari bitta jarayonda bo'lsa, navbatlarda sodir bo'ladigan barcha hodisalar local hisoblanadi.
Navbat bir nechta jarayonlar o'rtasida bo'linganda, global hodisalar paydo bo'lishi mumkin. Bir jarayondagi listener boshqa jarayonda ishga tushgan hodisa haqida xabar olish uchun global hodisa sifatida ro'yxatdan o'tishi kerak.
Hodisa handlerlari tegishli hodisa chiqarilganda chaqiriladi. Handler quyidagi jadvalda ko'rsatilgan signatura bilan chaqiriladi va hodisaga oid ma'lumotlarga kirish imkonini beradi. Local va global hodisa handlerlari signaturalari o'rtasidagi bitta muhim farqni quyida muhokama qilamiz.
| Local event listeners | Global event listeners | Handler method signature / When fired |
|---|---|---|
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - Xato yuz berdi. error ishga tushiruvchi xatoni o'z ichiga oladi. |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - Job worker bo'sh bo'lganda darhol qayta ishlanishi uchun kutyapti. jobId bu holatga kirgan job ID sini o'z ichiga oladi. |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job) - Job job ishga tushdi. |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job) - Job job to'xtab qolgan deb belgilandi. Bu event loopni to'xtatib qo'yadigan job workerlarni debug qilishda foydali. |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number) - Job job progressi progress qiymatiga yangilandi. |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) Job job result natija bilan muvaffaqiyatli yakunlandi. |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error) Job job err sabab bilan muvaffaqiyatsiz bo'ldi. |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler() Navbat pauzaga o'tdi. |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job) Navbat qayta davom ettirildi. |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) Eski joblar navbatdan tozalandi. jobs - tozalangan joblar massivi, type - tozalangan job turi. |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler() Navbat kutayotgan joblarning barchasini qayta ishlaganida (hatto qayta ishlanmagan kechiktirilgan joblar bo'lsa ham) chiqariladi. |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job) Job job muvaffaqiyatli o'chirildi. |
Global hodisalarni tinglaganda metod signaturalari local variantidan biroz farq qiladi. Xususan, local versiyada job obyektlarini qabul qiladigan metod signaturalari global versiyada jobId (number) ni qabul qiladi. Bunday holatda haqiqiy job obyektiga havola olish uchun Queue#getJob metodidan foydalaning. Bu chaqiruvni await qilish kerak, shuning uchun handler async deb e'lon qilinishi lozim. Masalan:
1@OnGlobalQueueCompleted()
2async onGlobalCompleted(jobId: number, result: any) {
3 const job = await this.immediateQueue.getJob(jobId);
4 console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
5}Queue obyektiga kirish uchun (ya'ni getJob() chaqiruvini qilish uchun) uni albatta inject qilishingiz kerak. Shuningdek, Queue siz inject qilayotgan modulda ro'yxatdan o'tgan bo'lishi kerak.
Muayyan hodisa listener dekoratorlaridan tashqari, @OnQueueEvent() dekoratorini BullQueueEvents yoki BullQueueGlobalEvents enumlari bilan birga ham ishlatishingiz mumkin. Hodisalar haqida batafsil bu yerda o'qing.
Navbatni boshqarish
Navbatlarda pauza berish yoki davom ettirish, turli holatlardagi joblar sonini olish va yana bir qancha boshqaruv funksiyalarini bajarishga imkon beradigan API mavjud. Navbat API ning to'liq ro'yxatini bu yerda topasiz. Bu metodlarning istalganini Queue obyektida to'g'ridan-to'g'ri chaqiring, quyida pause/resume misollarida ko'rsatilgandek.
pause() metodini chaqirib navbatni pauza qiling. Pauza qilingan navbat resume qilinmaguncha yangi joblarni qayta ishlamaydi, ammo hozir qayta ishlanayotgan joblar yakunlanguncha davom etadi.
1await audioQueue.pause();Pauzadan chiqarish uchun resume() metodidan foydalaning:
1await audioQueue.resume();Alohida jarayonlar
Job handlerlar alohida (fork qilingan) jarayonda ham ishlashi mumkin (manba). Bu bir qancha afzalliklarga ega:
- Jarayon sandbox qilingan bo'ladi, shuning uchun u crash bo'lsa workerga ta'sir qilmaydi.
- Navbatga ta'sir qilmasdan bloklovchi kodni ishga tushirishingiz mumkin (joblar to'xtab qolmaydi).
- Ko'p yadroli CPUlardan ancha yaxshi foydalanish.
- Redisga kamroq ulanishlar.
1import { Module } from '@nestjs/common';
2import { BullModule } from '@nestjs/bull';
3import { join } from 'path';
4
5@Module({
6 imports: [
7 BullModule.registerQueue({
8 name: 'audio',
9 processors: [join(__dirname, 'processor.js')],
10 }),
11 ],
12})
13export class AppModule {}Funksiyangiz fork qilingan jarayonda bajarilayotganligi sababli, Dependency Injection (va IoC container) mavjud bo'lmaydi. Bu shuni anglatadiki, processor funksiyangizga kerak bo'lgan tashqi bog'liqliklarning barcha instansiyalarini ichida saqlashi (yoki yaratishi) kerak bo'ladi.
1import { Job, DoneCallback } from 'bull';
2
3export default function (job: Job, cb: DoneCallback) {
4 console.log(`[${process.pid}] ${JSON.stringify(job.data)}`);
5 cb(null, 'It works');
6}Async sozlash
bull opsiyalarini statik emas, asinxron tarzda uzatishni xohlashingiz mumkin. Bunday holatda, asinxron konfiguratsiya bilan ishlashning bir nechta usullarini taqdim etadigan forRootAsync() metodidan foydalaning.
Usullardan biri - factory funksiyasidan foydalanish:
1BullModule.forRootAsync({
2 useFactory: () => ({
3 redis: {
4 host: 'localhost',
5 port: 6379,
6 },
7 }),
8});Factory funksiyamiz har qanday asinxron provider kabi ishlaydi (masalan, async bo'lishi mumkin va inject orqali bog'liqliklarni qabul qiladi).
1BullModule.forRootAsync({
2 imports: [ConfigModule],
3 useFactory: async (configService: ConfigService) => ({
4 redis: {
5 host: configService.get('QUEUE_HOST'),
6 port: configService.get('QUEUE_PORT'),
7 },
8 }),
9 inject: [ConfigService],
10});Muqobil ravishda, useClass sintaksisidan foydalanishingiz mumkin:
1BullModule.forRootAsync({
2 useClass: BullConfigService,
3});Yuqoridagi konstruktsiya BullConfigService ni BullModule ichida instansiyalaydi va createSharedConfiguration() metodini chaqirib opsiyalar obyektini taqdim etish uchun foydalanadi. Bu BullConfigService SharedBullConfigurationFactory interfeysini implementatsiya qilishi kerakligini anglatadi, quyida ko'rsatilgandek:
1@Injectable()
2class BullConfigService implements SharedBullConfigurationFactory {
3 createSharedConfiguration(): BullModuleOptions {
4 return {
5 redis: {
6 host: 'localhost',
7 port: 6379,
8 },
9 };
10 }
11}BullModule ichida BullConfigService ni yaratmasdan, boshqa moduldan import qilingan providerni ishlatish uchun useExisting sintaksisidan foydalaning.
1BullModule.forRootAsync({
2 imports: [ConfigModule],
3 useExisting: ConfigService,
4});Bu konstruktsiya useClass bilan bir xil ishlaydi, lekin bitta muhim farqi bor - BullModule yangi ConfigService instansiyasini yaratish o'rniga import qilingan modullardan mavjud ConfigService ni qayta ishlatish uchun qidiradi.
Xuddi shuningdek, navbat opsiyalarini asinxron tarzda uzatish uchun registerQueueAsync() metodidan foydalaning, shunchaki name atributini factory funksiyasidan tashqarida ko'rsatishni unutmang.
1BullModule.registerQueueAsync({
2 name: 'audio',
3 useFactory: () => ({
4 redis: {
5 host: 'localhost',
6 port: 6379,
7 },
8 }),
9});Misol
Ishlaydigan misol bu yerda mavjud.