diff --git a/Docker/.env.example b/Docker/.env.example index 5aa74a5c..ca2dc377 100644 --- a/Docker/.env.example +++ b/Docker/.env.example @@ -47,6 +47,7 @@ REDIS_URI=redis://redis:6379 REDIS_PREFIX_KEY=evdocker RABBITMQ_ENABLED=false +RABBITMQ_GLOBAL_EVENT_QUEUE=false RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672 WEBSOCKET_ENABLED=false diff --git a/Dockerfile b/Dockerfile index 7758c484..0c6752aa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,6 +62,7 @@ ENV REDIS_URI=redis://redis:6379 ENV REDIS_PREFIX_KEY=evolution ENV RABBITMQ_ENABLED=false +ENV RABBITMQ_GLOBAL_EVENT_QUEUE=false ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672 ENV WEBSOCKET_ENABLED=false diff --git a/src/config/env.config.ts b/src/config/env.config.ts index ae4f0951..95a3e7f8 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -71,6 +71,7 @@ export type Redis = { export type Rabbitmq = { ENABLED: boolean; + GLOBAL_EVENT_QUEUE: boolean; URI: string; }; @@ -282,6 +283,7 @@ export class ConfigService { }, RABBITMQ: { ENABLED: process.env?.RABBITMQ_ENABLED === 'true', + GLOBAL_EVENT_QUEUE: process.env?.RABBITMQ_GLOBAL_EVENT_QUEUE === 'true', URI: process.env.RABBITMQ_URI || '', }, SQS: { diff --git a/src/dev-env.yml b/src/dev-env.yml index b05176bc..02b9fa54 100644 --- a/src/dev-env.yml +++ b/src/dev-env.yml @@ -83,6 +83,7 @@ REDIS: RABBITMQ: ENABLED: false + GLOBAL_EVENT_QUEUE: false URI: "amqp://guest:guest@localhost:5672" SQS: diff --git a/src/libs/amqp.server.ts b/src/libs/amqp.server.ts index c861916b..866cf0c4 100644 --- a/src/libs/amqp.server.ts +++ b/src/libs/amqp.server.ts @@ -1,6 +1,6 @@ import * as amqp from 'amqplib/callback_api'; -import { configService, Rabbitmq } from '../config/env.config'; +import { configService, HttpServer, Rabbitmq } from '../config/env.config'; import { Logger } from '../config/logger.config'; const logger = new Logger('AMQP'); @@ -9,8 +9,8 @@ let amqpChannel: amqp.Channel | null = null; export const initAMQP = () => { return new Promise((resolve, reject) => { - const uri = configService.get('RABBITMQ').URI; - amqp.connect(uri, (error, connection) => { + const rabbitConfig = configService.get('RABBITMQ'); + amqp.connect(rabbitConfig.URI, (error, connection) => { if (error) { reject(error); return; @@ -45,6 +45,7 @@ export const getAMQP = (): amqp.Channel | null => { export const initQueues = (instanceName: string, events: string[]) => { if (!instanceName || !events || !events.length) return; + const rabbitConfig = configService.get('RABBITMQ'); const queues = events.map((event) => { return `${event.replace(/_/g, '.').toLowerCase()}`; @@ -60,7 +61,7 @@ export const initQueues = (instanceName: string, events: string[]) => { assert: true, }); - const queueName = `${instanceName}.${event}`; + const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`; amqp.assertQueue(queueName, { durable: true, @@ -76,6 +77,7 @@ export const initQueues = (instanceName: string, events: string[]) => { export const removeQueues = (instanceName: string, events: string[]) => { if (!events || !events.length) return; + const rabbitConfig = configService.get('RABBITMQ'); const channel = getAMQP(); @@ -94,10 +96,64 @@ export const removeQueues = (instanceName: string, events: string[]) => { assert: true, }); - const queueName = `${instanceName}.${event}`; + const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`; amqp.deleteQueue(queueName); }); channel.deleteExchange(exchangeName); }; + +interface SendEventData { + instanceName: string; + wuid: string; + event: string; + apiKey?: string; + data: any; +} + +export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => { + const exchangeName = instanceName ?? 'evolution_exchange'; + + amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + assert: true, + }); + + const rabbitConfig = configService.get('RABBITMQ'); + const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`; + + amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { 'x-queue-type': 'quorum' }, + }); + + amqpChannel.bindQueue(queueName, exchangeName, event); + + const serverUrl = configService.get('SERVER').URL; + const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds + const localISOTime = new Date(Date.now() - tzoffset).toISOString(); + const now = localISOTime; + + const message = { + event, + instance: instanceName, + data, + server_url: serverUrl, + date_time: now, + sender: wuid, + }; + + if (apiKey) { + message['apikey'] = apiKey; + } + + logger.log({ + queueName, + exchangeName, + event, + }); + amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); +}; diff --git a/src/whatsapp/services/whatsapp.service.ts b/src/whatsapp/services/whatsapp.service.ts index 5000bcb7..ded8fa27 100644 --- a/src/whatsapp/services/whatsapp.service.ts +++ b/src/whatsapp/services/whatsapp.service.ts @@ -20,7 +20,7 @@ import { import { Logger } from '../../config/logger.config'; import { ROOT_DIR } from '../../config/path.config'; import { NotFoundException } from '../../exceptions'; -import { getAMQP, removeQueues } from '../../libs/amqp.server'; +import { getAMQP, removeQueues, sendEventData } from '../../libs/amqp.server'; import { getIO } from '../../libs/socket.server'; import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server'; import { ChamaaiRaw, IntegrationRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, SqsRaw, TypebotRaw } from '../models'; @@ -685,40 +685,13 @@ export class WAStartupService { if (amqp) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { - const exchangeName = this.instanceName ?? 'evolution_exchange'; - - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - assert: true, - }); - - const queueName = `${this.instanceName}.${event}`; - - amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - amqp.bindQueue(queueName, exchangeName, event); - - const message = { - event, - instance: this.instance.name, + sendEventData({ data, - server_url: serverUrl, - date_time: now, - sender: this.wuid, - }; - - if (expose && instanceApikey) { - message['apikey'] = instanceApikey; - } - - amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + event, + instanceName: this.instanceName, + wuid: this.wuid, + apiKey: expose && instanceApikey ? instanceApikey : undefined, + }); if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { const logData = {