From 89d4d341f6203845fe60b30cc43e2d95d63255c2 Mon Sep 17 00:00:00 2001 From: Santosl2 Date: Tue, 15 Jul 2025 21:35:25 -0300 Subject: [PATCH] feat: add BaileysMessageProcessor for improved message handling and integrate rxjs for asynchronous processing --- package-lock.json | 10 ++++ package.json | 1 + .../whatsapp/baileysMessage.processor.ts | 51 +++++++++++++++++++ .../whatsapp/whatsapp.baileys.service.ts | 11 +++- 4 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 src/api/integrations/channel/whatsapp/baileysMessage.processor.ts diff --git a/package-lock.json b/package-lock.json index f07b4c91..263a96a4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -55,6 +55,7 @@ "qrcode": "^1.5.4", "qrcode-terminal": "^0.12.0", "redis": "^4.7.0", + "rxjs": "^7.8.2", "sharp": "^0.32.6", "socket.io": "^4.8.1", "socket.io-client": "^4.8.1", @@ -10372,6 +10373,15 @@ "queue-microtask": "^1.2.2" } }, + "node_modules/rxjs": { + "version": "7.8.2", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.2.tgz", + "integrity": "sha512-dhKf903U/PQZY6boNNtAGdWbG85WAbjT/1xYoZIC7FAY0yWapOBQVsVrDl58W86//e1VpMNBtRV4MaXfdMySFA==", + "license": "Apache-2.0", + "dependencies": { + "tslib": "^2.1.0" + } + }, "node_modules/safe-array-concat": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/safe-array-concat/-/safe-array-concat-1.1.3.tgz", diff --git a/package.json b/package.json index a6eb2470..1296eb78 100644 --- a/package.json +++ b/package.json @@ -95,6 +95,7 @@ "qrcode": "^1.5.4", "qrcode-terminal": "^0.12.0", "redis": "^4.7.0", + "rxjs": "^7.8.2", "sharp": "^0.32.6", "socket.io": "^4.8.1", "socket.io-client": "^4.8.1", diff --git a/src/api/integrations/channel/whatsapp/baileysMessage.processor.ts b/src/api/integrations/channel/whatsapp/baileysMessage.processor.ts new file mode 100644 index 00000000..21f913d8 --- /dev/null +++ b/src/api/integrations/channel/whatsapp/baileysMessage.processor.ts @@ -0,0 +1,51 @@ +import { Logger } from '@config/logger.config'; +import { BaileysEventMap, MessageUpsertType, proto } from 'baileys'; +import { catchError, concatMap, EMPTY, from, Subject, Subscription, tap } from 'rxjs'; + +type MessageUpsertPayload = BaileysEventMap['messages.upsert']; +type MountProps = { + onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise; +}; + +export class BaileysMessageProcessor { + private processorLogs = new Logger('BaileysMessageProcessor'); + private subscription?: Subscription; + + protected messageSubject = new Subject<{ + messages: proto.IWebMessageInfo[]; + type: MessageUpsertType; + requestId?: string; + settings: any; + }>(); + + mount({ onMessageReceive }: MountProps) { + this.subscription = this.messageSubject + .pipe( + tap(({ messages }) => { + this.processorLogs.log(`Processing batch of ${messages.length} messages`); + }), + concatMap(({ messages, type, requestId, settings }) => + from(onMessageReceive({ messages, type, requestId }, settings)), + ), + catchError((error) => { + this.processorLogs.error(`Error processing message batch: ${error}`); + return EMPTY; + }), + ) + .subscribe({ + error: (error) => { + this.processorLogs.error(`Message stream error: ${error}`); + }, + }); + } + + processMessage(payload: MessageUpsertPayload, settings: any) { + const { messages, type, requestId } = payload; + this.messageSubject.next({ messages, type, requestId, settings }); + } + + onDestroy() { + this.subscription?.unsubscribe(); + this.messageSubject.complete(); + } +} diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index de31b503..957a7754 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -147,6 +147,7 @@ import sharp from 'sharp'; import { PassThrough, Readable } from 'stream'; import { v4 } from 'uuid'; +import { BaileysMessageProcessor } from './baileysMessage.processor'; import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys'; const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine()); @@ -212,6 +213,8 @@ async function getVideoDuration(input: Buffer | string | Readable): Promise