Merge pull request #2017 from Vitordotpy/fix/enhanced-chatwoot-database-connection
Some checks are pending
Check Code Quality / check-lint-and-build (push) Waiting to run
Build Docker image / Build and Deploy (push) Waiting to run
Security Scan / CodeQL Analysis (javascript) (push) Waiting to run
Security Scan / Dependency Review (push) Waiting to run

Fix Chatwoot DB Connection Instability and Implement Stale Conversation Cache Handling
This commit is contained in:
Davidson Gomes 2025-09-26 07:35:26 -03:00 committed by GitHub
commit bd0c43feac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -53,7 +53,9 @@ export class ChatwootService {
private readonly cache: CacheService,
) {}
private pgClient = postgresClient.getChatwootConnection();
private async getPgClient() {
return postgresClient.getChatwootConnection();
}
private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
const cacheKey = `${instance.instanceName}:getProvider`;
@ -382,7 +384,8 @@ export class ChatwootService {
if (!uri) return false;
const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`;
const tagData = (await this.pgClient.query(sqlTags, [nameInbox]))?.rows[0];
const pgClient = await this.getPgClient();
const tagData = (await pgClient.query(sqlTags, [nameInbox]))?.rows[0];
let tagId = tagData?.id;
const taggingsCount = tagData?.taggings_count || 0;
@ -392,18 +395,18 @@ export class ChatwootService {
DO UPDATE SET taggings_count = tags.taggings_count + 1
RETURNING id`;
tagId = (await this.pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
tagId = (await pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
const sqlCheckTagging = `SELECT 1 FROM taggings
WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`;
const taggingExists = (await this.pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
const taggingExists = (await pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
if (!taggingExists) {
const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at)
VALUES ($1, 'Contact', $2, 'labels', NOW())`;
await this.pgClient.query(sqlInsertLabel, [tagId, contactId]);
await pgClient.query(sqlInsertLabel, [tagId, contactId]);
}
return true;
@ -861,6 +864,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
const client = await this.clientCw(instance);
@ -869,32 +873,86 @@ export class ChatwootService {
return null;
}
const replyToIds = await this.getReplyToIds(messageBody, instance);
const doCreateMessage = async (convId: number) => {
const replyToIds = await this.getReplyToIds(messageBody, instance);
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: conversationId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: convId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
});
});
if (!message) {
this.logger.warn('message not found');
return null;
if (!message) {
this.logger.warn('message not found');
return null;
}
return message;
};
try {
return await doCreateMessage(conversationId);
} catch (error) {
return this.handleStaleConversationError(
error,
instance,
conversationId,
messageBody,
messageBodyForRetry,
'createMessage',
(newConvId) => doCreateMessage(newConvId),
);
}
}
return message;
private async handleStaleConversationError(
error: any,
instance: InstanceDto,
conversationId: number,
messageBody: any,
messageBodyForRetry: any,
functionName: string,
originalFunction: (newConversationId: number) => Promise<any>,
) {
if (axios.isAxiosError(error) && error.response?.status === 404) {
this.logger.warn(
`Conversation ${conversationId} not found in Chatwoot. Retrying operation from ${functionName}...`,
);
const bodyForRetry = messageBodyForRetry || messageBody;
if (!bodyForRetry || !bodyForRetry.key?.remoteJid) {
this.logger.error(`Cannot retry ${functionName} without a message body for context.`);
return null;
}
const { remoteJid } = bodyForRetry.key;
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
await this.cache.delete(cacheKey);
const newConversationId = await this.createConversation(instance, bodyForRetry);
if (!newConversationId) {
this.logger.error(`Failed to create new conversation for ${remoteJid} during retry.`);
return null;
}
this.logger.log(`Retrying ${functionName} for ${remoteJid} with new conversation ${newConversationId}`);
return await originalFunction(newConversationId);
} else {
this.logger.error(`Error in ${functionName}: ${error}`);
throw error;
}
}
public async getOpenConversationByContact(
@ -987,6 +1045,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
if (sourceId && this.isImportHistoryAvailable()) {
const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId);
@ -997,54 +1056,65 @@ export class ChatwootService {
}
}
}
const data = new FormData();
const doSendData = async (convId: number) => {
const data = new FormData();
if (content) {
data.append('content', content);
}
data.append('message_type', messageType);
data.append('attachments[]', fileStream, { filename: fileName });
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
if (content) {
data.append('content', content);
}
}
if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}
data.append('message_type', messageType);
if (sourceId) {
data.append('source_id', sourceId);
}
data.append('attachments[]', fileStream, { filename: fileName });
const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${conversationId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
}
}
if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}
if (sourceId) {
data.append('source_id', sourceId);
}
const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${convId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
};
const { data: responseData } = await axios.request(config);
return responseData;
};
try {
const { data } = await axios.request(config);
return data;
return await doSendData(conversationId);
} catch (error) {
this.logger.error(error);
return this.handleStaleConversationError(
error,
instance,
conversationId,
messageBody,
messageBodyForRetry,
'sendData',
(newConvId) => doSendData(newConvId),
);
}
}
@ -2032,6 +2102,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@ -2051,6 +2122,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@ -2076,6 +2148,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
quotedMsg,
body,
);
if (!send) {
this.logger.warn('message not sent');
@ -2132,6 +2205,8 @@ export class ChatwootService {
instance,
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@ -2173,6 +2248,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@ -2192,6 +2268,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);
if (!send) {
@ -2262,6 +2339,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
null,
body,
);
if (!send) {
this.logger.warn('edited message not sent');
@ -2515,7 +2593,8 @@ export class ChatwootService {
and created_at >= now() - interval '6h'
order by created_at desc`;
const messagesData = (await this.pgClient.query(sqlMessages))?.rows;
const pgClient = await this.getPgClient();
const messagesData = (await pgClient.query(sqlMessages))?.rows;
const ids: string[] = messagesData
.filter((message) => !!message.source_id)
.map((message) => message.source_id.replace('WAID:', ''));