diff --git a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts index 5020bfa6..bca90082 100644 --- a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts +++ b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts @@ -53,7 +53,9 @@ export class ChatwootService { private readonly cache: CacheService, ) {} - private pgClient = postgresClient.getChatwootConnection(); + private async getPgClient() { + return postgresClient.getChatwootConnection(); + } private async getProvider(instance: InstanceDto): Promise { const cacheKey = `${instance.instanceName}:getProvider`; @@ -382,7 +384,8 @@ export class ChatwootService { if (!uri) return false; const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`; - const tagData = (await this.pgClient.query(sqlTags, [nameInbox]))?.rows[0]; + const pgClient = await this.getPgClient(); + const tagData = (await pgClient.query(sqlTags, [nameInbox]))?.rows[0]; let tagId = tagData?.id; const taggingsCount = tagData?.taggings_count || 0; @@ -392,18 +395,18 @@ export class ChatwootService { DO UPDATE SET taggings_count = tags.taggings_count + 1 RETURNING id`; - tagId = (await this.pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id; + tagId = (await pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id; const sqlCheckTagging = `SELECT 1 FROM taggings WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`; - const taggingExists = (await this.pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0; + const taggingExists = (await pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0; if (!taggingExists) { const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at) VALUES ($1, 'Contact', $2, 'labels', NOW())`; - await this.pgClient.query(sqlInsertLabel, [tagId, contactId]); + await pgClient.query(sqlInsertLabel, [tagId, contactId]); } return true; @@ -861,6 +864,7 @@ export class ChatwootService { messageBody?: any, sourceId?: string, quotedMsg?: MessageModel, + messageBodyForRetry?: any, ) { const client = await this.clientCw(instance); @@ -869,32 +873,66 @@ export class ChatwootService { return null; } - const replyToIds = await this.getReplyToIds(messageBody, instance); + const doCreateMessage = async (convId: number) => { + const replyToIds = await this.getReplyToIds(messageBody, instance); - const sourceReplyId = quotedMsg?.chatwootMessageId || null; + const sourceReplyId = quotedMsg?.chatwootMessageId || null; - const message = await client.messages.create({ - accountId: this.provider.accountId, - conversationId: conversationId, - data: { - content: content, - message_type: messageType, - attachments: attachments, - private: privateMessage || false, - source_id: sourceId, - content_attributes: { - ...replyToIds, + const message = await client.messages.create({ + accountId: this.provider.accountId, + conversationId: convId, + data: { + content: content, + message_type: messageType, + attachments: attachments, + private: privateMessage || false, + source_id: sourceId, + content_attributes: { + ...replyToIds, + }, + source_reply_id: sourceReplyId ? sourceReplyId.toString() : null, }, - source_reply_id: sourceReplyId ? sourceReplyId.toString() : null, - }, - }); + }); - if (!message) { - this.logger.warn('message not found'); - return null; + if (!message) { + this.logger.warn('message not found'); + return null; + } + + return message; + }; + + try { + return await doCreateMessage(conversationId); + } catch (error) { + const errorMessage = error.toString().toLowerCase(); + const status = error.response?.status; + if (errorMessage.includes('not found') || status === 404) { + this.logger.warn(`Conversation ${conversationId} not found. Retrying...`); + const bodyForRetry = messageBodyForRetry || messageBody; + + if (!bodyForRetry) { + this.logger.error('Cannot retry createMessage without a message body for context.'); + return null; + } + + const remoteJid = bodyForRetry.key.remoteJid; + const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`; + await this.cache.delete(cacheKey); + + const newConversationId = await this.createConversation(instance, bodyForRetry); + if (!newConversationId) { + this.logger.error(`Failed to create new conversation for ${remoteJid}`); + return null; + } + + this.logger.log(`Retrying message creation for ${remoteJid} with new conversation ${newConversationId}`); + return await doCreateMessage(newConversationId); + } else { + this.logger.error(`Error creating message: ${error}`); + throw error; + } } - - return message; } public async getOpenConversationByContact( @@ -987,6 +1025,7 @@ export class ChatwootService { messageBody?: any, sourceId?: string, quotedMsg?: MessageModel, + messageBodyForRetry?: any, ) { if (sourceId && this.isImportHistoryAvailable()) { const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId); @@ -997,54 +1036,84 @@ export class ChatwootService { } } } - const data = new FormData(); + const doSendData = async (convId: number) => { + const data = new FormData(); - if (content) { - data.append('content', content); - } - - data.append('message_type', messageType); - - data.append('attachments[]', fileStream, { filename: fileName }); - - const sourceReplyId = quotedMsg?.chatwootMessageId || null; - - if (messageBody && instance) { - const replyToIds = await this.getReplyToIds(messageBody, instance); - - if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) { - const content = JSON.stringify({ - ...replyToIds, - }); - data.append('content_attributes', content); + if (content) { + data.append('content', content); } - } - if (sourceReplyId) { - data.append('source_reply_id', sourceReplyId.toString()); - } + data.append('message_type', messageType); - if (sourceId) { - data.append('source_id', sourceId); - } + data.append('attachments[]', fileStream, { filename: fileName }); - const config = { - method: 'post', - maxBodyLength: Infinity, - url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${conversationId}/messages`, - headers: { - api_access_token: this.provider.token, - ...data.getHeaders(), - }, - data: data, + const sourceReplyId = quotedMsg?.chatwootMessageId || null; + + if (messageBody && instance) { + const replyToIds = await this.getReplyToIds(messageBody, instance); + + if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) { + const content = JSON.stringify({ + ...replyToIds, + }); + data.append('content_attributes', content); + } + } + + if (sourceReplyId) { + data.append('source_reply_id', sourceReplyId.toString()); + } + + if (sourceId) { + data.append('source_id', sourceId); + } + + const config = { + method: 'post', + maxBodyLength: Infinity, + url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${convId}/messages`, + headers: { + api_access_token: this.provider.token, + ...data.getHeaders(), + }, + data: data, + }; + + const { data: responseData } = await axios.request(config); + return responseData; }; try { - const { data } = await axios.request(config); - - return data; + return await doSendData(conversationId); } catch (error) { - this.logger.error(error); + const errorMessage = error.toString().toLowerCase(); + const status = error.response?.status; + + if (errorMessage.includes('not found') || status === 404) { + this.logger.warn(`Conversation ${conversationId} not found. Retrying...`); + const bodyForRetry = messageBodyForRetry || messageBody; + + if (!bodyForRetry) { + this.logger.error('Cannot retry sendData without a message body for context.'); + return null; + } + + const remoteJid = bodyForRetry.key.remoteJid; + const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`; + await this.cache.delete(cacheKey); + + const newConversationId = await this.createConversation(instance, bodyForRetry); + if (!newConversationId) { + this.logger.error(`Failed to create new conversation for ${remoteJid}`); + return null; + } + + this.logger.log(`Retrying sendData for ${remoteJid} with new conversation ${newConversationId}`); + return await doSendData(newConversationId); + } else { + this.logger.error(error); + return null; + } } } @@ -2032,6 +2101,7 @@ export class ChatwootService { body, 'WAID:' + body.key.id, quotedMsg, + null, ); if (!send) { @@ -2051,6 +2121,7 @@ export class ChatwootService { body, 'WAID:' + body.key.id, quotedMsg, + null, ); if (!send) { @@ -2076,6 +2147,7 @@ export class ChatwootService { }, 'WAID:' + body.key.id, quotedMsg, + body, ); if (!send) { this.logger.warn('message not sent'); @@ -2132,6 +2204,8 @@ export class ChatwootService { instance, body, 'WAID:' + body.key.id, + quotedMsg, + null, ); if (!send) { @@ -2173,6 +2247,7 @@ export class ChatwootService { body, 'WAID:' + body.key.id, quotedMsg, + null, ); if (!send) { @@ -2192,6 +2267,7 @@ export class ChatwootService { body, 'WAID:' + body.key.id, quotedMsg, + null, ); if (!send) { @@ -2262,6 +2338,7 @@ export class ChatwootService { }, 'WAID:' + body.key.id, null, + body, ); if (!send) { this.logger.warn('edited message not sent'); @@ -2515,7 +2592,8 @@ export class ChatwootService { and created_at >= now() - interval '6h' order by created_at desc`; - const messagesData = (await this.pgClient.query(sqlMessages))?.rows; + const pgClient = await this.getPgClient(); + const messagesData = (await pgClient.query(sqlMessages))?.rows; const ids: string[] = messagesData .filter((message) => !!message.source_id) .map((message) => message.source_id.replace('WAID:', ''));