From e241cf4ee00548d6c68e2c17d4c45d892c26da1e Mon Sep 17 00:00:00 2001 From: Judson Cairo Date: Wed, 25 Sep 2024 11:48:19 -0300 Subject: [PATCH] feat: Sync lost messages on chatwoot Runs the sync method every 30min --- package.json | 2 + .../whatsapp/whatsapp.baileys.service.ts | 27 +++++- .../controllers/chatwoot.controller.ts | 2 +- .../chatwoot/services/chatwoot.service.ts | 86 +++++++++++++++---- .../chatwoot/utils/chatwoot-import-helper.ts | 35 ++++++-- src/api/services/channel.service.ts | 2 +- src/config/error.config.ts | 2 +- 7 files changed, 129 insertions(+), 27 deletions(-) diff --git a/package.json b/package.json index 96c408a2..204c3b9c 100644 --- a/package.json +++ b/package.json @@ -81,6 +81,7 @@ "mime": "^3.0.0", "minio": "^8.0.1", "node-cache": "^5.1.2", + "node-cron": "^3.0.3", "node-windows": "^1.0.0-beta.8", "openai": "^4.52.7", "parse-bmfont-xml": "^1.1.4", @@ -106,6 +107,7 @@ "@types/json-schema": "^7.0.15", "@types/mime": "3.0.0", "@types/node": "^18.15.11", + "@types/node-cron": "^3.0.11", "@types/node-windows": "^0.1.2", "@types/qrcode": "^1.5.0", "@types/qrcode-terminal": "^0.12.0", diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 13eea7b5..e920c4f7 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -121,6 +121,7 @@ import { readFileSync } from 'fs'; import Long from 'long'; import mime from 'mime'; import NodeCache from 'node-cache'; +import cron from 'node-cron'; import { release } from 'os'; import { join } from 'path'; import P from 'pino'; @@ -367,7 +368,12 @@ export class BaileysStartupService extends ChannelStartupService { if (connection === 'open') { this.instance.wuid = this.client.user.id.replace(/:\d+/, ''); - this.instance.profilePictureUrl = (await this.profilePicture(this.instance.wuid)).profilePictureUrl; + try { + const profilePic = await this.profilePicture(this.instance.wuid); + this.instance.profilePictureUrl = profilePic.profilePictureUrl; + } catch (error) { + this.instance.profilePictureUrl = null; + } const formattedWuid = this.instance.wuid.split('@')[0].padEnd(30, ' '); const formattedName = this.instance.name; this.logger.info( @@ -402,6 +408,7 @@ export class BaileysStartupService extends ChannelStartupService { status: 'open', }, ); + this.syncChatwootLostMessages(); } } } @@ -3638,14 +3645,15 @@ export class BaileysStartupService extends ChannelStartupService { } private prepareMessage(message: proto.IWebMessageInfo): any { - const contentMsg = message?.message[getContentType(message.message)] as any; + const contentType = getContentType(message.message); + const contentMsg = message?.message[contentType] as any; const messageRaw = { key: message.key, pushName: message.pushName, message: { ...message.message }, contextInfo: contentMsg?.contextInfo, - messageType: getContentType(message.message) || 'unknown', + messageType: contentType || 'unknown', messageTimestamp: message.messageTimestamp as number, instanceId: this.instanceId, source: getDevice(message.key.id), @@ -3659,4 +3667,17 @@ export class BaileysStartupService extends ChannelStartupService { return messageRaw; } + + private async syncChatwootLostMessages() { + if (this.configService.get('CHATWOOT').ENABLED && this.localChatwoot?.enabled) { + const chatwootConfig = await this.findChatwoot(); + const prepare = (message: any) => this.prepareMessage(message); + this.chatwootService.syncLostMessages({ instanceName: this.instance.name }, chatwootConfig, prepare); + + const task = cron.schedule('0,30 * * * *', async () => { + this.chatwootService.syncLostMessages({ instanceName: this.instance.name }, chatwootConfig, prepare); + }); + task.start(); + } + } } diff --git a/src/api/integrations/chatbot/chatwoot/controllers/chatwoot.controller.ts b/src/api/integrations/chatbot/chatwoot/controllers/chatwoot.controller.ts index bfebad12..17cdce01 100644 --- a/src/api/integrations/chatbot/chatwoot/controllers/chatwoot.controller.ts +++ b/src/api/integrations/chatbot/chatwoot/controllers/chatwoot.controller.ts @@ -54,7 +54,7 @@ export class ChatwootController { return response; } - public async findChatwoot(instance: InstanceDto) { + public async findChatwoot(instance: InstanceDto): Promise { if (!this.configService.get('CHATWOOT').ENABLED) throw new BadRequestException('Chatwoot is disabled'); const result = await this.chatwootService.find(instance); diff --git a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts index e307c99d..fe1c0301 100644 --- a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts +++ b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts @@ -7,7 +7,7 @@ import { PrismaRepository } from '@api/repository/repository.service'; import { CacheService } from '@api/services/cache.service'; import { WAMonitoringService } from '@api/services/monitor.service'; import { Events } from '@api/types/wa.types'; -import { Chatwoot, ConfigService, HttpServer } from '@config/env.config'; +import { Chatwoot, ConfigService, Database, HttpServer } from '@config/env.config'; import { Logger } from '@config/logger.config'; import ChatwootClient, { ChatwootAPIConfig, @@ -24,6 +24,7 @@ import i18next from '@utils/i18n'; import { sendTelemetry } from '@utils/sendTelemetry'; import axios from 'axios'; import { proto } from 'baileys'; +import dayjs from 'dayjs'; import FormData from 'form-data'; import Jimp from 'jimp'; import Long from 'long'; @@ -53,7 +54,7 @@ export class ChatwootService { private pgClient = postgresClient.getChatwootConnection(); - private async getProvider(instance: InstanceDto) { + private async getProvider(instance: InstanceDto): Promise { const cacheKey = `${instance.instanceName}:getProvider`; if (await this.cache.has(cacheKey)) { const provider = (await this.cache.get(cacheKey)) as ChatwootModel; @@ -715,7 +716,7 @@ export class ChatwootService { } } - public async getInbox(instance: InstanceDto) { + public async getInbox(instance: InstanceDto): Promise { const cacheKey = `${instance.instanceName}:getInbox`; if (await this.cache.has(cacheKey)) { return (await this.cache.get(cacheKey)) as inbox; @@ -839,12 +840,6 @@ export class ChatwootService { return null; } - if (!this.configService.get('CHATWOOT').BOT_CONTACT) { - this.logger.log('Chatwoot bot contact is disabled'); - - return true; - } - const contact = await this.findContact(instance, '123456'); if (!contact) { @@ -1186,10 +1181,10 @@ export class ChatwootService { const cwBotContact = this.configService.get('CHATWOOT').BOT_CONTACT; - if (cwBotContact && chatId === '123456' && body.message_type === 'outgoing') { + if (chatId === '123456' && body.message_type === 'outgoing') { const command = messageReceived.replace('/', ''); - if (command.includes('init') || command.includes('iniciar')) { + if (cwBotContact && (command.includes('init') || command.includes('iniciar'))) { const state = waInstance?.connectionStatus?.state; if (state !== 'open') { @@ -1242,7 +1237,7 @@ export class ChatwootService { } } - if (command === 'disconnect' || command === 'desconectar') { + if (cwBotContact && (command === 'disconnect' || command === 'desconectar')) { const msgLogout = i18next.t('cw.inbox.disconnect', { inboxName: body.inbox.name, }); @@ -1532,7 +1527,7 @@ export class ChatwootService { 'audioMessage', 'videoMessage', 'stickerMessage', - 'viewOnceMessageV2' + 'viewOnceMessageV2', ]; const messageKeys = Object.keys(message); @@ -1586,8 +1581,10 @@ export class ChatwootService { liveLocationMessage: msg.liveLocationMessage, listMessage: msg.listMessage, listResponseMessage: msg.listResponseMessage, - viewOnceMessageV2: msg?.message?.viewOnceMessageV2?.message?.imageMessage?.url || msg?.message?.viewOnceMessageV2?.message?.videoMessage?.url || msg?.message?.viewOnceMessageV2?.message?.audioMessage?.url, - + viewOnceMessageV2: + msg?.message?.viewOnceMessageV2?.message?.imageMessage?.url || + msg?.message?.viewOnceMessageV2?.message?.videoMessage?.url || + msg?.message?.viewOnceMessageV2?.message?.audioMessage?.url, }; return types; @@ -2376,4 +2373,63 @@ export class ChatwootService { this.logger.error(`Error on update avatar in recent conversations: ${error.toString()}`); } } + + public async syncLostMessages( + instance: InstanceDto, + chatwootConfig: ChatwootDto, + prepareMessage: (message: any) => any, + ) { + if (!this.isImportHistoryAvailable()) { + return; + } + if (!this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) { + return; + } + + const inbox = await this.getInbox(instance); + + const sqlMessages = `select * from messages m + where account_id = ${chatwootConfig.accountId} + and inbox_id = ${inbox.id} + and created_at >= now() - interval '6h' + order by created_at desc`; + + const messagesData = (await this.pgClient.query(sqlMessages))?.rows; + const ids: string[] = messagesData + .filter((message) => !!message.source_id) + .map((message) => message.source_id.replace('WAID:', '')); + + const savedMessages = await this.prismaRepository.message.findMany({ + where: { + Instance: { name: instance.instanceName }, + messageTimestamp: { gte: dayjs().subtract(6, 'hours').unix() }, + AND: ids.map((id) => ({ key: { path: ['id'], not: id } })), + }, + }); + + const filteredMessages = savedMessages.filter( + (msg: any) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid), + ); + const messagesRaw: any[] = []; + for (const m of filteredMessages) { + if (!m.message || !m.key || !m.messageTimestamp) { + continue; + } + + if (Long.isLong(m?.messageTimestamp)) { + m.messageTimestamp = m.messageTimestamp?.toNumber(); + } + + messagesRaw.push(prepareMessage(m as any)); + } + + this.addHistoryMessages( + instance, + messagesRaw.filter((msg) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid)), + ); + + await chatwootImport.importHistoryMessages(instance, this, inbox, this.provider); + const waInstance = this.waMonitor.waInstances[instance.instanceName]; + waInstance.clearCacheChatwoot(); + } } diff --git a/src/api/integrations/chatbot/chatwoot/utils/chatwoot-import-helper.ts b/src/api/integrations/chatbot/chatwoot/utils/chatwoot-import-helper.ts index f331e494..8f1ae580 100644 --- a/src/api/integrations/chatbot/chatwoot/utils/chatwoot-import-helper.ts +++ b/src/api/integrations/chatbot/chatwoot/utils/chatwoot-import-helper.ts @@ -50,7 +50,7 @@ class ChatwootImport { const actualValue = this.historyMessages.has(instance.instanceName) ? this.historyMessages.get(instance.instanceName) : []; - this.historyMessages.set(instance.instanceName, actualValue.concat(messagesRaw)); + this.historyMessages.set(instance.instanceName, [...actualValue, ...messagesRaw]); } public addHistoryContacts(instance: InstanceDto, contactsRaw: Contact[]) { @@ -169,6 +169,24 @@ class ChatwootImport { } } + private async getExistingSourceIds(sourceIds: string[]): Promise> { + const existingSourceIdsSet = new Set(); + + if (sourceIds.length === 0) { + return existingSourceIdsSet; + } + + const query = 'SELECT source_id FROM messages WHERE source_id = ANY($1)'; + const pgClient = postgresClient.getChatwootConnection(); + const result = await pgClient.query(query, [sourceIds]); + + for (const row of result.rows) { + existingSourceIdsSet.add(row.source_id); + } + + return existingSourceIdsSet; + } + public async importHistoryMessages( instance: InstanceDto, chatwootService: ChatwootService, @@ -185,7 +203,7 @@ class ChatwootImport { let totalMessagesImported = 0; - const messagesOrdered = this.historyMessages.get(instance.instanceName) || []; + let messagesOrdered = this.historyMessages.get(instance.instanceName) || []; if (messagesOrdered.length === 0) { return 0; } @@ -216,6 +234,8 @@ class ChatwootImport { }); }); + const existingSourceIds = await this.getExistingSourceIds(messagesOrdered.map((message: any) => message.key.id)); + messagesOrdered = messagesOrdered.filter((message: any) => !existingSourceIds.has(message.key.id)); // processing messages in batch const batchSize = 4000; let messagesChunk: Message[] = this.sliceIntoChunks(messagesOrdered, batchSize); @@ -233,8 +253,8 @@ class ChatwootImport { // inserting messages in chatwoot db let sqlInsertMsg = `INSERT INTO messages - (content, account_id, inbox_id, conversation_id, message_type, private, content_type, - sender_type, sender_id, created_at, updated_at) VALUES `; + (content, processed_message_content, account_id, inbox_id, conversation_id, message_type, private, content_type, + sender_type, sender_id, source_id, created_at, updated_at) VALUES `; const bindInsertMsg = [provider.accountId, inbox.id]; messagesByPhoneNumber.forEach((messages: any[], phoneNumber: string) => { @@ -269,11 +289,14 @@ class ChatwootImport { bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id); const bindSenderId = `$${bindInsertMsg.length}`; + bindInsertMsg.push('WAID:' + message.key.id); + const bindSourceId = `$${bindInsertMsg.length}`; + bindInsertMsg.push(message.messageTimestamp as number); const bindmessageTimestamp = `$${bindInsertMsg.length}`; - sqlInsertMsg += `(${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0, - ${bindSenderType},${bindSenderId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`; + sqlInsertMsg += `(${bindContent}, ${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0, + ${bindSenderType},${bindSenderId},${bindSourceId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`; }); }); if (bindInsertMsg.length > 2) { diff --git a/src/api/services/channel.service.ts b/src/api/services/channel.service.ts index 7fe445e3..42a74f3a 100644 --- a/src/api/services/channel.service.ts +++ b/src/api/services/channel.service.ts @@ -294,7 +294,7 @@ export class ChannelStartupService { this.clearCacheChatwoot(); } - public async findChatwoot(): Promise { + public async findChatwoot(): Promise { if (!this.configService.get('CHATWOOT').ENABLED) { return null; } diff --git a/src/config/error.config.ts b/src/config/error.config.ts index 6449d52e..302c67d1 100644 --- a/src/config/error.config.ts +++ b/src/config/error.config.ts @@ -15,7 +15,7 @@ export function onUnexpectedError() { logger.error({ origin, stderr: process.stderr.fd, - error, }); + logger.error(error); }); }