refactor(chatbot): refatorar conexão com PostgreSQL e melhorar tratamento de mensagens

- Alterado método de obtenção da conexão PostgreSQL para ser assíncrono, melhorando a gestão de conexões.
- Implementada lógica de retry para criação de mensagens e conversas, garantindo maior robustez em caso de falhas.
- Ajustadas chamadas de consulta ao banco de dados para utilizar a nova abordagem de conexão.
- Adicionada nova propriedade `messageBodyForRetry` para facilitar o reenvio de mensagens em caso de erro.
This commit is contained in:
Vitordotpy 2025-09-25 17:08:40 -03:00
parent d8268b0eb1
commit 093515555d

View File

@ -53,7 +53,9 @@ export class ChatwootService {
private readonly cache: CacheService, private readonly cache: CacheService,
) {} ) {}
private pgClient = postgresClient.getChatwootConnection(); private async getPgClient() {
return postgresClient.getChatwootConnection();
}
private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> { private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
const cacheKey = `${instance.instanceName}:getProvider`; const cacheKey = `${instance.instanceName}:getProvider`;
@ -382,7 +384,8 @@ export class ChatwootService {
if (!uri) return false; if (!uri) return false;
const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`; const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`;
const tagData = (await this.pgClient.query(sqlTags, [nameInbox]))?.rows[0]; const pgClient = await this.getPgClient();
const tagData = (await pgClient.query(sqlTags, [nameInbox]))?.rows[0];
let tagId = tagData?.id; let tagId = tagData?.id;
const taggingsCount = tagData?.taggings_count || 0; const taggingsCount = tagData?.taggings_count || 0;
@ -392,18 +395,18 @@ export class ChatwootService {
DO UPDATE SET taggings_count = tags.taggings_count + 1 DO UPDATE SET taggings_count = tags.taggings_count + 1
RETURNING id`; RETURNING id`;
tagId = (await this.pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id; tagId = (await pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
const sqlCheckTagging = `SELECT 1 FROM taggings const sqlCheckTagging = `SELECT 1 FROM taggings
WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`; WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`;
const taggingExists = (await this.pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0; const taggingExists = (await pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
if (!taggingExists) { if (!taggingExists) {
const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at) const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at)
VALUES ($1, 'Contact', $2, 'labels', NOW())`; VALUES ($1, 'Contact', $2, 'labels', NOW())`;
await this.pgClient.query(sqlInsertLabel, [tagId, contactId]); await pgClient.query(sqlInsertLabel, [tagId, contactId]);
} }
return true; return true;
@ -861,6 +864,7 @@ export class ChatwootService {
messageBody?: any, messageBody?: any,
sourceId?: string, sourceId?: string,
quotedMsg?: MessageModel, quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) { ) {
const client = await this.clientCw(instance); const client = await this.clientCw(instance);
@ -869,32 +873,66 @@ export class ChatwootService {
return null; return null;
} }
const replyToIds = await this.getReplyToIds(messageBody, instance); const doCreateMessage = async (convId: number) => {
const replyToIds = await this.getReplyToIds(messageBody, instance);
const sourceReplyId = quotedMsg?.chatwootMessageId || null; const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const message = await client.messages.create({ const message = await client.messages.create({
accountId: this.provider.accountId, accountId: this.provider.accountId,
conversationId: conversationId, conversationId: convId,
data: { data: {
content: content, content: content,
message_type: messageType, message_type: messageType,
attachments: attachments, attachments: attachments,
private: privateMessage || false, private: privateMessage || false,
source_id: sourceId, source_id: sourceId,
content_attributes: { content_attributes: {
...replyToIds, ...replyToIds,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
}, },
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null, });
},
});
if (!message) { if (!message) {
this.logger.warn('message not found'); this.logger.warn('message not found');
return null; return null;
}
return message;
};
try {
return await doCreateMessage(conversationId);
} catch (error) {
const errorMessage = error.toString().toLowerCase();
const status = error.response?.status;
if (errorMessage.includes('not found') || status === 404) {
this.logger.warn(`Conversation ${conversationId} not found. Retrying...`);
const bodyForRetry = messageBodyForRetry || messageBody;
if (!bodyForRetry) {
this.logger.error('Cannot retry createMessage without a message body for context.');
return null;
}
const remoteJid = bodyForRetry.key.remoteJid;
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
await this.cache.delete(cacheKey);
const newConversationId = await this.createConversation(instance, bodyForRetry);
if (!newConversationId) {
this.logger.error(`Failed to create new conversation for ${remoteJid}`);
return null;
}
this.logger.log(`Retrying message creation for ${remoteJid} with new conversation ${newConversationId}`);
return await doCreateMessage(newConversationId);
} else {
this.logger.error(`Error creating message: ${error}`);
throw error;
}
} }
return message;
} }
public async getOpenConversationByContact( public async getOpenConversationByContact(
@ -987,6 +1025,7 @@ export class ChatwootService {
messageBody?: any, messageBody?: any,
sourceId?: string, sourceId?: string,
quotedMsg?: MessageModel, quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) { ) {
if (sourceId && this.isImportHistoryAvailable()) { if (sourceId && this.isImportHistoryAvailable()) {
const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId); const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId);
@ -997,54 +1036,84 @@ export class ChatwootService {
} }
} }
} }
const data = new FormData(); const doSendData = async (convId: number) => {
const data = new FormData();
if (content) { if (content) {
data.append('content', content); data.append('content', content);
}
data.append('message_type', messageType);
data.append('attachments[]', fileStream, { filename: fileName });
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
} }
}
if (sourceReplyId) { data.append('message_type', messageType);
data.append('source_reply_id', sourceReplyId.toString());
}
if (sourceId) { data.append('attachments[]', fileStream, { filename: fileName });
data.append('source_id', sourceId);
}
const config = { const sourceReplyId = quotedMsg?.chatwootMessageId || null;
method: 'post',
maxBodyLength: Infinity, if (messageBody && instance) {
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${conversationId}/messages`, const replyToIds = await this.getReplyToIds(messageBody, instance);
headers: {
api_access_token: this.provider.token, if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
...data.getHeaders(), const content = JSON.stringify({
}, ...replyToIds,
data: data, });
data.append('content_attributes', content);
}
}
if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}
if (sourceId) {
data.append('source_id', sourceId);
}
const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${convId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
};
const { data: responseData } = await axios.request(config);
return responseData;
}; };
try { try {
const { data } = await axios.request(config); return await doSendData(conversationId);
return data;
} catch (error) { } catch (error) {
this.logger.error(error); const errorMessage = error.toString().toLowerCase();
const status = error.response?.status;
if (errorMessage.includes('not found') || status === 404) {
this.logger.warn(`Conversation ${conversationId} not found. Retrying...`);
const bodyForRetry = messageBodyForRetry || messageBody;
if (!bodyForRetry) {
this.logger.error('Cannot retry sendData without a message body for context.');
return null;
}
const remoteJid = bodyForRetry.key.remoteJid;
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
await this.cache.delete(cacheKey);
const newConversationId = await this.createConversation(instance, bodyForRetry);
if (!newConversationId) {
this.logger.error(`Failed to create new conversation for ${remoteJid}`);
return null;
}
this.logger.log(`Retrying sendData for ${remoteJid} with new conversation ${newConversationId}`);
return await doSendData(newConversationId);
} else {
this.logger.error(error);
return null;
}
} }
} }
@ -2032,6 +2101,7 @@ export class ChatwootService {
body, body,
'WAID:' + body.key.id, 'WAID:' + body.key.id,
quotedMsg, quotedMsg,
null,
); );
if (!send) { if (!send) {
@ -2051,6 +2121,7 @@ export class ChatwootService {
body, body,
'WAID:' + body.key.id, 'WAID:' + body.key.id,
quotedMsg, quotedMsg,
null,
); );
if (!send) { if (!send) {
@ -2076,6 +2147,7 @@ export class ChatwootService {
}, },
'WAID:' + body.key.id, 'WAID:' + body.key.id,
quotedMsg, quotedMsg,
body,
); );
if (!send) { if (!send) {
this.logger.warn('message not sent'); this.logger.warn('message not sent');
@ -2132,6 +2204,8 @@ export class ChatwootService {
instance, instance,
body, body,
'WAID:' + body.key.id, 'WAID:' + body.key.id,
quotedMsg,
null,
); );
if (!send) { if (!send) {
@ -2173,6 +2247,7 @@ export class ChatwootService {
body, body,
'WAID:' + body.key.id, 'WAID:' + body.key.id,
quotedMsg, quotedMsg,
null,
); );
if (!send) { if (!send) {
@ -2192,6 +2267,7 @@ export class ChatwootService {
body, body,
'WAID:' + body.key.id, 'WAID:' + body.key.id,
quotedMsg, quotedMsg,
null,
); );
if (!send) { if (!send) {
@ -2262,6 +2338,7 @@ export class ChatwootService {
}, },
'WAID:' + body.key.id, 'WAID:' + body.key.id,
null, null,
body,
); );
if (!send) { if (!send) {
this.logger.warn('edited message not sent'); this.logger.warn('edited message not sent');
@ -2515,7 +2592,8 @@ export class ChatwootService {
and created_at >= now() - interval '6h' and created_at >= now() - interval '6h'
order by created_at desc`; order by created_at desc`;
const messagesData = (await this.pgClient.query(sqlMessages))?.rows; const pgClient = await this.getPgClient();
const messagesData = (await pgClient.query(sqlMessages))?.rows;
const ids: string[] = messagesData const ids: string[] = messagesData
.filter((message) => !!message.source_id) .filter((message) => !!message.source_id)
.map((message) => message.source_id.replace('WAID:', '')); .map((message) => message.source_id.replace('WAID:', ''));