Mikroxizmatlar7 min read

RabbitMQ

RabbitMQ - bir nechta messaging protokollarini qo'llab-quvvatlaydigan open source va yengil message broker. U yuqori masshtab va yuqori mavjudlik talablarini qondirish uchun taqsim

RabbitMQ - bir nechta messaging protokollarini qo'llab-quvvatlaydigan open source va yengil message broker. U yuqori masshtab va yuqori mavjudlik talablarini qondirish uchun taqsimlangan va federatsiyalangan konfiguratsiyalarda joylashtirilishi mumkin. Bundan tashqari, u dunyo bo'ylab kichik startaplardan katta korxonalargacha eng keng tarqalgan message broker hisoblanadi.

O'rnatish

RabbitMQ asosidagi microservice'larni qurishni boshlash uchun avval kerakli paketlarni o'rnating:

Terminal
1$ npm i --save amqplib amqp-connection-manager

Umumiy ko'rinish

RabbitMQ transportyoridan foydalanish uchun createMicroservice() metodiga quyidagi options obyektini uzating:

TypeScript
main
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2  transport: Transport.RMQ,
3  options: {
4    urls: ['amqp://localhost:5672'],
5    queue: 'cats_queue',
6    queueOptions: {
7      durable: false
8    },
9  },
10});
Hint

Transport enum'i @nestjs/microservices paketidan import qilinadi.

Opsiyalar

options propertysi tanlangan transportyorga xos. RabbitMQ transportyori quyida tasvirlangan propertylarni ochadi.

urlsKetma-ket sinash uchun ulanish URL'lari massivi
queueServeringiz tinglaydigan queue nomi
prefetchCountKanal uchun prefetch countni o'rnatadi
isGlobalPrefetchCountKanal bo'yicha prefetchingni yoqadi
noAckAgar false bo'lsa, qo'lda acknowledgment rejimi yoqiladi
consumerTagServer consumer uchun xabar yetkazib berishni ajratish uchun foydalanadigan nom; u kanalda allaqachon ishlatilmagan bo'lishi kerak. Odatda buni tashlab yuborish osonroq, bu holda server tasodifiy nom yaratadi va uni javobda beradi. Consumer Tag Identifier (batafsil here)
queueOptionsQo'shimcha queue options'lari (batafsil here)
socketOptionsQo'shimcha socket options'lari (batafsil here)
headersHar bir xabar bilan birga yuboriladigan headerlar
replyQueueProducer uchun reply queue. Standart qiymat amq.rabbitmq.reply-to
persistentAgar truthy bo'lsa, xabar broker qayta ishga tushganda ham saqlanib qoladi, buning uchun u ham qayta ishga tushganda saqlanadigan queueda bo'lishi kerak
noAssertFalse bo'lsa, consume qilishdan oldin queue assert qilinmaydi
wildcardsQueue'larga xabarlarni routlash uchun Topic Exchange ishlatmoqchi bo'lsangiz, true ga o'rnating. Bu message va event patternlari sifatida wildcardlardan (*, #) foydalanish imkonini beradi
exchangeExchange nomi. "wildcards" true bo'lganda default queue nomiga teng
exchangeTypeExchange tipi. Default topic. Yaroqli qiymatlar: direct, fanout, topic va headers
routingKeyTopic exchange uchun qo'shimcha routing key
maxConnectionAttemptsMaksimal ulanish urinishlari soni. Faqat consumer konfiguratsiyasiga taalluqli. -1 === infinite

Mijoz

Boshqa microservice transportyorlari kabi, RabbitMQ ClientProxy instansiyasini yaratish uchun bir nechta variant bor.

Instansiya yaratishning bir usuli - ClientsModuledan foydalanish. ClientsModule orqali client instansiyasini yaratish uchun uni import qiling va register() metodiga createMicroservice() metodida yuqorida ko'rsatilgan xuddi shu propertylarga ega options obyektini, shuningdek injection token sifatida ishlatiladigan name propertysini uzating. ClientsModule haqida batafsil hereda o'qing.

TypeScript
1@Module({
2  imports: [
3    ClientsModule.register([
4      {
5        name: 'MATH_SERVICE',
6        transport: Transport.RMQ,
7        options: {
8          urls: ['amqp://localhost:5672'],
9          queue: 'cats_queue',
10          queueOptions: {
11            durable: false
12          },
13        },
14      },
15    ]),
16  ]
17  ...
18})

Client yaratishning boshqa usullari (ClientProxyFactory yoki @Client()) ham qo'llanishi mumkin. Ular haqida hereda o'qishingiz mumkin.

Kontekst

Murakkabroq ssenariylarda kiruvchi so'rov haqida qo'shimcha ma'lumotlarga kirish kerak bo'lishi mumkin. RabbitMQ transportyoridan foydalanganda RmqContext obyektiga kirishingiz mumkin.

TypeScript
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
3  console.log(`Pattern: ${context.getPattern()}`);
4}
Hint

@Payload(), @Ctx() va RmqContext@nestjs/microservices paketidan import qilinadi.

RabbitMQ xabarining asl nusxasiga (properties, fields va content bilan) kirish uchun RmqContext obyektining getMessage() metodidan quyidagicha foydalaning:

TypeScript
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
3  console.log(context.getMessage());
4}

RabbitMQ channeliga referens olish uchun RmqContext obyektining getChannelRef metodidan quyidagicha foydalaning:

TypeScript
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
3  console.log(context.getChannelRef());
4}

Xabarni tasdiqlash (acknowledgement)

Xabar hech qachon yo'qolmasligini ta'minlash uchun RabbitMQ message acknowledgementsni qo'llab-quvvatlaydi. Acknowledgement consumer tomonidan RabbitMQga yuborilib, xabar olingani, qayta ishlangani va RabbitMQ xabarni o'chirishi mumkinligi haqida xabar beradi. Agar consumer o'lsa (kanal yopilsa, ulanish yopilsa yoki TCP ulanish uzilsa) va ack yuborilmasa, RabbitMQ xabar to'liq qayta ishlanmaganini tushunadi va uni qayta queuega qo'yadi.

Qo'lda acknowledgement rejimini yoqish uchun noAck propertysini false ga o'rnating:

TypeScript
1options: {
2  urls: ['amqp://localhost:5672'],
3  queue: 'cats_queue',
4  noAck: false,
5  queueOptions: {
6    durable: false
7  },
8},

Qo'lda consumer acknowledgementlari yoqilganda, ishchi tomonidan to'g'ri acknowledgement yuborib, vazifa tugaganini bildirishimiz kerak.

TypeScript
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
3  const channel = context.getChannelRef();
4  const originalMsg = context.getMessage();
5
6  channel.ack(originalMsg);
7}

Record builderlari

Xabar options'larini sozlash uchun RmqRecordBuilder klassidan foydalanishingiz mumkin (eslatma: bu event-based flowlar uchun ham mumkin). Masalan, headers va priority propertylarini o'rnatish uchun setOptions metodidan quyidagicha foydalaning:

TypeScript
1const message = ':cat:';
2const record = new RmqRecordBuilder(message)
3  .setOptions({
4    headers: {
5      ['x-version']: '1.0.0',
6    },
7    priority: 3,
8  })
9  .build();
10
11this.client.send('replace-emoji', record).subscribe(...);
Hint

RmqRecordBuilder klassi @nestjs/microservices paketidan eksport qilinadi.

Bu qiymatlarni server tomonida ham RmqContextga kirish orqali quyidagicha o'qishingiz mumkin:

TypeScript
1@MessagePattern('replace-emoji')
2replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
3  const { properties: { headers } } = context.getMessage();
4  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
5}

Instansiya holati yangilanishlari

Ulanish va asosiy driver instansiyasi holati haqida real vaqt yangilanishlarini olish uchun status streamiga subscribe bo'lishingiz mumkin. Bu stream tanlangan driverga xos holat yangilanishlarini beradi. RMQ driveri uchun status streami connected va disconnected eventlarini emit qiladi.

TypeScript
1this.client.status.subscribe((status: RmqStatus) => {
2  console.log(status);
3});
Hint

RmqStatus tipi @nestjs/microservices paketidan import qilinadi.

Xuddi shuningdek, serverning status streamiga subscribe bo'lib, server holati haqida xabarlarni olishingiz mumkin.

TypeScript
1const server = app.connectMicroservice<MicroserviceOptions>(...);
2server.status.subscribe((status: RmqStatus) => {
3  console.log(status);
4});

RabbitMQ eventlarini tinglash

Ba'zi holatlarda microservice tomonidan emit qilinadigan ichki eventlarni tinglashni xohlashingiz mumkin. Masalan, xato yuz berganda qo'shimcha operatsiyalarni ishga tushirish uchun error eventini tinglashingiz mumkin. Buning uchun quyida ko'rsatilgandek on() metodidan foydalaning:

TypeScript
1this.client.on('error', (err) => {
2  console.error(err);
3});

Xuddi shuningdek, serverning ichki eventlarini tinglashingiz mumkin:

TypeScript
1server.on<RmqEvents>('error', (err) => {
2  console.error(err);
3});
Hint

RmqEvents tipi @nestjs/microservices paketidan import qilinadi.

Asosiy driverga kirish

Murakkabroq use-case'larda asosiy driver instansiyasiga kirish kerak bo'lishi mumkin. Bu ulanishni qo'lda yopish yoki driverga xos metodlardan foydalanish kabi ssenariylar uchun foydali. Biroq, ko'p hollarda driverga to'g'ridan-to'g'ri kirish kerak emasligini yodda tuting.

Buning uchun unwrap() metodidan foydalanishingiz mumkin, u asosiy driver instansiyasini qaytaradi. Generic type parametri kutilayotgan driver instansiyasi turini ko'rsatishi kerak.

TypeScript
1const managerRef =
2  this.client.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();

Xuddi shuningdek, serverning asosiy driver instansiyasiga kirishingiz mumkin:

TypeScript
1const managerRef =
2  server.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();

Wildcardlar

RabbitMQ routing keylarda wildcardlardan foydalanishni qo'llab-quvvatlaydi, bu xabarlarni moslashuvchan routlashga imkon beradi. # wildcardi nol yoki undan ko'p so'zlarni, * wildcardi esa aynan bitta so'zni moslaydi.

Masalan, cats.# routing keyi cats, cats.meow va cats.meow.purrni moslaydi. cats.* routing keyi cats.meowni moslaydi, ammo cats.meow.purrni moslamaydi.

RabbitMQ microservice'ingizda wildcardlarni yoqish uchun options obyektida wildcards konfiguratsiya optionini true ga o'rnating:

TypeScript
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(
2  AppModule,
3  {
4    transport: Transport.RMQ,
5    options: {
6      urls: ['amqp://localhost:5672'],
7      queue: 'cats_queue',
8      wildcards: true,
9    },
10  },
11);

Ushbu konfiguratsiya bilan event/message'ga subscribe bo'lganda routing keylarda wildcardlardan foydalanishingiz mumkin. Masalan, cats.# routing keyiga ega xabarlarni tinglash uchun quyidagi koddan foydalaning:

TypeScript
1@MessagePattern('cats.#')
2getCats(@Payload() data: { message: string }, @Ctx() context: RmqContext) {
3  console.log(`Received message with routing key: ${context.getPattern()}`);
4
5  return {
6    message: 'Hello from the cats service!',
7  }
8}

Ma'lum routing key bilan xabar yuborish uchun ClientProxy instansiyasining send() metodidan foydalanishingiz mumkin:

TypeScript
1this.client.send('cats.meow', { message: 'Meow!' }).subscribe((response) => {
2  console.log(response);
3});