From 82894a1c4f70cee9e9f074c2c2a8d5bec97cbd81 Mon Sep 17 00:00:00 2001 From: jaison-x Date: Mon, 25 Dec 2023 18:48:25 -0300 Subject: [PATCH 1/3] refactor: change the message ids cache in chatwoot to use a in memory cache Remove use of disc cache for optimize performance. To make this, we need change to use only one instance of ChatwootService in entire application. --- .../controllers/chatwoot.controller.ts | 4 +- .../controllers/instance.controller.ts | 6 ++ src/whatsapp/services/chatwoot.service.ts | 102 ++++-------------- src/whatsapp/services/whatsapp.service.ts | 4 +- 4 files changed, 32 insertions(+), 84 deletions(-) diff --git a/src/whatsapp/controllers/chatwoot.controller.ts b/src/whatsapp/controllers/chatwoot.controller.ts index 8f59ccac..d1090956 100644 --- a/src/whatsapp/controllers/chatwoot.controller.ts +++ b/src/whatsapp/controllers/chatwoot.controller.ts @@ -7,7 +7,7 @@ import { ChatwootDto } from '../dto/chatwoot.dto'; import { InstanceDto } from '../dto/instance.dto'; import { RepositoryBroker } from '../repository/repository.manager'; import { ChatwootService } from '../services/chatwoot.service'; -import { waMonitor } from '../whatsapp.module'; +import { instanceController } from '../whatsapp.module'; const logger = new Logger('ChatwootController'); @@ -94,7 +94,7 @@ export class ChatwootController { public async receiveWebhook(instance: InstanceDto, data: any) { logger.verbose('requested receiveWebhook from ' + instance.instanceName + ' instance'); - const chatwootService = new ChatwootService(waMonitor, this.configService, this.repository); + const chatwootService = instanceController.getChatwootService(); return chatwootService.receiveWebhook(instance, data); } diff --git a/src/whatsapp/controllers/instance.controller.ts b/src/whatsapp/controllers/instance.controller.ts index 0f06895e..d15e5c2b 100644 --- a/src/whatsapp/controllers/instance.controller.ts +++ b/src/whatsapp/controllers/instance.controller.ts @@ -659,4 +659,10 @@ export class InstanceController { this.logger.verbose('requested refreshToken'); return await this.authService.refreshToken(oldToken); } + + public getChatwootService() { + this.logger.verbose('getting chatwootService object instance'); + + return this.chatwootService; + } } diff --git a/src/whatsapp/services/chatwoot.service.ts b/src/whatsapp/services/chatwoot.service.ts index 26b0cce9..cf173050 100644 --- a/src/whatsapp/services/chatwoot.service.ts +++ b/src/whatsapp/services/chatwoot.service.ts @@ -1,14 +1,13 @@ import ChatwootClient from '@figuro/chatwoot-sdk'; import axios from 'axios'; import FormData from 'form-data'; -import { createReadStream, readFileSync, unlinkSync, writeFileSync } from 'fs'; +import { createReadStream, unlinkSync, writeFileSync } from 'fs'; import Jimp from 'jimp'; import mimeTypes from 'mime-types'; import path from 'path'; import { ConfigService, HttpServer } from '../../config/env.config'; import { Logger } from '../../config/logger.config'; -import { ROOT_DIR } from '../../config/path.config'; import { ChatwootDto } from '../dto/chatwoot.dto'; import { InstanceDto } from '../dto/instance.dto'; import { Options, Quoted, SendAudioDto, SendMediaDto, SendTextDto } from '../dto/sendMessage.dto'; @@ -18,8 +17,7 @@ import { Events } from '../types/wa.types'; import { WAMonitoringService } from './monitor.service'; export class ChatwootService { - private messageCacheFile: string; - private messageCache: Set; + private messageCache: Record>; private readonly logger = new Logger(ChatwootService.name); @@ -30,31 +28,25 @@ export class ChatwootService { private readonly configService: ConfigService, private readonly repository: RepositoryBroker, ) { - this.messageCache = new Set(); + this.messageCache = {}; } - private loadMessageCache(): Set { - this.logger.verbose('load message cache'); - try { - const cacheData = readFileSync(this.messageCacheFile, 'utf-8'); - const cacheArray = cacheData.split('\n'); - return new Set(cacheArray); - } catch (error) { - return new Set(); + private isMessageInCache(instance: InstanceDto, id: number) { + this.logger.verbose('check if message is in cache'); + if (!this.messageCache[instance.instanceName]) { + return false; } + + return this.messageCache[instance.instanceName].has(id); } - private saveMessageCache() { - this.logger.verbose('save message cache'); - const cacheData = Array.from(this.messageCache).join('\n'); - writeFileSync(this.messageCacheFile, cacheData, 'utf-8'); - this.logger.verbose('message cache saved'); - } + private addMessageToCache(instance: InstanceDto, id: number) { + this.logger.verbose('add message to cache'); - private clearMessageCache() { - this.logger.verbose('clear message cache'); - this.messageCache.clear(); - this.saveMessageCache(); + if (!this.messageCache[instance.instanceName]) { + this.messageCache[instance.instanceName] = new Set(); + } + this.messageCache[instance.instanceName].add(id); } private async getProvider(instance: InstanceDto) { @@ -1105,22 +1097,11 @@ export class ChatwootService { if (body.message_type === 'outgoing' && body?.conversation?.messages?.length && chatId !== '123456') { this.logger.verbose('check if is group'); - this.messageCacheFile = path.join(ROOT_DIR, 'store', 'chatwoot', `${instance.instanceName}_cache.txt`); - this.logger.verbose('cache file path: ' + this.messageCacheFile); - - this.messageCache = this.loadMessageCache(); - this.logger.verbose('cache file loaded'); - this.logger.verbose(this.messageCache); - - this.logger.verbose('check if message is cached'); - if (this.messageCache.has(body.id.toString())) { + if (this.isMessageInCache(instance, body.id)) { this.logger.verbose('message is cached'); return { message: 'bot' }; } - this.logger.verbose('clear cache'); - this.clearMessageCache(); - this.logger.verbose('Format message to send'); let formatText: string; if (senderName === null || senderName === undefined) { @@ -1597,14 +1578,7 @@ export class ChatwootService { return; } - this.messageCacheFile = path.join(ROOT_DIR, 'store', 'chatwoot', `${instance.instanceName}_cache.txt`); - - this.messageCache = this.loadMessageCache(); - - this.messageCache.add(send.id.toString()); - - this.logger.verbose('save message cache'); - this.saveMessageCache(); + this.addMessageToCache(instance, send.id); return send; } else { @@ -1618,14 +1592,7 @@ export class ChatwootService { return; } - this.messageCacheFile = path.join(ROOT_DIR, 'store', 'chatwoot', `${instance.instanceName}_cache.txt`); - - this.messageCache = this.loadMessageCache(); - - this.messageCache.add(send.id.toString()); - - this.logger.verbose('save message cache'); - this.saveMessageCache(); + this.addMessageToCache(instance, send.id); return send; } @@ -1650,11 +1617,7 @@ export class ChatwootService { this.logger.warn('message not sent'); return; } - this.messageCacheFile = path.join(ROOT_DIR, 'store', 'chatwoot', `${instance.instanceName}_cache.txt`); - this.messageCache = this.loadMessageCache(); - this.messageCache.add(send.id.toString()); - this.logger.verbose('save message cache'); - this.saveMessageCache(); + this.addMessageToCache(instance, send.id); } return; @@ -1711,14 +1674,7 @@ export class ChatwootService { return; } - this.messageCacheFile = path.join(ROOT_DIR, 'store', 'chatwoot', `${instance.instanceName}_cache.txt`); - - this.messageCache = this.loadMessageCache(); - - this.messageCache.add(send.id.toString()); - - this.logger.verbose('save message cache'); - this.saveMessageCache(); + this.addMessageToCache(instance, send.id); return send; } @@ -1746,14 +1702,7 @@ export class ChatwootService { return; } - this.messageCacheFile = path.join(ROOT_DIR, 'store', 'chatwoot', `${instance.instanceName}_cache.txt`); - - this.messageCache = this.loadMessageCache(); - - this.messageCache.add(send.id.toString()); - - this.logger.verbose('save message cache'); - this.saveMessageCache(); + this.addMessageToCache(instance, send.id); return send; } else { @@ -1767,14 +1716,7 @@ export class ChatwootService { return; } - this.messageCacheFile = path.join(ROOT_DIR, 'store', 'chatwoot', `${instance.instanceName}_cache.txt`); - - this.messageCache = this.loadMessageCache(); - - this.messageCache.add(send.id.toString()); - - this.logger.verbose('save message cache'); - this.saveMessageCache(); + this.addMessageToCache(instance, send.id); return send; } diff --git a/src/whatsapp/services/whatsapp.service.ts b/src/whatsapp/services/whatsapp.service.ts index f222d5b7..ce355486 100644 --- a/src/whatsapp/services/whatsapp.service.ts +++ b/src/whatsapp/services/whatsapp.service.ts @@ -75,6 +75,7 @@ import { getIO } from '../../libs/socket.server'; import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server'; import { useMultiFileAuthStateDb } from '../../utils/use-multi-file-auth-state-db'; import { useMultiFileAuthStateRedisDb } from '../../utils/use-multi-file-auth-state-redis-db'; +import { instanceController } from '../../whatsapp/whatsapp.module'; import { ArchiveChatDto, DeleteMessage, @@ -131,7 +132,6 @@ import { RepositoryBroker } from '../repository/repository.manager'; import { Events, MessageSubtype, TypeMediaMessage, wa } from '../types/wa.types'; import { waMonitor } from '../whatsapp.module'; import { ChamaaiService } from './chamaai.service'; -import { ChatwootService } from './chatwoot.service'; import { TypebotService } from './typebot.service'; const retryCache = {}; @@ -169,7 +169,7 @@ export class WAStartupService { private phoneNumber: string; - private chatwootService = new ChatwootService(waMonitor, this.configService, this.repository); + private chatwootService = instanceController.getChatwootService(); private typebotService = new TypebotService(waMonitor, this.configService, this.eventEmitter); From 8b5f73badd200601f689842a8c96b27b2fd5ce29 Mon Sep 17 00:00:00 2001 From: jaison-x Date: Wed, 27 Dec 2023 13:56:32 -0300 Subject: [PATCH 2/3] fix: added support to use source_id to check chatwoot's webhook needs to be ignored. With this, messageCache is used to support Chatwoot version <= 3.3.1. After this version we can remove use of message cache and use only the source_id field to check chatwoot's webhook needs to be ignored. It's have much better performance. --- src/whatsapp/services/chatwoot.service.ts | 71 ++++++++++++++++++++--- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/src/whatsapp/services/chatwoot.service.ts b/src/whatsapp/services/chatwoot.service.ts index cf173050..c62597f7 100644 --- a/src/whatsapp/services/chatwoot.service.ts +++ b/src/whatsapp/services/chatwoot.service.ts @@ -28,16 +28,24 @@ export class ChatwootService { private readonly configService: ConfigService, private readonly repository: RepositoryBroker, ) { + // messageCache is used to support Chatwoot version <= 3.3.1. + // after this version we can remove use of message cache and use source_id to check webhook needs to be ignored this.messageCache = {}; } - private isMessageInCache(instance: InstanceDto, id: number) { + private isMessageInCache(instance: InstanceDto, id: number, remove = true) { this.logger.verbose('check if message is in cache'); if (!this.messageCache[instance.instanceName]) { return false; } - return this.messageCache[instance.instanceName].has(id); + const hasId = this.messageCache[instance.instanceName].has(id); + + if (remove) { + this.messageCache[instance.instanceName].delete(id); + } + + return hasId; } private addMessageToCache(instance: InstanceDto, id: number) { @@ -636,6 +644,7 @@ export class ChatwootService { filename: string; }[], messageBody?: any, + sourceId?: string, ) { this.logger.verbose('create message to instance: ' + instance.instanceName); @@ -657,6 +666,7 @@ export class ChatwootService { message_type: messageType, attachments: attachments, private: privateMessage || false, + source_id: sourceId, content_attributes: { ...replyToIds, }, @@ -757,6 +767,7 @@ export class ChatwootService { content?: string, instance?: InstanceDto, messageBody?: any, + sourceId?: string, ) { this.logger.verbose('send data to chatwoot'); @@ -783,6 +794,10 @@ export class ChatwootService { } } + if (sourceId) { + data.append('source_id', sourceId); + } + this.logger.verbose('get client to instance: ' + this.provider.instanceName); const config = { method: 'post', @@ -1097,7 +1112,13 @@ export class ChatwootService { if (body.message_type === 'outgoing' && body?.conversation?.messages?.length && chatId !== '123456') { this.logger.verbose('check if is group'); - if (this.isMessageInCache(instance, body.id)) { + // messageCache is used to support Chatwoot version <= 3.3.1. + // after this version we can remove use of message cache and use only source_id value check + // use of source_id is better for performance + if ( + body?.conversation?.messages[0]?.source_id?.substring(0, 5) === 'WAID:' || + this.isMessageInCache(instance, body.id) + ) { this.logger.verbose('message is cached'); return { message: 'bot' }; } @@ -1571,7 +1592,15 @@ export class ChatwootService { } this.logger.verbose('send data to chatwoot'); - const send = await this.sendData(getConversation, fileName, messageType, content, instance, body); + const send = await this.sendData( + getConversation, + fileName, + messageType, + content, + instance, + body, + 'WAID:' + body.key.id, + ); if (!send) { this.logger.warn('message not sent'); @@ -1585,7 +1614,15 @@ export class ChatwootService { this.logger.verbose('message is not group'); this.logger.verbose('send data to chatwoot'); - const send = await this.sendData(getConversation, fileName, messageType, bodyMessage, instance, body); + const send = await this.sendData( + getConversation, + fileName, + messageType, + bodyMessage, + instance, + body, + 'WAID:' + body.key.id, + ); if (!send) { this.logger.warn('message not sent'); @@ -1612,6 +1649,7 @@ export class ChatwootService { { message: { extendedTextMessage: { contextInfo: { stanzaId: reactionMessage.key.id } } }, }, + 'WAID:' + body.key.id, ); if (!send) { this.logger.warn('message not sent'); @@ -1667,6 +1705,7 @@ export class ChatwootService { `${bodyMessage}\n\n\n**${title}**\n${description}\n${adsMessage.sourceUrl}`, instance, body, + 'WAID:' + body.key.id, ); if (!send) { @@ -1695,7 +1734,16 @@ export class ChatwootService { } this.logger.verbose('send data to chatwoot'); - const send = await this.createMessage(instance, getConversation, content, messageType, false, [], body); + const send = await this.createMessage( + instance, + getConversation, + content, + messageType, + false, + [], + body, + 'WAID:' + body.key.id, + ); if (!send) { this.logger.warn('message not sent'); @@ -1709,7 +1757,16 @@ export class ChatwootService { this.logger.verbose('message is not group'); this.logger.verbose('send data to chatwoot'); - const send = await this.createMessage(instance, getConversation, bodyMessage, messageType, false, [], body); + const send = await this.createMessage( + instance, + getConversation, + bodyMessage, + messageType, + false, + [], + body, + 'WAID:' + body.key.id, + ); if (!send) { this.logger.warn('message not sent'); From bfa7d429bd1d2f2cfea4d486624407d72ad9bf0f Mon Sep 17 00:00:00 2001 From: jaison-x Date: Thu, 28 Dec 2023 14:43:50 -0300 Subject: [PATCH 3/3] refactor(chatwoot): remove message ids cache in chatwoot to use chatwoot's api itself. Remove use of disc cache to optimize performance. BREAKING CHANGE: to make this, we need to use the param `source_id` from message in chatwoot. This param is only available from api in chatwoot version => 3.4.0. --- .../controllers/chatwoot.controller.ts | 4 +- .../controllers/instance.controller.ts | 6 --- src/whatsapp/services/chatwoot.service.ts | 53 ++----------------- src/whatsapp/services/whatsapp.service.ts | 4 +- 4 files changed, 7 insertions(+), 60 deletions(-) diff --git a/src/whatsapp/controllers/chatwoot.controller.ts b/src/whatsapp/controllers/chatwoot.controller.ts index d1090956..8f59ccac 100644 --- a/src/whatsapp/controllers/chatwoot.controller.ts +++ b/src/whatsapp/controllers/chatwoot.controller.ts @@ -7,7 +7,7 @@ import { ChatwootDto } from '../dto/chatwoot.dto'; import { InstanceDto } from '../dto/instance.dto'; import { RepositoryBroker } from '../repository/repository.manager'; import { ChatwootService } from '../services/chatwoot.service'; -import { instanceController } from '../whatsapp.module'; +import { waMonitor } from '../whatsapp.module'; const logger = new Logger('ChatwootController'); @@ -94,7 +94,7 @@ export class ChatwootController { public async receiveWebhook(instance: InstanceDto, data: any) { logger.verbose('requested receiveWebhook from ' + instance.instanceName + ' instance'); - const chatwootService = instanceController.getChatwootService(); + const chatwootService = new ChatwootService(waMonitor, this.configService, this.repository); return chatwootService.receiveWebhook(instance, data); } diff --git a/src/whatsapp/controllers/instance.controller.ts b/src/whatsapp/controllers/instance.controller.ts index d15e5c2b..0f06895e 100644 --- a/src/whatsapp/controllers/instance.controller.ts +++ b/src/whatsapp/controllers/instance.controller.ts @@ -659,10 +659,4 @@ export class InstanceController { this.logger.verbose('requested refreshToken'); return await this.authService.refreshToken(oldToken); } - - public getChatwootService() { - this.logger.verbose('getting chatwootService object instance'); - - return this.chatwootService; - } } diff --git a/src/whatsapp/services/chatwoot.service.ts b/src/whatsapp/services/chatwoot.service.ts index c62597f7..63c04b6f 100644 --- a/src/whatsapp/services/chatwoot.service.ts +++ b/src/whatsapp/services/chatwoot.service.ts @@ -17,8 +17,6 @@ import { Events } from '../types/wa.types'; import { WAMonitoringService } from './monitor.service'; export class ChatwootService { - private messageCache: Record>; - private readonly logger = new Logger(ChatwootService.name); private provider: any; @@ -27,35 +25,7 @@ export class ChatwootService { private readonly waMonitor: WAMonitoringService, private readonly configService: ConfigService, private readonly repository: RepositoryBroker, - ) { - // messageCache is used to support Chatwoot version <= 3.3.1. - // after this version we can remove use of message cache and use source_id to check webhook needs to be ignored - this.messageCache = {}; - } - - private isMessageInCache(instance: InstanceDto, id: number, remove = true) { - this.logger.verbose('check if message is in cache'); - if (!this.messageCache[instance.instanceName]) { - return false; - } - - const hasId = this.messageCache[instance.instanceName].has(id); - - if (remove) { - this.messageCache[instance.instanceName].delete(id); - } - - return hasId; - } - - private addMessageToCache(instance: InstanceDto, id: number) { - this.logger.verbose('add message to cache'); - - if (!this.messageCache[instance.instanceName]) { - this.messageCache[instance.instanceName] = new Set(); - } - this.messageCache[instance.instanceName].add(id); - } + ) {} private async getProvider(instance: InstanceDto) { this.logger.verbose('get provider to instance: ' + instance.instanceName); @@ -1112,14 +1082,8 @@ export class ChatwootService { if (body.message_type === 'outgoing' && body?.conversation?.messages?.length && chatId !== '123456') { this.logger.verbose('check if is group'); - // messageCache is used to support Chatwoot version <= 3.3.1. - // after this version we can remove use of message cache and use only source_id value check - // use of source_id is better for performance - if ( - body?.conversation?.messages[0]?.source_id?.substring(0, 5) === 'WAID:' || - this.isMessageInCache(instance, body.id) - ) { - this.logger.verbose('message is cached'); + if (body?.conversation?.messages[0]?.source_id?.substring(0, 5) === 'WAID:') { + this.logger.verbose('message sent directly from whatsapp. Webhook ignored.'); return { message: 'bot' }; } @@ -1607,8 +1571,6 @@ export class ChatwootService { return; } - this.addMessageToCache(instance, send.id); - return send; } else { this.logger.verbose('message is not group'); @@ -1629,8 +1591,6 @@ export class ChatwootService { return; } - this.addMessageToCache(instance, send.id); - return send; } } @@ -1655,7 +1615,6 @@ export class ChatwootService { this.logger.warn('message not sent'); return; } - this.addMessageToCache(instance, send.id); } return; @@ -1713,8 +1672,6 @@ export class ChatwootService { return; } - this.addMessageToCache(instance, send.id); - return send; } @@ -1750,8 +1707,6 @@ export class ChatwootService { return; } - this.addMessageToCache(instance, send.id); - return send; } else { this.logger.verbose('message is not group'); @@ -1773,8 +1728,6 @@ export class ChatwootService { return; } - this.addMessageToCache(instance, send.id); - return send; } } diff --git a/src/whatsapp/services/whatsapp.service.ts b/src/whatsapp/services/whatsapp.service.ts index ce355486..f222d5b7 100644 --- a/src/whatsapp/services/whatsapp.service.ts +++ b/src/whatsapp/services/whatsapp.service.ts @@ -75,7 +75,6 @@ import { getIO } from '../../libs/socket.server'; import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server'; import { useMultiFileAuthStateDb } from '../../utils/use-multi-file-auth-state-db'; import { useMultiFileAuthStateRedisDb } from '../../utils/use-multi-file-auth-state-redis-db'; -import { instanceController } from '../../whatsapp/whatsapp.module'; import { ArchiveChatDto, DeleteMessage, @@ -132,6 +131,7 @@ import { RepositoryBroker } from '../repository/repository.manager'; import { Events, MessageSubtype, TypeMediaMessage, wa } from '../types/wa.types'; import { waMonitor } from '../whatsapp.module'; import { ChamaaiService } from './chamaai.service'; +import { ChatwootService } from './chatwoot.service'; import { TypebotService } from './typebot.service'; const retryCache = {}; @@ -169,7 +169,7 @@ export class WAStartupService { private phoneNumber: string; - private chatwootService = instanceController.getChatwootService(); + private chatwootService = new ChatwootService(waMonitor, this.configService, this.repository); private typebotService = new TypebotService(waMonitor, this.configService, this.eventEmitter);