diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index e055b2bd..a793c4d7 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -90,6 +90,7 @@ import useMultiFileAuthStatePrisma from '@utils/use-multi-file-auth-state-prisma import { AuthStateProvider } from '@utils/use-multi-file-auth-state-provider-files'; import { useMultiFileAuthStateRedisDb } from '@utils/use-multi-file-auth-state-redis-db'; import axios from 'axios'; +import { createHash } from 'crypto'; import makeWASocket, { AnyMessageContent, BufferedEventData, @@ -100,9 +101,11 @@ import makeWASocket, { ConnectionState, Contact, delay, + decryptPollVote, DisconnectReason, downloadContentFromMessage, downloadMediaMessage, + jidNormalizedUser, generateWAMessageFromContent, getAggregateVotesInPollMessage, GetCatalogOptions, @@ -247,6 +250,7 @@ export class BaileysStartupService extends ChannelStartupService { private readonly userDevicesCache: CacheStore = new NodeCache({ stdTTL: 300000, useClones: false }); private endSession = false; private logBaileys = this.configService.get('LOG').BAILEYS; + private eventProcessingQueue: Promise = Promise.resolve(); // Cache TTL constants (in seconds) private readonly MESSAGE_CACHE_TTL_SECONDS = 5 * 60; // 5 minutes - avoid duplicate message processing @@ -1121,6 +1125,11 @@ export class BaileysStartupService extends ChannelStartupService { ); await this.sendDataWebhook(Events.MESSAGES_EDITED, editedMessage); + + if (received.key?.id && editedMessage.key?.id) { + await this.baileysCache.set(`protocol_${received.key.id}`, editedMessage.key.id, 60 * 60 * 24); + } + const oldMessage = await this.getMessage(editedMessage.key, true); if ((oldMessage as any)?.id) { const editedMessageTimestamp = Long.isLong(received?.messageTimestamp) @@ -1188,6 +1197,109 @@ export class BaileysStartupService extends ChannelStartupService { const messageRaw = this.prepareMessage(received); + if (messageRaw.messageType === 'pollUpdateMessage') { + const pollCreationKey = messageRaw.message.pollUpdateMessage.pollCreationMessageKey; + const pollMessage = (await this.getMessage(pollCreationKey, true)) as proto.IWebMessageInfo; + const pollMessageSecret = (await this.getMessage(pollCreationKey)) as any; + + if (pollMessage) { + const pollOptions = + (pollMessage.message as any).pollCreationMessage?.options || + (pollMessage.message as any).pollCreationMessageV3?.options || + []; + const pollVote = messageRaw.message.pollUpdateMessage.vote; + + const voterJid = received.key.fromMe + ? this.instance.wuid + : received.key.participant || received.key.remoteJid; + + let pollEncKey = pollMessageSecret?.messageContextInfo?.messageSecret; + + let successfulVoterJid = voterJid; + + if (typeof pollEncKey === 'string') { + pollEncKey = Buffer.from(pollEncKey, 'base64'); + } else if (pollEncKey?.type === 'Buffer' && Array.isArray(pollEncKey.data)) { + pollEncKey = Buffer.from(pollEncKey.data); + } + + if (Buffer.isBuffer(pollEncKey) && pollEncKey.length === 44) { + pollEncKey = Buffer.from(pollEncKey.toString('utf8'), 'base64'); + } + + if (pollVote.encPayload && pollEncKey) { + const creatorCandidates = [ + this.instance.wuid, + this.client.user?.lid, + pollMessage.key.participant, + (pollMessage.key as any).participantAlt, + pollMessage.key.remoteJid, + ]; + + const key = received.key as any; + const voterCandidates = [ + this.instance.wuid, + this.client.user?.lid, + key.participant, + key.participantAlt, + key.remoteJidAlt, + key.remoteJid, + ]; + + const uniqueCreators = [ + ...new Set(creatorCandidates.filter(Boolean).map((id) => jidNormalizedUser(id))), + ]; + const uniqueVoters = [ + ...new Set(voterCandidates.filter(Boolean).map((id) => jidNormalizedUser(id))), + ]; + + let decryptedVote; + + for (const creator of uniqueCreators) { + for (const voter of uniqueVoters) { + try { + decryptedVote = decryptPollVote(pollVote, { + pollCreatorJid: creator, + pollMsgId: pollMessage.key.id, + pollEncKey, + voterJid: voter, + } as any); + if (decryptedVote) { + successfulVoterJid = voter; + break; + } + } catch (err) { + // Continue trying + } + } + if (decryptedVote) break; + } + + if (decryptedVote) { + Object.assign(pollVote, decryptedVote); + } + } + + const selectedOptions = pollVote?.selectedOptions || []; + + const selectedOptionNames = pollOptions + .filter((option) => { + const hash = createHash('sha256').update(option.optionName).digest(); + return selectedOptions.some((selected) => Buffer.compare(selected, hash) === 0); + }) + .map((option) => option.optionName); + + messageRaw.message.pollUpdateMessage.vote.selectedOptions = selectedOptionNames; + + const pollUpdates = pollOptions.map((option) => ({ + name: option.optionName, + voters: selectedOptionNames.includes(option.optionName) ? [successfulVoterJid] : [], + })); + + messageRaw.pollUpdates = pollUpdates; + } + } + const isMedia = received?.message?.imageMessage || received?.message?.videoMessage || @@ -1237,7 +1349,8 @@ export class BaileysStartupService extends ChannelStartupService { } if (this.configService.get('DATABASE').SAVE_DATA.NEW_MESSAGE) { - const msg = await this.prismaRepository.message.create({ data: messageRaw }); + const { pollUpdates, ...messageData } = messageRaw; + const msg = await this.prismaRepository.message.create({ data: messageData }); const { remoteJid } = received.key; const timestamp = msg.messageTimestamp; @@ -1442,18 +1555,23 @@ export class BaileysStartupService extends ChannelStartupService { continue; } - if (update.message === null && update.status === undefined) continue; - const updateKey = `${this.instance.id}_${key.id}_${update.status}`; const cached = await this.baileysCache.get(updateKey); - if (cached && update.messageTimestamp == cached.messageTimestamp) { + const secondsSinceEpoch = Math.floor(Date.now() / 1000) + console.log('CACHE:', {cached, updateKey, messageTimestamp: update.messageTimestamp, secondsSinceEpoch}); + + if ((update.messageTimestamp && update.messageTimestamp === cached) || (!update.messageTimestamp && secondsSinceEpoch === cached)) { this.logger.info(`Update Message duplicated ignored [avoid deadlock]: ${updateKey}`); continue; } - await this.baileysCache.set(updateKey, update.messageTimestamp, 30 * 60); + if (update.messageTimestamp) { + await this.baileysCache.set(updateKey, update.messageTimestamp, 30 * 60); + } else { + await this.baileysCache.set(updateKey, secondsSinceEpoch, 30 * 60); + } if (status[update.status] === 'READ' && key.fromMe) { if (this.configService.get('CHATWOOT').ENABLED && this.localChatwoot?.enabled) { @@ -1489,14 +1607,27 @@ export class BaileysStartupService extends ChannelStartupService { instanceId: this.instanceId, }; + if (update.message) { + message.message = update.message; + } + let findMessage: any; const configDatabaseData = this.configService.get('DATABASE').SAVE_DATA; if (configDatabaseData.HISTORIC || configDatabaseData.NEW_MESSAGE) { // Use raw SQL to avoid JSON path issues + const protocolMapKey = `protocol_${key.id}`; + const originalMessageId = (await this.baileysCache.get(protocolMapKey)) as string; + + if (originalMessageId) { + message.keyId = originalMessageId; + } + + const searchId = originalMessageId || key.id; + const messages = (await this.prismaRepository.$queryRaw` SELECT * FROM "Message" WHERE "instanceId" = ${this.instanceId} - AND "key"->>'id' = ${key.id} + AND "key"->>'id' = ${searchId} LIMIT 1 `) as any[]; findMessage = messages[0] || null; @@ -1509,7 +1640,7 @@ export class BaileysStartupService extends ChannelStartupService { } if (update.message === null && update.status === undefined) { - this.sendDataWebhook(Events.MESSAGES_DELETE, key); + this.sendDataWebhook(Events.MESSAGES_DELETE, { ...key, status: 'DELETED' }); if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) await this.prismaRepository.messageUpdate.create({ data: message }); @@ -1557,8 +1688,10 @@ export class BaileysStartupService extends ChannelStartupService { this.sendDataWebhook(Events.MESSAGES_UPDATE, message); - if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) - await this.prismaRepository.messageUpdate.create({ data: message }); + if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) { + const { message: _msg, ...messageData } = message; + await this.prismaRepository.messageUpdate.create({ data: messageData }); + } const existingChat = await this.prismaRepository.chat.findFirst({ where: { instanceId: this.instanceId, remoteJid: message.remoteJid }, @@ -1727,135 +1860,141 @@ export class BaileysStartupService extends ChannelStartupService { private eventHandler() { this.client.ev.process(async (events) => { - if (!this.endSession) { - const database = this.configService.get('DATABASE'); - const settings = await this.findSettings(); + this.eventProcessingQueue = this.eventProcessingQueue.then(async () => { + try { + if (!this.endSession) { + const database = this.configService.get('DATABASE'); + const settings = await this.findSettings(); - if (events.call) { - const call = events.call[0]; + if (events.call) { + const call = events.call[0]; - if (settings?.rejectCall && call.status == 'offer') { - this.client.rejectCall(call.id, call.from); - } + if (settings?.rejectCall && call.status == 'offer') { + this.client.rejectCall(call.id, call.from); + } - if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') { - if (call.from.endsWith('@lid')) { - call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string); + if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') { + if (call.from.endsWith('@lid')) { + call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string); + } + const msg = await this.client.sendMessage(call.from, { text: settings.msgCall }); + + this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' }); + } + + this.sendDataWebhook(Events.CALL, call); } - const msg = await this.client.sendMessage(call.from, { text: settings.msgCall }); - this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' }); - } + if (events['connection.update']) { + this.connectionUpdate(events['connection.update']); + } - this.sendDataWebhook(Events.CALL, call); - } + if (events['creds.update']) { + this.instance.authState.saveCreds(); + } - if (events['connection.update']) { - this.connectionUpdate(events['connection.update']); - } + if (events['messaging-history.set']) { + const payload = events['messaging-history.set']; + await this.messageHandle['messaging-history.set'](payload); + } - if (events['creds.update']) { - this.instance.authState.saveCreds(); - } + if (events['messages.upsert']) { + const payload = events['messages.upsert']; - if (events['messaging-history.set']) { - const payload = events['messaging-history.set']; - this.messageHandle['messaging-history.set'](payload); - } + // this.messageProcessor.processMessage(payload, settings); + await this.messageHandle['messages.upsert'](payload, settings); + } - if (events['messages.upsert']) { - const payload = events['messages.upsert']; + if (events['messages.update']) { + const payload = events['messages.update']; + await this.messageHandle['messages.update'](payload, settings); + } - this.messageProcessor.processMessage(payload, settings); - // this.messageHandle['messages.upsert'](payload, settings); - } + if (events['message-receipt.update']) { + const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[]; + const remotesJidMap: Record = {}; - if (events['messages.update']) { - const payload = events['messages.update']; - this.messageHandle['messages.update'](payload, settings); - } + for (const event of payload) { + if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') { + remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp; + } + } - if (events['message-receipt.update']) { - const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[]; - const remotesJidMap: Record = {}; + await Promise.all( + Object.keys(remotesJidMap).map(async (remoteJid) => + this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]), + ), + ); + } - for (const event of payload) { - if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') { - remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp; + if (events['presence.update']) { + const payload = events['presence.update']; + + if (settings?.groupsIgnore && payload.id.includes('@g.us')) { + return; + } + + this.sendDataWebhook(Events.PRESENCE_UPDATE, payload); + } + + if (!settings?.groupsIgnore) { + if (events['groups.upsert']) { + const payload = events['groups.upsert']; + this.groupHandler['groups.upsert'](payload); + } + + if (events['groups.update']) { + const payload = events['groups.update']; + this.groupHandler['groups.update'](payload); + } + + if (events['group-participants.update']) { + const payload = events['group-participants.update'] as any; + this.groupHandler['group-participants.update'](payload); + } + } + + if (events['chats.upsert']) { + const payload = events['chats.upsert']; + this.chatHandle['chats.upsert'](payload); + } + + if (events['chats.update']) { + const payload = events['chats.update']; + this.chatHandle['chats.update'](payload); + } + + if (events['chats.delete']) { + const payload = events['chats.delete']; + this.chatHandle['chats.delete'](payload); + } + + if (events['contacts.upsert']) { + const payload = events['contacts.upsert']; + this.contactHandle['contacts.upsert'](payload); + } + + if (events['contacts.update']) { + const payload = events['contacts.update']; + this.contactHandle['contacts.update'](payload); + } + + if (events[Events.LABELS_ASSOCIATION]) { + const payload = events[Events.LABELS_ASSOCIATION]; + this.labelHandle[Events.LABELS_ASSOCIATION](payload, database); + return; + } + + if (events[Events.LABELS_EDIT]) { + const payload = events[Events.LABELS_EDIT]; + this.labelHandle[Events.LABELS_EDIT](payload); + return; } } - - await Promise.all( - Object.keys(remotesJidMap).map(async (remoteJid) => - this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]), - ), - ); + } catch (error) { + this.logger.error(error); } - - if (events['presence.update']) { - const payload = events['presence.update']; - - if (settings?.groupsIgnore && payload.id.includes('@g.us')) { - return; - } - - this.sendDataWebhook(Events.PRESENCE_UPDATE, payload); - } - - if (!settings?.groupsIgnore) { - if (events['groups.upsert']) { - const payload = events['groups.upsert']; - this.groupHandler['groups.upsert'](payload); - } - - if (events['groups.update']) { - const payload = events['groups.update']; - this.groupHandler['groups.update'](payload); - } - - if (events['group-participants.update']) { - const payload = events['group-participants.update'] as any; - this.groupHandler['group-participants.update'](payload); - } - } - - if (events['chats.upsert']) { - const payload = events['chats.upsert']; - this.chatHandle['chats.upsert'](payload); - } - - if (events['chats.update']) { - const payload = events['chats.update']; - this.chatHandle['chats.update'](payload); - } - - if (events['chats.delete']) { - const payload = events['chats.delete']; - this.chatHandle['chats.delete'](payload); - } - - if (events['contacts.upsert']) { - const payload = events['contacts.upsert']; - this.contactHandle['contacts.upsert'](payload); - } - - if (events['contacts.update']) { - const payload = events['contacts.update']; - this.contactHandle['contacts.update'](payload); - } - - if (events[Events.LABELS_ASSOCIATION]) { - const payload = events[Events.LABELS_ASSOCIATION]; - this.labelHandle[Events.LABELS_ASSOCIATION](payload, database); - return; - } - - if (events[Events.LABELS_EDIT]) { - const payload = events[Events.LABELS_EDIT]; - this.labelHandle[Events.LABELS_EDIT](payload); - return; - } - } + }); }); }