MQTT
MQTT (Message Queuing Telemetry Transport) open source, yengil messaging protokoli bo'lib, past kechikish (low latency) uchun optimallashtirilgan. Bu protokol publish/subscribe mod
MQTT (Message Queuing Telemetry Transport) open source, yengil messaging protokoli bo'lib, past kechikish (low latency) uchun optimallashtirilgan. Bu protokol publish/subscribe modelidan foydalanib qurilmalarni ulashning masshtablanuvchi va tejamkor usulini beradi. MQTT asosida qurilgan aloqa tizimi publish qiluvchi server, broker va bir yoki bir nechta clientdan iborat. U resurslari cheklangan qurilmalar hamda past-bandwidth, yuqori kechikishli yoki ishonchsiz tarmoqlar uchun mo'ljallangan.
O'rnatish
MQTT asosidagi microservice'larni qurishni boshlash uchun avval kerakli paketni o'rnating:
1$ npm i --save mqttUmumiy ko'rinish
MQTT transportyoridan foydalanish uchun createMicroservice() metodiga quyidagi options obyektini uzating:
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2 transport: Transport.MQTT,
3 options: {
4 url: 'mqtt://localhost:1883',
5 },
6});Transport enum'i @nestjs/microservices paketidan import qilinadi.
Opsiyalar
options obyekti tanlangan transportyorga xos. MQTT transportyori here da tasvirlangan propertylarni ochadi.
Mijoz
Boshqa microservice transportyorlari kabi, MQTT ClientProxy instansiyasini yaratish uchun bir nechta variant bor.
Instansiya yaratishning bir usuli - ClientsModuledan foydalanish. ClientsModule orqali client instansiyasini yaratish uchun uni import qiling va register() metodiga createMicroservice() metodida yuqorida ko'rsatilgan xuddi shu propertylarga ega options obyektini, shuningdek injection token sifatida ishlatiladigan name propertysini uzating. ClientsModule haqida batafsil hereda o'qing.
1@Module({
2 imports: [
3 ClientsModule.register([
4 {
5 name: 'MATH_SERVICE',
6 transport: Transport.MQTT,
7 options: {
8 url: 'mqtt://localhost:1883',
9 }
10 },
11 ]),
12 ]
13 ...
14})Client yaratishning boshqa usullari (ClientProxyFactory yoki @Client()) ham qo'llanishi mumkin. Ular haqida hereda o'qishingiz mumkin.
Kontekst
Murakkabroq ssenariylarda kiruvchi so'rov haqida qo'shimcha ma'lumotlarga kirish kerak bo'lishi mumkin. MQTT transportyoridan foydalanganda MqttContext obyektiga kirishingiz mumkin.
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
3 console.log(`Topic: ${context.getTopic()}`);
4}@Payload(), @Ctx() va MqttContext@nestjs/microservices paketidan import qilinadi.
Original mqtt packet obyektiga kirish uchun MqttContext obyektining getPacket() metodidan quyidagicha foydalaning:
1@MessagePattern('notifications')
2getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
3 console.log(context.getPacket());
4}Wildcardlar
Obuna aniq topicga bo'lishi mumkin yoki wildcardlarni o'z ichiga olishi mumkin. Ikkita wildcard mavjud: + va #. + - bir darajali wildcard, # esa ko'p darajali wildcard bo'lib, ko'plab topic darajalarini qamrab oladi.
1@MessagePattern('sensors/+/temperature/+')
2getTemperature(@Ctx() context: MqttContext) {
3 console.log(`Topic: ${context.getTopic()}`);
4}Xizmat sifati (QoS)
@MessagePattern yoki @EventPattern dekoratorlari bilan yaratilgan har qanday obuna QoS 0 bilan subscribe qiladi. Agar yuqori QoS kerak bo'lsa, ulanishni o'rnatishda subscribeOptions blokidan foydalanib global tarzda quyidagicha o'rnatish mumkin:
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2 transport: Transport.MQTT,
3 options: {
4 url: 'mqtt://localhost:1883',
5 subscribeOptions: {
6 qos: 2
7 },
8 },
9});Agar topicga xos QoS talab qilinsa, Custom transporter yaratishni ko'rib chiqing.
Record builderlari
Xabar options'larini sozlash (QoS darajasini o'zgartirish, Retain yoki DUP flaglarini o'rnatish yoki payloadga qo'shimcha propertylar qo'shish) uchun MqttRecordBuilder klassidan foydalanishingiz mumkin. Masalan, QoSni 2 ga o'rnatish uchun setQoS metodidan quyidagicha foydalaning:
1const userProperties = { 'x-version': '1.0.0' };
2const record = new MqttRecordBuilder(':cat:')
3 .setProperties({ userProperties })
4 .setQoS(1)
5 .build();
6client.send('replace-emoji', record).subscribe(...);MqttRecordBuilder klassi @nestjs/microservices paketidan eksport qilinadi.
Bu options'larni server tomonida ham MqttContextga kirish orqali o'qishingiz mumkin.
1@MessagePattern('replace-emoji')
2replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
3 const { properties: { userProperties } } = context.getPacket();
4 return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
5}Ba'zi holatlarda bir nechta so'rovlar uchun user propertiesni sozlashni xohlashingiz mumkin, bu holda ushbu options'larni ClientProxyFactoryga uzatishingiz mumkin.
1import { Module } from '@nestjs/common';
2import { ClientProxyFactory, Transport } from '@nestjs/microservices';
3
4@Module({
5 providers: [
6 {
7 provide: 'API_v1',
8 useFactory: () =>
9 ClientProxyFactory.create({
10 transport: Transport.MQTT,
11 options: {
12 url: 'mqtt://localhost:1833',
13 userProperties: { 'x-version': '1.0.0' },
14 },
15 }),
16 },
17 ],
18})
19export class ApiModule {}Instansiya holati yangilanishlari
Ulanish va asosiy driver instansiyasi holati haqida real vaqt yangilanishlarini olish uchun status streamiga subscribe bo'lishingiz mumkin. Bu stream tanlangan driverga xos holat yangilanishlarini beradi. MQTT driveri uchun status streami connected, disconnected, reconnecting va closed eventlarini emit qiladi.
1this.client.status.subscribe((status: MqttStatus) => {
2 console.log(status);
3});MqttStatus tipi @nestjs/microservices paketidan import qilinadi.
Xuddi shuningdek, serverning status streamiga subscribe bo'lib, server holati haqida xabarlarni olishingiz mumkin.
1const server = app.connectMicroservice<MicroserviceOptions>(...);
2server.status.subscribe((status: MqttStatus) => {
3 console.log(status);
4});MQTT eventlarini tinglash
Ba'zi holatlarda microservice tomonidan emit qilinadigan ichki eventlarni tinglashni xohlashingiz mumkin. Masalan, xato yuz berganda qo'shimcha operatsiyalarni ishga tushirish uchun error eventini tinglashingiz mumkin. Buning uchun quyida ko'rsatilgandek on() metodidan foydalaning:
1this.client.on('error', (err) => {
2 console.error(err);
3});Xuddi shuningdek, serverning ichki eventlarini tinglashingiz mumkin:
1server.on<MqttEvents>('error', (err) => {
2 console.error(err);
3});MqttEvents tipi @nestjs/microservices paketidan import qilinadi.
Asosiy driverga kirish
Murakkabroq use-case'larda asosiy driver instansiyasiga kirish kerak bo'lishi mumkin. Bu ulanishni qo'lda yopish yoki driverga xos metodlardan foydalanish kabi ssenariylar uchun foydali. Biroq, ko'p hollarda driverga to'g'ridan-to'g'ri kirish kerak emasligini yodda tuting.
Buning uchun unwrap() metodidan foydalanishingiz mumkin, u asosiy driver instansiyasini qaytaradi. Generic type parametri kutilayotgan driver instansiyasi turini ko'rsatishi kerak.
1const mqttClient = this.client.unwrap<import('mqtt').MqttClient>();Xuddi shuningdek, serverning asosiy driver instansiyasiga kirishingiz mumkin:
1const mqttClient = server.unwrap<import('mqtt').MqttClient>();