diff --git a/package-lock.json b/package-lock.json index 5ad9f33f..b18a0e24 100644 --- a/package-lock.json +++ b/package-lock.json @@ -55,6 +55,7 @@ "qrcode": "^1.5.4", "qrcode-terminal": "^0.12.0", "redis": "^4.7.0", + "rxjs": "^7.8.2", "sharp": "^0.34.2", "socket.io": "^4.8.1", "socket.io-client": "^4.8.1", @@ -10424,6 +10425,15 @@ "queue-microtask": "^1.2.2" } }, + "node_modules/rxjs": { + "version": "7.8.2", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.2.tgz", + "integrity": "sha512-dhKf903U/PQZY6boNNtAGdWbG85WAbjT/1xYoZIC7FAY0yWapOBQVsVrDl58W86//e1VpMNBtRV4MaXfdMySFA==", + "license": "Apache-2.0", + "dependencies": { + "tslib": "^2.1.0" + } + }, "node_modules/safe-array-concat": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/safe-array-concat/-/safe-array-concat-1.1.3.tgz", diff --git a/package.json b/package.json index 630bd316..67ed5422 100644 --- a/package.json +++ b/package.json @@ -95,6 +95,7 @@ "qrcode": "^1.5.4", "qrcode-terminal": "^0.12.0", "redis": "^4.7.0", + "rxjs": "^7.8.2", "sharp": "^0.34.2", "socket.io": "^4.8.1", "socket.io-client": "^4.8.1", diff --git a/src/api/integrations/channel/whatsapp/baileysMessage.processor.ts b/src/api/integrations/channel/whatsapp/baileysMessage.processor.ts new file mode 100644 index 00000000..79fedd93 --- /dev/null +++ b/src/api/integrations/channel/whatsapp/baileysMessage.processor.ts @@ -0,0 +1,59 @@ +import { Logger } from '@config/logger.config'; +import { BaileysEventMap, MessageUpsertType, proto } from 'baileys'; +import { catchError, concatMap, delay, EMPTY, from, retryWhen, Subject, Subscription, take, tap } from 'rxjs'; + +type MessageUpsertPayload = BaileysEventMap['messages.upsert']; +type MountProps = { + onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise; +}; + +export class BaileysMessageProcessor { + private processorLogs = new Logger('BaileysMessageProcessor'); + private subscription?: Subscription; + + protected messageSubject = new Subject<{ + messages: proto.IWebMessageInfo[]; + type: MessageUpsertType; + requestId?: string; + settings: any; + }>(); + + mount({ onMessageReceive }: MountProps) { + this.subscription = this.messageSubject + .pipe( + tap(({ messages }) => { + this.processorLogs.log(`Processing batch of ${messages.length} messages`); + }), + concatMap(({ messages, type, requestId, settings }) => + from(onMessageReceive({ messages, type, requestId }, settings)).pipe( + retryWhen((errors) => + errors.pipe( + tap((error) => this.processorLogs.warn(`Retrying message batch due to error: ${error.message}`)), + delay(1000), // 1 segundo de delay + take(3), // Máximo 3 tentativas + ), + ), + ), + ), + catchError((error) => { + this.processorLogs.error(`Error processing message batch: ${error}`); + return EMPTY; + }), + ) + .subscribe({ + error: (error) => { + this.processorLogs.error(`Message stream error: ${error}`); + }, + }); + } + + processMessage(payload: MessageUpsertPayload, settings: any) { + const { messages, type, requestId } = payload; + this.messageSubject.next({ messages, type, requestId, settings }); + } + + onDestroy() { + this.subscription?.unsubscribe(); + this.messageSubject.complete(); + } +} diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 7729918a..71657c2f 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -148,6 +148,7 @@ import sharp from 'sharp'; import { PassThrough, Readable } from 'stream'; import { v4 } from 'uuid'; +import { BaileysMessageProcessor } from './baileysMessage.processor'; import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys'; const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine()); @@ -213,6 +214,8 @@ async function getVideoDuration(input: Buffer | string | Readable): Promise