From cca929e7fc8f2b916dc0a4a9f2f4d677598fb450 Mon Sep 17 00:00:00 2001 From: Marlon Alves Date: Mon, 1 Sep 2025 19:18:55 -0300 Subject: [PATCH] feat/add global SQS mode with single-queue-per-event and payload size control --- .env.example | 30 +++ .../integrations/event/sqs/sqs.controller.ts | 204 +++++++++++------- src/config/env.config.ts | 66 ++++++ 3 files changed, 221 insertions(+), 79 deletions(-) diff --git a/.env.example b/.env.example index 679d15f6..3a863279 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,4 @@ +SERVER_NAME=evolution SERVER_TYPE=http SERVER_PORT=8080 # Server URL - Set your application url @@ -96,6 +97,35 @@ SQS_SECRET_ACCESS_KEY= SQS_ACCOUNT_ID= SQS_REGION= +SQS_GLOBAL_ENABLED=false +SQS_GLOBAL_APPLICATION_STARTUP=false +SQS_GLOBAL_CALL=false +SQS_GLOBAL_CHATS_DELETE=false +SQS_GLOBAL_CHATS_SET=false +SQS_GLOBAL_CHATS_UPDATE=false +SQS_GLOBAL_CHATS_UPSERT=false +SQS_GLOBAL_CONNECTION_UPDATE=false +SQS_GLOBAL_CONTACTS_SET=false +SQS_GLOBAL_CONTACTS_UPDATE=false +SQS_GLOBAL_CONTACTS_UPSERT=false +SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE=false +SQS_GLOBAL_GROUP_UPDATE=false +SQS_GLOBAL_GROUPS_UPSERT=false +SQS_GLOBAL_LABELS_ASSOCIATION=false +SQS_GLOBAL_LABELS_EDIT=false +SQS_GLOBAL_LOGOUT_INSTANCE=false +SQS_GLOBAL_MESSAGES_DELETE=false +SQS_GLOBAL_MESSAGES_EDITED=false +SQS_GLOBAL_MESSAGES_SET=false +SQS_GLOBAL_MESSAGES_UPDATE=false +SQS_GLOBAL_MESSAGES_UPSERT=false +SQS_GLOBAL_PRESENCE_UPDATE=false +SQS_GLOBAL_QRCODE_UPDATED=false +SQS_GLOBAL_REMOVE_INSTANCE=false +SQS_GLOBAL_SEND_MESSAGE=false +SQS_GLOBAL_TYPEBOT_CHANGE_STATUS=false +SQS_GLOBAL_TYPEBOT_START=false + # Websocket - Environment variables WEBSOCKET_ENABLED=false WEBSOCKET_GLOBAL_EVENTS=false diff --git a/src/api/integrations/event/sqs/sqs.controller.ts b/src/api/integrations/event/sqs/sqs.controller.ts index 05bf618b..d570d33c 100644 --- a/src/api/integrations/event/sqs/sqs.controller.ts +++ b/src/api/integrations/event/sqs/sqs.controller.ts @@ -1,7 +1,8 @@ +import * as s3Service from '@api/integrations/storage/s3/libs/minio.server'; import { PrismaRepository } from '@api/repository/repository.service'; import { WAMonitoringService } from '@api/services/monitor.service'; import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs'; -import { configService, Log, Sqs } from '@config/env.config'; +import { configService, HttpServer, Log, S3, Sqs } from '@config/env.config'; import { Logger } from '@config/logger.config'; import { EmitData, EventController, EventControllerInterface } from '../event.controller'; @@ -15,27 +16,29 @@ export class SqsController extends EventController implements EventControllerInt super(prismaRepository, waMonitor, configService.get('SQS')?.ENABLED, 'sqs'); } - public init(): void { + public async init(): Promise { if (!this.status) { return; } - new Promise((resolve) => { - const awsConfig = configService.get('SQS'); + const awsConfig = configService.get('SQS'); - this.sqs = new SQS({ - credentials: { - accessKeyId: awsConfig.ACCESS_KEY_ID, - secretAccessKey: awsConfig.SECRET_ACCESS_KEY, - }, + this.sqs = new SQS({ + credentials: { + accessKeyId: awsConfig.ACCESS_KEY_ID, + secretAccessKey: awsConfig.SECRET_ACCESS_KEY, + }, - region: awsConfig.REGION, - }); - - this.logger.info('SQS initialized'); - - resolve(); + region: awsConfig.REGION, }); + + this.logger.info('SQS initialized'); + + const sqsConfig = configService.get('SQS'); + if (this.sqs && sqsConfig.GLOBAL_ENABLED) { + const sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]); + await this.saveQueues(sqsConfig.GLOBAL_PREFIX_NAME, sqsEvents, true); + } } private set channel(sqs: SQS) { @@ -47,7 +50,7 @@ export class SqsController extends EventController implements EventControllerInt } override async set(instanceName: string, data: EventDto): Promise { - if (!this.status) { + if (!this.status || configService.get('SQS').GLOBAL_ENABLED) { return; } @@ -75,6 +78,7 @@ export class SqsController extends EventController implements EventControllerInt instanceId: this.monitor.waInstances[instanceName].instanceId, }, }; + console.log('*** payload: ', payload); return this.prisma[this.name].upsert(payload); } @@ -98,66 +102,104 @@ export class SqsController extends EventController implements EventControllerInt return; } - const instanceSqs = await this.get(instanceName); - const sqsLocal = instanceSqs?.events; - const we = event.replace(/[.-]/gm, '_').toUpperCase(); + if (this.sqs) { + const sqsConfig = configService.get('SQS'); - if (instanceSqs?.enabled) { - if (this.sqs) { - if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) { - const eventFormatted = `${event.replace('.', '_').toLowerCase()}`; - const queueName = `${instanceName}_${eventFormatted}.fifo`; - const sqsConfig = configService.get('SQS'); - const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`; + const we = event.replace(/[.-]/gm, '_').toUpperCase(); - const message = { - event, - instance: instanceName, - data, - server_url: serverUrl, - date_time: dateTime, - sender, - apikey: apiKey, - }; - - const params = { - MessageBody: JSON.stringify(message), - MessageGroupId: 'evolution', - MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`, - QueueUrl: sqsUrl, - }; - - this.sqs.sendMessage(params, (err) => { - if (err) { - this.logger.error({ - local: `${origin}.sendData-SQS`, - message: err?.message, - hostName: err?.hostname, - code: err?.code, - stack: err?.stack, - name: err?.name, - url: queueName, - server_url: serverUrl, - }); - } else { - if (configService.get('LOG').LEVEL.includes('WEBHOOKS')) { - const logData = { - local: `${origin}.sendData-SQS`, - ...message, - }; - - this.logger.log(logData); - } - } - }); + let sqsEvents = []; + if (sqsConfig.GLOBAL_ENABLED) { + sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]); + } else { + const instanceSqs = await this.get(instanceName); + if (instanceSqs?.enabled && Array.isArray(instanceSqs?.events)) { + sqsEvents = instanceSqs?.events; } } + + if (Array.isArray(sqsEvents) && sqsEvents.includes(we)) { + const eventFormatted = `${event.replace('.', '_').toLowerCase()}`; + const prefixName = sqsConfig.GLOBAL_ENABLED ? sqsConfig.GLOBAL_PREFIX_NAME : instanceName; + const queueName = `${prefixName}_${eventFormatted}.fifo`; + + const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`; + + const message = { + event, + instance: instanceName, + dataType: 'json', + data, + server: configService.get('SERVER').NAME, + server_url: serverUrl, + date_time: dateTime, + sender, + apikey: apiKey, + }; + + const jsonStr = JSON.stringify(message); + const size = Buffer.byteLength(jsonStr, 'utf8'); + if (size > sqsConfig.MAX_PAYLOAD_SIZE) { + if (!configService.get('S3').ENABLE) { + this.logger.error( + `${instanceName} - ${eventFormatted} - SQS ignored: payload (${size} bytes) exceeds SQS size limit (${sqsConfig.MAX_PAYLOAD_SIZE} bytes) and S3 storage is not enabled.`, + ); + return; + } + + const buffer = Buffer.from(jsonStr, 'utf8'); + const fullName = `messages/${instanceName}_${eventFormatted}_${Date.now()}.json`; + + await s3Service.uploadFile(fullName, buffer, size, { + 'Content-Type': 'application/json', + 'Cache-Control': 'no-store', + }); + + const fileUrl = await s3Service.getObjectUrl(fullName); + + message.data = { fileUrl }; + message.dataType = 's3'; + } + + const isGlobalEnabled = configService.get('SQS').GLOBAL_ENABLED; + const params = { + MessageBody: JSON.stringify(message), + MessageGroupId: 'evolution', + QueueUrl: sqsUrl, + ...(!isGlobalEnabled && { + MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`, + }), + }; + + this.sqs.sendMessage(params, (err) => { + if (err) { + this.logger.error({ + local: `${origin}.sendData-SQS`, + params: JSON.stringify(message), + sqsUrl: sqsUrl, + message: err?.message, + hostName: err?.hostname, + code: err?.code, + stack: err?.stack, + name: err?.name, + url: queueName, + server_url: serverUrl, + }); + } else if (configService.get('LOG').LEVEL.includes('WEBHOOKS')) { + const logData = { + local: `${origin}.sendData-SQS`, + ...message, + }; + + this.logger.log(logData); + } + }); + } } } - private async saveQueues(instanceName: string, events: string[], enable: boolean) { + private async saveQueues(prefixName: string, events: string[], enable: boolean) { if (enable) { - const eventsFinded = await this.listQueuesByInstance(instanceName); + const eventsFinded = await this.listQueues(prefixName); console.log('eventsFinded', eventsFinded); for (const event of events) { @@ -168,15 +210,17 @@ export class SqsController extends EventController implements EventControllerInt continue; } - const queueName = `${instanceName}_${normalizedEvent}.fifo`; - + const queueName = `${prefixName}_${normalizedEvent}.fifo`; try { + const isGlobalEnabled = configService.get('SQS').GLOBAL_ENABLED; const createCommand = new CreateQueueCommand({ QueueName: queueName, Attributes: { FifoQueue: 'true', + ...(isGlobalEnabled && { ContentBasedDeduplication: 'true' }), }, }); + const data = await this.sqs.send(createCommand); this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`); } catch (err: any) { @@ -186,12 +230,14 @@ export class SqsController extends EventController implements EventControllerInt } } - private async listQueuesByInstance(instanceName: string) { + private async listQueues(prefixName: string) { let existingQueues: string[] = []; + try { const listCommand = new ListQueuesCommand({ - QueueNamePrefix: `${instanceName}_`, + QueueNamePrefix: `${prefixName}_`, }); + const listData = await this.sqs.send(listCommand); if (listData.QueueUrls && listData.QueueUrls.length > 0) { // Extrai o nome da fila a partir da URL @@ -201,7 +247,7 @@ export class SqsController extends EventController implements EventControllerInt }); } } catch (error: any) { - this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`); + this.logger.error(`Erro ao listar filas para ${prefixName}: ${error.message}`); return; } @@ -209,8 +255,8 @@ export class SqsController extends EventController implements EventControllerInt return existingQueues .map((queueName) => { // Espera-se que o nome seja `${instanceName}_${event}.fifo` - if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) { - return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase(); + if (queueName.startsWith(`${prefixName}_`) && queueName.endsWith('.fifo')) { + return queueName.substring(prefixName.length + 1, queueName.length - 5).toLowerCase(); } return ''; }) @@ -218,15 +264,15 @@ export class SqsController extends EventController implements EventControllerInt } // Para uma futura feature de exclusão forçada das queues - private async removeQueuesByInstance(instanceName: string) { + private async removeQueuesByInstance(prefixName: string) { try { const listCommand = new ListQueuesCommand({ - QueueNamePrefix: `${instanceName}_`, + QueueNamePrefix: `${prefixName}_`, }); const listData = await this.sqs.send(listCommand); if (!listData.QueueUrls || listData.QueueUrls.length === 0) { - this.logger.info(`No queues found for instance ${instanceName}`); + this.logger.info(`No queues found for ${prefixName}`); return; } @@ -240,7 +286,7 @@ export class SqsController extends EventController implements EventControllerInt } } } catch (err: any) { - this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`); + this.logger.error(`Error listing queues for ${prefixName}: ${err.message}`); } } } diff --git a/src/config/env.config.ts b/src/config/env.config.ts index c59acd38..dcf2a0ea 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -4,6 +4,7 @@ import dotenv from 'dotenv'; dotenv.config(); export type HttpServer = { + NAME: string; TYPE: 'http' | 'https'; PORT: number; URL: string; @@ -113,10 +114,42 @@ export type Nats = { export type Sqs = { ENABLED: boolean; + GLOBAL_ENABLED: boolean; + GLOBAL_PREFIX_NAME: string; ACCESS_KEY_ID: string; SECRET_ACCESS_KEY: string; ACCOUNT_ID: string; REGION: string; + MAX_PAYLOAD_SIZE: number; + EVENTS: { + APPLICATION_STARTUP: boolean; + CALL: boolean; + CHATS_DELETE: boolean; + CHATS_SET: boolean; + CHATS_UPDATE: boolean; + CHATS_UPSERT: boolean; + CONNECTION_UPDATE: boolean; + CONTACTS_SET: boolean; + CONTACTS_UPDATE: boolean; + CONTACTS_UPSERT: boolean; + GROUP_PARTICIPANTS_UPDATE: boolean; + GROUP_UPDATE: boolean; + GROUPS_UPSERT: boolean; + LABELS_ASSOCIATION: boolean; + LABELS_EDIT: boolean; + LOGOUT_INSTANCE: boolean; + MESSAGES_DELETE: boolean; + MESSAGES_EDITED: boolean; + MESSAGES_SET: boolean; + MESSAGES_UPDATE: boolean; + MESSAGES_UPSERT: boolean; + PRESENCE_UPDATE: boolean; + QRCODE_UPDATED: boolean; + REMOVE_INSTANCE: boolean; + SEND_MESSAGE: boolean; + TYPEBOT_CHANGE_STATUS: boolean; + TYPEBOT_START: boolean; + }; }; export type Websocket = { @@ -344,6 +377,7 @@ export class ConfigService { private envProcess(): Env { return { SERVER: { + NAME: process.env?.SERVER_NAME || 'evolution', TYPE: (process.env.SERVER_TYPE as 'http' | 'https') || 'http', PORT: Number.parseInt(process.env.SERVER_PORT) || 8080, URL: process.env.SERVER_URL, @@ -465,10 +499,42 @@ export class ConfigService { }, SQS: { ENABLED: process.env?.SQS_ENABLED === 'true', + GLOBAL_ENABLED: process.env?.SQS_GLOBAL_ENABLED === 'true', + GLOBAL_PREFIX_NAME: process.env?.SQS_GLOBAL_PREFIX_NAME || 'global', ACCESS_KEY_ID: process.env.SQS_ACCESS_KEY_ID || '', SECRET_ACCESS_KEY: process.env.SQS_SECRET_ACCESS_KEY || '', ACCOUNT_ID: process.env.SQS_ACCOUNT_ID || '', REGION: process.env.SQS_REGION || '', + MAX_PAYLOAD_SIZE: Number.parseInt(process.env.SQS_MAX_PAYLOAD_SIZE ?? '1048576'), + EVENTS: { + APPLICATION_STARTUP: process.env?.SQS_GLOBAL_APPLICATION_STARTUP === 'true', + CALL: process.env?.SQS_GLOBAL_CALL === 'true', + CHATS_DELETE: process.env?.SQS_GLOBAL_CHATS_DELETE === 'true', + CHATS_SET: process.env?.SQS_GLOBAL_CHATS_SET === 'true', + CHATS_UPDATE: process.env?.SQS_GLOBAL_CHATS_UPDATE === 'true', + CHATS_UPSERT: process.env?.SQS_GLOBAL_CHATS_UPSERT === 'true', + CONNECTION_UPDATE: process.env?.SQS_GLOBAL_CONNECTION_UPDATE === 'true', + CONTACTS_SET: process.env?.SQS_GLOBAL_CONTACTS_SET === 'true', + CONTACTS_UPDATE: process.env?.SQS_GLOBAL_CONTACTS_UPDATE === 'true', + CONTACTS_UPSERT: process.env?.SQS_GLOBAL_CONTACTS_UPSERT === 'true', + GROUP_PARTICIPANTS_UPDATE: process.env?.SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE === 'true', + GROUP_UPDATE: process.env?.SQS_GLOBAL_GROUP_UPDATE === 'true', + GROUPS_UPSERT: process.env?.SQS_GLOBAL_GROUPS_UPSERT === 'true', + LABELS_ASSOCIATION: process.env?.SQS_GLOBAL_LABELS_ASSOCIATION === 'true', + LABELS_EDIT: process.env?.SQS_GLOBAL_LABELS_EDIT === 'true', + LOGOUT_INSTANCE: process.env?.SQS_GLOBAL_LOGOUT_INSTANCE === 'true', + MESSAGES_DELETE: process.env?.SQS_GLOBAL_MESSAGES_DELETE === 'true', + MESSAGES_EDITED: process.env?.SQS_GLOBAL_MESSAGES_EDITED === 'true', + MESSAGES_SET: process.env?.SQS_GLOBAL_MESSAGES_SET === 'true', + MESSAGES_UPDATE: process.env?.SQS_GLOBAL_MESSAGES_UPDATE === 'true', + MESSAGES_UPSERT: process.env?.SQS_GLOBAL_MESSAGES_UPSERT === 'true', + PRESENCE_UPDATE: process.env?.SQS_GLOBAL_PRESENCE_UPDATE === 'true', + QRCODE_UPDATED: process.env?.SQS_GLOBAL_QRCODE_UPDATED === 'true', + REMOVE_INSTANCE: process.env?.SQS_GLOBAL_REMOVE_INSTANCE === 'true', + SEND_MESSAGE: process.env?.SQS_GLOBAL_SEND_MESSAGE === 'true', + TYPEBOT_CHANGE_STATUS: process.env?.SQS_GLOBAL_TYPEBOT_CHANGE_STATUS === 'true', + TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true' + }, }, WEBSOCKET: { ENABLED: process.env?.WEBSOCKET_ENABLED === 'true',