mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-07-25 01:48:39 -06:00
Merge pull request #1726 from Santosl2/feat/add-async-lock
Some checks are pending
Build Docker image / Build and Deploy (push) Waiting to run
Some checks are pending
Build Docker image / Build and Deploy (push) Waiting to run
feat: add BaileysMessageProcessor for improved message handling
This commit is contained in:
commit
9fd40a411a
10
package-lock.json
generated
10
package-lock.json
generated
@ -55,6 +55,7 @@
|
|||||||
"qrcode": "^1.5.4",
|
"qrcode": "^1.5.4",
|
||||||
"qrcode-terminal": "^0.12.0",
|
"qrcode-terminal": "^0.12.0",
|
||||||
"redis": "^4.7.0",
|
"redis": "^4.7.0",
|
||||||
|
"rxjs": "^7.8.2",
|
||||||
"sharp": "^0.34.2",
|
"sharp": "^0.34.2",
|
||||||
"socket.io": "^4.8.1",
|
"socket.io": "^4.8.1",
|
||||||
"socket.io-client": "^4.8.1",
|
"socket.io-client": "^4.8.1",
|
||||||
@ -10424,6 +10425,15 @@
|
|||||||
"queue-microtask": "^1.2.2"
|
"queue-microtask": "^1.2.2"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/rxjs": {
|
||||||
|
"version": "7.8.2",
|
||||||
|
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.2.tgz",
|
||||||
|
"integrity": "sha512-dhKf903U/PQZY6boNNtAGdWbG85WAbjT/1xYoZIC7FAY0yWapOBQVsVrDl58W86//e1VpMNBtRV4MaXfdMySFA==",
|
||||||
|
"license": "Apache-2.0",
|
||||||
|
"dependencies": {
|
||||||
|
"tslib": "^2.1.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/safe-array-concat": {
|
"node_modules/safe-array-concat": {
|
||||||
"version": "1.1.3",
|
"version": "1.1.3",
|
||||||
"resolved": "https://registry.npmjs.org/safe-array-concat/-/safe-array-concat-1.1.3.tgz",
|
"resolved": "https://registry.npmjs.org/safe-array-concat/-/safe-array-concat-1.1.3.tgz",
|
||||||
|
@ -95,6 +95,7 @@
|
|||||||
"qrcode": "^1.5.4",
|
"qrcode": "^1.5.4",
|
||||||
"qrcode-terminal": "^0.12.0",
|
"qrcode-terminal": "^0.12.0",
|
||||||
"redis": "^4.7.0",
|
"redis": "^4.7.0",
|
||||||
|
"rxjs": "^7.8.2",
|
||||||
"sharp": "^0.34.2",
|
"sharp": "^0.34.2",
|
||||||
"socket.io": "^4.8.1",
|
"socket.io": "^4.8.1",
|
||||||
"socket.io-client": "^4.8.1",
|
"socket.io-client": "^4.8.1",
|
||||||
|
@ -0,0 +1,59 @@
|
|||||||
|
import { Logger } from '@config/logger.config';
|
||||||
|
import { BaileysEventMap, MessageUpsertType, proto } from 'baileys';
|
||||||
|
import { catchError, concatMap, delay, EMPTY, from, retryWhen, Subject, Subscription, take, tap } from 'rxjs';
|
||||||
|
|
||||||
|
type MessageUpsertPayload = BaileysEventMap['messages.upsert'];
|
||||||
|
type MountProps = {
|
||||||
|
onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise<void>;
|
||||||
|
};
|
||||||
|
|
||||||
|
export class BaileysMessageProcessor {
|
||||||
|
private processorLogs = new Logger('BaileysMessageProcessor');
|
||||||
|
private subscription?: Subscription;
|
||||||
|
|
||||||
|
protected messageSubject = new Subject<{
|
||||||
|
messages: proto.IWebMessageInfo[];
|
||||||
|
type: MessageUpsertType;
|
||||||
|
requestId?: string;
|
||||||
|
settings: any;
|
||||||
|
}>();
|
||||||
|
|
||||||
|
mount({ onMessageReceive }: MountProps) {
|
||||||
|
this.subscription = this.messageSubject
|
||||||
|
.pipe(
|
||||||
|
tap(({ messages }) => {
|
||||||
|
this.processorLogs.log(`Processing batch of ${messages.length} messages`);
|
||||||
|
}),
|
||||||
|
concatMap(({ messages, type, requestId, settings }) =>
|
||||||
|
from(onMessageReceive({ messages, type, requestId }, settings)).pipe(
|
||||||
|
retryWhen((errors) =>
|
||||||
|
errors.pipe(
|
||||||
|
tap((error) => this.processorLogs.warn(`Retrying message batch due to error: ${error.message}`)),
|
||||||
|
delay(1000), // 1 segundo de delay
|
||||||
|
take(3), // Máximo 3 tentativas
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
catchError((error) => {
|
||||||
|
this.processorLogs.error(`Error processing message batch: ${error}`);
|
||||||
|
return EMPTY;
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.subscribe({
|
||||||
|
error: (error) => {
|
||||||
|
this.processorLogs.error(`Message stream error: ${error}`);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
processMessage(payload: MessageUpsertPayload, settings: any) {
|
||||||
|
const { messages, type, requestId } = payload;
|
||||||
|
this.messageSubject.next({ messages, type, requestId, settings });
|
||||||
|
}
|
||||||
|
|
||||||
|
onDestroy() {
|
||||||
|
this.subscription?.unsubscribe();
|
||||||
|
this.messageSubject.complete();
|
||||||
|
}
|
||||||
|
}
|
@ -148,6 +148,7 @@ import sharp from 'sharp';
|
|||||||
import { PassThrough, Readable } from 'stream';
|
import { PassThrough, Readable } from 'stream';
|
||||||
import { v4 } from 'uuid';
|
import { v4 } from 'uuid';
|
||||||
|
|
||||||
|
import { BaileysMessageProcessor } from './baileysMessage.processor';
|
||||||
import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys';
|
import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys';
|
||||||
|
|
||||||
const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine());
|
const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine());
|
||||||
@ -213,6 +214,8 @@ async function getVideoDuration(input: Buffer | string | Readable): Promise<numb
|
|||||||
}
|
}
|
||||||
|
|
||||||
export class BaileysStartupService extends ChannelStartupService {
|
export class BaileysStartupService extends ChannelStartupService {
|
||||||
|
private messageProcessor = new BaileysMessageProcessor();
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
public readonly configService: ConfigService,
|
public readonly configService: ConfigService,
|
||||||
public readonly eventEmitter: EventEmitter2,
|
public readonly eventEmitter: EventEmitter2,
|
||||||
@ -224,6 +227,9 @@ export class BaileysStartupService extends ChannelStartupService {
|
|||||||
) {
|
) {
|
||||||
super(configService, eventEmitter, prismaRepository, chatwootCache);
|
super(configService, eventEmitter, prismaRepository, chatwootCache);
|
||||||
this.instance.qrcode = { count: 0 };
|
this.instance.qrcode = { count: 0 };
|
||||||
|
this.messageProcessor.mount({
|
||||||
|
onMessageReceive: this.messageHandle['messages.upsert'].bind(this), // Bind the method to the current context
|
||||||
|
});
|
||||||
|
|
||||||
this.authStateProvider = new AuthStateProvider(this.providerFiles);
|
this.authStateProvider = new AuthStateProvider(this.providerFiles);
|
||||||
}
|
}
|
||||||
@ -243,6 +249,7 @@ export class BaileysStartupService extends ChannelStartupService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async logoutInstance() {
|
public async logoutInstance() {
|
||||||
|
this.messageProcessor.onDestroy();
|
||||||
await this.client?.logout('Log out instance: ' + this.instanceName);
|
await this.client?.logout('Log out instance: ' + this.instanceName);
|
||||||
|
|
||||||
this.client?.ws?.close();
|
this.client?.ws?.close();
|
||||||
@ -1653,7 +1660,9 @@ export class BaileysStartupService extends ChannelStartupService {
|
|||||||
|
|
||||||
if (events['messages.upsert']) {
|
if (events['messages.upsert']) {
|
||||||
const payload = events['messages.upsert'];
|
const payload = events['messages.upsert'];
|
||||||
this.messageHandle['messages.upsert'](payload, settings);
|
|
||||||
|
this.messageProcessor.processMessage(payload, settings);
|
||||||
|
// this.messageHandle['messages.upsert'](payload, settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events['messages.update']) {
|
if (events['messages.update']) {
|
||||||
|
Loading…
Reference in New Issue
Block a user