mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-12-09 01:49:37 -06:00
Refactor message handling and polling updates
Refactor message handling and polling updates, including decryption logic for poll votes and cache management for message updates. Improved event processing flow and added handling for various message types.
This commit is contained in:
parent
250ddd2e89
commit
2408384b0f
@ -90,6 +90,7 @@ import useMultiFileAuthStatePrisma from '@utils/use-multi-file-auth-state-prisma
|
||||
import { AuthStateProvider } from '@utils/use-multi-file-auth-state-provider-files';
|
||||
import { useMultiFileAuthStateRedisDb } from '@utils/use-multi-file-auth-state-redis-db';
|
||||
import axios from 'axios';
|
||||
import { createHash } from 'crypto';
|
||||
import makeWASocket, {
|
||||
AnyMessageContent,
|
||||
BufferedEventData,
|
||||
@ -100,9 +101,11 @@ import makeWASocket, {
|
||||
ConnectionState,
|
||||
Contact,
|
||||
delay,
|
||||
decryptPollVote,
|
||||
DisconnectReason,
|
||||
downloadContentFromMessage,
|
||||
downloadMediaMessage,
|
||||
jidNormalizedUser,
|
||||
generateWAMessageFromContent,
|
||||
getAggregateVotesInPollMessage,
|
||||
GetCatalogOptions,
|
||||
@ -247,6 +250,7 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
private readonly userDevicesCache: CacheStore = new NodeCache({ stdTTL: 300000, useClones: false });
|
||||
private endSession = false;
|
||||
private logBaileys = this.configService.get<Log>('LOG').BAILEYS;
|
||||
private eventProcessingQueue: Promise<void> = Promise.resolve();
|
||||
|
||||
// Cache TTL constants (in seconds)
|
||||
private readonly MESSAGE_CACHE_TTL_SECONDS = 5 * 60; // 5 minutes - avoid duplicate message processing
|
||||
@ -1121,6 +1125,11 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
);
|
||||
|
||||
await this.sendDataWebhook(Events.MESSAGES_EDITED, editedMessage);
|
||||
|
||||
if (received.key?.id && editedMessage.key?.id) {
|
||||
await this.baileysCache.set(`protocol_${received.key.id}`, editedMessage.key.id, 60 * 60 * 24);
|
||||
}
|
||||
|
||||
const oldMessage = await this.getMessage(editedMessage.key, true);
|
||||
if ((oldMessage as any)?.id) {
|
||||
const editedMessageTimestamp = Long.isLong(received?.messageTimestamp)
|
||||
@ -1188,6 +1197,109 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
|
||||
const messageRaw = this.prepareMessage(received);
|
||||
|
||||
if (messageRaw.messageType === 'pollUpdateMessage') {
|
||||
const pollCreationKey = messageRaw.message.pollUpdateMessage.pollCreationMessageKey;
|
||||
const pollMessage = (await this.getMessage(pollCreationKey, true)) as proto.IWebMessageInfo;
|
||||
const pollMessageSecret = (await this.getMessage(pollCreationKey)) as any;
|
||||
|
||||
if (pollMessage) {
|
||||
const pollOptions =
|
||||
(pollMessage.message as any).pollCreationMessage?.options ||
|
||||
(pollMessage.message as any).pollCreationMessageV3?.options ||
|
||||
[];
|
||||
const pollVote = messageRaw.message.pollUpdateMessage.vote;
|
||||
|
||||
const voterJid = received.key.fromMe
|
||||
? this.instance.wuid
|
||||
: received.key.participant || received.key.remoteJid;
|
||||
|
||||
let pollEncKey = pollMessageSecret?.messageContextInfo?.messageSecret;
|
||||
|
||||
let successfulVoterJid = voterJid;
|
||||
|
||||
if (typeof pollEncKey === 'string') {
|
||||
pollEncKey = Buffer.from(pollEncKey, 'base64');
|
||||
} else if (pollEncKey?.type === 'Buffer' && Array.isArray(pollEncKey.data)) {
|
||||
pollEncKey = Buffer.from(pollEncKey.data);
|
||||
}
|
||||
|
||||
if (Buffer.isBuffer(pollEncKey) && pollEncKey.length === 44) {
|
||||
pollEncKey = Buffer.from(pollEncKey.toString('utf8'), 'base64');
|
||||
}
|
||||
|
||||
if (pollVote.encPayload && pollEncKey) {
|
||||
const creatorCandidates = [
|
||||
this.instance.wuid,
|
||||
this.client.user?.lid,
|
||||
pollMessage.key.participant,
|
||||
(pollMessage.key as any).participantAlt,
|
||||
pollMessage.key.remoteJid,
|
||||
];
|
||||
|
||||
const key = received.key as any;
|
||||
const voterCandidates = [
|
||||
this.instance.wuid,
|
||||
this.client.user?.lid,
|
||||
key.participant,
|
||||
key.participantAlt,
|
||||
key.remoteJidAlt,
|
||||
key.remoteJid,
|
||||
];
|
||||
|
||||
const uniqueCreators = [
|
||||
...new Set(creatorCandidates.filter(Boolean).map((id) => jidNormalizedUser(id))),
|
||||
];
|
||||
const uniqueVoters = [
|
||||
...new Set(voterCandidates.filter(Boolean).map((id) => jidNormalizedUser(id))),
|
||||
];
|
||||
|
||||
let decryptedVote;
|
||||
|
||||
for (const creator of uniqueCreators) {
|
||||
for (const voter of uniqueVoters) {
|
||||
try {
|
||||
decryptedVote = decryptPollVote(pollVote, {
|
||||
pollCreatorJid: creator,
|
||||
pollMsgId: pollMessage.key.id,
|
||||
pollEncKey,
|
||||
voterJid: voter,
|
||||
} as any);
|
||||
if (decryptedVote) {
|
||||
successfulVoterJid = voter;
|
||||
break;
|
||||
}
|
||||
} catch (err) {
|
||||
// Continue trying
|
||||
}
|
||||
}
|
||||
if (decryptedVote) break;
|
||||
}
|
||||
|
||||
if (decryptedVote) {
|
||||
Object.assign(pollVote, decryptedVote);
|
||||
}
|
||||
}
|
||||
|
||||
const selectedOptions = pollVote?.selectedOptions || [];
|
||||
|
||||
const selectedOptionNames = pollOptions
|
||||
.filter((option) => {
|
||||
const hash = createHash('sha256').update(option.optionName).digest();
|
||||
return selectedOptions.some((selected) => Buffer.compare(selected, hash) === 0);
|
||||
})
|
||||
.map((option) => option.optionName);
|
||||
|
||||
messageRaw.message.pollUpdateMessage.vote.selectedOptions = selectedOptionNames;
|
||||
|
||||
const pollUpdates = pollOptions.map((option) => ({
|
||||
name: option.optionName,
|
||||
voters: selectedOptionNames.includes(option.optionName) ? [successfulVoterJid] : [],
|
||||
}));
|
||||
|
||||
messageRaw.pollUpdates = pollUpdates;
|
||||
}
|
||||
}
|
||||
|
||||
const isMedia =
|
||||
received?.message?.imageMessage ||
|
||||
received?.message?.videoMessage ||
|
||||
@ -1237,7 +1349,8 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
}
|
||||
|
||||
if (this.configService.get<Database>('DATABASE').SAVE_DATA.NEW_MESSAGE) {
|
||||
const msg = await this.prismaRepository.message.create({ data: messageRaw });
|
||||
const { pollUpdates, ...messageData } = messageRaw;
|
||||
const msg = await this.prismaRepository.message.create({ data: messageData });
|
||||
|
||||
const { remoteJid } = received.key;
|
||||
const timestamp = msg.messageTimestamp;
|
||||
@ -1442,18 +1555,23 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (update.message === null && update.status === undefined) continue;
|
||||
|
||||
const updateKey = `${this.instance.id}_${key.id}_${update.status}`;
|
||||
|
||||
const cached = await this.baileysCache.get(updateKey);
|
||||
|
||||
if (cached && update.messageTimestamp == cached.messageTimestamp) {
|
||||
const secondsSinceEpoch = Math.floor(Date.now() / 1000)
|
||||
console.log('CACHE:', {cached, updateKey, messageTimestamp: update.messageTimestamp, secondsSinceEpoch});
|
||||
|
||||
if ((update.messageTimestamp && update.messageTimestamp === cached) || (!update.messageTimestamp && secondsSinceEpoch === cached)) {
|
||||
this.logger.info(`Update Message duplicated ignored [avoid deadlock]: ${updateKey}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.baileysCache.set(updateKey, update.messageTimestamp, 30 * 60);
|
||||
if (update.messageTimestamp) {
|
||||
await this.baileysCache.set(updateKey, update.messageTimestamp, 30 * 60);
|
||||
} else {
|
||||
await this.baileysCache.set(updateKey, secondsSinceEpoch, 30 * 60);
|
||||
}
|
||||
|
||||
if (status[update.status] === 'READ' && key.fromMe) {
|
||||
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED && this.localChatwoot?.enabled) {
|
||||
@ -1489,14 +1607,27 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
instanceId: this.instanceId,
|
||||
};
|
||||
|
||||
if (update.message) {
|
||||
message.message = update.message;
|
||||
}
|
||||
|
||||
let findMessage: any;
|
||||
const configDatabaseData = this.configService.get<Database>('DATABASE').SAVE_DATA;
|
||||
if (configDatabaseData.HISTORIC || configDatabaseData.NEW_MESSAGE) {
|
||||
// Use raw SQL to avoid JSON path issues
|
||||
const protocolMapKey = `protocol_${key.id}`;
|
||||
const originalMessageId = (await this.baileysCache.get(protocolMapKey)) as string;
|
||||
|
||||
if (originalMessageId) {
|
||||
message.keyId = originalMessageId;
|
||||
}
|
||||
|
||||
const searchId = originalMessageId || key.id;
|
||||
|
||||
const messages = (await this.prismaRepository.$queryRaw`
|
||||
SELECT * FROM "Message"
|
||||
WHERE "instanceId" = ${this.instanceId}
|
||||
AND "key"->>'id' = ${key.id}
|
||||
AND "key"->>'id' = ${searchId}
|
||||
LIMIT 1
|
||||
`) as any[];
|
||||
findMessage = messages[0] || null;
|
||||
@ -1509,7 +1640,7 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
}
|
||||
|
||||
if (update.message === null && update.status === undefined) {
|
||||
this.sendDataWebhook(Events.MESSAGES_DELETE, key);
|
||||
this.sendDataWebhook(Events.MESSAGES_DELETE, { ...key, status: 'DELETED' });
|
||||
|
||||
if (this.configService.get<Database>('DATABASE').SAVE_DATA.MESSAGE_UPDATE)
|
||||
await this.prismaRepository.messageUpdate.create({ data: message });
|
||||
@ -1557,8 +1688,10 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
|
||||
this.sendDataWebhook(Events.MESSAGES_UPDATE, message);
|
||||
|
||||
if (this.configService.get<Database>('DATABASE').SAVE_DATA.MESSAGE_UPDATE)
|
||||
await this.prismaRepository.messageUpdate.create({ data: message });
|
||||
if (this.configService.get<Database>('DATABASE').SAVE_DATA.MESSAGE_UPDATE) {
|
||||
const { message: _msg, ...messageData } = message;
|
||||
await this.prismaRepository.messageUpdate.create({ data: messageData });
|
||||
}
|
||||
|
||||
const existingChat = await this.prismaRepository.chat.findFirst({
|
||||
where: { instanceId: this.instanceId, remoteJid: message.remoteJid },
|
||||
@ -1727,135 +1860,141 @@ export class BaileysStartupService extends ChannelStartupService {
|
||||
|
||||
private eventHandler() {
|
||||
this.client.ev.process(async (events) => {
|
||||
if (!this.endSession) {
|
||||
const database = this.configService.get<Database>('DATABASE');
|
||||
const settings = await this.findSettings();
|
||||
this.eventProcessingQueue = this.eventProcessingQueue.then(async () => {
|
||||
try {
|
||||
if (!this.endSession) {
|
||||
const database = this.configService.get<Database>('DATABASE');
|
||||
const settings = await this.findSettings();
|
||||
|
||||
if (events.call) {
|
||||
const call = events.call[0];
|
||||
if (events.call) {
|
||||
const call = events.call[0];
|
||||
|
||||
if (settings?.rejectCall && call.status == 'offer') {
|
||||
this.client.rejectCall(call.id, call.from);
|
||||
}
|
||||
if (settings?.rejectCall && call.status == 'offer') {
|
||||
this.client.rejectCall(call.id, call.from);
|
||||
}
|
||||
|
||||
if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') {
|
||||
if (call.from.endsWith('@lid')) {
|
||||
call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string);
|
||||
if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') {
|
||||
if (call.from.endsWith('@lid')) {
|
||||
call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string);
|
||||
}
|
||||
const msg = await this.client.sendMessage(call.from, { text: settings.msgCall });
|
||||
|
||||
this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' });
|
||||
}
|
||||
|
||||
this.sendDataWebhook(Events.CALL, call);
|
||||
}
|
||||
const msg = await this.client.sendMessage(call.from, { text: settings.msgCall });
|
||||
|
||||
this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' });
|
||||
}
|
||||
if (events['connection.update']) {
|
||||
this.connectionUpdate(events['connection.update']);
|
||||
}
|
||||
|
||||
this.sendDataWebhook(Events.CALL, call);
|
||||
}
|
||||
if (events['creds.update']) {
|
||||
this.instance.authState.saveCreds();
|
||||
}
|
||||
|
||||
if (events['connection.update']) {
|
||||
this.connectionUpdate(events['connection.update']);
|
||||
}
|
||||
if (events['messaging-history.set']) {
|
||||
const payload = events['messaging-history.set'];
|
||||
await this.messageHandle['messaging-history.set'](payload);
|
||||
}
|
||||
|
||||
if (events['creds.update']) {
|
||||
this.instance.authState.saveCreds();
|
||||
}
|
||||
if (events['messages.upsert']) {
|
||||
const payload = events['messages.upsert'];
|
||||
|
||||
if (events['messaging-history.set']) {
|
||||
const payload = events['messaging-history.set'];
|
||||
this.messageHandle['messaging-history.set'](payload);
|
||||
}
|
||||
// this.messageProcessor.processMessage(payload, settings);
|
||||
await this.messageHandle['messages.upsert'](payload, settings);
|
||||
}
|
||||
|
||||
if (events['messages.upsert']) {
|
||||
const payload = events['messages.upsert'];
|
||||
if (events['messages.update']) {
|
||||
const payload = events['messages.update'];
|
||||
await this.messageHandle['messages.update'](payload, settings);
|
||||
}
|
||||
|
||||
this.messageProcessor.processMessage(payload, settings);
|
||||
// this.messageHandle['messages.upsert'](payload, settings);
|
||||
}
|
||||
if (events['message-receipt.update']) {
|
||||
const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[];
|
||||
const remotesJidMap: Record<string, number> = {};
|
||||
|
||||
if (events['messages.update']) {
|
||||
const payload = events['messages.update'];
|
||||
this.messageHandle['messages.update'](payload, settings);
|
||||
}
|
||||
for (const event of payload) {
|
||||
if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') {
|
||||
remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp;
|
||||
}
|
||||
}
|
||||
|
||||
if (events['message-receipt.update']) {
|
||||
const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[];
|
||||
const remotesJidMap: Record<string, number> = {};
|
||||
await Promise.all(
|
||||
Object.keys(remotesJidMap).map(async (remoteJid) =>
|
||||
this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
for (const event of payload) {
|
||||
if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') {
|
||||
remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp;
|
||||
if (events['presence.update']) {
|
||||
const payload = events['presence.update'];
|
||||
|
||||
if (settings?.groupsIgnore && payload.id.includes('@g.us')) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendDataWebhook(Events.PRESENCE_UPDATE, payload);
|
||||
}
|
||||
|
||||
if (!settings?.groupsIgnore) {
|
||||
if (events['groups.upsert']) {
|
||||
const payload = events['groups.upsert'];
|
||||
this.groupHandler['groups.upsert'](payload);
|
||||
}
|
||||
|
||||
if (events['groups.update']) {
|
||||
const payload = events['groups.update'];
|
||||
this.groupHandler['groups.update'](payload);
|
||||
}
|
||||
|
||||
if (events['group-participants.update']) {
|
||||
const payload = events['group-participants.update'] as any;
|
||||
this.groupHandler['group-participants.update'](payload);
|
||||
}
|
||||
}
|
||||
|
||||
if (events['chats.upsert']) {
|
||||
const payload = events['chats.upsert'];
|
||||
this.chatHandle['chats.upsert'](payload);
|
||||
}
|
||||
|
||||
if (events['chats.update']) {
|
||||
const payload = events['chats.update'];
|
||||
this.chatHandle['chats.update'](payload);
|
||||
}
|
||||
|
||||
if (events['chats.delete']) {
|
||||
const payload = events['chats.delete'];
|
||||
this.chatHandle['chats.delete'](payload);
|
||||
}
|
||||
|
||||
if (events['contacts.upsert']) {
|
||||
const payload = events['contacts.upsert'];
|
||||
this.contactHandle['contacts.upsert'](payload);
|
||||
}
|
||||
|
||||
if (events['contacts.update']) {
|
||||
const payload = events['contacts.update'];
|
||||
this.contactHandle['contacts.update'](payload);
|
||||
}
|
||||
|
||||
if (events[Events.LABELS_ASSOCIATION]) {
|
||||
const payload = events[Events.LABELS_ASSOCIATION];
|
||||
this.labelHandle[Events.LABELS_ASSOCIATION](payload, database);
|
||||
return;
|
||||
}
|
||||
|
||||
if (events[Events.LABELS_EDIT]) {
|
||||
const payload = events[Events.LABELS_EDIT];
|
||||
this.labelHandle[Events.LABELS_EDIT](payload);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
Object.keys(remotesJidMap).map(async (remoteJid) =>
|
||||
this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]),
|
||||
),
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.error(error);
|
||||
}
|
||||
|
||||
if (events['presence.update']) {
|
||||
const payload = events['presence.update'];
|
||||
|
||||
if (settings?.groupsIgnore && payload.id.includes('@g.us')) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendDataWebhook(Events.PRESENCE_UPDATE, payload);
|
||||
}
|
||||
|
||||
if (!settings?.groupsIgnore) {
|
||||
if (events['groups.upsert']) {
|
||||
const payload = events['groups.upsert'];
|
||||
this.groupHandler['groups.upsert'](payload);
|
||||
}
|
||||
|
||||
if (events['groups.update']) {
|
||||
const payload = events['groups.update'];
|
||||
this.groupHandler['groups.update'](payload);
|
||||
}
|
||||
|
||||
if (events['group-participants.update']) {
|
||||
const payload = events['group-participants.update'] as any;
|
||||
this.groupHandler['group-participants.update'](payload);
|
||||
}
|
||||
}
|
||||
|
||||
if (events['chats.upsert']) {
|
||||
const payload = events['chats.upsert'];
|
||||
this.chatHandle['chats.upsert'](payload);
|
||||
}
|
||||
|
||||
if (events['chats.update']) {
|
||||
const payload = events['chats.update'];
|
||||
this.chatHandle['chats.update'](payload);
|
||||
}
|
||||
|
||||
if (events['chats.delete']) {
|
||||
const payload = events['chats.delete'];
|
||||
this.chatHandle['chats.delete'](payload);
|
||||
}
|
||||
|
||||
if (events['contacts.upsert']) {
|
||||
const payload = events['contacts.upsert'];
|
||||
this.contactHandle['contacts.upsert'](payload);
|
||||
}
|
||||
|
||||
if (events['contacts.update']) {
|
||||
const payload = events['contacts.update'];
|
||||
this.contactHandle['contacts.update'](payload);
|
||||
}
|
||||
|
||||
if (events[Events.LABELS_ASSOCIATION]) {
|
||||
const payload = events[Events.LABELS_ASSOCIATION];
|
||||
this.labelHandle[Events.LABELS_ASSOCIATION](payload, database);
|
||||
return;
|
||||
}
|
||||
|
||||
if (events[Events.LABELS_EDIT]) {
|
||||
const payload = events[Events.LABELS_EDIT];
|
||||
this.labelHandle[Events.LABELS_EDIT](payload);
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user