mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-12-18 19:32:21 -06:00
Merge pull request #2321 from Vitordotpy/fix/remotejid-normalization-and-cache-race
fix: normalize remoteJid in message updates and handle race condition in contact cache
This commit is contained in:
@@ -1565,8 +1565,15 @@ export class BaileysStartupService extends ChannelStartupService {
|
|||||||
|
|
||||||
for await (const { key, update } of args) {
|
for await (const { key, update } of args) {
|
||||||
const keyAny = key as any;
|
const keyAny = key as any;
|
||||||
const normalizedRemoteJid = keyAny.remoteJid?.replace(/:.*$/, '');
|
if (keyAny.remoteJid) {
|
||||||
const normalizedParticipant = keyAny.participant?.replace(/:.*$/, '');
|
keyAny.remoteJid = keyAny.remoteJid.replace(/:.*$/, '');
|
||||||
|
}
|
||||||
|
if (keyAny.participant) {
|
||||||
|
keyAny.participant = keyAny.participant.replace(/:.*$/, '');
|
||||||
|
}
|
||||||
|
|
||||||
|
const normalizedRemoteJid = keyAny.remoteJid;
|
||||||
|
const normalizedParticipant = keyAny.participant;
|
||||||
|
|
||||||
if (settings?.groupsIgnore && normalizedRemoteJid?.includes('@g.us')) {
|
if (settings?.groupsIgnore && normalizedRemoteJid?.includes('@g.us')) {
|
||||||
continue;
|
continue;
|
||||||
@@ -1644,18 +1651,48 @@ export class BaileysStartupService extends ChannelStartupService {
|
|||||||
|
|
||||||
const searchId = originalMessageId || key.id;
|
const searchId = originalMessageId || key.id;
|
||||||
|
|
||||||
const messages = (await this.prismaRepository.$queryRaw`
|
let retries = 0;
|
||||||
SELECT * FROM "Message"
|
const maxRetries = 3;
|
||||||
WHERE "instanceId" = ${this.instanceId}
|
const retryDelay = 500; // 500ms delay to avoid blocking for too long
|
||||||
AND "key"->>'id' = ${searchId}
|
|
||||||
LIMIT 1
|
while (retries < maxRetries) {
|
||||||
`) as any[];
|
const messages = (await this.prismaRepository.$queryRaw`
|
||||||
findMessage = messages[0] || null;
|
SELECT * FROM "Message"
|
||||||
|
WHERE "instanceId" = ${this.instanceId}
|
||||||
|
AND "key"->>'id' = ${searchId}
|
||||||
|
LIMIT 1
|
||||||
|
`) as any[];
|
||||||
|
findMessage = messages[0] || null;
|
||||||
|
|
||||||
|
if (findMessage?.id) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
retries++;
|
||||||
|
if (retries < maxRetries) {
|
||||||
|
await delay(retryDelay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!findMessage?.id) {
|
if (!findMessage?.id) {
|
||||||
this.logger.warn(`Original message not found for update. Skipping. Key: ${JSON.stringify(key)}`);
|
this.logger.verbose(
|
||||||
|
`Original message not found for update after ${maxRetries} retries. Skipping. This is expected for protocol messages or ephemeral events not saved to the database. Key: ${JSON.stringify(key)}`,
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync the incoming key.remoteJid with the stored one.
|
||||||
|
// This mutation is safe and necessary because Baileys events might use LIDs while we store Phone JIDs (or vice versa).
|
||||||
|
// Normalizing ensuring downstream logic uses the identifier that exists in our database.
|
||||||
|
if (findMessage?.key?.remoteJid && key.remoteJid !== findMessage.key.remoteJid) {
|
||||||
|
key.remoteJid = findMessage.key.remoteJid;
|
||||||
|
}
|
||||||
|
if (findMessage?.key?.remoteJid && findMessage.key.remoteJid !== key.remoteJid) {
|
||||||
|
this.logger.verbose(
|
||||||
|
`Updating key.remoteJid from ${key.remoteJid} to ${findMessage.key.remoteJid} based on stored message`,
|
||||||
|
);
|
||||||
|
key.remoteJid = findMessage.key.remoteJid;
|
||||||
|
}
|
||||||
message.messageId = findMessage.id;
|
message.messageId = findMessage.id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { prismaRepository } from '@api/server.module';
|
import { prismaRepository } from '@api/server.module';
|
||||||
import { configService, Database } from '@config/env.config';
|
import { configService, Database } from '@config/env.config';
|
||||||
import { Logger } from '@config/logger.config';
|
import { Logger } from '@config/logger.config';
|
||||||
|
import { Prisma } from '@prisma/client';
|
||||||
import dayjs from 'dayjs';
|
import dayjs from 'dayjs';
|
||||||
|
|
||||||
const logger = new Logger('OnWhatsappCache');
|
const logger = new Logger('OnWhatsappCache');
|
||||||
@@ -164,9 +165,28 @@ export async function saveOnWhatsappCache(data: ISaveOnWhatsappCacheParams[]) {
|
|||||||
logger.verbose(
|
logger.verbose(
|
||||||
`[saveOnWhatsappCache] Register does not exist, creating: remoteJid=${remoteJid}, jidOptions=${dataPayload.jidOptions}, lid=${dataPayload.lid}`,
|
`[saveOnWhatsappCache] Register does not exist, creating: remoteJid=${remoteJid}, jidOptions=${dataPayload.jidOptions}, lid=${dataPayload.lid}`,
|
||||||
);
|
);
|
||||||
await prismaRepository.isOnWhatsapp.create({
|
try {
|
||||||
data: dataPayload,
|
await prismaRepository.isOnWhatsapp.create({
|
||||||
});
|
data: dataPayload,
|
||||||
|
});
|
||||||
|
} catch (error: any) {
|
||||||
|
// Check for unique constraint violation (Prisma error code P2002)
|
||||||
|
if (
|
||||||
|
error instanceof Prisma.PrismaClientKnownRequestError &&
|
||||||
|
error.code === 'P2002' &&
|
||||||
|
(error.meta?.target as string[])?.includes('remoteJid')
|
||||||
|
) {
|
||||||
|
logger.verbose(
|
||||||
|
`[saveOnWhatsappCache] Race condition detected for ${remoteJid}, updating existing record instead.`,
|
||||||
|
);
|
||||||
|
await prismaRepository.isOnWhatsapp.update({
|
||||||
|
where: { remoteJid: remoteJid },
|
||||||
|
data: dataPayload,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
// Loga o erro mas não para a execução dos outros promises
|
// Loga o erro mas não para a execução dos outros promises
|
||||||
|
|||||||
Reference in New Issue
Block a user