From ec7999b04f8717cf5dd8bbf42459a1ff161a70f0 Mon Sep 17 00:00:00 2001 From: Alexandre Reyes Martins Date: Mon, 23 Feb 2026 21:31:20 +0000 Subject: [PATCH] feat(history-sync): emit messaging-history.set event on sync completion and fix race condition Reorder webhook emissions (CHATS_SET, MESSAGES_SET) to fire after database persistence, fixing a race condition where consumers received the event before data was queryable. Emit a new MESSAGING_HISTORY_SET event when progress reaches 100%, allowing consumers to know exactly when history sync is complete and messages are available in the database. Register the new event across all transport types (Webhook, WebSocket, RabbitMQ, NATS, SQS, Kafka, Pusher) and validation schemas. --- .../whatsapp/whatsapp.baileys.service.ts | 20 +++++++++++++------ .../integrations/event/event.controller.ts | 1 + src/config/env.config.ts | 10 ++++++++++ src/validate/instance.schema.ts | 4 ++++ 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 60e857fc..9f4a900a 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -989,12 +989,12 @@ export class BaileysStartupService extends ChannelStartupService { chatsRaw.push({ remoteJid: chat.id, instanceId: this.instanceId, name: chat.name }); } - this.sendDataWebhook(Events.CHATS_SET, chatsRaw); - if (this.configService.get('DATABASE').SAVE_DATA.HISTORIC) { await this.prismaRepository.chat.createMany({ data: chatsRaw, skipDuplicates: true }); } + this.sendDataWebhook(Events.CHATS_SET, chatsRaw); + const messagesRaw: any[] = []; const messagesRepository: Set = new Set( @@ -1046,15 +1046,15 @@ export class BaileysStartupService extends ChannelStartupService { messagesRaw.push(this.prepareMessage(m)); } + if (this.configService.get('DATABASE').SAVE_DATA.HISTORIC) { + await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true }); + } + this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw], true, undefined, { isLatest, progress, }); - if (this.configService.get('DATABASE').SAVE_DATA.HISTORIC) { - await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true }); - } - if ( this.configService.get('CHATWOOT').ENABLED && this.localChatwoot?.enabled && @@ -1071,6 +1071,14 @@ export class BaileysStartupService extends ChannelStartupService { contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })), ); + if (progress === 100) { + this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, { + messageCount: messagesRaw.length, + chatCount: chatsRaw.length, + contactCount: contacts?.length ?? 0, + }); + } + contacts = undefined; messages = undefined; chats = undefined; diff --git a/src/api/integrations/event/event.controller.ts b/src/api/integrations/event/event.controller.ts index 39b52184..63061ea1 100644 --- a/src/api/integrations/event/event.controller.ts +++ b/src/api/integrations/event/event.controller.ts @@ -162,6 +162,7 @@ export class EventController { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', 'REMOVE_INSTANCE', 'LOGOUT_INSTANCE', 'INSTANCE_CREATE', diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 7c4e382e..772ae927 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -91,6 +91,7 @@ export type EventsRabbitmq = { CALL: boolean; TYPEBOT_START: boolean; TYPEBOT_CHANGE_STATUS: boolean; + MESSAGING_HISTORY_SET: boolean; }; export type Rabbitmq = { @@ -150,6 +151,7 @@ export type Sqs = { SEND_MESSAGE: boolean; TYPEBOT_CHANGE_STATUS: boolean; TYPEBOT_START: boolean; + MESSAGING_HISTORY_SET: boolean; }; }; @@ -223,6 +225,7 @@ export type EventsWebhook = { CALL: boolean; TYPEBOT_START: boolean; TYPEBOT_CHANGE_STATUS: boolean; + MESSAGING_HISTORY_SET: boolean; ERRORS: boolean; ERRORS_WEBHOOK: string; }; @@ -256,6 +259,7 @@ export type EventsPusher = { CALL: boolean; TYPEBOT_START: boolean; TYPEBOT_CHANGE_STATUS: boolean; + MESSAGING_HISTORY_SET: boolean; }; export type ApiKey = { KEY: string }; @@ -537,6 +541,7 @@ export class ConfigService { CALL: process.env?.RABBITMQ_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.RABBITMQ_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.RABBITMQ_EVENTS_MESSAGING_HISTORY_SET === 'true', }, }, NATS: { @@ -574,6 +579,7 @@ export class ConfigService { CALL: process.env?.NATS_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.NATS_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.NATS_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.NATS_EVENTS_MESSAGING_HISTORY_SET === 'true', }, }, SQS: { @@ -614,6 +620,7 @@ export class ConfigService { SEND_MESSAGE: process.env?.SQS_GLOBAL_SEND_MESSAGE === 'true', TYPEBOT_CHANGE_STATUS: process.env?.SQS_GLOBAL_TYPEBOT_CHANGE_STATUS === 'true', TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true', + MESSAGING_HISTORY_SET: process.env?.SQS_GLOBAL_MESSAGING_HISTORY_SET === 'true', }, }, KAFKA: { @@ -657,6 +664,7 @@ export class ConfigService { CALL: process.env?.KAFKA_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.KAFKA_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.KAFKA_EVENTS_MESSAGING_HISTORY_SET === 'true', }, SASL: process.env?.KAFKA_SASL_ENABLED === 'true' @@ -722,6 +730,7 @@ export class ConfigService { CALL: process.env?.PUSHER_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.PUSHER_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.PUSHER_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.PUSHER_EVENTS_MESSAGING_HISTORY_SET === 'true', }, }, WA_BUSINESS: { @@ -779,6 +788,7 @@ export class ConfigService { CALL: process.env?.WEBHOOK_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.WEBHOOK_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.WEBHOOK_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.WEBHOOK_EVENTS_MESSAGING_HISTORY_SET === 'true', ERRORS: process.env?.WEBHOOK_EVENTS_ERRORS === 'true', ERRORS_WEBHOOK: process.env?.WEBHOOK_EVENTS_ERRORS_WEBHOOK || '', }, diff --git a/src/validate/instance.schema.ts b/src/validate/instance.schema.ts index a0553b66..16fd4fe8 100644 --- a/src/validate/instance.schema.ts +++ b/src/validate/instance.schema.ts @@ -86,6 +86,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, }, @@ -123,6 +124,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, }, @@ -160,6 +162,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, }, @@ -197,6 +200,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, },