From ac6e9ae9949a5a04b1dd2561c64a0234d1a6a05c Mon Sep 17 00:00:00 2001 From: Judson Cairo Date: Mon, 4 Mar 2024 20:24:15 -0300 Subject: [PATCH] Improvements in RabbitMQ, added 2 new modes --- Docker/.env.example | 3 +- Dockerfile | 3 +- package.json | 1 + src/config/env.config.ts | 6 +- src/dev-env.yml | 3 +- src/libs/amqp.server.ts | 184 +++++++++++++----- .../services/whatsapp.baileys.service.ts | 3 +- 7 files changed, 148 insertions(+), 55 deletions(-) diff --git a/Docker/.env.example b/Docker/.env.example index ca2dc377..6c3865ed 100644 --- a/Docker/.env.example +++ b/Docker/.env.example @@ -47,7 +47,8 @@ REDIS_URI=redis://redis:6379 REDIS_PREFIX_KEY=evdocker RABBITMQ_ENABLED=false -RABBITMQ_GLOBAL_EVENT_QUEUE=false +RABBITMQ_RABBITMQ_MODE=isolated +RABBITMQ_EXCHANGE_NAME=evolution_exchange RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672 WEBSOCKET_ENABLED=false diff --git a/Dockerfile b/Dockerfile index 0c6752aa..2752c8ce 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,7 +62,8 @@ ENV REDIS_URI=redis://redis:6379 ENV REDIS_PREFIX_KEY=evolution ENV RABBITMQ_ENABLED=false -ENV RABBITMQ_GLOBAL_EVENT_QUEUE=false +ENV RABBITMQ_MODE=isolated +ENV RABBITMQ_EXCHANGE_NAME=evolution_exchange ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672 ENV WEBSOCKET_ENABLED=false diff --git a/package.json b/package.json index ef023149..1df6df41 100644 --- a/package.json +++ b/package.json @@ -91,6 +91,7 @@ "yamljs": "^0.3.0" }, "devDependencies": { + "@types/amqplib": "^0.10.5", "@types/compression": "^1.7.2", "@types/cors": "^2.8.13", "@types/express": "^4.17.17", diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 95a3e7f8..c8e23944 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -71,7 +71,8 @@ export type Redis = { export type Rabbitmq = { ENABLED: boolean; - GLOBAL_EVENT_QUEUE: boolean; + MODE: 'isolated' | 'global' | 'single'; + EXCHANGE_NAME: string; // available for global and single, isolated mode will use instance name as exchange URI: string; }; @@ -283,7 +284,8 @@ export class ConfigService { }, RABBITMQ: { ENABLED: process.env?.RABBITMQ_ENABLED === 'true', - GLOBAL_EVENT_QUEUE: process.env?.RABBITMQ_GLOBAL_EVENT_QUEUE === 'true', + MODE: (process.env?.RABBITMQ_MODE as Rabbitmq['MODE']) || 'single', + EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange', URI: process.env.RABBITMQ_URI || '', }, SQS: { diff --git a/src/dev-env.yml b/src/dev-env.yml index 02b9fa54..1edecc07 100644 --- a/src/dev-env.yml +++ b/src/dev-env.yml @@ -83,7 +83,8 @@ REDIS: RABBITMQ: ENABLED: false - GLOBAL_EVENT_QUEUE: false + MODE: "global" + EXCHANGE_NAME: "evolution_exchange" URI: "amqp://guest:guest@localhost:5672" SQS: diff --git a/src/libs/amqp.server.ts b/src/libs/amqp.server.ts index 866cf0c4..093a4aaa 100644 --- a/src/libs/amqp.server.ts +++ b/src/libs/amqp.server.ts @@ -1,16 +1,35 @@ -import * as amqp from 'amqplib/callback_api'; +import { Channel, connect } from 'amqplib/callback_api'; import { configService, HttpServer, Rabbitmq } from '../config/env.config'; import { Logger } from '../config/logger.config'; +import { Events } from '../whatsapp/types/wa.types'; const logger = new Logger('AMQP'); -let amqpChannel: amqp.Channel | null = null; +const parseEvtName = (evt: string) => evt.replace(/_/g, '.').toLowerCase(); + +const globalQueues: { [key: string]: Events[] } = { + contacts: [Events.CONTACTS_SET, Events.CONTACTS_UPDATE, Events.CONTACTS_UPSERT], + messages: [ + Events.MESSAGES_DELETE, + Events.MESSAGES_SET, + Events.MESSAGES_UPDATE, + Events.MESSAGES_UPSERT, + Events.MESSAGING_HISTORY_SET, + Events.SEND_MESSAGE, + ], + chats: [Events.CHATS_DELETE, Events.CHATS_SET, Events.CHATS_UPDATE, Events.CHATS_UPSERT], + groups: [Events.GROUPS_UPDATE, Events.GROUPS_UPSERT, Events.GROUP_PARTICIPANTS_UPDATE], + others: [], // All other events not included in the above categories +}; + +let amqpChannel: Channel | null = null; export const initAMQP = () => { return new Promise((resolve, reject) => { const rabbitConfig = configService.get('RABBITMQ'); - amqp.connect(rabbitConfig.URI, (error, connection) => { + console.log(rabbitConfig); + connect(rabbitConfig.URI, (error, connection) => { if (error) { reject(error); return; @@ -22,12 +41,9 @@ export const initAMQP = () => { return; } - const exchangeName = 'evolution_exchange'; - - channel.assertExchange(exchangeName, 'topic', { + channel.assertExchange(rabbitConfig.EXCHANGE_NAME, 'topic', { durable: true, autoDelete: false, - assert: true, }); amqpChannel = channel; @@ -39,69 +55,131 @@ export const initAMQP = () => { }); }; -export const getAMQP = (): amqp.Channel | null => { +export const getAMQP = (): Channel | null => { return amqpChannel; }; export const initQueues = (instanceName: string, events: string[]) => { if (!instanceName || !events || !events.length) return; const rabbitConfig = configService.get('RABBITMQ'); + const TWO_DAYS_IN_MS = 2 * 24 * 60 * 60 * 1000; + const amqp = getAMQP(); - const queues = events.map((event) => { - return `${event.replace(/_/g, '.').toLowerCase()}`; - }); + let exchangeName = rabbitConfig.EXCHANGE_NAME; - queues.forEach((event) => { - const amqp = getAMQP(); - const exchangeName = instanceName ?? 'evolution_exchange'; + const receivedEvents = events.map(parseEvtName); + if (rabbitConfig.MODE === 'isolated') { + exchangeName = instanceName; + receivedEvents.forEach((event) => { + amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + const queueName = event; + amqp.assertQueue(queueName, { + durable: true, + autoDelete: false, + messageTtl: TWO_DAYS_IN_MS, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + amqp.bindQueue(queueName, exchangeName, event); + }); + } else if (rabbitConfig.MODE === 'single') { amqp.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, - assert: true, }); - const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`; - + const queueName = 'evolution'; amqp.assertQueue(queueName, { durable: true, autoDelete: false, + messageTtl: TWO_DAYS_IN_MS, arguments: { 'x-queue-type': 'quorum', }, }); - amqp.bindQueue(queueName, exchangeName, event); - }); + receivedEvents.forEach((event) => { + amqp.bindQueue(queueName, exchangeName, event); + }); + } else if (rabbitConfig.MODE === 'global') { + const queues = Object.keys(globalQueues); + + const addQueues = queues.filter((evt) => { + if (evt === 'others') { + return receivedEvents.some( + (e) => + !Object.values(globalQueues) + .flat() + .includes(e as Events), + ); + } + return globalQueues[evt].some((e) => receivedEvents.includes(e)); + }); + + addQueues.forEach((event) => { + amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + const queueName = event; + amqp.assertQueue(queueName, { + durable: true, + autoDelete: false, + messageTtl: TWO_DAYS_IN_MS, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + if (globalQueues[event].length === 0) { + // Other events + const otherEvents = Object.values(globalQueues).flat(); + for (const subEvent in Events) { + const eventCode = Events[subEvent]; + if (otherEvents.includes(eventCode)) continue; + if (!receivedEvents.includes(eventCode)) continue; + amqp.bindQueue(queueName, exchangeName, eventCode); + } + } else { + globalQueues[event].forEach((subEvent) => { + amqp.bindQueue(queueName, exchangeName, subEvent); + }); + } + }); + } else { + throw new Error('Invalid RabbitMQ mode'); + } }; export const removeQueues = (instanceName: string, events: string[]) => { if (!events || !events.length) return; const rabbitConfig = configService.get('RABBITMQ'); + let exchangeName = rabbitConfig.EXCHANGE_NAME; + const amqp = getAMQP(); - const channel = getAMQP(); + const receivedEvents = events.map(parseEvtName); + if (rabbitConfig.MODE === 'isolated') { + exchangeName = instanceName; + receivedEvents.forEach((event) => { + amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - const queues = events.map((event) => { - return `${event.replace(/_/g, '.').toLowerCase()}`; - }); + const queueName = event; - const exchangeName = instanceName ?? 'evolution_exchange'; - - queues.forEach((event) => { - const amqp = getAMQP(); - - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - assert: true, + amqp.unbindQueue(queueName, exchangeName, event); }); - - const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`; - - amqp.deleteQueue(queueName); - }); - - channel.deleteExchange(exchangeName); + amqp.deleteExchange(instanceName); + } }; interface SendEventData { @@ -113,30 +191,40 @@ interface SendEventData { } export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => { - const exchangeName = instanceName ?? 'evolution_exchange'; + const rabbitConfig = configService.get('RABBITMQ'); + let exchangeName = rabbitConfig.EXCHANGE_NAME; + if (rabbitConfig.MODE === 'isolated') exchangeName = instanceName; amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, - assert: true, }); - - const rabbitConfig = configService.get('RABBITMQ'); - const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`; - + let queueName = event; + if (rabbitConfig.MODE === 'single') { + queueName = 'evolution'; + } else if (rabbitConfig.MODE === 'global') { + let eventName = ''; + Object.keys(globalQueues).forEach((key) => { + if (globalQueues[key].includes(event as Events)) { + eventName = key; + } + if (eventName === '' && key === 'others') { + eventName = key; + } + }); + queueName = eventName; + } amqpChannel.assertQueue(queueName, { durable: true, autoDelete: false, arguments: { 'x-queue-type': 'quorum' }, }); - amqpChannel.bindQueue(queueName, exchangeName, event); const serverUrl = configService.get('SERVER').URL; const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds const localISOTime = new Date(Date.now() - tzoffset).toISOString(); const now = localISOTime; - const message = { event, instance: instanceName, @@ -145,11 +233,9 @@ export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendE date_time: now, sender: wuid, }; - if (apiKey) { message['apikey'] = apiKey; } - logger.log({ queueName, exchangeName, diff --git a/src/whatsapp/services/whatsapp.baileys.service.ts b/src/whatsapp/services/whatsapp.baileys.service.ts index 1ed58471..4e3a4960 100644 --- a/src/whatsapp/services/whatsapp.baileys.service.ts +++ b/src/whatsapp/services/whatsapp.baileys.service.ts @@ -1124,7 +1124,8 @@ export class BaileysStartupService extends WAStartupService { 5: 'PLAYED', }; for await (const { key, update } of args) { - if (settings?.groups_ignore && key.remoteJid.includes('@g.us')) { + console.log(key); + if (settings?.groups_ignore && key.remoteJid?.includes('@g.us')) { this.logger.verbose('group ignored'); return; }