diff --git a/.DS_Store b/.DS_Store index 9880dac3..34664d41 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/package.json b/package.json index 7997c3fa..6d97cf0b 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "@sentry/node": "^7.59.2", "@whiskeysockets/baileys": "github:WhiskeySockets/Baileys#fix-lids", "amqplib": "^0.10.3", + "aws-sdk": "^2.1499.0", "axios": "^1.3.5", "class-validator": "^0.13.2", "compression": "^1.7.4", diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 873c54ce..118836b7 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -66,6 +66,14 @@ export type Rabbitmq = { URI: string; }; +export type Sqs = { + ENABLED: boolean; + ACCESS_KEY_ID: string; + SECRET_ACCESS_KEY: string; + ACCOUNT_ID: string; + REGION: string; +}; + export type Websocket = { ENABLED: boolean; }; @@ -135,6 +143,7 @@ export interface Env { DATABASE: Database; REDIS: Redis; RABBITMQ: Rabbitmq; + SQS: Sqs; WEBSOCKET: Websocket; LOG: Log; DEL_INSTANCE: DelInstance; @@ -226,6 +235,13 @@ export class ConfigService { ENABLED: process.env?.RABBITMQ_ENABLED === 'true', URI: process.env.RABBITMQ_URI || '', }, + SQS: { + ENABLED: process.env?.SQS_ENABLED === 'true', + ACCESS_KEY_ID: process.env.SQS_ACCESS_KEY_ID || '', + SECRET_ACCESS_KEY: process.env.SQS_SECRET_ACCESS_KEY || '', + ACCOUNT_ID: process.env.SQS_ACCOUNT_ID || '', + REGION: process.env.SQS_REGION || '', + }, WEBSOCKET: { ENABLED: process.env?.WEBSOCKET_ENABLED === 'true', }, diff --git a/src/libs/redis.client.ts b/src/libs/redis.client.ts index f03513ba..1d74ff15 100644 --- a/src/libs/redis.client.ts +++ b/src/libs/redis.client.ts @@ -5,49 +5,55 @@ import { Redis } from '../config/env.config'; import { Logger } from '../config/logger.config'; export class RedisCache { - async disconnect() { - await this.client.disconnect(); - this.statusConnection = false; - } - constructor() { - this.logger.verbose('instance created'); - process.on('beforeExit', async () => { - this.logger.verbose('instance destroyed'); - if (this.statusConnection) { - this.logger.verbose('instance disconnect'); - await this.client.disconnect(); - } - }); - } - + private readonly logger = new Logger(RedisCache.name); + private client: RedisClientType; private statusConnection = false; private instanceName: string; private redisEnv: Redis; + constructor() { + this.logger.verbose('RedisCache instance created'); + process.on('beforeExit', () => { + this.logger.verbose('RedisCache instance destroyed'); + this.disconnect(); + }); + } + public set reference(reference: string) { this.logger.verbose('set reference: ' + reference); this.instanceName = reference; } public async connect(redisEnv: Redis) { - this.logger.verbose('connecting'); + this.logger.verbose('Connecting to Redis...'); this.client = createClient({ url: redisEnv.URI }); - this.logger.verbose('connected in ' + redisEnv.URI); + this.client.on('error', (err) => this.logger.error('Redis Client Error ' + err)); + await this.client.connect(); this.statusConnection = true; this.redisEnv = redisEnv; + this.logger.verbose(`Connected to ${redisEnv.URI}`); } - private readonly logger = new Logger(RedisCache.name); - private client: RedisClientType; + public async disconnect() { + if (this.statusConnection) { + await this.client.disconnect(); + this.statusConnection = false; + this.logger.verbose('Redis client disconnected'); + } + } public async instanceKeys(): Promise { + const keys: string[] = []; try { - this.logger.verbose('instance keys: ' + this.redisEnv.PREFIX_KEY + ':*'); - return await this.client.sendCommand(['keys', this.redisEnv.PREFIX_KEY + ':*']); + this.logger.verbose('Fetching instance keys'); + for await (const key of this.client.scanIterator({ MATCH: `${this.redisEnv.PREFIX_KEY}:*` })) { + keys.push(key); + } } catch (error) { - this.logger.error(error); + this.logger.error('Error fetching instance keys ' + error); } + return keys; } public async keyExists(key?: string) { diff --git a/src/libs/sqs.server.ts b/src/libs/sqs.server.ts new file mode 100644 index 00000000..04184542 --- /dev/null +++ b/src/libs/sqs.server.ts @@ -0,0 +1,97 @@ +import { SQS } from 'aws-sdk'; + +import { configService, Sqs } from '../config/env.config'; +import { Logger } from '../config/logger.config'; + +const logger = new Logger('SQS'); + +let sqs: SQS; + +export const initSQS = () => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + return new Promise((resolve, reject) => { + const awsConfig = configService.get('SQS'); + sqs = new SQS({ + accessKeyId: awsConfig.ACCESS_KEY_ID, + secretAccessKey: awsConfig.SECRET_ACCESS_KEY, + region: awsConfig.REGION, + }); + + logger.info('SQS initialized'); + resolve(); + }); +}; + +export const getSQS = (): SQS => { + return sqs; +}; + +export const initQueues = (instanceName: string, events: string[]) => { + if (!events || !events.length) return; + + const queues = events.map((event) => { + return `${event.replace(/_/g, '_').toLowerCase()}`; + }); + + const sqs = getSQS(); + + queues.forEach((event) => { + const queueName = `${instanceName}_${event}.fifo`; + + sqs.createQueue( + { + QueueName: queueName, + Attributes: { + FifoQueue: 'true', + }, + }, + (err, data) => { + if (err) { + logger.error(`Error creating queue ${queueName}: ${err.message}`); + } else { + logger.info(`Queue ${queueName} created: ${data.QueueUrl}`); + } + }, + ); + }); +}; + +export const removeQueues = (instanceName: string, events: string[]) => { + if (!events || !events.length) return; + + const sqs = getSQS(); + + const queues = events.map((event) => { + return `${event.replace(/_/g, '_').toLowerCase()}`; + }); + + queues.forEach((event) => { + const queueName = `${instanceName}_${event}.fifo`; + + sqs.getQueueUrl( + { + QueueName: queueName, + }, + (err, data) => { + if (err) { + logger.error(`Error getting queue URL for ${queueName}: ${err.message}`); + } else { + const queueUrl = data.QueueUrl; + + sqs.deleteQueue( + { + QueueUrl: queueUrl, + }, + (deleteErr) => { + if (deleteErr) { + logger.error(`Error deleting queue ${queueName}: ${deleteErr.message}`); + } else { + logger.info(`Queue ${queueName} deleted`); + } + }, + ); + } + }, + ); + }); +}; diff --git a/src/main.ts b/src/main.ts index 3dcd0f2d..52bdd798 100644 --- a/src/main.ts +++ b/src/main.ts @@ -6,13 +6,14 @@ import cors from 'cors'; import express, { json, NextFunction, Request, Response, urlencoded } from 'express'; import { join } from 'path'; -import { Auth, configService, Cors, HttpServer, Rabbitmq, Webhook } from './config/env.config'; +import { Auth, configService, Cors, HttpServer, Rabbitmq, Sqs, Webhook } from './config/env.config'; import { onUnexpectedError } from './config/error.config'; import { Logger } from './config/logger.config'; import { ROOT_DIR } from './config/path.config'; import { swaggerRouter } from './docs/swagger.conf'; import { initAMQP } from './libs/amqp.server'; import { initIO } from './libs/socket.server'; +import { initSQS } from './libs/sqs.server'; import { ServerUP } from './utils/server-up'; import { HttpStatus, router } from './whatsapp/routers/index.router'; import { waMonitor } from './whatsapp/whatsapp.module'; @@ -128,6 +129,8 @@ function bootstrap() { if (configService.get('RABBITMQ')?.ENABLED) initAMQP(); + if (configService.get('SQS')?.ENABLED) initSQS(); + onUnexpectedError(); } diff --git a/src/validate/validate.schema.ts b/src/validate/validate.schema.ts index b8a4c0ad..9781e18c 100644 --- a/src/validate/validate.schema.ts +++ b/src/validate/validate.schema.ts @@ -987,6 +987,49 @@ export const rabbitmqSchema: JSONSchema7 = { ...isNotEmpty('enabled'), }; +export const sqsSchema: JSONSchema7 = { + $id: v4(), + type: 'object', + properties: { + enabled: { type: 'boolean', enum: [true, false] }, + events: { + type: 'array', + minItems: 0, + items: { + type: 'string', + enum: [ + 'APPLICATION_STARTUP', + 'QRCODE_UPDATED', + 'MESSAGES_SET', + 'MESSAGES_UPSERT', + 'MESSAGES_UPDATE', + 'MESSAGES_DELETE', + 'SEND_MESSAGE', + 'CONTACTS_SET', + 'CONTACTS_UPSERT', + 'CONTACTS_UPDATE', + 'PRESENCE_UPDATE', + 'CHATS_SET', + 'CHATS_UPSERT', + 'CHATS_UPDATE', + 'CHATS_DELETE', + 'GROUPS_UPSERT', + 'GROUP_UPDATE', + 'GROUP_PARTICIPANTS_UPDATE', + 'CONNECTION_UPDATE', + 'CALL', + 'NEW_JWT_TOKEN', + 'TYPEBOT_START', + 'TYPEBOT_CHANGE_STATUS', + 'CHAMA_AI_ACTION', + ], + }, + }, + }, + required: ['enabled'], + ...isNotEmpty('enabled'), +}; + export const typebotSchema: JSONSchema7 = { $id: v4(), type: 'object', diff --git a/src/whatsapp/controllers/instance.controller.ts b/src/whatsapp/controllers/instance.controller.ts index d26c64cd..5409a7dd 100644 --- a/src/whatsapp/controllers/instance.controller.ts +++ b/src/whatsapp/controllers/instance.controller.ts @@ -13,6 +13,7 @@ import { ChatwootService } from '../services/chatwoot.service'; import { WAMonitoringService } from '../services/monitor.service'; import { RabbitmqService } from '../services/rabbitmq.service'; import { SettingsService } from '../services/settings.service'; +import { SqsService } from '../services/sqs.service'; import { TypebotService } from '../services/typebot.service'; import { WebhookService } from '../services/webhook.service'; import { WebsocketService } from '../services/websocket.service'; @@ -31,6 +32,7 @@ export class InstanceController { private readonly settingsService: SettingsService, private readonly websocketService: WebsocketService, private readonly rabbitmqService: RabbitmqService, + private readonly sqsService: SqsService, private readonly typebotService: TypebotService, private readonly cache: RedisCache, ) {} @@ -62,6 +64,8 @@ export class InstanceController { websocket_events, rabbitmq_enabled, rabbitmq_events, + sqs_enabled, + sqs_events, typebot_url, typebot, typebot_expire, @@ -243,6 +247,53 @@ export class InstanceController { } } + let sqsEvents: string[]; + + if (sqs_enabled) { + this.logger.verbose('creating sqs'); + try { + let newEvents: string[] = []; + if (sqs_events.length === 0) { + newEvents = [ + 'APPLICATION_STARTUP', + 'QRCODE_UPDATED', + 'MESSAGES_SET', + 'MESSAGES_UPSERT', + 'MESSAGES_UPDATE', + 'MESSAGES_DELETE', + 'SEND_MESSAGE', + 'CONTACTS_SET', + 'CONTACTS_UPSERT', + 'CONTACTS_UPDATE', + 'PRESENCE_UPDATE', + 'CHATS_SET', + 'CHATS_UPSERT', + 'CHATS_UPDATE', + 'CHATS_DELETE', + 'GROUPS_UPSERT', + 'GROUP_UPDATE', + 'GROUP_PARTICIPANTS_UPDATE', + 'CONNECTION_UPDATE', + 'CALL', + 'NEW_JWT_TOKEN', + 'TYPEBOT_START', + 'TYPEBOT_CHANGE_STATUS', + 'CHAMA_AI_ACTION', + ]; + } else { + newEvents = sqs_events; + } + this.sqsService.create(instance, { + enabled: true, + events: newEvents, + }); + + sqsEvents = (await this.sqsService.find(instance)).events; + } catch (error) { + this.logger.log(error); + } + } + if (typebot_url) { try { if (!isURL(typebot_url, { require_tld: false })) { @@ -310,6 +361,10 @@ export class InstanceController { enabled: rabbitmq_enabled, events: rabbitmqEvents, }, + sqs: { + enabled: sqs_enabled, + events: sqsEvents, + }, typebot: { enabled: typebot_url ? true : false, url: typebot_url, @@ -404,6 +459,10 @@ export class InstanceController { enabled: rabbitmq_enabled, events: rabbitmqEvents, }, + sqs: { + enabled: sqs_enabled, + events: sqsEvents, + }, typebot: { enabled: typebot_url ? true : false, url: typebot_url, diff --git a/src/whatsapp/controllers/sqs.controller.ts b/src/whatsapp/controllers/sqs.controller.ts new file mode 100644 index 00000000..063e29ed --- /dev/null +++ b/src/whatsapp/controllers/sqs.controller.ts @@ -0,0 +1,56 @@ +import { Logger } from '../../config/logger.config'; +import { InstanceDto } from '../dto/instance.dto'; +import { SqsDto } from '../dto/sqs.dto'; +import { SqsService } from '../services/sqs.service'; + +const logger = new Logger('SqsController'); + +export class SqsController { + constructor(private readonly sqsService: SqsService) {} + + public async createSqs(instance: InstanceDto, data: SqsDto) { + logger.verbose('requested createSqs from ' + instance.instanceName + ' instance'); + + if (!data.enabled) { + logger.verbose('sqs disabled'); + data.events = []; + } + + if (data.events.length === 0) { + logger.verbose('sqs events empty'); + data.events = [ + 'APPLICATION_STARTUP', + 'QRCODE_UPDATED', + 'MESSAGES_SET', + 'MESSAGES_UPSERT', + 'MESSAGES_UPDATE', + 'MESSAGES_DELETE', + 'SEND_MESSAGE', + 'CONTACTS_SET', + 'CONTACTS_UPSERT', + 'CONTACTS_UPDATE', + 'PRESENCE_UPDATE', + 'CHATS_SET', + 'CHATS_UPSERT', + 'CHATS_UPDATE', + 'CHATS_DELETE', + 'GROUPS_UPSERT', + 'GROUP_UPDATE', + 'GROUP_PARTICIPANTS_UPDATE', + 'CONNECTION_UPDATE', + 'CALL', + 'NEW_JWT_TOKEN', + 'TYPEBOT_START', + 'TYPEBOT_CHANGE_STATUS', + 'CHAMA_AI_ACTION', + ]; + } + + return this.sqsService.create(instance, data); + } + + public async findSqs(instance: InstanceDto) { + logger.verbose('requested findSqs from ' + instance.instanceName + ' instance'); + return this.sqsService.find(instance); + } +} diff --git a/src/whatsapp/dto/instance.dto.ts b/src/whatsapp/dto/instance.dto.ts index 700fa099..84e2b1de 100644 --- a/src/whatsapp/dto/instance.dto.ts +++ b/src/whatsapp/dto/instance.dto.ts @@ -23,6 +23,8 @@ export class InstanceDto { websocket_events?: string[]; rabbitmq_enabled?: boolean; rabbitmq_events?: string[]; + sqs_enabled?: boolean; + sqs_events?: string[]; typebot_url?: string; typebot?: string; typebot_expire?: number; diff --git a/src/whatsapp/dto/sqs.dto.ts b/src/whatsapp/dto/sqs.dto.ts new file mode 100644 index 00000000..9b8aeedd --- /dev/null +++ b/src/whatsapp/dto/sqs.dto.ts @@ -0,0 +1,4 @@ +export class SqsDto { + enabled: boolean; + events?: string[]; +} diff --git a/src/whatsapp/models/index.ts b/src/whatsapp/models/index.ts index e79093f9..7903e5b5 100644 --- a/src/whatsapp/models/index.ts +++ b/src/whatsapp/models/index.ts @@ -7,6 +7,7 @@ export * from './message.model'; export * from './proxy.model'; export * from './rabbitmq.model'; export * from './settings.model'; +export * from './sqs.model'; export * from './typebot.model'; export * from './webhook.model'; export * from './websocket.model'; diff --git a/src/whatsapp/models/sqs.model.ts b/src/whatsapp/models/sqs.model.ts new file mode 100644 index 00000000..2d5f432f --- /dev/null +++ b/src/whatsapp/models/sqs.model.ts @@ -0,0 +1,18 @@ +import { Schema } from 'mongoose'; + +import { dbserver } from '../../libs/db.connect'; + +export class SqsRaw { + _id?: string; + enabled?: boolean; + events?: string[]; +} + +const sqsSchema = new Schema({ + _id: { type: String, _id: true }, + enabled: { type: Boolean, required: true }, + events: { type: [String], required: true }, +}); + +export const SqsModel = dbserver?.model(SqsRaw.name, sqsSchema, 'sqs'); +export type ISqsModel = typeof SqsModel; diff --git a/src/whatsapp/repository/repository.manager.ts b/src/whatsapp/repository/repository.manager.ts index 1c16fdef..ab4da1e3 100644 --- a/src/whatsapp/repository/repository.manager.ts +++ b/src/whatsapp/repository/repository.manager.ts @@ -14,6 +14,7 @@ import { MessageUpRepository } from './messageUp.repository'; import { ProxyRepository } from './proxy.repository'; import { RabbitmqRepository } from './rabbitmq.repository'; import { SettingsRepository } from './settings.repository'; +import { SqsRepository } from './sqs.repository'; import { TypebotRepository } from './typebot.repository'; import { WebhookRepository } from './webhook.repository'; import { WebsocketRepository } from './websocket.repository'; @@ -28,6 +29,7 @@ export class RepositoryBroker { public readonly settings: SettingsRepository, public readonly websocket: WebsocketRepository, public readonly rabbitmq: RabbitmqRepository, + public readonly sqs: SqsRepository, public readonly typebot: TypebotRepository, public readonly proxy: ProxyRepository, public readonly chamaai: ChamaaiRepository, @@ -63,6 +65,7 @@ export class RepositoryBroker { const settingsDir = join(storePath, 'settings'); const websocketDir = join(storePath, 'websocket'); const rabbitmqDir = join(storePath, 'rabbitmq'); + const sqsDir = join(storePath, 'sqs'); const typebotDir = join(storePath, 'typebot'); const proxyDir = join(storePath, 'proxy'); const chamaaiDir = join(storePath, 'chamaai'); @@ -108,6 +111,10 @@ export class RepositoryBroker { this.logger.verbose('creating rabbitmq dir: ' + rabbitmqDir); fs.mkdirSync(rabbitmqDir, { recursive: true }); } + if (!fs.existsSync(sqsDir)) { + this.logger.verbose('creating sqs dir: ' + sqsDir); + fs.mkdirSync(sqsDir, { recursive: true }); + } if (!fs.existsSync(typebotDir)) { this.logger.verbose('creating typebot dir: ' + typebotDir); fs.mkdirSync(typebotDir, { recursive: true }); diff --git a/src/whatsapp/repository/sqs.repository.ts b/src/whatsapp/repository/sqs.repository.ts new file mode 100644 index 00000000..50ea1cd3 --- /dev/null +++ b/src/whatsapp/repository/sqs.repository.ts @@ -0,0 +1,62 @@ +import { readFileSync } from 'fs'; +import { join } from 'path'; + +import { ConfigService } from '../../config/env.config'; +import { Logger } from '../../config/logger.config'; +import { IInsert, Repository } from '../abstract/abstract.repository'; +import { ISqsModel, SqsRaw } from '../models'; + +export class SqsRepository extends Repository { + constructor(private readonly sqsModel: ISqsModel, private readonly configService: ConfigService) { + super(configService); + } + + private readonly logger = new Logger('SqsRepository'); + + public async create(data: SqsRaw, instance: string): Promise { + try { + this.logger.verbose('creating sqs'); + if (this.dbSettings.ENABLED) { + this.logger.verbose('saving sqs to db'); + const insert = await this.sqsModel.replaceOne({ _id: instance }, { ...data }, { upsert: true }); + + this.logger.verbose('sqs saved to db: ' + insert.modifiedCount + ' sqs'); + return { insertCount: insert.modifiedCount }; + } + + this.logger.verbose('saving sqs to store'); + + this.writeStore({ + path: join(this.storePath, 'sqs'), + fileName: instance, + data, + }); + + this.logger.verbose('sqs saved to store in path: ' + join(this.storePath, 'sqs') + '/' + instance); + + this.logger.verbose('sqs created'); + return { insertCount: 1 }; + } catch (error) { + return error; + } + } + + public async find(instance: string): Promise { + try { + this.logger.verbose('finding sqs'); + if (this.dbSettings.ENABLED) { + this.logger.verbose('finding sqs in db'); + return await this.sqsModel.findOne({ _id: instance }); + } + + this.logger.verbose('finding sqs in store'); + return JSON.parse( + readFileSync(join(this.storePath, 'sqs', instance + '.json'), { + encoding: 'utf-8', + }), + ) as SqsRaw; + } catch (error) { + return {}; + } + } +} diff --git a/src/whatsapp/routers/index.router.ts b/src/whatsapp/routers/index.router.ts index e35d21e4..fbe28ddd 100644 --- a/src/whatsapp/routers/index.router.ts +++ b/src/whatsapp/routers/index.router.ts @@ -13,6 +13,7 @@ import { ProxyRouter } from './proxy.router'; import { RabbitmqRouter } from './rabbitmq.router'; import { MessageRouter } from './sendMessage.router'; import { SettingsRouter } from './settings.router'; +import { SqsRouter } from './sqs.router'; import { TypebotRouter } from './typebot.router'; import { ViewsRouter } from './view.router'; import { WebhookRouter } from './webhook.router'; @@ -53,6 +54,7 @@ router .use('/settings', new SettingsRouter(...guards).router) .use('/websocket', new WebsocketRouter(...guards).router) .use('/rabbitmq', new RabbitmqRouter(...guards).router) + .use('/sqs', new SqsRouter(...guards).router) .use('/typebot', new TypebotRouter(...guards).router) .use('/proxy', new ProxyRouter(...guards).router) .use('/chamaai', new ChamaaiRouter(...guards).router); diff --git a/src/whatsapp/routers/sqs.router.ts b/src/whatsapp/routers/sqs.router.ts new file mode 100644 index 00000000..e1bf8e93 --- /dev/null +++ b/src/whatsapp/routers/sqs.router.ts @@ -0,0 +1,52 @@ +import { RequestHandler, Router } from 'express'; + +import { Logger } from '../../config/logger.config'; +import { instanceNameSchema, sqsSchema } from '../../validate/validate.schema'; +import { RouterBroker } from '../abstract/abstract.router'; +import { InstanceDto } from '../dto/instance.dto'; +import { SqsDto } from '../dto/sqs.dto'; +import { sqsController } from '../whatsapp.module'; +import { HttpStatus } from './index.router'; + +const logger = new Logger('SqsRouter'); + +export class SqsRouter extends RouterBroker { + constructor(...guards: RequestHandler[]) { + super(); + this.router + .post(this.routerPath('set'), ...guards, async (req, res) => { + logger.verbose('request received in setSqs'); + logger.verbose('request body: '); + logger.verbose(req.body); + + logger.verbose('request query: '); + logger.verbose(req.query); + const response = await this.dataValidate({ + request: req, + schema: sqsSchema, + ClassRef: SqsDto, + execute: (instance, data) => sqsController.createSqs(instance, data), + }); + + res.status(HttpStatus.CREATED).json(response); + }) + .get(this.routerPath('find'), ...guards, async (req, res) => { + logger.verbose('request received in findSqs'); + logger.verbose('request body: '); + logger.verbose(req.body); + + logger.verbose('request query: '); + logger.verbose(req.query); + const response = await this.dataValidate({ + request: req, + schema: instanceNameSchema, + ClassRef: InstanceDto, + execute: (instance) => sqsController.findSqs(instance), + }); + + res.status(HttpStatus.OK).json(response); + }); + } + + public readonly router = Router(); +} diff --git a/src/whatsapp/services/sqs.service.ts b/src/whatsapp/services/sqs.service.ts new file mode 100644 index 00000000..236d4ceb --- /dev/null +++ b/src/whatsapp/services/sqs.service.ts @@ -0,0 +1,35 @@ +import { Logger } from '../../config/logger.config'; +import { initQueues } from '../../libs/sqs.server'; +import { InstanceDto } from '../dto/instance.dto'; +import { SqsDto } from '../dto/sqs.dto'; +import { SqsRaw } from '../models'; +import { WAMonitoringService } from './monitor.service'; + +export class SqsService { + constructor(private readonly waMonitor: WAMonitoringService) {} + + private readonly logger = new Logger(SqsService.name); + + public create(instance: InstanceDto, data: SqsDto) { + this.logger.verbose('create sqs: ' + instance.instanceName); + this.waMonitor.waInstances[instance.instanceName].setSqs(data); + + initQueues(instance.instanceName, data.events); + return { sqs: { ...instance, sqs: data } }; + } + + public async find(instance: InstanceDto): Promise { + try { + this.logger.verbose('find sqs: ' + instance.instanceName); + const result = await this.waMonitor.waInstances[instance.instanceName].findSqs(); + + if (Object.keys(result).length === 0) { + throw new Error('Sqs not found'); + } + + return result; + } catch (error) { + return { enabled: false, events: [] }; + } + } +} diff --git a/src/whatsapp/services/whatsapp.service.ts b/src/whatsapp/services/whatsapp.service.ts index d1b5a62f..47a3bc50 100644 --- a/src/whatsapp/services/whatsapp.service.ts +++ b/src/whatsapp/services/whatsapp.service.ts @@ -60,6 +60,7 @@ import { Log, QrCode, Redis, + Sqs, Webhook, Websocket, } from '../../config/env.config'; @@ -70,6 +71,7 @@ import { getAMQP, removeQueues } from '../../libs/amqp.server'; import { dbserver } from '../../libs/db.connect'; import { RedisCache } from '../../libs/redis.client'; import { getIO } from '../../libs/socket.server'; +import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server'; import { useMultiFileAuthStateDb } from '../../utils/use-multi-file-auth-state-db'; import { useMultiFileAuthStateRedisDb } from '../../utils/use-multi-file-auth-state-redis-db'; import { @@ -113,7 +115,7 @@ import { SendTextDto, StatusMessage, } from '../dto/sendMessage.dto'; -import { ChamaaiRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, TypebotRaw } from '../models'; +import { ChamaaiRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, SqsRaw, TypebotRaw } from '../models'; import { ChatRaw } from '../models/chat.model'; import { ChatwootRaw } from '../models/chatwoot.model'; import { ContactRaw } from '../models/contact.model'; @@ -149,6 +151,7 @@ export class WAStartupService { private readonly localSettings: wa.LocalSettings = {}; private readonly localWebsocket: wa.LocalWebsocket = {}; private readonly localRabbitmq: wa.LocalRabbitmq = {}; + private readonly localSqs: wa.LocalSqs = {}; public readonly localTypebot: wa.LocalTypebot = {}; private readonly localProxy: wa.LocalProxy = {}; private readonly localChamaai: wa.LocalChamaai = {}; @@ -504,6 +507,48 @@ export class WAStartupService { } } + private async loadSqs() { + this.logger.verbose('Loading sqs'); + const data = await this.repository.sqs.find(this.instanceName); + + this.localSqs.enabled = data?.enabled; + this.logger.verbose(`Sqs enabled: ${this.localSqs.enabled}`); + + this.localSqs.events = data?.events; + this.logger.verbose(`Sqs events: ${this.localSqs.events}`); + + this.logger.verbose('Sqs loaded'); + } + + public async setSqs(data: SqsRaw) { + this.logger.verbose('Setting sqs'); + await this.repository.sqs.create(data, this.instanceName); + this.logger.verbose(`Sqs events: ${data.events}`); + Object.assign(this.localSqs, data); + this.logger.verbose('Sqs set'); + } + + public async findSqs() { + this.logger.verbose('Finding sqs'); + const data = await this.repository.sqs.find(this.instanceName); + + if (!data) { + this.logger.verbose('Sqs not found'); + throw new NotFoundException('Sqs not found'); + } + + this.logger.verbose(`Sqs events: ${data.events}`); + return data; + } + + public async removeSqsQueues() { + this.logger.verbose('Removing sqs'); + + if (this.localSqs.enabled) { + removeQueuesSQS(this.instanceName, this.localSqs.events); + } + } + private async loadTypebot() { this.logger.verbose('Loading typebot'); const data = await this.repository.typebot.find(this.instanceName); @@ -648,6 +693,7 @@ export class WAStartupService { const webhookLocal = this.localWebhook.events; const websocketLocal = this.localWebsocket.events; const rabbitmqLocal = this.localRabbitmq.events; + const sqsLocal = this.localSqs.events; const serverUrl = this.configService.get('SERVER').URL; const we = event.replace(/[.-]/gm, '_').toUpperCase(); const transformedWe = we.replace(/_/gm, '-').toLowerCase(); @@ -720,6 +766,76 @@ export class WAStartupService { } } + if (this.localSqs.enabled) { + const sqs = getSQS(); + + if (sqs) { + if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) { + const eventFormatted = `${event.replace('.', '_').toLowerCase()}`; + + const queueName = `${this.instanceName}_${eventFormatted}.fifo`; + + const sqsConfig = this.configService.get('SQS'); + + const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`; + + const message = { + event, + instance: this.instance.name, + data, + server_url: serverUrl, + date_time: now, + sender: this.wuid, + }; + + if (expose && instanceApikey) { + message['apikey'] = instanceApikey; + } + + const params = { + MessageBody: JSON.stringify(message), + MessageGroupId: 'evolution', + MessageDeduplicationId: `${this.instanceName}_${eventFormatted}_${Date.now()}`, + QueueUrl: sqsUrl, + }; + + sqs.sendMessage(params, (err, data) => { + if (err) { + this.logger.error({ + local: WAStartupService.name + '.sendData-SQS', + message: err?.message, + hostName: err?.hostname, + code: err?.code, + stack: err?.stack, + name: err?.name, + url: queueName, + server_url: serverUrl, + }); + } else { + if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { + const logData = { + local: WAStartupService.name + '.sendData-SQS', + event, + instance: this.instance.name, + data, + server_url: serverUrl, + apikey: (expose && instanceApikey) || null, + date_time: now, + sender: this.wuid, + }; + + if (expose && instanceApikey) { + logData['apikey'] = instanceApikey; + } + + this.logger.log(logData); + } + } + }); + } + } + } + if (this.configService.get('WEBSOCKET')?.ENABLED && this.localWebsocket.enabled) { this.logger.verbose('Sending data to websocket on channel: ' + this.instance.name); if (Array.isArray(websocketLocal) && websocketLocal.includes(we)) { @@ -1165,6 +1281,7 @@ export class WAStartupService { this.loadSettings(); this.loadWebsocket(); this.loadRabbitmq(); + this.loadSqs(); this.loadTypebot(); this.loadProxy(); this.loadChamaai(); @@ -1520,7 +1637,6 @@ export class WAStartupService { this.logger.verbose('Event received: messages.upsert'); const received = messages[0]; - console.log(received, type); if ( type !== 'notify' || !received?.message || diff --git a/src/whatsapp/types/wa.types.ts b/src/whatsapp/types/wa.types.ts index 9f326c8a..27582001 100644 --- a/src/whatsapp/types/wa.types.ts +++ b/src/whatsapp/types/wa.types.ts @@ -84,6 +84,11 @@ export declare namespace wa { events?: string[]; }; + export type LocalSqs = { + enabled?: boolean; + events?: string[]; + }; + type Session = { remoteJid?: string; sessionId?: string; diff --git a/src/whatsapp/whatsapp.module.ts b/src/whatsapp/whatsapp.module.ts index a37e98ef..7211f8e6 100644 --- a/src/whatsapp/whatsapp.module.ts +++ b/src/whatsapp/whatsapp.module.ts @@ -12,6 +12,7 @@ import { ProxyController } from './controllers/proxy.controller'; import { RabbitmqController } from './controllers/rabbitmq.controller'; import { SendMessageController } from './controllers/sendMessage.controller'; import { SettingsController } from './controllers/settings.controller'; +import { SqsController } from './controllers/sqs.controller'; import { TypebotController } from './controllers/typebot.controller'; import { ViewsController } from './controllers/views.controller'; import { WebhookController } from './controllers/webhook.controller'; @@ -27,6 +28,7 @@ import { ProxyModel, RabbitmqModel, SettingsModel, + SqsModel, TypebotModel, WebhookModel, WebsocketModel, @@ -42,6 +44,7 @@ import { ProxyRepository } from './repository/proxy.repository'; import { RabbitmqRepository } from './repository/rabbitmq.repository'; import { RepositoryBroker } from './repository/repository.manager'; import { SettingsRepository } from './repository/settings.repository'; +import { SqsRepository } from './repository/sqs.repository'; import { TypebotRepository } from './repository/typebot.repository'; import { WebhookRepository } from './repository/webhook.repository'; import { WebsocketRepository } from './repository/websocket.repository'; @@ -52,6 +55,7 @@ import { WAMonitoringService } from './services/monitor.service'; import { ProxyService } from './services/proxy.service'; import { RabbitmqService } from './services/rabbitmq.service'; import { SettingsService } from './services/settings.service'; +import { SqsService } from './services/sqs.service'; import { TypebotService } from './services/typebot.service'; import { WebhookService } from './services/webhook.service'; import { WebsocketService } from './services/websocket.service'; @@ -68,6 +72,7 @@ const websocketRepository = new WebsocketRepository(WebsocketModel, configServic const proxyRepository = new ProxyRepository(ProxyModel, configService); const chamaaiRepository = new ChamaaiRepository(ChamaaiModel, configService); const rabbitmqRepository = new RabbitmqRepository(RabbitmqModel, configService); +const sqsRepository = new SqsRepository(SqsModel, configService); const chatwootRepository = new ChatwootRepository(ChatwootModel, configService); const settingsRepository = new SettingsRepository(SettingsModel, configService); const authRepository = new AuthRepository(AuthModel, configService); @@ -82,6 +87,7 @@ export const repository = new RepositoryBroker( settingsRepository, websocketRepository, rabbitmqRepository, + sqsRepository, typebotRepository, proxyRepository, chamaaiRepository, @@ -120,6 +126,10 @@ const rabbitmqService = new RabbitmqService(waMonitor); export const rabbitmqController = new RabbitmqController(rabbitmqService); +const sqsService = new SqsService(waMonitor); + +export const sqsController = new SqsController(sqsService); + const chatwootService = new ChatwootService(waMonitor, configService); export const chatwootController = new ChatwootController(chatwootService, configService); @@ -139,6 +149,7 @@ export const instanceController = new InstanceController( settingsService, websocketService, rabbitmqService, + sqsService, typebotService, cache, );