Kafka
Kafka open source, taqsimlangan streaming platforma bo'lib, uchta asosiy imkoniyatga ega:
Kafka open source, taqsimlangan streaming platforma bo'lib, uchta asosiy imkoniyatga ega:
- Yozuvlar oqimini publish va subscribe qilish, message queue yoki enterprise messaging tizimiga o'xshash.
- Yozuvlar oqimini xatolarga chidamli, bardoshli tarzda saqlash.
- Yozuvlar oqimini paydo bo'lishi bilan qayta ishlash.
Kafka loyihasi real vaqt ma'lumot oqimlarini boshqarish uchun yagona, yuqori o'tkazuvchanlik va past kechikishli platformani taqdim etishni maqsad qiladi. U real vaqt streaming data tahlili uchun Apache Storm va Spark bilan juda yaxshi integratsiyalanadi.
O'rnatish
Kafka asosidagi microservice'larni qurishni boshlash uchun avval kerakli paketni o'rnating:
1$ npm i --save kafkajsUmumiy ko'rinish
Boshqa Nest microservice transport qatlam implementatsiyalari kabi, Kafka transportyer mexanizmini createMicroservice() metodiga uzatiladigan options obyektining transport propertysi orqali, ixtiyoriy options propertysi bilan birga, quyida ko'rsatilgandek tanlaysiz:
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2 transport: Transport.KAFKA,
3 options: {
4 client: {
5 brokers: ['localhost:9092'],
6 }
7 }
8});Transport enum'i @nestjs/microservices paketidan import qilinadi.
Opsiyalar
options propertysi tanlangan transportyorga xos. Kafka transportyori quyida tasvirlangan propertylarni ochadi.
client | Client konfiguratsiya options'lari (batafsil here) |
consumer | Consumer konfiguratsiya options'lari (batafsil here) |
run | Run konfiguratsiya options'lari (batafsil here) |
subscribe | Subscribe konfiguratsiya options'lari (batafsil here) |
producer | Producer konfiguratsiya options'lari (batafsil here) |
send | Send konfiguratsiya options'lari (batafsil here) |
producerOnlyMode | Consumer group ro'yxatdan o'tishini o'tkazib yuborib, faqat producer sifatida ishlash uchun feature flag (boolean) |
postfixId | clientId qiymatining suffiksini o'zgartirish (string) |
Mijoz
Kafka boshqa microservice transportyorlariga qaraganda kichik farqqa ega. ClientProxy klassi o'rniga ClientKafkaProxy klassidan foydalanamiz.
Boshqa microservice transportyorlari kabi, ClientKafkaProxy instansiyasini yaratish uchun bir nechta variant bor.
Instansiya yaratishning bir usuli - ClientsModuledan foydalanish. ClientsModule orqali client instansiyasini yaratish uchun uni import qiling va register() metodiga createMicroservice() metodida yuqorida ko'rsatilgan xuddi shu propertylarga ega options obyektini, shuningdek injection token sifatida ishlatiladigan name propertysini uzating. ClientsModule haqida batafsil hereda o'qing.
1@Module({
2 imports: [
3 ClientsModule.register([
4 {
5 name: 'HERO_SERVICE',
6 transport: Transport.KAFKA,
7 options: {
8 client: {
9 clientId: 'hero',
10 brokers: ['localhost:9092'],
11 },
12 consumer: {
13 groupId: 'hero-consumer'
14 }
15 }
16 },
17 ]),
18 ]
19 ...
20})Client yaratishning boshqa usullari (ClientProxyFactory yoki @Client()) ham qo'llanishi mumkin. Ular haqida hereda o'qishingiz mumkin.
@Client() dekoratoridan quyidagicha foydalaning:
1@Client({
2 transport: Transport.KAFKA,
3 options: {
4 client: {
5 clientId: 'hero',
6 brokers: ['localhost:9092'],
7 },
8 consumer: {
9 groupId: 'hero-consumer'
10 }
11 }
12})
13client: ClientKafkaProxy;Xabar patterni
Kafka microservice message patterni so'rov va javob kanallari uchun ikki topicdan foydalanadi. ClientKafkaProxy.send() metodi return address bilan xabarlarni, correlation id, reply topic va reply partitionni so'rov xabariga bog'lab yuboradi. Bu xabar yuborishdan oldin ClientKafkaProxy instansiyasi reply topicga subscribe qilingan va kamida bitta partitionga biriktirilgan bo'lishini talab qiladi.
Keyin, ishlayotgan har bir Nest ilovasi uchun kamida bitta reply topic partition bo'lishi kerak. Masalan, agar siz 4 ta Nest ilovasini ishlatayotgan bo'lsangiz, lekin reply topicda faqat 3 ta partition bo'lsa, Nest ilovalaridan 1 tasi xabar yuborishga uringanda xatoga uchraydi.
Yangi ClientKafkaProxy instansiyalari ishga tushganda ular consumer groupga qo'shiladi va tegishli topiclarga subscribe qiladi. Bu jarayon consumer group iste'molchilariga biriktirilgan topic partitionlarining qayta balanslanishini keltirib chiqaradi.
Odatda topic partitionlari round robin partitioner yordamida biriktiriladi, u topic partitionlarini application ishga tushganda tasodifiy o'rnatiladigan consumer nomlari bo'yicha saralangan consumerlar kolleksiyasiga taqsimlaydi. Biroq, yangi consumer consumer groupga qo'shilganda, u consumerlar kolleksiyasining istalgan joyida joylashishi mumkin. Bu, yangi consumer mavjud consumerdan oldin joylashganda, mavjud consumerlarga boshqa partitionlar berilishiga olib keladi. Natijada, partitioni o'zgargan consumerlar rebalance'dan oldin yuborilgan so'rovlarning response xabarlarini yo'qotadi.
ClientKafkaProxy consumerlari response xabarlarini yo'qotmasligi uchun Nest'ga xos built-in custom partitioner ishlatiladi. Bu custom partitioner partitionlarni application ishga tushganda o'rnatilgan yuqori aniqlikdagi timestamp (process.hrtime()) bo'yicha saralangan consumerlar kolleksiyasiga taqsimlaydi.
Javobga obuna bo'lish
Bu bo'lim faqat request-response message uslubidan foydalansangiz ( @MessagePattern dekoratori va ClientKafkaProxy.send metodi bilan) tegishli. event-based muloqot (@EventPattern dekoratori va ClientKafkaProxy.emit metodi) uchun response topicga subscribe bo'lish shart emas.
ClientKafkaProxy klassi subscribeToResponseOf() metodini taqdim etadi. subscribeToResponseOf() metodi so'rov topic nomini argument sifatida oladi va hosil qilingan reply topic nomini reply topiclar kolleksiyasiga qo'shadi. Bu metod message patternni implement qilishda talab etiladi.
1onModuleInit() {
2 this.client.subscribeToResponseOf('hero.kill.dragon');
3}Agar ClientKafkaProxy instansiyasi asinxron yaratilsa, subscribeToResponseOf() metodi connect() metodini chaqirishdan oldin chaqirilishi kerak.
1async onModuleInit() {
2 this.client.subscribeToResponseOf('hero.kill.dragon');
3 await this.client.connect();
4}Kiruvchi
Nest kiruvchi Kafka xabarlarini key, value va headers propertylariga ega obyekt sifatida qabul qiladi; ularning qiymatlari Buffer tipida bo'ladi. So'ng Nest bu qiymatlarni bufferlarni stringga aylantirish orqali parse qiladi. Agar string "object like" bo'lsa, Nest uni JSON sifatida parse qilishga urinadi. Shundan so'ng value o'ziga mos handlerga uzatiladi.
Chiquvchi
Nest outgoing Kafka xabarlarini event publish qilish yoki xabar yuborishda serializatsiya jarayonidan so'ng yuboradi. Bu ClientKafkaProxyning emit() va send() metodlariga uzatilgan argumentlarda yoki @MessagePattern metodidan qaytgan qiymatlarda sodir bo'ladi. Bu serializatsiya string bo'lmagan yoki buffer bo'lmagan obyektlarni JSON.stringify() yoki toString() prototip metodi yordamida "stringify" qiladi.
1@Controller()
2export class HeroesController {
3 @MessagePattern('hero.kill.dragon')
4 killDragon(@Payload() message: KillDragonMessage): any {
5 const dragonId = message.dragonId;
6 const items = [
7 { id: 1, name: 'Mythical Sword' },
8 { id: 2, name: 'Key to Dungeon' },
9 ];
10 return items;
11 }
12}@Payload()@nestjs/microservices paketidan import qilinadi.
Outgoing xabarlar key va value propertylariga ega obyekt uzatish orqali key'lanishi ham mumkin. Xabarlarni key'lash co-partitioning requirement talabini bajarish uchun muhim.
1@Controller()
2export class HeroesController {
3 @MessagePattern('hero.kill.dragon')
4 killDragon(@Payload() message: KillDragonMessage): any {
5 const realm = 'Nest';
6 const heroId = message.heroId;
7 const dragonId = message.dragonId;
8
9 const items = [
10 { id: 1, name: 'Mythical Sword' },
11 { id: 2, name: 'Key to Dungeon' },
12 ];
13
14 return {
15 headers: {
16 realm
17 },
18 key: heroId,
19 value: items
20 }
21 }
22}Bundan tashqari, bu formatda uzatilgan xabarlar headers hash propertysida o'rnatilgan custom headerlarni ham o'z ichiga olishi mumkin. Header hash property qiymatlari string yoki Buffer tipida bo'lishi kerak.
1@Controller()
2export class HeroesController {
3 @MessagePattern('hero.kill.dragon')
4 killDragon(@Payload() message: KillDragonMessage): any {
5 const realm = 'Nest';
6 const heroId = message.heroId;
7 const dragonId = message.dragonId;
8
9 const items = [
10 { id: 1, name: 'Mythical Sword' },
11 { id: 2, name: 'Key to Dungeon' },
12 ];
13
14 return {
15 headers: {
16 kafka_nestRealm: realm
17 },
18 key: heroId,
19 value: items
20 }
21 }
22}Eventga asoslangan
Request-response usuli servislar o'rtasida xabar almashish uchun ideal bo'lsa-da, xabar uslubi event-based bo'lganda (bu Kafka uchun ideal), ya'ni javob kutmasdan event publish qilmoqchi bo'lsangiz, u kamroq mos. Bunday holatda request-response uchun ikki topicni ushlab turish ortiqcha bo'ladi.
Batafsil ma'lumot uchun ushbu ikki bo'limga qarang: Umumiy ko'rinish: Eventga asoslangan va Umumiy ko'rinish: Eventlarni publish qilish.
Kontekst
Murakkabroq ssenariylarda kiruvchi so'rov haqida qo'shimcha ma'lumotlarga kirish kerak bo'lishi mumkin. Kafka transportyoridan foydalanganda KafkaContext obyektiga kirishingiz mumkin.
1@MessagePattern('hero.kill.dragon')
2killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
3 console.log(`Topic: ${context.getTopic()}`);
4}@Payload(), @Ctx() va KafkaContext@nestjs/microservices paketidan import qilinadi.
Original Kafka IncomingMessage obyektiga kirish uchun KafkaContext obyektining getMessage() metodidan quyidagicha foydalaning:
1@MessagePattern('hero.kill.dragon')
2killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
3 const originalMessage = context.getMessage();
4 const partition = context.getPartition();
5 const { headers, timestamp } = originalMessage;
6}Bu yerda IncomingMessage quyidagi interfeysga mos keladi:
1interface IncomingMessage {
2 topic: string;
3 partition: number;
4 timestamp: string;
5 size: number;
6 attributes: number;
7 offset: string;
8 key: any;
9 value: any;
10 headers: Record<string, any>;
11}Agar handleringiz har bir qabul qilingan xabar uchun sekin ishlov berishni talab qilsa, heartbeat callbackdan foydalanishni ko'rib chiqing. heartbeat funksiyasini olish uchun KafkaContextning getHeartbeat() metodidan quyidagicha foydalaning:
1@MessagePattern('hero.kill.dragon')
2async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
3 const heartbeat = context.getHeartbeat();
4
5 // Do some slow processing
6 await doWorkPart1();
7
8 // Send heartbeat to not exceed the sessionTimeout
9 await heartbeat();
10
11 // Do some slow processing again
12 await doWorkPart2();
13}Nomlash konventsiyalari
Kafka microservice komponentlari Nest microservice client va server komponentlari o'rtasida kolliziyalarni oldini olish uchun client.clientId va consumer.groupId opsiyalariga o'z rollarining tavsifini qo'shib boradi. Standart bo'yicha ClientKafkaProxy komponentlari -client, ServerKafka komponentlari esa -server suffiksini har ikki optionga ham qo'shadi. Quyida berilgan qiymatlar qanday o'zgartirilganini ko'ring (izohlarda ko'rsatilgan).
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2 transport: Transport.KAFKA,
3 options: {
4 client: {
5 clientId: 'hero', // hero-server
6 brokers: ['localhost:9092'],
7 },
8 consumer: {
9 groupId: 'hero-consumer' // hero-consumer-server
10 },
11 }
12});Va client uchun:
1@Client({
2 transport: Transport.KAFKA,
3 options: {
4 client: {
5 clientId: 'hero', // hero-client
6 brokers: ['localhost:9092'],
7 },
8 consumer: {
9 groupId: 'hero-consumer' // hero-consumer-client
10 }
11 }
12})
13client: ClientKafkaProxy;Kafka client va consumer naming conventions'ni o'zingizning custom provideringizda ClientKafkaProxy va KafkaServerni kengaytirib, konstruktorni override qilish orqali sozlash mumkin.
Kafka microservice message patterni so'rov va javob kanallari uchun ikki topicdan foydalanar ekan, reply pattern so'rov topicidan hosil qilinishi kerak. Standart bo'yicha reply topic nomi so'rov topic nomiga .reply qo'shilgan ko'rinish bo'ladi.
1onModuleInit() {
2 this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
3}Kafka reply topic naming conventions'ni o'zingizning custom provideringizda ClientKafkaProxyni kengaytirib, getResponsePatternName metodini override qilish orqali sozlash mumkin.
Qayta uriniladigan exceptionlar
Boshqa transportyorlar kabi, barcha qayta ishlanmagan exceptionlar avtomatik tarzda RpcExceptionga o'raladi va "user-friendly" formatga aylantiriladi. Biroq, ba'zi chekka holatlarda bu mexanizmdan chetlab o'tib, exceptionlarni kafkajs driveri tomonidan iste'mol qilinishini xohlashingiz mumkin. Xabarni qayta ishlash paytida exception tashlash kafkajsga uni retry qilishni (qayta yetkazishni) buyuradi, ya'ni message (yoki event) handler ishga tushgan bo'lsa ham, offset Kafka'ga commit qilinmaydi.
Event handlerlar (event-based muloqot) uchun barcha qayta ishlanmagan exceptionlar default bo'yicha retriable exceptions hisoblanadi.
Buning uchun KafkaRetriableException deb nomlangan maxsus klassdan quyidagicha foydalanishingiz mumkin:
1throw new KafkaRetriableException('...');KafkaRetriableException klassi @nestjs/microservices paketidan eksport qilinadi.
Custom exceptionlarni boshqarish
Standart error handling mexanizmlariga qo'shimcha ravishda, Kafka eventlari uchun retry logikasini boshqarish maqsadida custom Exception Filter yaratishingiz mumkin. Masalan, quyidagi misolda muammoli eventni ma'lum miqdordagi retrylardan so'ng o'tkazib yuborish ko'rsatilgan:
1import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
2import { BaseExceptionFilter } from '@nestjs/core';
3import { KafkaContext } from '../ctx-host';
4
5@Catch()
6export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
7 private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);
8
9 constructor(
10 private readonly maxRetries: number,
11 // Optional custom function executed when max retries are exceeded
12 private readonly skipHandler?: (message: any) => Promise<void>,
13 ) {
14 super();
15 }
16
17 async catch(exception: unknown, host: ArgumentsHost) {
18 const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
19 const message = kafkaContext.getMessage();
20 const currentRetryCount = this.getRetryCountFromContext(kafkaContext);
21
22 if (currentRetryCount >= this.maxRetries) {
23 this.logger.warn(
24 `Max retries (${
25 this.maxRetries
26 }) exceeded for message: ${JSON.stringify(message)}`,
27 );
28
29 if (this.skipHandler) {
30 try {
31 await this.skipHandler(message);
32 } catch (err) {
33 this.logger.error('Error in skipHandler:', err);
34 }
35 }
36
37 try {
38 await this.commitOffset(kafkaContext);
39 } catch (commitError) {
40 this.logger.error('Failed to commit offset:', commitError);
41 }
42 return; // Stop propagating the exception
43 }
44
45 // If retry count is below the maximum, proceed with the default Exception Filter logic
46 super.catch(exception, host);
47 }
48
49 private getRetryCountFromContext(context: KafkaContext): number {
50 const headers = context.getMessage().headers || {};
51 const retryHeader = headers['retryCount'] || headers['retry-count'];
52 return retryHeader ? Number(retryHeader) : 0;
53 }
54
55 private async commitOffset(context: KafkaContext): Promise<void> {
56 const consumer = context.getConsumer && context.getConsumer();
57 if (!consumer) {
58 throw new Error('Consumer instance is not available from KafkaContext.');
59 }
60
61 const topic = context.getTopic && context.getTopic();
62 const partition = context.getPartition && context.getPartition();
63 const message = context.getMessage();
64 const offset = message.offset;
65
66 if (!topic || partition === undefined || offset === undefined) {
67 throw new Error(
68 'Incomplete Kafka message context for committing offset.',
69 );
70 }
71
72 await consumer.commitOffsets([
73 {
74 topic,
75 partition,
76 // When committing an offset, commit the next number (i.e., current offset + 1)
77 offset: (Number(offset) + 1).toString(),
78 },
79 ]);
80 }
81}Bu filter Kafka eventini configurable miqdorda qayta ishlashga urinish imkonini beradi. Maksimal retrylar soniga yetilgach, u custom skipHandlerni (agar berilgan bo'lsa) ishga tushiradi va offsetni commit qiladi, ya'ni muammoli eventni o'tkazib yuboradi. Bu keyingi eventlarning to'xtamasdan qayta ishlanishiga imkon beradi.
Ushbu filterni event handlerlaringizga qo'shish orqali integratsiya qilishingiz mumkin:
1@UseFilters(new KafkaMaxRetryExceptionFilter(5))
2export class MyEventHandler {
3 @EventPattern('your-topic')
4 async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
5 // Your event processing logic...
6 }
7}Offsetlarni commit qilish
Kafka bilan ishlaganda offsetlarni commit qilish muhim. Standart bo'yicha xabarlar ma'lum vaqt o'tgach avtomatik commit qilinadi. Batafsil ma'lumot uchun KafkaJS docsga tashrif buyuring. KafkaContext manual commit qilish uchun faol consumerga kirish imkonini beradi. Consumer KafkaJS consumer bo'lib, native KafkaJS implementation kabi ishlaydi.
1@EventPattern('user.created')
2async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
3 // business logic
4
5 const { offset } = context.getMessage();
6 const partition = context.getPartition();
7 const topic = context.getTopic();
8 const consumer = context.getConsumer();
9 await consumer.commitOffsets([{ topic, partition, offset }])
10}Xabarlarni auto-commit qilishni o'chirish uchun run konfiguratsiyasida autoCommit: false ni quyidagicha o'rnating:
1const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
2 transport: Transport.KAFKA,
3 options: {
4 client: {
5 brokers: ['localhost:9092'],
6 },
7 run: {
8 autoCommit: false
9 }
10 }
11});Instansiya holati yangilanishlari
Ulanish va asosiy driver instansiyasi holati haqida real vaqt yangilanishlarini olish uchun status streamiga subscribe bo'lishingiz mumkin. Bu stream tanlangan driverga xos holat yangilanishlarini beradi. Kafka driveri uchun status streami connected, disconnected, rebalancing, crashed va stopped eventlarini emit qiladi.
1this.client.status.subscribe((status: KafkaStatus) => {
2 console.log(status);
3});KafkaStatus tipi @nestjs/microservices paketidan import qilinadi.
Xuddi shuningdek, serverning status streamiga subscribe bo'lib, server holati haqida xabarlarni olishingiz mumkin.
1const server = app.connectMicroservice<MicroserviceOptions>(...);
2server.status.subscribe((status: KafkaStatus) => {
3 console.log(status);
4});Asosiy producer va consumer
Murakkabroq use-case'larda asosiy producer va consumer instansiyalariga kirish kerak bo'lishi mumkin. Bu ulanishni qo'lda yopish yoki driverga xos metodlardan foydalanish kabi ssenariylar uchun foydali. Biroq, ko'p hollarda driverga to'g'ridan-to'g'ri kirish kerak emasligini yodda tuting.
Buning uchun ClientKafkaProxy instansiyasi taqdim etadigan producer va consumer getterlaridan foydalanishingiz mumkin.
1const producer = this.client.producer;
2const consumer = this.client.consumer;