Merge pull request #2442 from alexandrereyes/feat/history-sync-event

feat(history-sync): emit messaging-history.set event on sync completion and fix race condition
This commit is contained in:
Davidson Gomes
2026-02-24 12:16:36 -03:00
committed by GitHub
4 changed files with 56 additions and 7 deletions
@@ -253,6 +253,12 @@ export class BaileysStartupService extends ChannelStartupService {
private logBaileys = this.configService.get<Log>('LOG').BAILEYS;
private eventProcessingQueue: Promise<void> = Promise.resolve();
// Cumulative history sync counters (reset on new sync or completion)
private historySyncMessageCount = 0;
private historySyncChatCount = 0;
private historySyncContactCount = 0;
private historySyncLastProgress = -1;
// Cache TTL constants (in seconds)
private readonly MESSAGE_CACHE_TTL_SECONDS = 5 * 60; // 5 minutes - avoid duplicate message processing
private readonly UPDATE_CACHE_TTL_SECONDS = 30 * 60; // 30 minutes - avoid duplicate status updates
@@ -1013,6 +1019,14 @@ export class BaileysStartupService extends ChannelStartupService {
syncType?: proto.HistorySync.HistorySyncType;
}) => {
try {
// Reset counters when a new sync starts (progress resets or decreases)
if (progress <= this.historySyncLastProgress) {
this.historySyncMessageCount = 0;
this.historySyncChatCount = 0;
this.historySyncContactCount = 0;
}
this.historySyncLastProgress = progress ?? -1;
if (syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) {
console.log('received on-demand history sync, messages=', messages);
}
@@ -1098,8 +1112,6 @@ export class BaileysStartupService extends ChannelStartupService {
chatsRaw.push({ remoteJid, remoteLid, instanceId: this.instanceId, name: chat.name });
}
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
const chatsToCreateMany = JSON.parse(JSON.stringify(chatsRaw)).map((chat) => {
delete chat.remoteLid;
@@ -1109,6 +1121,10 @@ export class BaileysStartupService extends ChannelStartupService {
await this.prismaRepository.chat.createMany({ data: chatsToCreateMany, skipDuplicates: true });
}
this.historySyncChatCount += chatsRaw.length;
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
const messagesRaw: any[] = [];
const messagesRepository: Set<string> = new Set(
@@ -1160,15 +1176,17 @@ export class BaileysStartupService extends ChannelStartupService {
messagesRaw.push(this.prepareMessage(m));
}
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw], true, undefined, {
isLatest,
progress,
});
this.historySyncMessageCount += messagesRaw.length;
if (this.configService.get<Database>('DATABASE').SAVE_DATA.HISTORIC) {
await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true });
}
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw], true, undefined, {
isLatest,
progress,
});
if (
this.configService.get<Chatwoot>('CHATWOOT').ENABLED &&
this.localChatwoot?.enabled &&
@@ -1181,10 +1199,26 @@ export class BaileysStartupService extends ChannelStartupService {
);
}
const filteredContacts = contacts.filter((c) => !!c.notify || !!c.name);
this.historySyncContactCount += filteredContacts.length;
await this.contactHandle['contacts.upsert'](
contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })),
filteredContacts.map((c) => ({ id: c.id, name: c.name ?? c.notify })),
);
if (progress === 100) {
this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, {
messageCount: this.historySyncMessageCount,
chatCount: this.historySyncChatCount,
contactCount: this.historySyncContactCount,
});
this.historySyncMessageCount = 0;
this.historySyncChatCount = 0;
this.historySyncContactCount = 0;
this.historySyncLastProgress = -1;
}
contacts = undefined;
messages = undefined;
chats = undefined;
@@ -162,6 +162,7 @@ export class EventController {
'CALL',
'TYPEBOT_START',
'TYPEBOT_CHANGE_STATUS',
'MESSAGING_HISTORY_SET',
'REMOVE_INSTANCE',
'LOGOUT_INSTANCE',
'INSTANCE_CREATE',
+10
View File
@@ -91,6 +91,7 @@ export type EventsRabbitmq = {
CALL: boolean;
TYPEBOT_START: boolean;
TYPEBOT_CHANGE_STATUS: boolean;
MESSAGING_HISTORY_SET: boolean;
};
export type Rabbitmq = {
@@ -150,6 +151,7 @@ export type Sqs = {
SEND_MESSAGE: boolean;
TYPEBOT_CHANGE_STATUS: boolean;
TYPEBOT_START: boolean;
MESSAGING_HISTORY_SET: boolean;
};
};
@@ -223,6 +225,7 @@ export type EventsWebhook = {
CALL: boolean;
TYPEBOT_START: boolean;
TYPEBOT_CHANGE_STATUS: boolean;
MESSAGING_HISTORY_SET: boolean;
ERRORS: boolean;
ERRORS_WEBHOOK: string;
};
@@ -256,6 +259,7 @@ export type EventsPusher = {
CALL: boolean;
TYPEBOT_START: boolean;
TYPEBOT_CHANGE_STATUS: boolean;
MESSAGING_HISTORY_SET: boolean;
};
export type ApiKey = { KEY: string };
@@ -539,6 +543,7 @@ export class ConfigService {
CALL: process.env?.RABBITMQ_EVENTS_CALL === 'true',
TYPEBOT_START: process.env?.RABBITMQ_EVENTS_TYPEBOT_START === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
MESSAGING_HISTORY_SET: process.env?.RABBITMQ_EVENTS_MESSAGING_HISTORY_SET === 'true',
},
},
NATS: {
@@ -576,6 +581,7 @@ export class ConfigService {
CALL: process.env?.NATS_EVENTS_CALL === 'true',
TYPEBOT_START: process.env?.NATS_EVENTS_TYPEBOT_START === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.NATS_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
MESSAGING_HISTORY_SET: process.env?.NATS_EVENTS_MESSAGING_HISTORY_SET === 'true',
},
},
SQS: {
@@ -616,6 +622,7 @@ export class ConfigService {
SEND_MESSAGE: process.env?.SQS_GLOBAL_SEND_MESSAGE === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.SQS_GLOBAL_TYPEBOT_CHANGE_STATUS === 'true',
TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true',
MESSAGING_HISTORY_SET: process.env?.SQS_GLOBAL_MESSAGING_HISTORY_SET === 'true',
},
},
KAFKA: {
@@ -659,6 +666,7 @@ export class ConfigService {
CALL: process.env?.KAFKA_EVENTS_CALL === 'true',
TYPEBOT_START: process.env?.KAFKA_EVENTS_TYPEBOT_START === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
MESSAGING_HISTORY_SET: process.env?.KAFKA_EVENTS_MESSAGING_HISTORY_SET === 'true',
},
SASL:
process.env?.KAFKA_SASL_ENABLED === 'true'
@@ -724,6 +732,7 @@ export class ConfigService {
CALL: process.env?.PUSHER_EVENTS_CALL === 'true',
TYPEBOT_START: process.env?.PUSHER_EVENTS_TYPEBOT_START === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.PUSHER_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
MESSAGING_HISTORY_SET: process.env?.PUSHER_EVENTS_MESSAGING_HISTORY_SET === 'true',
},
},
WA_BUSINESS: {
@@ -781,6 +790,7 @@ export class ConfigService {
CALL: process.env?.WEBHOOK_EVENTS_CALL === 'true',
TYPEBOT_START: process.env?.WEBHOOK_EVENTS_TYPEBOT_START === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.WEBHOOK_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
MESSAGING_HISTORY_SET: process.env?.WEBHOOK_EVENTS_MESSAGING_HISTORY_SET === 'true',
ERRORS: process.env?.WEBHOOK_EVENTS_ERRORS === 'true',
ERRORS_WEBHOOK: process.env?.WEBHOOK_EVENTS_ERRORS_WEBHOOK || '',
},
+4
View File
@@ -86,6 +86,7 @@ export const instanceSchema: JSONSchema7 = {
'CALL',
'TYPEBOT_START',
'TYPEBOT_CHANGE_STATUS',
'MESSAGING_HISTORY_SET',
],
},
},
@@ -123,6 +124,7 @@ export const instanceSchema: JSONSchema7 = {
'CALL',
'TYPEBOT_START',
'TYPEBOT_CHANGE_STATUS',
'MESSAGING_HISTORY_SET',
],
},
},
@@ -160,6 +162,7 @@ export const instanceSchema: JSONSchema7 = {
'CALL',
'TYPEBOT_START',
'TYPEBOT_CHANGE_STATUS',
'MESSAGING_HISTORY_SET',
],
},
},
@@ -197,6 +200,7 @@ export const instanceSchema: JSONSchema7 = {
'CALL',
'TYPEBOT_START',
'TYPEBOT_CHANGE_STATUS',
'MESSAGING_HISTORY_SET',
],
},
},