Merge branch 'develop' into main

This commit is contained in:
dersonbsb2022
2025-10-06 15:21:10 -03:00
committed by GitHub
14 changed files with 396 additions and 145 deletions

View File

@@ -33,6 +33,8 @@ import mimeTypes from 'mime-types';
import path from 'path';
import { Readable } from 'stream';
const MIN_CONNECTION_NOTIFICATION_INTERVAL_MS = 30000; // 30 seconds
interface ChatwootMessage {
messageId?: number;
inboxId?: number;
@@ -72,7 +74,9 @@ export class ChatwootService {
private readonly cache: CacheService,
) {}
private pgClient = postgresClient.getChatwootConnection();
private async getPgClient() {
return postgresClient.getChatwootConnection();
}
private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
const cacheKey = `${instance.instanceName}:getProvider`;
@@ -401,7 +405,8 @@ export class ChatwootService {
if (!uri) return false;
const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`;
const tagData = (await this.pgClient.query(sqlTags, [nameInbox]))?.rows[0];
const pgClient = await this.getPgClient();
const tagData = (await pgClient.query(sqlTags, [nameInbox]))?.rows[0];
let tagId = tagData?.id;
const taggingsCount = tagData?.taggings_count || 0;
@@ -411,18 +416,18 @@ export class ChatwootService {
DO UPDATE SET taggings_count = tags.taggings_count + 1
RETURNING id`;
tagId = (await this.pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
tagId = (await pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
const sqlCheckTagging = `SELECT 1 FROM taggings
WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`;
const taggingExists = (await this.pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
const taggingExists = (await pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
if (!taggingExists) {
const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at)
VALUES ($1, 'Contact', $2, 'labels', NOW())`;
await this.pgClient.query(sqlInsertLabel, [tagId, contactId]);
await pgClient.query(sqlInsertLabel, [tagId, contactId]);
}
return true;
@@ -620,12 +625,7 @@ export class ChatwootService {
this.logger.verbose(`--- Start createConversation ---`);
this.logger.verbose(`Instance: ${JSON.stringify(instance)}`);
// If it already exists in the cache, return conversationId
if (await this.cache.has(cacheKey)) {
const conversationId = (await this.cache.get(cacheKey)) as number;
this.logger.verbose(`Found conversation to: ${remoteJid}, conversation ID: ${conversationId}`);
return conversationId;
}
// Always check Chatwoot first, cache only as fallback
// If lock already exists, wait until release or timeout
if (await this.cache.has(lockKey)) {
@@ -651,12 +651,9 @@ export class ChatwootService {
try {
/*
Double check after lock
Utilizei uma nova verificação para evitar que outra thread execute entre o terminio do while e o set lock
Double check after lock - REMOVED
This was causing the system to use cached conversations instead of checking Chatwoot
*/
if (await this.cache.has(cacheKey)) {
return (await this.cache.get(cacheKey)) as number;
}
const client = await this.clientCw(instance);
if (!client) return null;
@@ -763,34 +760,39 @@ export class ChatwootService {
return null;
}
let inboxConversation = contactConversations.payload.find(
(conversation) => conversation.inbox_id == filterInbox.id,
);
if (inboxConversation) {
if (this.provider.reopenConversation) {
this.logger.verbose(`Found conversation in reopenConversation mode: ${JSON.stringify(inboxConversation)}`);
if (inboxConversation && this.provider.conversationPending && inboxConversation.status !== 'open') {
await client.conversations.toggleStatus({
accountId: this.provider.accountId,
conversationId: inboxConversation.id,
data: {
status: 'pending',
},
});
}
} else {
inboxConversation = contactConversations.payload.find(
(conversation) =>
conversation && conversation.status !== 'resolved' && conversation.inbox_id == filterInbox.id,
);
this.logger.verbose(`Found conversation: ${JSON.stringify(inboxConversation)}`);
}
let inboxConversation = null;
if (this.provider.reopenConversation) {
inboxConversation = this.findOpenConversation(contactConversations.payload, filterInbox.id);
if (inboxConversation) {
this.logger.verbose(`Returning existing conversation ID: ${inboxConversation.id}`);
this.cache.set(cacheKey, inboxConversation.id);
return inboxConversation.id;
this.logger.verbose(
`Found open conversation in reopenConversation mode: ${JSON.stringify(inboxConversation)}`,
);
} else {
inboxConversation = await this.findAndReopenResolvedConversation(
client,
contactConversations.payload,
filterInbox.id,
);
}
} else {
inboxConversation = this.findOpenConversation(contactConversations.payload, filterInbox.id);
this.logger.verbose(`Found conversation: ${JSON.stringify(inboxConversation)}`);
}
if (inboxConversation) {
this.logger.verbose(`Returning existing conversation ID: ${inboxConversation.id}`);
this.cache.set(cacheKey, inboxConversation.id);
return inboxConversation.id;
}
if (await this.cache.has(cacheKey)) {
const conversationId = (await this.cache.get(cacheKey)) as number;
this.logger.warn(
`No active conversations found in Chatwoot, using cached conversation ID: ${conversationId} as fallback`,
);
return conversationId;
}
const data = {
@@ -833,6 +835,45 @@ export class ChatwootService {
}
}
private findOpenConversation(conversations: any[], inboxId: number): any | null {
const openConversation = conversations.find(
(conversation) => conversation && conversation.status !== 'resolved' && conversation.inbox_id == inboxId,
);
if (openConversation) {
this.logger.verbose(`Found open conversation: ${JSON.stringify(openConversation)}`);
}
return openConversation || null;
}
private async findAndReopenResolvedConversation(
client: any,
conversations: any[],
inboxId: number,
): Promise<any | null> {
const resolvedConversation = conversations.find(
(conversation) => conversation && conversation.status === 'resolved' && conversation.inbox_id == inboxId,
);
if (resolvedConversation) {
this.logger.verbose(`Found resolved conversation to reopen: ${JSON.stringify(resolvedConversation)}`);
if (this.provider.conversationPending && resolvedConversation.status !== 'open') {
await client.conversations.toggleStatus({
accountId: this.provider.accountId,
conversationId: resolvedConversation.id,
data: {
status: 'pending',
},
});
this.logger.verbose(`Reopened resolved conversation ID: ${resolvedConversation.id}`);
}
return resolvedConversation;
}
return null;
}
public async getInbox(instance: InstanceDto): Promise<inbox | null> {
const cacheKey = `${instance.instanceName}:getInbox`;
if (await this.cache.has(cacheKey)) {
@@ -880,6 +921,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
const client = await this.clientCw(instance);
@@ -888,32 +930,86 @@ export class ChatwootService {
return null;
}
const replyToIds = await this.getReplyToIds(messageBody, instance);
const doCreateMessage = async (convId: number) => {
const replyToIds = await this.getReplyToIds(messageBody, instance);
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: conversationId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: convId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
});
});
if (!message) {
this.logger.warn('message not found');
return null;
if (!message) {
this.logger.warn('message not found');
return null;
}
return message;
};
try {
return await doCreateMessage(conversationId);
} catch (error) {
return this.handleStaleConversationError(
error,
instance,
conversationId,
messageBody,
messageBodyForRetry,
'createMessage',
(newConvId) => doCreateMessage(newConvId),
);
}
}
return message;
private async handleStaleConversationError(
error: any,
instance: InstanceDto,
conversationId: number,
messageBody: any,
messageBodyForRetry: any,
functionName: string,
originalFunction: (newConversationId: number) => Promise<any>,
) {
if (axios.isAxiosError(error) && error.response?.status === 404) {
this.logger.warn(
`Conversation ${conversationId} not found in Chatwoot. Retrying operation from ${functionName}...`,
);
const bodyForRetry = messageBodyForRetry || messageBody;
if (!bodyForRetry || !bodyForRetry.key?.remoteJid) {
this.logger.error(`Cannot retry ${functionName} without a message body for context.`);
return null;
}
const { remoteJid } = bodyForRetry.key;
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
await this.cache.delete(cacheKey);
const newConversationId = await this.createConversation(instance, bodyForRetry);
if (!newConversationId) {
this.logger.error(`Failed to create new conversation for ${remoteJid} during retry.`);
return null;
}
this.logger.log(`Retrying ${functionName} for ${remoteJid} with new conversation ${newConversationId}`);
return await originalFunction(newConversationId);
} else {
this.logger.error(`Error in ${functionName}: ${error}`);
throw error;
}
}
public async getOpenConversationByContact(
@@ -1006,6 +1102,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
if (sourceId && this.isImportHistoryAvailable()) {
const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId);
@@ -1016,54 +1113,65 @@ export class ChatwootService {
}
}
}
const data = new FormData();
const doSendData = async (convId: number) => {
const data = new FormData();
if (content) {
data.append('content', content);
}
data.append('message_type', messageType);
data.append('attachments[]', fileData, { filename: fileName });
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
if (content) {
data.append('content', content);
}
}
if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}
data.append('message_type', messageType);
if (sourceId) {
data.append('source_id', sourceId);
}
data.append('attachments[]', fileStream, { filename: fileName });
const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${conversationId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
}
}
if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}
if (sourceId) {
data.append('source_id', sourceId);
}
const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${convId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
};
const { data: responseData } = await axios.request(config);
return responseData;
};
try {
const { data } = await axios.request(config);
return data;
return await doSendData(conversationId);
} catch (error) {
this.logger.error(error);
return this.handleStaleConversationError(
error,
instance,
conversationId,
messageBody,
messageBodyForRetry,
'sendData',
(newConvId) => doSendData(newConvId),
);
}
}
@@ -2304,6 +2412,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@@ -2323,6 +2432,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@@ -2348,6 +2458,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
quotedMsg,
body,
);
if (!send) {
this.logger.warn('message not sent');
@@ -2399,6 +2510,8 @@ export class ChatwootService {
instance,
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@@ -2440,6 +2553,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@@ -2459,6 +2573,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@@ -2587,6 +2702,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
null,
body,
);
if (!send) {
@@ -2673,15 +2789,30 @@ export class ChatwootService {
await this.createBotMessage(instance, msgStatus, 'incoming');
}
if (event === 'connection.update') {
if (body.status === 'open') {
// if we have qrcode count then we understand that a new connection was established
if (this.waMonitor.waInstances[instance.instanceName].qrCode.count > 0) {
const msgConnection = i18next.t('cw.inbox.connected');
await this.createBotMessage(instance, msgConnection, 'incoming');
this.waMonitor.waInstances[instance.instanceName].qrCode.count = 0;
chatwootImport.clearAll(instance);
}
if (event === 'connection.update' && body.status === 'open') {
const waInstance = this.waMonitor.waInstances[instance.instanceName];
if (!waInstance) return;
const now = Date.now();
const timeSinceLastNotification = now - (waInstance.lastConnectionNotification || 0);
// Se a conexão foi estabelecida via QR code, notifica imediatamente.
if (waInstance.qrCode && waInstance.qrCode.count > 0) {
const msgConnection = i18next.t('cw.inbox.connected');
await this.createBotMessage(instance, msgConnection, 'incoming');
waInstance.qrCode.count = 0;
waInstance.lastConnectionNotification = now;
chatwootImport.clearAll(instance);
}
// Se não foi via QR code, verifica o throttling.
else if (timeSinceLastNotification >= MIN_CONNECTION_NOTIFICATION_INTERVAL_MS) {
const msgConnection = i18next.t('cw.inbox.connected');
await this.createBotMessage(instance, msgConnection, 'incoming');
waInstance.lastConnectionNotification = now;
} else {
this.logger.warn(
`Connection notification skipped for ${instance.instanceName} - too frequent (${timeSinceLastNotification}ms since last)`,
);
}
}
@@ -2861,7 +2992,8 @@ export class ChatwootService {
and created_at >= now() - interval '6h'
order by created_at desc`;
const messagesData = (await this.pgClient.query(sqlMessages))?.rows;
const pgClient = await this.getPgClient();
const messagesData = (await pgClient.query(sqlMessages))?.rows;
const ids: string[] = messagesData
.filter((message) => !!message.source_id)
.map((message) => message.source_id.replace('WAID:', ''));