From 7d9dd64303ae42a10edd7ce8c1e3bf054ac8e7f1 Mon Sep 17 00:00:00 2001 From: Wender Teixeira Date: Mon, 4 Sep 2023 15:06:18 -0300 Subject: [PATCH 1/7] Update monitor.service.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Isso permite que as operações assíncronas sejam executadas em paralelo --- src/whatsapp/services/monitor.service.ts | 101 +++++++++-------------- 1 file changed, 41 insertions(+), 60 deletions(-) diff --git a/src/whatsapp/services/monitor.service.ts b/src/whatsapp/services/monitor.service.ts index 69aae5aa..e0bdfd73 100644 --- a/src/whatsapp/services/monitor.service.ts +++ b/src/whatsapp/services/monitor.service.ts @@ -74,81 +74,62 @@ export class WAMonitoringService { } } - public async instanceInfo(instanceName?: string) { + public async instanceInfo(instanceName?: string) { this.logger.verbose('get instance info'); - if (instanceName && !this.waInstances[instanceName]) { - throw new NotFoundException(`Instance "${instanceName}" not found`); - } - const instances: any[] = []; + const urlServer = this.configService.get('SERVER').URL; - for await (const [key, value] of Object.entries(this.waInstances)) { - if (value) { - this.logger.verbose('get instance info: ' + key); - let chatwoot: any; - - const urlServer = this.configService.get('SERVER').URL; - - const findChatwoot = await this.waInstances[key].findChatwoot(); - - if (findChatwoot && findChatwoot.enabled) { - chatwoot = { - ...findChatwoot, - webhook_url: `${urlServer}/chatwoot/webhook/${encodeURIComponent(key)}`, + const instances: any[] = await Promise.all( + Object.entries(this.waInstances).map(async ([key, value]) => { + if (!value || !value.connectionStatus || value.connectionStatus.state !== 'open') { + return { + instance: { + instanceName: key, + status: value?.connectionStatus?.state || 'unknown', + }, }; } - if (value.connectionStatus.state === 'open') { - this.logger.verbose('instance: ' + key + ' - connectionStatus: open'); + this.logger.verbose('instance: ' + key + ' - connectionStatus: open'); - const instanceData = { - instance: { - instanceName: key, - owner: value.wuid, - profileName: (await value.getProfileName()) || 'not loaded', - profilePictureUrl: value.profilePictureUrl, - profileStatus: (await value.getProfileStatus()) || '', - status: value.connectionStatus.state, - }, - }; + const instanceData: any = { + instance: { + instanceName: key, + owner: value.wuid, + profileName: (await value.getProfileName()) || 'not loaded', + profilePictureUrl: value.profilePictureUrl, + profileStatus: (await value.getProfileStatus()) || '', + status: 'open', + }, + }; - if (this.configService.get('AUTHENTICATION').EXPOSE_IN_FETCH_INSTANCES) { - instanceData.instance['serverUrl'] = this.configService.get('SERVER').URL; + if (this.configService.get('AUTHENTICATION').EXPOSE_IN_FETCH_INSTANCES) { + instanceData.instance.serverUrl = urlServer; + instanceData.instance.apikey = (await this.repository.auth.find(key))?.apikey; - instanceData.instance['apikey'] = (await this.repository.auth.find(key))?.apikey; - - instanceData.instance['chatwoot'] = chatwoot; + const findChatwoot = await this.waInstances[key].findChatwoot(); + if (findChatwoot && findChatwoot.enabled) { + instanceData.instance.chatwoot = { + ...findChatwoot, + webhook_url: `${urlServer}/chatwoot/webhook/${encodeURIComponent(key)}`, + }; } - - instances.push(instanceData); - } else { - this.logger.verbose('instance: ' + key + ' - connectionStatus: ' + value.connectionStatus.state); - - const instanceData = { - instance: { - instanceName: key, - status: value.connectionStatus.state, - }, - }; - - if (this.configService.get('AUTHENTICATION').EXPOSE_IN_FETCH_INSTANCES) { - instanceData.instance['serverUrl'] = this.configService.get('SERVER').URL; - - instanceData.instance['apikey'] = (await this.repository.auth.find(key))?.apikey; - - instanceData.instance['chatwoot'] = chatwoot; - } - - instances.push(instanceData); } - } - } + + return instanceData; + }), + ); this.logger.verbose('return instance info: ' + instances.length); - return instances.find((i) => i.instance.instanceName === instanceName) ?? instances; - } + if (instanceName) { + const instance = instances.find((i) => i.instance.instanceName === instanceName); + return instance || []; + } + return instances; + } + private delInstanceFiles() { this.logger.verbose('cron to delete instance files started'); setInterval(async () => { From b554d8c19ca22af911d3b2098ed73f16583bfa8d Mon Sep 17 00:00:00 2001 From: Wender Teixeira Date: Mon, 4 Sep 2023 15:11:23 -0300 Subject: [PATCH 2/7] Update monitor.service.ts (loadInstances) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Melhorando é organizando o loadInstances --- src/whatsapp/services/monitor.service.ts | 130 +++++++++++++---------- 1 file changed, 74 insertions(+), 56 deletions(-) diff --git a/src/whatsapp/services/monitor.service.ts b/src/whatsapp/services/monitor.service.ts index e0bdfd73..ec5106e9 100644 --- a/src/whatsapp/services/monitor.service.ts +++ b/src/whatsapp/services/monitor.service.ts @@ -74,7 +74,7 @@ export class WAMonitoringService { } } - public async instanceInfo(instanceName?: string) { + public async instanceInfo(instanceName?: string) { this.logger.verbose('get instance info'); const urlServer = this.configService.get('SERVER').URL; @@ -129,7 +129,7 @@ export class WAMonitoringService { return instances; } - + private delInstanceFiles() { this.logger.verbose('cron to delete instance files started'); setInterval(async () => { @@ -226,69 +226,87 @@ export class WAMonitoringService { } public async loadInstance() { - this.logger.verbose('load instances'); - const set = async (name: string) => { - const instance = new WAStartupService(this.configService, this.eventEmitter, this.repository, this.cache); - instance.instanceName = name; - this.logger.verbose('instance loaded: ' + name); - - await instance.connectToWhatsapp(); - this.logger.verbose('connectToWhatsapp: ' + name); - - this.waInstances[name] = instance; - }; + this.logger.verbose('Loading instances'); try { if (this.redis.ENABLED) { - this.logger.verbose('redis enabled'); - await this.cache.connect(this.redis as Redis); - const keys = await this.cache.instanceKeys(); - if (keys?.length > 0) { - this.logger.verbose('reading instance keys and setting instances'); - keys.forEach(async (k) => await set(k.split(':')[1])); - } else { - this.logger.verbose('no instance keys found'); - } - this.cache.disconnect(); - return; - } - - if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { - this.logger.verbose('database enabled'); - await this.repository.dbServer.connect(); - const collections: any[] = await this.dbInstance.collections(); - if (collections.length > 0) { - this.logger.verbose('reading collections and setting instances'); - collections.forEach(async (coll) => await set(coll.namespace.replace(/^[\w-]+\./, ''))); - } else { - this.logger.verbose('no collections found'); - } - return; - } - - this.logger.verbose('store in files enabled'); - const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' }); - for await (const dirent of dir) { - if (dirent.isDirectory()) { - this.logger.verbose('reading instance files and setting instances'); - const files = readdirSync(join(INSTANCE_DIR, dirent.name), { - encoding: 'utf-8', - }); - if (files.length === 0) { - rmSync(join(INSTANCE_DIR, dirent.name), { recursive: true, force: true }); - break; - } - - await set(dirent.name); - } else { - this.logger.verbose('no instance files found'); - } + await this.loadInstancesFromRedis(); + } else if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { + await this.loadInstancesFromDatabase(); + } else { + await this.loadInstancesFromFiles(); } } catch (error) { this.logger.error(error); } } + private async setInstance(name: string) { + const instance = new WAStartupService(this.configService, this.eventEmitter, this.repository, this.cache); + instance.instanceName = name; + this.logger.verbose('Instance loaded: ' + name); + + await instance.connectToWhatsapp(); + this.logger.verbose('connectToWhatsapp: ' + name); + + this.waInstances[name] = instance; + } + + private async loadInstancesFromRedis() { + this.logger.verbose('Redis enabled'); + await this.cache.connect(this.redis as Redis); + const keys = await this.cache.instanceKeys(); + + if (keys?.length > 0) { + this.logger.verbose('Reading instance keys and setting instances'); + await Promise.all(keys.map((k) => this.setInstance(k.split(':')[1]))); + } else { + this.logger.verbose('No instance keys found'); + } + + this.cache.disconnect(); + } + + private async loadInstancesFromDatabase() { + this.logger.verbose('Database enabled'); + await this.repository.dbServer.connect(); + const collections: any[] = await this.dbInstance.collections(); + + if (collections.length > 0) { + this.logger.verbose('Reading collections and setting instances'); + await Promise.all(collections.map((coll) => this.setInstance(coll.namespace.replace(/^[\w-]+\./, '')))); + } else { + this.logger.verbose('No collections found'); + } + } + + private async loadInstancesFromFiles() { + this.logger.verbose('Store in files enabled'); + const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' }); + const instanceDirs = []; + + for await (const dirent of dir) { + if (dirent.isDirectory()) { + instanceDirs.push(dirent.name); + } else { + this.logger.verbose('No instance files found'); + } + } + + await Promise.all( + instanceDirs.map(async (instanceName) => { + this.logger.verbose('Reading instance files and setting instances: ' + instanceName); + const files = readdirSync(join(INSTANCE_DIR, instanceName), { encoding: 'utf-8' }); + + if (files.length === 0) { + rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true }); + } else { + await this.setInstance(instanceName); + } + }), + ); + } + private removeInstance() { this.eventEmitter.on('remove.instance', async (instanceName: string) => { this.logger.verbose('remove instance: ' + instanceName); From 402d5af19a766acb43b64a17fcd977b0009621ff Mon Sep 17 00:00:00 2001 From: Wender Teixeira Date: Mon, 4 Sep 2023 15:18:32 -0300 Subject: [PATCH 3/7] Update monitor.service.ts --- src/whatsapp/services/monitor.service.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/whatsapp/services/monitor.service.ts b/src/whatsapp/services/monitor.service.ts index ec5106e9..9acf0fd6 100644 --- a/src/whatsapp/services/monitor.service.ts +++ b/src/whatsapp/services/monitor.service.ts @@ -7,7 +7,6 @@ import { join } from 'path'; import { Auth, ConfigService, Database, DelInstance, HttpServer, Redis } from '../../config/env.config'; import { Logger } from '../../config/logger.config'; import { INSTANCE_DIR, STORE_DIR } from '../../config/path.config'; -import { NotFoundException } from '../../exceptions'; import { dbserver } from '../../libs/db.connect'; import { RedisCache } from '../../libs/redis.client'; import { @@ -180,7 +179,6 @@ export class WAMonitoringService { this.logger.verbose('cleaning up instance in redis: ' + instanceName); this.cache.reference = instanceName; await this.cache.delAll(); - this.cache.disconnect(); return; } @@ -263,8 +261,6 @@ export class WAMonitoringService { } else { this.logger.verbose('No instance keys found'); } - - this.cache.disconnect(); } private async loadInstancesFromDatabase() { From 8652d4031c1c00919a7086d41f43f57cbe74da86 Mon Sep 17 00:00:00 2001 From: Wender Teixeira Date: Mon, 4 Sep 2023 19:00:42 -0300 Subject: [PATCH 4/7] Update monitor.service.ts --- src/whatsapp/services/monitor.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/whatsapp/services/monitor.service.ts b/src/whatsapp/services/monitor.service.ts index 9acf0fd6..3e99a912 100644 --- a/src/whatsapp/services/monitor.service.ts +++ b/src/whatsapp/services/monitor.service.ts @@ -80,7 +80,7 @@ export class WAMonitoringService { const instances: any[] = await Promise.all( Object.entries(this.waInstances).map(async ([key, value]) => { - if (!value || !value.connectionStatus || value.connectionStatus.state !== 'open') { + if (!value || !value.connectionStatus || !value.connectionStatus.state) { return { instance: { instanceName: key, From 1dc5bb8bd17e5b7550543b223b5ee2d18ed4d26c Mon Sep 17 00:00:00 2001 From: Wender Teixeira Date: Tue, 5 Sep 2023 11:47:24 -0300 Subject: [PATCH 5/7] Update monitor.service.ts --- src/whatsapp/services/monitor.service.ts | 322 +---------------------- 1 file changed, 9 insertions(+), 313 deletions(-) diff --git a/src/whatsapp/services/monitor.service.ts b/src/whatsapp/services/monitor.service.ts index 3e99a912..97a32f95 100644 --- a/src/whatsapp/services/monitor.service.ts +++ b/src/whatsapp/services/monitor.service.ts @@ -1,78 +1,3 @@ -import { execSync } from 'child_process'; -import EventEmitter2 from 'eventemitter2'; -import { opendirSync, readdirSync, rmSync } from 'fs'; -import { Db } from 'mongodb'; -import { join } from 'path'; - -import { Auth, ConfigService, Database, DelInstance, HttpServer, Redis } from '../../config/env.config'; -import { Logger } from '../../config/logger.config'; -import { INSTANCE_DIR, STORE_DIR } from '../../config/path.config'; -import { dbserver } from '../../libs/db.connect'; -import { RedisCache } from '../../libs/redis.client'; -import { - AuthModel, - ChatwootModel, - ContactModel, - MessageModel, - MessageUpModel, - SettingsModel, - WebhookModel, -} from '../models'; -import { RepositoryBroker } from '../repository/repository.manager'; -import { WAStartupService } from './whatsapp.service'; - -export class WAMonitoringService { - constructor( - private readonly eventEmitter: EventEmitter2, - private readonly configService: ConfigService, - private readonly repository: RepositoryBroker, - private readonly cache: RedisCache, - ) { - this.logger.verbose('instance created'); - - this.removeInstance(); - this.noConnection(); - this.delInstanceFiles(); - - Object.assign(this.db, configService.get('DATABASE')); - Object.assign(this.redis, configService.get('REDIS')); - - this.dbInstance = this.db.ENABLED - ? this.repository.dbServer?.db(this.db.CONNECTION.DB_PREFIX_NAME + '-instances') - : undefined; - } - - private readonly db: Partial = {}; - private readonly redis: Partial = {}; - - private dbInstance: Db; - - private dbStore = dbserver; - - private readonly logger = new Logger(WAMonitoringService.name); - public readonly waInstances: Record = {}; - - public delInstanceTime(instance: string) { - const time = this.configService.get('DEL_INSTANCE'); - if (typeof time === 'number' && time > 0) { - this.logger.verbose(`Instance "${instance}" don't have connection, will be removed in ${time} minutes`); - - setTimeout(async () => { - if (this.waInstances[instance]?.connectionStatus?.state !== 'open') { - if (this.waInstances[instance]?.connectionStatus?.state === 'connecting') { - await this.waInstances[instance]?.client?.logout('Log out instance: ' + instance); - this.waInstances[instance]?.client?.ws?.close(); - this.waInstances[instance]?.client?.end(undefined); - delete this.waInstances[instance]; - } else { - delete this.waInstances[instance]; - this.eventEmitter.emit('remove.instance', instance, 'inner'); - } - } - }, 1000 * 60 * time); - } - } - public async instanceInfo(instanceName?: string) { this.logger.verbose('get instance info'); @@ -80,16 +5,15 @@ export class WAMonitoringService { const instances: any[] = await Promise.all( Object.entries(this.waInstances).map(async ([key, value]) => { - if (!value || !value.connectionStatus || !value.connectionStatus.state) { - return { - instance: { - instanceName: key, - status: value?.connectionStatus?.state || 'unknown', - }, - }; + const status = value?.connectionStatus?.state || 'unknown'; + + if (status === 'unknown') { + return null; } - this.logger.verbose('instance: ' + key + ' - connectionStatus: open'); + if (status === 'open') { + this.logger.verbose('instance: ' + key + ' - connectionStatus: open'); + } const instanceData: any = { instance: { @@ -98,7 +22,7 @@ export class WAMonitoringService { profileName: (await value.getProfileName()) || 'not loaded', profilePictureUrl: value.profilePictureUrl, profileStatus: (await value.getProfileStatus()) || '', - status: 'open', + status: status, }, }; @@ -117,7 +41,7 @@ export class WAMonitoringService { return instanceData; }), - ); + ).then((results) => results.filter((instance) => instance !== null)); this.logger.verbose('return instance info: ' + instances.length); @@ -128,231 +52,3 @@ export class WAMonitoringService { return instances; } - - private delInstanceFiles() { - this.logger.verbose('cron to delete instance files started'); - setInterval(async () => { - if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { - const collections = await this.dbInstance.collections(); - collections.forEach(async (collection) => { - const name = collection.namespace.replace(/^[\w-]+./, ''); - await this.dbInstance.collection(name).deleteMany({ - $or: [{ _id: { $regex: /^app.state.*/ } }, { _id: { $regex: /^session-.*/ } }], - }); - this.logger.verbose('instance files deleted: ' + name); - }); - } else if (!this.redis.ENABLED) { - const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' }); - for await (const dirent of dir) { - if (dirent.isDirectory()) { - const files = readdirSync(join(INSTANCE_DIR, dirent.name), { - encoding: 'utf-8', - }); - files.forEach(async (file) => { - if (file.match(/^app.state.*/) || file.match(/^session-.*/)) { - rmSync(join(INSTANCE_DIR, dirent.name, file), { - recursive: true, - force: true, - }); - } - }); - this.logger.verbose('instance files deleted: ' + dirent.name); - } - } - } - }, 3600 * 1000 * 2); - } - - public async cleaningUp(instanceName: string) { - this.logger.verbose('cleaning up instance: ' + instanceName); - if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { - this.logger.verbose('cleaning up instance in database: ' + instanceName); - await this.repository.dbServer.connect(); - const collections: any[] = await this.dbInstance.collections(); - if (collections.length > 0) { - await this.dbInstance.dropCollection(instanceName); - } - return; - } - - if (this.redis.ENABLED) { - this.logger.verbose('cleaning up instance in redis: ' + instanceName); - this.cache.reference = instanceName; - await this.cache.delAll(); - return; - } - - this.logger.verbose('cleaning up instance in files: ' + instanceName); - rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true }); - } - - public async cleaningStoreFiles(instanceName: string) { - if (!this.db.ENABLED) { - this.logger.verbose('cleaning store files instance: ' + instanceName); - rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true }); - - execSync(`rm -rf ${join(STORE_DIR, 'chats', instanceName)}`); - execSync(`rm -rf ${join(STORE_DIR, 'contacts', instanceName)}`); - execSync(`rm -rf ${join(STORE_DIR, 'message-up', instanceName)}`); - execSync(`rm -rf ${join(STORE_DIR, 'messages', instanceName)}`); - - execSync(`rm -rf ${join(STORE_DIR, 'auth', 'apikey', instanceName + '.json')}`); - execSync(`rm -rf ${join(STORE_DIR, 'webhook', instanceName + '.json')}`); - execSync(`rm -rf ${join(STORE_DIR, 'chatwoot', instanceName + '*')}`); - execSync(`rm -rf ${join(STORE_DIR, 'chamaai', instanceName + '*')}`); - execSync(`rm -rf ${join(STORE_DIR, 'proxy', instanceName + '*')}`); - execSync(`rm -rf ${join(STORE_DIR, 'rabbitmq', instanceName + '*')}`); - execSync(`rm -rf ${join(STORE_DIR, 'typebot', instanceName + '*')}`); - execSync(`rm -rf ${join(STORE_DIR, 'websocket', instanceName + '*')}`); - execSync(`rm -rf ${join(STORE_DIR, 'settings', instanceName + '*')}`); - - return; - } - - this.logger.verbose('cleaning store database instance: ' + instanceName); - - await AuthModel.deleteMany({ owner: instanceName }); - await ContactModel.deleteMany({ owner: instanceName }); - await MessageModel.deleteMany({ owner: instanceName }); - await MessageUpModel.deleteMany({ owner: instanceName }); - await AuthModel.deleteMany({ _id: instanceName }); - await WebhookModel.deleteMany({ _id: instanceName }); - await ChatwootModel.deleteMany({ _id: instanceName }); - await SettingsModel.deleteMany({ _id: instanceName }); - - return; - } - - public async loadInstance() { - this.logger.verbose('Loading instances'); - - try { - if (this.redis.ENABLED) { - await this.loadInstancesFromRedis(); - } else if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { - await this.loadInstancesFromDatabase(); - } else { - await this.loadInstancesFromFiles(); - } - } catch (error) { - this.logger.error(error); - } - } - - private async setInstance(name: string) { - const instance = new WAStartupService(this.configService, this.eventEmitter, this.repository, this.cache); - instance.instanceName = name; - this.logger.verbose('Instance loaded: ' + name); - - await instance.connectToWhatsapp(); - this.logger.verbose('connectToWhatsapp: ' + name); - - this.waInstances[name] = instance; - } - - private async loadInstancesFromRedis() { - this.logger.verbose('Redis enabled'); - await this.cache.connect(this.redis as Redis); - const keys = await this.cache.instanceKeys(); - - if (keys?.length > 0) { - this.logger.verbose('Reading instance keys and setting instances'); - await Promise.all(keys.map((k) => this.setInstance(k.split(':')[1]))); - } else { - this.logger.verbose('No instance keys found'); - } - } - - private async loadInstancesFromDatabase() { - this.logger.verbose('Database enabled'); - await this.repository.dbServer.connect(); - const collections: any[] = await this.dbInstance.collections(); - - if (collections.length > 0) { - this.logger.verbose('Reading collections and setting instances'); - await Promise.all(collections.map((coll) => this.setInstance(coll.namespace.replace(/^[\w-]+\./, '')))); - } else { - this.logger.verbose('No collections found'); - } - } - - private async loadInstancesFromFiles() { - this.logger.verbose('Store in files enabled'); - const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' }); - const instanceDirs = []; - - for await (const dirent of dir) { - if (dirent.isDirectory()) { - instanceDirs.push(dirent.name); - } else { - this.logger.verbose('No instance files found'); - } - } - - await Promise.all( - instanceDirs.map(async (instanceName) => { - this.logger.verbose('Reading instance files and setting instances: ' + instanceName); - const files = readdirSync(join(INSTANCE_DIR, instanceName), { encoding: 'utf-8' }); - - if (files.length === 0) { - rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true }); - } else { - await this.setInstance(instanceName); - } - }), - ); - } - - private removeInstance() { - this.eventEmitter.on('remove.instance', async (instanceName: string) => { - this.logger.verbose('remove instance: ' + instanceName); - try { - this.logger.verbose('instance: ' + instanceName + ' - removing from memory'); - this.waInstances[instanceName] = undefined; - } catch (error) { - this.logger.error(error); - } - - try { - this.logger.verbose('request cleaning up instance: ' + instanceName); - this.cleaningUp(instanceName); - this.cleaningStoreFiles(instanceName); - } finally { - this.logger.warn(`Instance "${instanceName}" - REMOVED`); - } - }); - this.eventEmitter.on('logout.instance', async (instanceName: string) => { - this.logger.verbose('logout instance: ' + instanceName); - try { - this.logger.verbose('request cleaning up instance: ' + instanceName); - this.cleaningUp(instanceName); - } finally { - this.logger.warn(`Instance "${instanceName}" - LOGOUT`); - } - }); - } - - private noConnection() { - this.logger.verbose('checking instances without connection'); - this.eventEmitter.on('no.connection', async (instanceName) => { - try { - this.logger.verbose('logging out instance: ' + instanceName); - await this.waInstances[instanceName]?.client?.logout('Log out instance: ' + instanceName); - - this.logger.verbose('close connection instance: ' + instanceName); - this.waInstances[instanceName]?.client?.ws?.close(); - - this.waInstances[instanceName].instance.qrcode = { count: 0 }; - this.waInstances[instanceName].stateConnection.state = 'close'; - } catch (error) { - this.logger.error({ - localError: 'noConnection', - warn: 'Error deleting instance from memory.', - error, - }); - } finally { - this.logger.warn(`Instance "${instanceName}" - NOT CONNECTION`); - } - }); - } -} From 240a77dccee59c332b86a2d8fe62255742472205 Mon Sep 17 00:00:00 2001 From: Wender Teixeira Date: Tue, 5 Sep 2023 11:48:38 -0300 Subject: [PATCH 6/7] Update monitor.service.ts Fix monitor --- src/whatsapp/services/monitor.service.ts | 303 +++++++++++++++++++++++ 1 file changed, 303 insertions(+) diff --git a/src/whatsapp/services/monitor.service.ts b/src/whatsapp/services/monitor.service.ts index 97a32f95..66b07bae 100644 --- a/src/whatsapp/services/monitor.service.ts +++ b/src/whatsapp/services/monitor.service.ts @@ -1,3 +1,78 @@ +import { execSync } from 'child_process'; +import EventEmitter2 from 'eventemitter2'; +import { opendirSync, readdirSync, rmSync } from 'fs'; +import { Db } from 'mongodb'; +import { join } from 'path'; + +import { Auth, ConfigService, Database, DelInstance, HttpServer, Redis } from '../../config/env.config'; +import { Logger } from '../../config/logger.config'; +import { INSTANCE_DIR, STORE_DIR } from '../../config/path.config'; +import { dbserver } from '../../libs/db.connect'; +import { RedisCache } from '../../libs/redis.client'; +import { + AuthModel, + ChatwootModel, + ContactModel, + MessageModel, + MessageUpModel, + SettingsModel, + WebhookModel, +} from '../models'; +import { RepositoryBroker } from '../repository/repository.manager'; +import { WAStartupService } from './whatsapp.service'; + +export class WAMonitoringService { + constructor( + private readonly eventEmitter: EventEmitter2, + private readonly configService: ConfigService, + private readonly repository: RepositoryBroker, + private readonly cache: RedisCache, + ) { + this.logger.verbose('instance created'); + + this.removeInstance(); + this.noConnection(); + this.delInstanceFiles(); + + Object.assign(this.db, configService.get('DATABASE')); + Object.assign(this.redis, configService.get('REDIS')); + + this.dbInstance = this.db.ENABLED + ? this.repository.dbServer?.db(this.db.CONNECTION.DB_PREFIX_NAME + '-instances') + : undefined; + } + + private readonly db: Partial = {}; + private readonly redis: Partial = {}; + + private dbInstance: Db; + + private dbStore = dbserver; + + private readonly logger = new Logger(WAMonitoringService.name); + public readonly waInstances: Record = {}; + + public delInstanceTime(instance: string) { + const time = this.configService.get('DEL_INSTANCE'); + if (typeof time === 'number' && time > 0) { + this.logger.verbose(`Instance "${instance}" don't have connection, will be removed in ${time} minutes`); + + setTimeout(async () => { + if (this.waInstances[instance]?.connectionStatus?.state !== 'open') { + if (this.waInstances[instance]?.connectionStatus?.state === 'connecting') { + await this.waInstances[instance]?.client?.logout('Log out instance: ' + instance); + this.waInstances[instance]?.client?.ws?.close(); + this.waInstances[instance]?.client?.end(undefined); + delete this.waInstances[instance]; + } else { + delete this.waInstances[instance]; + this.eventEmitter.emit('remove.instance', instance, 'inner'); + } + } + }, 1000 * 60 * time); + } + } + public async instanceInfo(instanceName?: string) { this.logger.verbose('get instance info'); @@ -52,3 +127,231 @@ return instances; } + + private delInstanceFiles() { + this.logger.verbose('cron to delete instance files started'); + setInterval(async () => { + if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { + const collections = await this.dbInstance.collections(); + collections.forEach(async (collection) => { + const name = collection.namespace.replace(/^[\w-]+./, ''); + await this.dbInstance.collection(name).deleteMany({ + $or: [{ _id: { $regex: /^app.state.*/ } }, { _id: { $regex: /^session-.*/ } }], + }); + this.logger.verbose('instance files deleted: ' + name); + }); + } else if (!this.redis.ENABLED) { + const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' }); + for await (const dirent of dir) { + if (dirent.isDirectory()) { + const files = readdirSync(join(INSTANCE_DIR, dirent.name), { + encoding: 'utf-8', + }); + files.forEach(async (file) => { + if (file.match(/^app.state.*/) || file.match(/^session-.*/)) { + rmSync(join(INSTANCE_DIR, dirent.name, file), { + recursive: true, + force: true, + }); + } + }); + this.logger.verbose('instance files deleted: ' + dirent.name); + } + } + } + }, 3600 * 1000 * 2); + } + + public async cleaningUp(instanceName: string) { + this.logger.verbose('cleaning up instance: ' + instanceName); + if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { + this.logger.verbose('cleaning up instance in database: ' + instanceName); + await this.repository.dbServer.connect(); + const collections: any[] = await this.dbInstance.collections(); + if (collections.length > 0) { + await this.dbInstance.dropCollection(instanceName); + } + return; + } + + if (this.redis.ENABLED) { + this.logger.verbose('cleaning up instance in redis: ' + instanceName); + this.cache.reference = instanceName; + await this.cache.delAll(); + return; + } + + this.logger.verbose('cleaning up instance in files: ' + instanceName); + rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true }); + } + + public async cleaningStoreFiles(instanceName: string) { + if (!this.db.ENABLED) { + this.logger.verbose('cleaning store files instance: ' + instanceName); + rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true }); + + execSync(`rm -rf ${join(STORE_DIR, 'chats', instanceName)}`); + execSync(`rm -rf ${join(STORE_DIR, 'contacts', instanceName)}`); + execSync(`rm -rf ${join(STORE_DIR, 'message-up', instanceName)}`); + execSync(`rm -rf ${join(STORE_DIR, 'messages', instanceName)}`); + + execSync(`rm -rf ${join(STORE_DIR, 'auth', 'apikey', instanceName + '.json')}`); + execSync(`rm -rf ${join(STORE_DIR, 'webhook', instanceName + '.json')}`); + execSync(`rm -rf ${join(STORE_DIR, 'chatwoot', instanceName + '*')}`); + execSync(`rm -rf ${join(STORE_DIR, 'chamaai', instanceName + '*')}`); + execSync(`rm -rf ${join(STORE_DIR, 'proxy', instanceName + '*')}`); + execSync(`rm -rf ${join(STORE_DIR, 'rabbitmq', instanceName + '*')}`); + execSync(`rm -rf ${join(STORE_DIR, 'typebot', instanceName + '*')}`); + execSync(`rm -rf ${join(STORE_DIR, 'websocket', instanceName + '*')}`); + execSync(`rm -rf ${join(STORE_DIR, 'settings', instanceName + '*')}`); + + return; + } + + this.logger.verbose('cleaning store database instance: ' + instanceName); + + await AuthModel.deleteMany({ owner: instanceName }); + await ContactModel.deleteMany({ owner: instanceName }); + await MessageModel.deleteMany({ owner: instanceName }); + await MessageUpModel.deleteMany({ owner: instanceName }); + await AuthModel.deleteMany({ _id: instanceName }); + await WebhookModel.deleteMany({ _id: instanceName }); + await ChatwootModel.deleteMany({ _id: instanceName }); + await SettingsModel.deleteMany({ _id: instanceName }); + + return; + } + + public async loadInstance() { + this.logger.verbose('Loading instances'); + + try { + if (this.redis.ENABLED) { + await this.loadInstancesFromRedis(); + } else if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) { + await this.loadInstancesFromDatabase(); + } else { + await this.loadInstancesFromFiles(); + } + } catch (error) { + this.logger.error(error); + } + } + + private async setInstance(name: string) { + const instance = new WAStartupService(this.configService, this.eventEmitter, this.repository, this.cache); + instance.instanceName = name; + this.logger.verbose('Instance loaded: ' + name); + + await instance.connectToWhatsapp(); + this.logger.verbose('connectToWhatsapp: ' + name); + + this.waInstances[name] = instance; + } + + private async loadInstancesFromRedis() { + this.logger.verbose('Redis enabled'); + await this.cache.connect(this.redis as Redis); + const keys = await this.cache.instanceKeys(); + + if (keys?.length > 0) { + this.logger.verbose('Reading instance keys and setting instances'); + await Promise.all(keys.map((k) => this.setInstance(k.split(':')[1]))); + } else { + this.logger.verbose('No instance keys found'); + } + } + + private async loadInstancesFromDatabase() { + this.logger.verbose('Database enabled'); + await this.repository.dbServer.connect(); + const collections: any[] = await this.dbInstance.collections(); + + if (collections.length > 0) { + this.logger.verbose('Reading collections and setting instances'); + await Promise.all(collections.map((coll) => this.setInstance(coll.namespace.replace(/^[\w-]+\./, '')))); + } else { + this.logger.verbose('No collections found'); + } + } + + private async loadInstancesFromFiles() { + this.logger.verbose('Store in files enabled'); + const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' }); + const instanceDirs = []; + + for await (const dirent of dir) { + if (dirent.isDirectory()) { + instanceDirs.push(dirent.name); + } else { + this.logger.verbose('No instance files found'); + } + } + + await Promise.all( + instanceDirs.map(async (instanceName) => { + this.logger.verbose('Reading instance files and setting instances: ' + instanceName); + const files = readdirSync(join(INSTANCE_DIR, instanceName), { encoding: 'utf-8' }); + + if (files.length === 0) { + rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true }); + } else { + await this.setInstance(instanceName); + } + }), + ); + } + + private removeInstance() { + this.eventEmitter.on('remove.instance', async (instanceName: string) => { + this.logger.verbose('remove instance: ' + instanceName); + try { + this.logger.verbose('instance: ' + instanceName + ' - removing from memory'); + this.waInstances[instanceName] = undefined; + } catch (error) { + this.logger.error(error); + } + + try { + this.logger.verbose('request cleaning up instance: ' + instanceName); + this.cleaningUp(instanceName); + this.cleaningStoreFiles(instanceName); + } finally { + this.logger.warn(`Instance "${instanceName}" - REMOVED`); + } + }); + this.eventEmitter.on('logout.instance', async (instanceName: string) => { + this.logger.verbose('logout instance: ' + instanceName); + try { + this.logger.verbose('request cleaning up instance: ' + instanceName); + this.cleaningUp(instanceName); + } finally { + this.logger.warn(`Instance "${instanceName}" - LOGOUT`); + } + }); + } + + private noConnection() { + this.logger.verbose('checking instances without connection'); + this.eventEmitter.on('no.connection', async (instanceName) => { + try { + this.logger.verbose('logging out instance: ' + instanceName); + await this.waInstances[instanceName]?.client?.logout('Log out instance: ' + instanceName); + + this.logger.verbose('close connection instance: ' + instanceName); + this.waInstances[instanceName]?.client?.ws?.close(); + + this.waInstances[instanceName].instance.qrcode = { count: 0 }; + this.waInstances[instanceName].stateConnection.state = 'close'; + } catch (error) { + this.logger.error({ + localError: 'noConnection', + warn: 'Error deleting instance from memory.', + error, + }); + } finally { + this.logger.warn(`Instance "${instanceName}" - NOT CONNECTION`); + } + }); + } +} From cdbe839b35ac9cafc7cd08d504583dc1f02f6981 Mon Sep 17 00:00:00 2001 From: Wender Teixeira Date: Tue, 5 Sep 2023 11:49:02 -0300 Subject: [PATCH 7/7] Update monitor.service.ts