feat: add BaileysMessageProcessor for improved message handling and integrate rxjs for asynchronous processing

This commit is contained in:
Santosl2 2025-07-15 21:35:25 -03:00
parent 39606240da
commit 89d4d341f6
No known key found for this signature in database
GPG Key ID: 6D81780D2C0364A0
4 changed files with 72 additions and 1 deletions

10
package-lock.json generated
View File

@ -55,6 +55,7 @@
"qrcode": "^1.5.4",
"qrcode-terminal": "^0.12.0",
"redis": "^4.7.0",
"rxjs": "^7.8.2",
"sharp": "^0.32.6",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
@ -10372,6 +10373,15 @@
"queue-microtask": "^1.2.2"
}
},
"node_modules/rxjs": {
"version": "7.8.2",
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.2.tgz",
"integrity": "sha512-dhKf903U/PQZY6boNNtAGdWbG85WAbjT/1xYoZIC7FAY0yWapOBQVsVrDl58W86//e1VpMNBtRV4MaXfdMySFA==",
"license": "Apache-2.0",
"dependencies": {
"tslib": "^2.1.0"
}
},
"node_modules/safe-array-concat": {
"version": "1.1.3",
"resolved": "https://registry.npmjs.org/safe-array-concat/-/safe-array-concat-1.1.3.tgz",

View File

@ -95,6 +95,7 @@
"qrcode": "^1.5.4",
"qrcode-terminal": "^0.12.0",
"redis": "^4.7.0",
"rxjs": "^7.8.2",
"sharp": "^0.32.6",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",

View File

@ -0,0 +1,51 @@
import { Logger } from '@config/logger.config';
import { BaileysEventMap, MessageUpsertType, proto } from 'baileys';
import { catchError, concatMap, EMPTY, from, Subject, Subscription, tap } from 'rxjs';
type MessageUpsertPayload = BaileysEventMap['messages.upsert'];
type MountProps = {
onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise<void>;
};
export class BaileysMessageProcessor {
private processorLogs = new Logger('BaileysMessageProcessor');
private subscription?: Subscription;
protected messageSubject = new Subject<{
messages: proto.IWebMessageInfo[];
type: MessageUpsertType;
requestId?: string;
settings: any;
}>();
mount({ onMessageReceive }: MountProps) {
this.subscription = this.messageSubject
.pipe(
tap(({ messages }) => {
this.processorLogs.log(`Processing batch of ${messages.length} messages`);
}),
concatMap(({ messages, type, requestId, settings }) =>
from(onMessageReceive({ messages, type, requestId }, settings)),
),
catchError((error) => {
this.processorLogs.error(`Error processing message batch: ${error}`);
return EMPTY;
}),
)
.subscribe({
error: (error) => {
this.processorLogs.error(`Message stream error: ${error}`);
},
});
}
processMessage(payload: MessageUpsertPayload, settings: any) {
const { messages, type, requestId } = payload;
this.messageSubject.next({ messages, type, requestId, settings });
}
onDestroy() {
this.subscription?.unsubscribe();
this.messageSubject.complete();
}
}

View File

@ -147,6 +147,7 @@ import sharp from 'sharp';
import { PassThrough, Readable } from 'stream';
import { v4 } from 'uuid';
import { BaileysMessageProcessor } from './baileysMessage.processor';
import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys';
const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine());
@ -212,6 +213,8 @@ async function getVideoDuration(input: Buffer | string | Readable): Promise<numb
}
export class BaileysStartupService extends ChannelStartupService {
private messageProcessor = new BaileysMessageProcessor();
constructor(
public readonly configService: ConfigService,
public readonly eventEmitter: EventEmitter2,
@ -223,6 +226,9 @@ export class BaileysStartupService extends ChannelStartupService {
) {
super(configService, eventEmitter, prismaRepository, chatwootCache);
this.instance.qrcode = { count: 0 };
this.messageProcessor.mount({
onMessageReceive: this.messageHandle['messages.upsert'].bind(this), // Bind the method to the current context
});
this.authStateProvider = new AuthStateProvider(this.providerFiles);
}
@ -242,6 +248,7 @@ export class BaileysStartupService extends ChannelStartupService {
}
public async logoutInstance() {
this.messageProcessor.onDestroy();
await this.client?.logout('Log out instance: ' + this.instanceName);
this.client?.ws?.close();
@ -1618,7 +1625,9 @@ export class BaileysStartupService extends ChannelStartupService {
if (events['messages.upsert']) {
const payload = events['messages.upsert'];
this.messageHandle['messages.upsert'](payload, settings);
this.messageProcessor.processMessage(payload, settings);
// this.messageHandle['messages.upsert'](payload, settings);
}
if (events['messages.update']) {