From 2408384b0fbe7d38c230d2396d3e2c7eff62af89 Mon Sep 17 00:00:00 2001 From: Vitor Manoel Santos Moura <72520858+Vitordotpy@users.noreply.github.com> Date: Sun, 30 Nov 2025 00:25:17 -0300 Subject: [PATCH] Refactor message handling and polling updates Refactor message handling and polling updates, including decryption logic for poll votes and cache management for message updates. Improved event processing flow and added handling for various message types. --- .../whatsapp/whatsapp.baileys.service.ts | 381 ++++++++++++------ 1 file changed, 260 insertions(+), 121 deletions(-) diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index e055b2bd..a793c4d7 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -90,6 +90,7 @@ import useMultiFileAuthStatePrisma from '@utils/use-multi-file-auth-state-prisma import { AuthStateProvider } from '@utils/use-multi-file-auth-state-provider-files'; import { useMultiFileAuthStateRedisDb } from '@utils/use-multi-file-auth-state-redis-db'; import axios from 'axios'; +import { createHash } from 'crypto'; import makeWASocket, { AnyMessageContent, BufferedEventData, @@ -100,9 +101,11 @@ import makeWASocket, { ConnectionState, Contact, delay, + decryptPollVote, DisconnectReason, downloadContentFromMessage, downloadMediaMessage, + jidNormalizedUser, generateWAMessageFromContent, getAggregateVotesInPollMessage, GetCatalogOptions, @@ -247,6 +250,7 @@ export class BaileysStartupService extends ChannelStartupService { private readonly userDevicesCache: CacheStore = new NodeCache({ stdTTL: 300000, useClones: false }); private endSession = false; private logBaileys = this.configService.get('LOG').BAILEYS; + private eventProcessingQueue: Promise = Promise.resolve(); // Cache TTL constants (in seconds) private readonly MESSAGE_CACHE_TTL_SECONDS = 5 * 60; // 5 minutes - avoid duplicate message processing @@ -1121,6 +1125,11 @@ export class BaileysStartupService extends ChannelStartupService { ); await this.sendDataWebhook(Events.MESSAGES_EDITED, editedMessage); + + if (received.key?.id && editedMessage.key?.id) { + await this.baileysCache.set(`protocol_${received.key.id}`, editedMessage.key.id, 60 * 60 * 24); + } + const oldMessage = await this.getMessage(editedMessage.key, true); if ((oldMessage as any)?.id) { const editedMessageTimestamp = Long.isLong(received?.messageTimestamp) @@ -1188,6 +1197,109 @@ export class BaileysStartupService extends ChannelStartupService { const messageRaw = this.prepareMessage(received); + if (messageRaw.messageType === 'pollUpdateMessage') { + const pollCreationKey = messageRaw.message.pollUpdateMessage.pollCreationMessageKey; + const pollMessage = (await this.getMessage(pollCreationKey, true)) as proto.IWebMessageInfo; + const pollMessageSecret = (await this.getMessage(pollCreationKey)) as any; + + if (pollMessage) { + const pollOptions = + (pollMessage.message as any).pollCreationMessage?.options || + (pollMessage.message as any).pollCreationMessageV3?.options || + []; + const pollVote = messageRaw.message.pollUpdateMessage.vote; + + const voterJid = received.key.fromMe + ? this.instance.wuid + : received.key.participant || received.key.remoteJid; + + let pollEncKey = pollMessageSecret?.messageContextInfo?.messageSecret; + + let successfulVoterJid = voterJid; + + if (typeof pollEncKey === 'string') { + pollEncKey = Buffer.from(pollEncKey, 'base64'); + } else if (pollEncKey?.type === 'Buffer' && Array.isArray(pollEncKey.data)) { + pollEncKey = Buffer.from(pollEncKey.data); + } + + if (Buffer.isBuffer(pollEncKey) && pollEncKey.length === 44) { + pollEncKey = Buffer.from(pollEncKey.toString('utf8'), 'base64'); + } + + if (pollVote.encPayload && pollEncKey) { + const creatorCandidates = [ + this.instance.wuid, + this.client.user?.lid, + pollMessage.key.participant, + (pollMessage.key as any).participantAlt, + pollMessage.key.remoteJid, + ]; + + const key = received.key as any; + const voterCandidates = [ + this.instance.wuid, + this.client.user?.lid, + key.participant, + key.participantAlt, + key.remoteJidAlt, + key.remoteJid, + ]; + + const uniqueCreators = [ + ...new Set(creatorCandidates.filter(Boolean).map((id) => jidNormalizedUser(id))), + ]; + const uniqueVoters = [ + ...new Set(voterCandidates.filter(Boolean).map((id) => jidNormalizedUser(id))), + ]; + + let decryptedVote; + + for (const creator of uniqueCreators) { + for (const voter of uniqueVoters) { + try { + decryptedVote = decryptPollVote(pollVote, { + pollCreatorJid: creator, + pollMsgId: pollMessage.key.id, + pollEncKey, + voterJid: voter, + } as any); + if (decryptedVote) { + successfulVoterJid = voter; + break; + } + } catch (err) { + // Continue trying + } + } + if (decryptedVote) break; + } + + if (decryptedVote) { + Object.assign(pollVote, decryptedVote); + } + } + + const selectedOptions = pollVote?.selectedOptions || []; + + const selectedOptionNames = pollOptions + .filter((option) => { + const hash = createHash('sha256').update(option.optionName).digest(); + return selectedOptions.some((selected) => Buffer.compare(selected, hash) === 0); + }) + .map((option) => option.optionName); + + messageRaw.message.pollUpdateMessage.vote.selectedOptions = selectedOptionNames; + + const pollUpdates = pollOptions.map((option) => ({ + name: option.optionName, + voters: selectedOptionNames.includes(option.optionName) ? [successfulVoterJid] : [], + })); + + messageRaw.pollUpdates = pollUpdates; + } + } + const isMedia = received?.message?.imageMessage || received?.message?.videoMessage || @@ -1237,7 +1349,8 @@ export class BaileysStartupService extends ChannelStartupService { } if (this.configService.get('DATABASE').SAVE_DATA.NEW_MESSAGE) { - const msg = await this.prismaRepository.message.create({ data: messageRaw }); + const { pollUpdates, ...messageData } = messageRaw; + const msg = await this.prismaRepository.message.create({ data: messageData }); const { remoteJid } = received.key; const timestamp = msg.messageTimestamp; @@ -1442,18 +1555,23 @@ export class BaileysStartupService extends ChannelStartupService { continue; } - if (update.message === null && update.status === undefined) continue; - const updateKey = `${this.instance.id}_${key.id}_${update.status}`; const cached = await this.baileysCache.get(updateKey); - if (cached && update.messageTimestamp == cached.messageTimestamp) { + const secondsSinceEpoch = Math.floor(Date.now() / 1000) + console.log('CACHE:', {cached, updateKey, messageTimestamp: update.messageTimestamp, secondsSinceEpoch}); + + if ((update.messageTimestamp && update.messageTimestamp === cached) || (!update.messageTimestamp && secondsSinceEpoch === cached)) { this.logger.info(`Update Message duplicated ignored [avoid deadlock]: ${updateKey}`); continue; } - await this.baileysCache.set(updateKey, update.messageTimestamp, 30 * 60); + if (update.messageTimestamp) { + await this.baileysCache.set(updateKey, update.messageTimestamp, 30 * 60); + } else { + await this.baileysCache.set(updateKey, secondsSinceEpoch, 30 * 60); + } if (status[update.status] === 'READ' && key.fromMe) { if (this.configService.get('CHATWOOT').ENABLED && this.localChatwoot?.enabled) { @@ -1489,14 +1607,27 @@ export class BaileysStartupService extends ChannelStartupService { instanceId: this.instanceId, }; + if (update.message) { + message.message = update.message; + } + let findMessage: any; const configDatabaseData = this.configService.get('DATABASE').SAVE_DATA; if (configDatabaseData.HISTORIC || configDatabaseData.NEW_MESSAGE) { // Use raw SQL to avoid JSON path issues + const protocolMapKey = `protocol_${key.id}`; + const originalMessageId = (await this.baileysCache.get(protocolMapKey)) as string; + + if (originalMessageId) { + message.keyId = originalMessageId; + } + + const searchId = originalMessageId || key.id; + const messages = (await this.prismaRepository.$queryRaw` SELECT * FROM "Message" WHERE "instanceId" = ${this.instanceId} - AND "key"->>'id' = ${key.id} + AND "key"->>'id' = ${searchId} LIMIT 1 `) as any[]; findMessage = messages[0] || null; @@ -1509,7 +1640,7 @@ export class BaileysStartupService extends ChannelStartupService { } if (update.message === null && update.status === undefined) { - this.sendDataWebhook(Events.MESSAGES_DELETE, key); + this.sendDataWebhook(Events.MESSAGES_DELETE, { ...key, status: 'DELETED' }); if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) await this.prismaRepository.messageUpdate.create({ data: message }); @@ -1557,8 +1688,10 @@ export class BaileysStartupService extends ChannelStartupService { this.sendDataWebhook(Events.MESSAGES_UPDATE, message); - if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) - await this.prismaRepository.messageUpdate.create({ data: message }); + if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) { + const { message: _msg, ...messageData } = message; + await this.prismaRepository.messageUpdate.create({ data: messageData }); + } const existingChat = await this.prismaRepository.chat.findFirst({ where: { instanceId: this.instanceId, remoteJid: message.remoteJid }, @@ -1727,135 +1860,141 @@ export class BaileysStartupService extends ChannelStartupService { private eventHandler() { this.client.ev.process(async (events) => { - if (!this.endSession) { - const database = this.configService.get('DATABASE'); - const settings = await this.findSettings(); + this.eventProcessingQueue = this.eventProcessingQueue.then(async () => { + try { + if (!this.endSession) { + const database = this.configService.get('DATABASE'); + const settings = await this.findSettings(); - if (events.call) { - const call = events.call[0]; + if (events.call) { + const call = events.call[0]; - if (settings?.rejectCall && call.status == 'offer') { - this.client.rejectCall(call.id, call.from); - } + if (settings?.rejectCall && call.status == 'offer') { + this.client.rejectCall(call.id, call.from); + } - if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') { - if (call.from.endsWith('@lid')) { - call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string); + if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') { + if (call.from.endsWith('@lid')) { + call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string); + } + const msg = await this.client.sendMessage(call.from, { text: settings.msgCall }); + + this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' }); + } + + this.sendDataWebhook(Events.CALL, call); } - const msg = await this.client.sendMessage(call.from, { text: settings.msgCall }); - this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' }); - } + if (events['connection.update']) { + this.connectionUpdate(events['connection.update']); + } - this.sendDataWebhook(Events.CALL, call); - } + if (events['creds.update']) { + this.instance.authState.saveCreds(); + } - if (events['connection.update']) { - this.connectionUpdate(events['connection.update']); - } + if (events['messaging-history.set']) { + const payload = events['messaging-history.set']; + await this.messageHandle['messaging-history.set'](payload); + } - if (events['creds.update']) { - this.instance.authState.saveCreds(); - } + if (events['messages.upsert']) { + const payload = events['messages.upsert']; - if (events['messaging-history.set']) { - const payload = events['messaging-history.set']; - this.messageHandle['messaging-history.set'](payload); - } + // this.messageProcessor.processMessage(payload, settings); + await this.messageHandle['messages.upsert'](payload, settings); + } - if (events['messages.upsert']) { - const payload = events['messages.upsert']; + if (events['messages.update']) { + const payload = events['messages.update']; + await this.messageHandle['messages.update'](payload, settings); + } - this.messageProcessor.processMessage(payload, settings); - // this.messageHandle['messages.upsert'](payload, settings); - } + if (events['message-receipt.update']) { + const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[]; + const remotesJidMap: Record = {}; - if (events['messages.update']) { - const payload = events['messages.update']; - this.messageHandle['messages.update'](payload, settings); - } + for (const event of payload) { + if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') { + remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp; + } + } - if (events['message-receipt.update']) { - const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[]; - const remotesJidMap: Record = {}; + await Promise.all( + Object.keys(remotesJidMap).map(async (remoteJid) => + this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]), + ), + ); + } - for (const event of payload) { - if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') { - remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp; + if (events['presence.update']) { + const payload = events['presence.update']; + + if (settings?.groupsIgnore && payload.id.includes('@g.us')) { + return; + } + + this.sendDataWebhook(Events.PRESENCE_UPDATE, payload); + } + + if (!settings?.groupsIgnore) { + if (events['groups.upsert']) { + const payload = events['groups.upsert']; + this.groupHandler['groups.upsert'](payload); + } + + if (events['groups.update']) { + const payload = events['groups.update']; + this.groupHandler['groups.update'](payload); + } + + if (events['group-participants.update']) { + const payload = events['group-participants.update'] as any; + this.groupHandler['group-participants.update'](payload); + } + } + + if (events['chats.upsert']) { + const payload = events['chats.upsert']; + this.chatHandle['chats.upsert'](payload); + } + + if (events['chats.update']) { + const payload = events['chats.update']; + this.chatHandle['chats.update'](payload); + } + + if (events['chats.delete']) { + const payload = events['chats.delete']; + this.chatHandle['chats.delete'](payload); + } + + if (events['contacts.upsert']) { + const payload = events['contacts.upsert']; + this.contactHandle['contacts.upsert'](payload); + } + + if (events['contacts.update']) { + const payload = events['contacts.update']; + this.contactHandle['contacts.update'](payload); + } + + if (events[Events.LABELS_ASSOCIATION]) { + const payload = events[Events.LABELS_ASSOCIATION]; + this.labelHandle[Events.LABELS_ASSOCIATION](payload, database); + return; + } + + if (events[Events.LABELS_EDIT]) { + const payload = events[Events.LABELS_EDIT]; + this.labelHandle[Events.LABELS_EDIT](payload); + return; } } - - await Promise.all( - Object.keys(remotesJidMap).map(async (remoteJid) => - this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]), - ), - ); + } catch (error) { + this.logger.error(error); } - - if (events['presence.update']) { - const payload = events['presence.update']; - - if (settings?.groupsIgnore && payload.id.includes('@g.us')) { - return; - } - - this.sendDataWebhook(Events.PRESENCE_UPDATE, payload); - } - - if (!settings?.groupsIgnore) { - if (events['groups.upsert']) { - const payload = events['groups.upsert']; - this.groupHandler['groups.upsert'](payload); - } - - if (events['groups.update']) { - const payload = events['groups.update']; - this.groupHandler['groups.update'](payload); - } - - if (events['group-participants.update']) { - const payload = events['group-participants.update'] as any; - this.groupHandler['group-participants.update'](payload); - } - } - - if (events['chats.upsert']) { - const payload = events['chats.upsert']; - this.chatHandle['chats.upsert'](payload); - } - - if (events['chats.update']) { - const payload = events['chats.update']; - this.chatHandle['chats.update'](payload); - } - - if (events['chats.delete']) { - const payload = events['chats.delete']; - this.chatHandle['chats.delete'](payload); - } - - if (events['contacts.upsert']) { - const payload = events['contacts.upsert']; - this.contactHandle['contacts.upsert'](payload); - } - - if (events['contacts.update']) { - const payload = events['contacts.update']; - this.contactHandle['contacts.update'](payload); - } - - if (events[Events.LABELS_ASSOCIATION]) { - const payload = events[Events.LABELS_ASSOCIATION]; - this.labelHandle[Events.LABELS_ASSOCIATION](payload, database); - return; - } - - if (events[Events.LABELS_EDIT]) { - const payload = events[Events.LABELS_EDIT]; - this.labelHandle[Events.LABELS_EDIT](payload); - return; - } - } + }); }); }