Mikroxizmatlar12 min read

Kafka

Kafka open source, taqsimlangan streaming platforma bo'lib, uchta asosiy imkoniyatga ega:

Kafka open source, taqsimlangan streaming platforma bo'lib, uchta asosiy imkoniyatga ega:

  • Yozuvlar oqimini publish va subscribe qilish, message queue yoki enterprise messaging tizimiga o'xshash.
  • Yozuvlar oqimini xatolarga chidamli, bardoshli tarzda saqlash.
  • Yozuvlar oqimini paydo bo'lishi bilan qayta ishlash.

Kafka loyihasi real vaqt ma'lumot oqimlarini boshqarish uchun yagona, yuqori o'tkazuvchanlik va past kechikishli platformani taqdim etishni maqsad qiladi. U real vaqt streaming data tahlili uchun Apache Storm va Spark bilan juda yaxshi integratsiyalanadi.

O'rnatish

Kafka asosidagi microservice'larni qurishni boshlash uchun avval kerakli paketni o'rnating:

Terminal
1$ npm i --save kafkajs

Umumiy ko'rinish

Boshqa Nest microservice transport qatlam implementatsiyalari kabi, Kafka transportyer mexanizmini createMicroservice() metodiga uzatiladigan options obyektining transport propertysi orqali, ixtiyoriy options propertysi bilan birga, quyida ko'rsatilgandek tanlaysiz:

TypeScript
main
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2  transport: Transport.KAFKA,
3  options: {
4    client: {
5      brokers: ['localhost:9092'],
6    }
7  }
8});
Hint

Transport enum'i @nestjs/microservices paketidan import qilinadi.

Opsiyalar

options propertysi tanlangan transportyorga xos. Kafka transportyori quyida tasvirlangan propertylarni ochadi.

clientClient konfiguratsiya options'lari (batafsil here)
consumerConsumer konfiguratsiya options'lari (batafsil here)
runRun konfiguratsiya options'lari (batafsil here)
subscribeSubscribe konfiguratsiya options'lari (batafsil here)
producerProducer konfiguratsiya options'lari (batafsil here)
sendSend konfiguratsiya options'lari (batafsil here)
producerOnlyModeConsumer group ro'yxatdan o'tishini o'tkazib yuborib, faqat producer sifatida ishlash uchun feature flag (boolean)
postfixIdclientId qiymatining suffiksini o'zgartirish (string)

Mijoz

Kafka boshqa microservice transportyorlariga qaraganda kichik farqqa ega. ClientProxy klassi o'rniga ClientKafkaProxy klassidan foydalanamiz.

Boshqa microservice transportyorlari kabi, ClientKafkaProxy instansiyasini yaratish uchun bir nechta variant bor.

Instansiya yaratishning bir usuli - ClientsModuledan foydalanish. ClientsModule orqali client instansiyasini yaratish uchun uni import qiling va register() metodiga createMicroservice() metodida yuqorida ko'rsatilgan xuddi shu propertylarga ega options obyektini, shuningdek injection token sifatida ishlatiladigan name propertysini uzating. ClientsModule haqida batafsil hereda o'qing.

TypeScript
1@Module({
2  imports: [
3    ClientsModule.register([
4      {
5        name: 'HERO_SERVICE',
6        transport: Transport.KAFKA,
7        options: {
8          client: {
9            clientId: 'hero',
10            brokers: ['localhost:9092'],
11          },
12          consumer: {
13            groupId: 'hero-consumer'
14          }
15        }
16      },
17    ]),
18  ]
19  ...
20})

Client yaratishning boshqa usullari (ClientProxyFactory yoki @Client()) ham qo'llanishi mumkin. Ular haqida hereda o'qishingiz mumkin.

@Client() dekoratoridan quyidagicha foydalaning:

TypeScript
1@Client({
2  transport: Transport.KAFKA,
3  options: {
4    client: {
5      clientId: 'hero',
6      brokers: ['localhost:9092'],
7    },
8    consumer: {
9      groupId: 'hero-consumer'
10    }
11  }
12})
13client: ClientKafkaProxy;

Xabar patterni

Kafka microservice message patterni so'rov va javob kanallari uchun ikki topicdan foydalanadi. ClientKafkaProxy.send() metodi return address bilan xabarlarni, correlation id, reply topic va reply partitionni so'rov xabariga bog'lab yuboradi. Bu xabar yuborishdan oldin ClientKafkaProxy instansiyasi reply topicga subscribe qilingan va kamida bitta partitionga biriktirilgan bo'lishini talab qiladi.

Keyin, ishlayotgan har bir Nest ilovasi uchun kamida bitta reply topic partition bo'lishi kerak. Masalan, agar siz 4 ta Nest ilovasini ishlatayotgan bo'lsangiz, lekin reply topicda faqat 3 ta partition bo'lsa, Nest ilovalaridan 1 tasi xabar yuborishga uringanda xatoga uchraydi.

Yangi ClientKafkaProxy instansiyalari ishga tushganda ular consumer groupga qo'shiladi va tegishli topiclarga subscribe qiladi. Bu jarayon consumer group iste'molchilariga biriktirilgan topic partitionlarining qayta balanslanishini keltirib chiqaradi.

Odatda topic partitionlari round robin partitioner yordamida biriktiriladi, u topic partitionlarini application ishga tushganda tasodifiy o'rnatiladigan consumer nomlari bo'yicha saralangan consumerlar kolleksiyasiga taqsimlaydi. Biroq, yangi consumer consumer groupga qo'shilganda, u consumerlar kolleksiyasining istalgan joyida joylashishi mumkin. Bu, yangi consumer mavjud consumerdan oldin joylashganda, mavjud consumerlarga boshqa partitionlar berilishiga olib keladi. Natijada, partitioni o'zgargan consumerlar rebalance'dan oldin yuborilgan so'rovlarning response xabarlarini yo'qotadi.

ClientKafkaProxy consumerlari response xabarlarini yo'qotmasligi uchun Nest'ga xos built-in custom partitioner ishlatiladi. Bu custom partitioner partitionlarni application ishga tushganda o'rnatilgan yuqori aniqlikdagi timestamp (process.hrtime()) bo'yicha saralangan consumerlar kolleksiyasiga taqsimlaydi.

Javobga obuna bo'lish

Note

Bu bo'lim faqat request-response message uslubidan foydalansangiz ( @MessagePattern dekoratori va ClientKafkaProxy.send metodi bilan) tegishli. event-based muloqot (@EventPattern dekoratori va ClientKafkaProxy.emit metodi) uchun response topicga subscribe bo'lish shart emas.

ClientKafkaProxy klassi subscribeToResponseOf() metodini taqdim etadi. subscribeToResponseOf() metodi so'rov topic nomini argument sifatida oladi va hosil qilingan reply topic nomini reply topiclar kolleksiyasiga qo'shadi. Bu metod message patternni implement qilishda talab etiladi.

TypeScript
heroes.controller
1onModuleInit() {
2  this.client.subscribeToResponseOf('hero.kill.dragon');
3}

Agar ClientKafkaProxy instansiyasi asinxron yaratilsa, subscribeToResponseOf() metodi connect() metodini chaqirishdan oldin chaqirilishi kerak.

TypeScript
heroes.controller
1async onModuleInit() {
2  this.client.subscribeToResponseOf('hero.kill.dragon');
3  await this.client.connect();
4}

Kiruvchi

Nest kiruvchi Kafka xabarlarini key, value va headers propertylariga ega obyekt sifatida qabul qiladi; ularning qiymatlari Buffer tipida bo'ladi. So'ng Nest bu qiymatlarni bufferlarni stringga aylantirish orqali parse qiladi. Agar string "object like" bo'lsa, Nest uni JSON sifatida parse qilishga urinadi. Shundan so'ng value o'ziga mos handlerga uzatiladi.

Chiquvchi

Nest outgoing Kafka xabarlarini event publish qilish yoki xabar yuborishda serializatsiya jarayonidan so'ng yuboradi. Bu ClientKafkaProxyning emit() va send() metodlariga uzatilgan argumentlarda yoki @MessagePattern metodidan qaytgan qiymatlarda sodir bo'ladi. Bu serializatsiya string bo'lmagan yoki buffer bo'lmagan obyektlarni JSON.stringify() yoki toString() prototip metodi yordamida "stringify" qiladi.

TypeScript
heroes.controller
1@Controller()
2export class HeroesController {
3  @MessagePattern('hero.kill.dragon')
4  killDragon(@Payload() message: KillDragonMessage): any {
5    const dragonId = message.dragonId;
6    const items = [
7      { id: 1, name: 'Mythical Sword' },
8      { id: 2, name: 'Key to Dungeon' },
9    ];
10    return items;
11  }
12}
Hint

@Payload()@nestjs/microservices paketidan import qilinadi.

Outgoing xabarlar key va value propertylariga ega obyekt uzatish orqali key'lanishi ham mumkin. Xabarlarni key'lash co-partitioning requirement talabini bajarish uchun muhim.

TypeScript
heroes.controller
1@Controller()
2export class HeroesController {
3  @MessagePattern('hero.kill.dragon')
4  killDragon(@Payload() message: KillDragonMessage): any {
5    const realm = 'Nest';
6    const heroId = message.heroId;
7    const dragonId = message.dragonId;
8
9    const items = [
10      { id: 1, name: 'Mythical Sword' },
11      { id: 2, name: 'Key to Dungeon' },
12    ];
13
14    return {
15      headers: {
16        realm
17      },
18      key: heroId,
19      value: items
20    }
21  }
22}

Bundan tashqari, bu formatda uzatilgan xabarlar headers hash propertysida o'rnatilgan custom headerlarni ham o'z ichiga olishi mumkin. Header hash property qiymatlari string yoki Buffer tipida bo'lishi kerak.

TypeScript
heroes.controller
1@Controller()
2export class HeroesController {
3  @MessagePattern('hero.kill.dragon')
4  killDragon(@Payload() message: KillDragonMessage): any {
5    const realm = 'Nest';
6    const heroId = message.heroId;
7    const dragonId = message.dragonId;
8
9    const items = [
10      { id: 1, name: 'Mythical Sword' },
11      { id: 2, name: 'Key to Dungeon' },
12    ];
13
14    return {
15      headers: {
16        kafka_nestRealm: realm
17      },
18      key: heroId,
19      value: items
20    }
21  }
22}

Eventga asoslangan

Request-response usuli servislar o'rtasida xabar almashish uchun ideal bo'lsa-da, xabar uslubi event-based bo'lganda (bu Kafka uchun ideal), ya'ni javob kutmasdan event publish qilmoqchi bo'lsangiz, u kamroq mos. Bunday holatda request-response uchun ikki topicni ushlab turish ortiqcha bo'ladi.

Batafsil ma'lumot uchun ushbu ikki bo'limga qarang: Umumiy ko'rinish: Eventga asoslangan va Umumiy ko'rinish: Eventlarni publish qilish.

Kontekst

Murakkabroq ssenariylarda kiruvchi so'rov haqida qo'shimcha ma'lumotlarga kirish kerak bo'lishi mumkin. Kafka transportyoridan foydalanganda KafkaContext obyektiga kirishingiz mumkin.

TypeScript
1@MessagePattern('hero.kill.dragon')
2killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
3  console.log(`Topic: ${context.getTopic()}`);
4}
Hint

@Payload(), @Ctx() va KafkaContext@nestjs/microservices paketidan import qilinadi.

Original Kafka IncomingMessage obyektiga kirish uchun KafkaContext obyektining getMessage() metodidan quyidagicha foydalaning:

TypeScript
1@MessagePattern('hero.kill.dragon')
2killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
3  const originalMessage = context.getMessage();
4  const partition = context.getPartition();
5  const { headers, timestamp } = originalMessage;
6}

Bu yerda IncomingMessage quyidagi interfeysga mos keladi:

TypeScript
1interface IncomingMessage {
2  topic: string;
3  partition: number;
4  timestamp: string;
5  size: number;
6  attributes: number;
7  offset: string;
8  key: any;
9  value: any;
10  headers: Record<string, any>;
11}

Agar handleringiz har bir qabul qilingan xabar uchun sekin ishlov berishni talab qilsa, heartbeat callbackdan foydalanishni ko'rib chiqing. heartbeat funksiyasini olish uchun KafkaContextning getHeartbeat() metodidan quyidagicha foydalaning:

TypeScript
1@MessagePattern('hero.kill.dragon')
2async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
3  const heartbeat = context.getHeartbeat();
4
5  // Do some slow processing
6  await doWorkPart1();
7
8  // Send heartbeat to not exceed the sessionTimeout
9  await heartbeat();
10
11  // Do some slow processing again
12  await doWorkPart2();
13}

Nomlash konventsiyalari

Kafka microservice komponentlari Nest microservice client va server komponentlari o'rtasida kolliziyalarni oldini olish uchun client.clientId va consumer.groupId opsiyalariga o'z rollarining tavsifini qo'shib boradi. Standart bo'yicha ClientKafkaProxy komponentlari -client, ServerKafka komponentlari esa -server suffiksini har ikki optionga ham qo'shadi. Quyida berilgan qiymatlar qanday o'zgartirilganini ko'ring (izohlarda ko'rsatilgan).

TypeScript
main
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2  transport: Transport.KAFKA,
3  options: {
4    client: {
5      clientId: 'hero', // hero-server
6      brokers: ['localhost:9092'],
7    },
8    consumer: {
9      groupId: 'hero-consumer' // hero-consumer-server
10    },
11  }
12});

Va client uchun:

TypeScript
heroes.controller
1@Client({
2  transport: Transport.KAFKA,
3  options: {
4    client: {
5      clientId: 'hero', // hero-client
6      brokers: ['localhost:9092'],
7    },
8    consumer: {
9      groupId: 'hero-consumer' // hero-consumer-client
10    }
11  }
12})
13client: ClientKafkaProxy;
Hint

Kafka client va consumer naming conventions'ni o'zingizning custom provideringizda ClientKafkaProxy va KafkaServerni kengaytirib, konstruktorni override qilish orqali sozlash mumkin.

Kafka microservice message patterni so'rov va javob kanallari uchun ikki topicdan foydalanar ekan, reply pattern so'rov topicidan hosil qilinishi kerak. Standart bo'yicha reply topic nomi so'rov topic nomiga .reply qo'shilgan ko'rinish bo'ladi.

TypeScript
heroes.controller
1onModuleInit() {
2  this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
3}
Hint

Kafka reply topic naming conventions'ni o'zingizning custom provideringizda ClientKafkaProxyni kengaytirib, getResponsePatternName metodini override qilish orqali sozlash mumkin.

Qayta uriniladigan exceptionlar

Boshqa transportyorlar kabi, barcha qayta ishlanmagan exceptionlar avtomatik tarzda RpcExceptionga o'raladi va "user-friendly" formatga aylantiriladi. Biroq, ba'zi chekka holatlarda bu mexanizmdan chetlab o'tib, exceptionlarni kafkajs driveri tomonidan iste'mol qilinishini xohlashingiz mumkin. Xabarni qayta ishlash paytida exception tashlash kafkajsga uni retry qilishni (qayta yetkazishni) buyuradi, ya'ni message (yoki event) handler ishga tushgan bo'lsa ham, offset Kafka'ga commit qilinmaydi.

Warning

Event handlerlar (event-based muloqot) uchun barcha qayta ishlanmagan exceptionlar default bo'yicha retriable exceptions hisoblanadi.

Buning uchun KafkaRetriableException deb nomlangan maxsus klassdan quyidagicha foydalanishingiz mumkin:

TypeScript
1throw new KafkaRetriableException('...');
Hint

KafkaRetriableException klassi @nestjs/microservices paketidan eksport qilinadi.

Custom exceptionlarni boshqarish

Standart error handling mexanizmlariga qo'shimcha ravishda, Kafka eventlari uchun retry logikasini boshqarish maqsadida custom Exception Filter yaratishingiz mumkin. Masalan, quyidagi misolda muammoli eventni ma'lum miqdordagi retrylardan so'ng o'tkazib yuborish ko'rsatilgan:

TypeScript
1import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
2import { BaseExceptionFilter } from '@nestjs/core';
3import { KafkaContext } from '../ctx-host';
4
5@Catch()
6export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
7  private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);
8
9  constructor(
10    private readonly maxRetries: number,
11    // Optional custom function executed when max retries are exceeded
12    private readonly skipHandler?: (message: any) => Promise<void>,
13  ) {
14    super();
15  }
16
17  async catch(exception: unknown, host: ArgumentsHost) {
18    const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
19    const message = kafkaContext.getMessage();
20    const currentRetryCount = this.getRetryCountFromContext(kafkaContext);
21
22    if (currentRetryCount >= this.maxRetries) {
23      this.logger.warn(
24        `Max retries (${
25          this.maxRetries
26        }) exceeded for message: ${JSON.stringify(message)}`,
27      );
28
29      if (this.skipHandler) {
30        try {
31          await this.skipHandler(message);
32        } catch (err) {
33          this.logger.error('Error in skipHandler:', err);
34        }
35      }
36
37      try {
38        await this.commitOffset(kafkaContext);
39      } catch (commitError) {
40        this.logger.error('Failed to commit offset:', commitError);
41      }
42      return; // Stop propagating the exception
43    }
44
45    // If retry count is below the maximum, proceed with the default Exception Filter logic
46    super.catch(exception, host);
47  }
48
49  private getRetryCountFromContext(context: KafkaContext): number {
50    const headers = context.getMessage().headers || {};
51    const retryHeader = headers['retryCount'] || headers['retry-count'];
52    return retryHeader ? Number(retryHeader) : 0;
53  }
54
55  private async commitOffset(context: KafkaContext): Promise<void> {
56    const consumer = context.getConsumer && context.getConsumer();
57    if (!consumer) {
58      throw new Error('Consumer instance is not available from KafkaContext.');
59    }
60
61    const topic = context.getTopic && context.getTopic();
62    const partition = context.getPartition && context.getPartition();
63    const message = context.getMessage();
64    const offset = message.offset;
65
66    if (!topic || partition === undefined || offset === undefined) {
67      throw new Error(
68        'Incomplete Kafka message context for committing offset.',
69      );
70    }
71
72    await consumer.commitOffsets([
73      {
74        topic,
75        partition,
76        // When committing an offset, commit the next number (i.e., current offset + 1)
77        offset: (Number(offset) + 1).toString(),
78      },
79    ]);
80  }
81}

Bu filter Kafka eventini configurable miqdorda qayta ishlashga urinish imkonini beradi. Maksimal retrylar soniga yetilgach, u custom skipHandlerni (agar berilgan bo'lsa) ishga tushiradi va offsetni commit qiladi, ya'ni muammoli eventni o'tkazib yuboradi. Bu keyingi eventlarning to'xtamasdan qayta ishlanishiga imkon beradi.

Ushbu filterni event handlerlaringizga qo'shish orqali integratsiya qilishingiz mumkin:

TypeScript
1@UseFilters(new KafkaMaxRetryExceptionFilter(5))
2export class MyEventHandler {
3  @EventPattern('your-topic')
4  async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
5    // Your event processing logic...
6  }
7}

Offsetlarni commit qilish

Kafka bilan ishlaganda offsetlarni commit qilish muhim. Standart bo'yicha xabarlar ma'lum vaqt o'tgach avtomatik commit qilinadi. Batafsil ma'lumot uchun KafkaJS docsga tashrif buyuring. KafkaContext manual commit qilish uchun faol consumerga kirish imkonini beradi. Consumer KafkaJS consumer bo'lib, native KafkaJS implementation kabi ishlaydi.

TypeScript
1@EventPattern('user.created')
2async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
3  // business logic
4
5  const { offset } = context.getMessage();
6  const partition = context.getPartition();
7  const topic = context.getTopic();
8  const consumer = context.getConsumer();
9  await consumer.commitOffsets([{ topic, partition, offset }])
10}

Xabarlarni auto-commit qilishni o'chirish uchun run konfiguratsiyasida autoCommit: false ni quyidagicha o'rnating:

TypeScript
main
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2  transport: Transport.KAFKA,
3  options: {
4    client: {
5      brokers: ['localhost:9092'],
6    },
7    run: {
8      autoCommit: false
9    }
10  }
11});

Instansiya holati yangilanishlari

Ulanish va asosiy driver instansiyasi holati haqida real vaqt yangilanishlarini olish uchun status streamiga subscribe bo'lishingiz mumkin. Bu stream tanlangan driverga xos holat yangilanishlarini beradi. Kafka driveri uchun status streami connected, disconnected, rebalancing, crashed va stopped eventlarini emit qiladi.

TypeScript
1this.client.status.subscribe((status: KafkaStatus) => {
2  console.log(status);
3});
Hint

KafkaStatus tipi @nestjs/microservices paketidan import qilinadi.

Xuddi shuningdek, serverning status streamiga subscribe bo'lib, server holati haqida xabarlarni olishingiz mumkin.

TypeScript
1const server = app.connectMicroservice<MicroserviceOptions>(...);
2server.status.subscribe((status: KafkaStatus) => {
3  console.log(status);
4});

Asosiy producer va consumer

Murakkabroq use-case'larda asosiy producer va consumer instansiyalariga kirish kerak bo'lishi mumkin. Bu ulanishni qo'lda yopish yoki driverga xos metodlardan foydalanish kabi ssenariylar uchun foydali. Biroq, ko'p hollarda driverga to'g'ridan-to'g'ri kirish kerak emasligini yodda tuting.

Buning uchun ClientKafkaProxy instansiyasi taqdim etadigan producer va consumer getterlaridan foydalanishingiz mumkin.

TypeScript
1const producer = this.client.producer;
2const consumer = this.client.consumer;