diff --git a/src/libs/amqp.server.ts b/src/libs/amqp.server.ts index 6b850624..fc95b33c 100644 --- a/src/libs/amqp.server.ts +++ b/src/libs/amqp.server.ts @@ -1,34 +1,16 @@ -import { Channel, connect } from 'amqplib/callback_api'; +import * as amqp from 'amqplib/callback_api'; -import { configService, HttpServer, Rabbitmq } from '../config/env.config'; +import { configService, Rabbitmq } from '../config/env.config'; import { Logger } from '../config/logger.config'; -import { Events } from '../whatsapp/types/wa.types'; const logger = new Logger('AMQP'); -const parseEvtName = (evt: string) => evt.replace(/_/g, '.').toLowerCase(); - -const globalQueues: { [key: string]: Events[] } = { - contacts: [Events.CONTACTS_SET, Events.CONTACTS_UPDATE, Events.CONTACTS_UPSERT], - messages: [ - Events.MESSAGES_DELETE, - Events.MESSAGES_SET, - Events.MESSAGES_UPDATE, - Events.MESSAGES_UPSERT, - Events.MESSAGING_HISTORY_SET, - Events.SEND_MESSAGE, - ], - chats: [Events.CHATS_DELETE, Events.CHATS_SET, Events.CHATS_UPDATE, Events.CHATS_UPSERT], - groups: [Events.GROUPS_UPDATE, Events.GROUPS_UPSERT, Events.GROUP_PARTICIPANTS_UPDATE], - others: [], // All other events not included in the above categories -}; - -let amqpChannel: Channel | null = null; +let amqpChannel: amqp.Channel | null = null; export const initAMQP = () => { return new Promise((resolve, reject) => { - const rabbitConfig = configService.get('RABBITMQ'); - connect(rabbitConfig.URI, (error, connection) => { + const uri = configService.get('RABBITMQ').URI; + amqp.connect(uri, (error, connection) => { if (error) { reject(error); return; @@ -40,7 +22,7 @@ export const initAMQP = () => { return; } - const exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange'; + const exchangeName = 'evolution_exchange'; channel.assertExchange(exchangeName, 'topic', { durable: true, @@ -56,46 +38,28 @@ export const initAMQP = () => { }); }; -export const getAMQP = (): Channel | null => { +export const getAMQP = (): amqp.Channel | null => { return amqpChannel; }; export const initQueues = (instanceName: string, events: string[]) => { - if (!instanceName || !events || !events.length) return; - const rabbitConfig = configService.get('RABBITMQ'); - const amqp = getAMQP(); + if (!events || !events.length) return; - const rabbitMode = rabbitConfig.MODE || 'isolated'; - let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange'; + const queues = events.map((event) => { + return `${event.replace(/_/g, '.').toLowerCase()}`; + }); - const receivedEvents = events.map(parseEvtName); - if (rabbitMode === 'isolated') { - exchangeName = instanceName ?? 'evolution_exchange'; + queues.forEach((event) => { + const amqp = getAMQP(); + const exchangeName = instanceName ?? 'evolution_exchange'; amqp.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, }); - receivedEvents.forEach((event) => { - const queueName = `${instanceName}.${event}`; - amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); + const queueName = `${instanceName}.${event}`; - amqp.bindQueue(queueName, exchangeName, event); - }); - } else if (rabbitMode === 'single') { - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const queueName = 'evolution'; amqp.assertQueue(queueName, { durable: true, autoDelete: false, @@ -104,157 +68,33 @@ export const initQueues = (instanceName: string, events: string[]) => { }, }); - receivedEvents.forEach((event) => { - amqp.bindQueue(queueName, exchangeName, event); - }); - } else if (rabbitMode === 'global') { - const queues = Object.keys(globalQueues); - - const addQueues = queues.filter((evt) => { - if (evt === 'others') { - return receivedEvents.some( - (e) => - !Object.values(globalQueues) - .flat() - .includes(e as Events), - ); - } - return globalQueues[evt].some((e) => receivedEvents.includes(e)); - }); - - addQueues.forEach((event) => { - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const queueName = event; - amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - if (globalQueues[event].length === 0) { - // Other events - const otherEvents = Object.values(globalQueues).flat(); - for (const subEvent in Events) { - const eventCode = Events[subEvent]; - if (otherEvents.includes(eventCode)) continue; - if (!receivedEvents.includes(eventCode)) continue; - amqp.bindQueue(queueName, exchangeName, eventCode); - } - } else { - globalQueues[event].forEach((subEvent) => { - amqp.bindQueue(queueName, exchangeName, subEvent); - }); - } - }); - } else { - throw new Error('Invalid RabbitMQ mode'); - } + amqp.bindQueue(queueName, exchangeName, event); + }); }; export const removeQueues = (instanceName: string, events: string[]) => { if (!events || !events.length) return; - const rabbitConfig = configService.get('RABBITMQ'); - const rabbitMode = rabbitConfig.MODE || 'isolated'; - let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange'; - const amqp = getAMQP(); + const channel = getAMQP(); - const receivedEvents = events.map(parseEvtName); - if (rabbitMode === 'isolated') { - exchangeName = instanceName; - receivedEvents.forEach((event) => { - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + const queues = events.map((event) => { + return `${event.replace(/_/g, '.').toLowerCase()}`; + }); - const queueName = `${instanceName}.${event}`; - amqp.deleteQueue(queueName); + const exchangeName = instanceName ?? 'evolution_exchange'; + + queues.forEach((event) => { + const amqp = getAMQP(); + + amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, }); - amqp.deleteExchange(instanceName); - } -}; - -interface SendEventData { - instanceName: string; - wuid: string; - event: string; - apiKey?: string; - data: any; -} - -export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => { - const rabbitConfig = configService.get('RABBITMQ'); - const rabbitMode = rabbitConfig.MODE || 'isolated'; - let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange'; - if (rabbitMode === 'isolated') exchangeName = instanceName ?? 'evolution_exchange'; - - console.log('exchangeName: ', exchangeName); - console.log('rabbitMode: ', rabbitMode); - - amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - let queueName = event; - - if (rabbitMode === 'single') { - queueName = 'evolution'; - } else if (rabbitMode === 'global') { - let eventName = ''; - - Object.keys(globalQueues).forEach((key) => { - if (globalQueues[key].includes(event as Events)) { - eventName = key; - } - if (eventName === '' && key === 'others') { - eventName = key; - } - }); - queueName = eventName; - } else if (rabbitMode === 'isolated') { - queueName = `${instanceName}.${event}`; - } - - amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { 'x-queue-type': 'quorum' }, - }); - - console.log('envia na fila: ', queueName, exchangeName, event); - - amqpChannel.bindQueue(queueName, exchangeName, event); - - const serverUrl = configService.get('SERVER').URL; - const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds - const localISOTime = new Date(Date.now() - tzoffset).toISOString(); - const now = localISOTime; - - const message = { - event, - instance: instanceName, - data, - server_url: serverUrl, - date_time: now, - sender: wuid, - }; - - if (apiKey) { - message['apikey'] = apiKey; - } - - logger.log({ - queueName, - exchangeName, - event, - }); - amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + + const queueName = `${instanceName}.${event}`; + + amqp.deleteQueue(queueName); + }); + + channel.deleteExchange(exchangeName); }; diff --git a/src/whatsapp/services/whatsapp.service.ts b/src/whatsapp/services/whatsapp.service.ts index 140d34e0..29c58f6c 100644 --- a/src/whatsapp/services/whatsapp.service.ts +++ b/src/whatsapp/services/whatsapp.service.ts @@ -20,7 +20,7 @@ import { import { Logger } from '../../config/logger.config'; import { ROOT_DIR } from '../../config/path.config'; import { NotFoundException } from '../../exceptions'; -import { getAMQP, removeQueues, sendEventData } from '../../libs/amqp.server'; +import { getAMQP, removeQueues } from '../../libs/amqp.server'; import { getIO } from '../../libs/socket.server'; import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server'; import { ChamaaiRaw, IntegrationRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, SqsRaw, TypebotRaw } from '../models'; @@ -685,15 +685,40 @@ export class WAStartupService { if (amqp) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { - console.log('envia na fila: ', we); - sendEventData({ - data, - event, - instanceName: this.instanceName, - wuid: this.wuid, - apiKey: expose && instanceApikey ? instanceApikey : undefined, + const exchangeName = this.instanceName ?? 'evolution_exchange'; + + amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, }); + const queueName = `${this.instanceName}.${event}`; + + amqp.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + amqp.bindQueue(queueName, exchangeName, event); + + const message = { + event, + instance: this.instance.name, + data, + server_url: serverUrl, + date_time: now, + sender: this.wuid, + }; + + if (expose && instanceApikey) { + message['apikey'] = instanceApikey; + } + + amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { const logData = { local: WAStartupService.name + '.sendData-RabbitMQ',