Mikroxizmatlar5 min read

MQTT

MQTT (Message Queuing Telemetry Transport) open source, yengil messaging protokoli bo'lib, past kechikish (low latency) uchun optimallashtirilgan. Bu protokol publish/subscribe mod

MQTT (Message Queuing Telemetry Transport) open source, yengil messaging protokoli bo'lib, past kechikish (low latency) uchun optimallashtirilgan. Bu protokol publish/subscribe modelidan foydalanib qurilmalarni ulashning masshtablanuvchi va tejamkor usulini beradi. MQTT asosida qurilgan aloqa tizimi publish qiluvchi server, broker va bir yoki bir nechta clientdan iborat. U resurslari cheklangan qurilmalar hamda past-bandwidth, yuqori kechikishli yoki ishonchsiz tarmoqlar uchun mo'ljallangan.

O'rnatish

MQTT asosidagi microservice'larni qurishni boshlash uchun avval kerakli paketni o'rnating:

Terminal
1$ npm i --save mqtt

Umumiy ko'rinish

MQTT transportyoridan foydalanish uchun createMicroservice() metodiga quyidagi options obyektini uzating:

TypeScript
main
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2  transport: Transport.MQTT,
3  options: {
4    url: 'mqtt://localhost:1883',
5  },
6});
Hint

Transport enum'i @nestjs/microservices paketidan import qilinadi.

Opsiyalar

options obyekti tanlangan transportyorga xos. MQTT transportyori here da tasvirlangan propertylarni ochadi.

Mijoz

Boshqa microservice transportyorlari kabi, MQTT ClientProxy instansiyasini yaratish uchun bir nechta variant bor.

Instansiya yaratishning bir usuli - ClientsModuledan foydalanish. ClientsModule orqali client instansiyasini yaratish uchun uni import qiling va register() metodiga createMicroservice() metodida yuqorida ko'rsatilgan xuddi shu propertylarga ega options obyektini, shuningdek injection token sifatida ishlatiladigan name propertysini uzating. ClientsModule haqida batafsil hereda o'qing.

TypeScript
1@Module({
2  imports: [
3    ClientsModule.register([
4      {
5        name: 'MATH_SERVICE',
6        transport: Transport.MQTT,
7        options: {
8          url: 'mqtt://localhost:1883',
9        }
10      },
11    ]),
12  ]
13  ...
14})

Client yaratishning boshqa usullari (ClientProxyFactory yoki @Client()) ham qo'llanishi mumkin. Ular haqida hereda o'qishingiz mumkin.

Kontekst

Murakkabroq ssenariylarda kiruvchi so'rov haqida qo'shimcha ma'lumotlarga kirish kerak bo'lishi mumkin. MQTT transportyoridan foydalanganda MqttContext obyektiga kirishingiz mumkin.

TypeScript
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
3  console.log(`Topic: ${context.getTopic()}`);
4}
Hint

@Payload(), @Ctx() va MqttContext@nestjs/microservices paketidan import qilinadi.

Original mqtt packet obyektiga kirish uchun MqttContext obyektining getPacket() metodidan quyidagicha foydalaning:

TypeScript
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
3  console.log(context.getPacket());
4}

Wildcardlar

Obuna aniq topicga bo'lishi mumkin yoki wildcardlarni o'z ichiga olishi mumkin. Ikkita wildcard mavjud: + va #. + - bir darajali wildcard, # esa ko'p darajali wildcard bo'lib, ko'plab topic darajalarini qamrab oladi.

TypeScript
1@MessagePattern('sensors/+/temperature/+')
2getTemperature(@Ctx() context: MqttContext) {
3  console.log(`Topic: ${context.getTopic()}`);
4}

Xizmat sifati (QoS)

@MessagePattern yoki @EventPattern dekoratorlari bilan yaratilgan har qanday obuna QoS 0 bilan subscribe qiladi. Agar yuqori QoS kerak bo'lsa, ulanishni o'rnatishda subscribeOptions blokidan foydalanib global tarzda quyidagicha o'rnatish mumkin:

TypeScript
main
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2  transport: Transport.MQTT,
3  options: {
4    url: 'mqtt://localhost:1883',
5    subscribeOptions: {
6      qos: 2
7    },
8  },
9});

Agar topicga xos QoS talab qilinsa, Custom transporter yaratishni ko'rib chiqing.

Record builderlari

Xabar options'larini sozlash (QoS darajasini o'zgartirish, Retain yoki DUP flaglarini o'rnatish yoki payloadga qo'shimcha propertylar qo'shish) uchun MqttRecordBuilder klassidan foydalanishingiz mumkin. Masalan, QoSni 2 ga o'rnatish uchun setQoS metodidan quyidagicha foydalaning:

TypeScript
1const userProperties = { 'x-version': '1.0.0' };
2const record = new MqttRecordBuilder(':cat:')
3  .setProperties({ userProperties })
4  .setQoS(1)
5  .build();
6client.send('replace-emoji', record).subscribe(...);
Hint

MqttRecordBuilder klassi @nestjs/microservices paketidan eksport qilinadi.

Bu options'larni server tomonida ham MqttContextga kirish orqali o'qishingiz mumkin.

TypeScript
1@MessagePattern('replace-emoji')
2replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
3  const { properties: { userProperties } } = context.getPacket();
4  return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
5}

Ba'zi holatlarda bir nechta so'rovlar uchun user propertiesni sozlashni xohlashingiz mumkin, bu holda ushbu options'larni ClientProxyFactoryga uzatishingiz mumkin.

TypeScript
1import { Module } from '@nestjs/common';
2import { ClientProxyFactory, Transport } from '@nestjs/microservices';
3
4@Module({
5  providers: [
6    {
7      provide: 'API_v1',
8      useFactory: () =>
9        ClientProxyFactory.create({
10          transport: Transport.MQTT,
11          options: {
12            url: 'mqtt://localhost:1833',
13            userProperties: { 'x-version': '1.0.0' },
14          },
15        }),
16    },
17  ],
18})
19export class ApiModule {}

Instansiya holati yangilanishlari

Ulanish va asosiy driver instansiyasi holati haqida real vaqt yangilanishlarini olish uchun status streamiga subscribe bo'lishingiz mumkin. Bu stream tanlangan driverga xos holat yangilanishlarini beradi. MQTT driveri uchun status streami connected, disconnected, reconnecting va closed eventlarini emit qiladi.

TypeScript
1this.client.status.subscribe((status: MqttStatus) => {
2  console.log(status);
3});
Hint

MqttStatus tipi @nestjs/microservices paketidan import qilinadi.

Xuddi shuningdek, serverning status streamiga subscribe bo'lib, server holati haqida xabarlarni olishingiz mumkin.

TypeScript
1const server = app.connectMicroservice<MicroserviceOptions>(...);
2server.status.subscribe((status: MqttStatus) => {
3  console.log(status);
4});

MQTT eventlarini tinglash

Ba'zi holatlarda microservice tomonidan emit qilinadigan ichki eventlarni tinglashni xohlashingiz mumkin. Masalan, xato yuz berganda qo'shimcha operatsiyalarni ishga tushirish uchun error eventini tinglashingiz mumkin. Buning uchun quyida ko'rsatilgandek on() metodidan foydalaning:

TypeScript
1this.client.on('error', (err) => {
2  console.error(err);
3});

Xuddi shuningdek, serverning ichki eventlarini tinglashingiz mumkin:

TypeScript
1server.on<MqttEvents>('error', (err) => {
2  console.error(err);
3});
Hint

MqttEvents tipi @nestjs/microservices paketidan import qilinadi.

Asosiy driverga kirish

Murakkabroq use-case'larda asosiy driver instansiyasiga kirish kerak bo'lishi mumkin. Bu ulanishni qo'lda yopish yoki driverga xos metodlardan foydalanish kabi ssenariylar uchun foydali. Biroq, ko'p hollarda driverga to'g'ridan-to'g'ri kirish kerak emasligini yodda tuting.

Buning uchun unwrap() metodidan foydalanishingiz mumkin, u asosiy driver instansiyasini qaytaradi. Generic type parametri kutilayotgan driver instansiyasi turini ko'rsatishi kerak.

TypeScript
1const mqttClient = this.client.unwrap<import('mqtt').MqttClient>();

Xuddi shuningdek, serverning asosiy driver instansiyasiga kirishingiz mumkin:

TypeScript
1const mqttClient = server.unwrap<import('mqtt').MqttClient>();