feat: Updates RabbitMQ with new optimizations and configurations

- Implements Retry and Reconnect system in the integration with RabbitMQ.
- Adds optimizations with parameterized configurations via environment variables (MESSAGE_TTL, MAX_LENGTH and MAX_LENGTH_BYTES).
- Introduces non-persistent messages to reduce disk usage and automatic cleaning of expired messages in queues.
- Updates the version to 1.8.6 in package.json and CHANGELOG.md.
This commit is contained in:
Davidson Gomes 2025-04-30 15:44:12 -03:00
parent 960efcecd5
commit 86c603b3a1
7 changed files with 54 additions and 6 deletions

View File

@ -1,3 +1,15 @@
# 1.8.6 (develop)
### Fixed
* Retry and Reconnect system in rabbitmq integration
### Feature
* RabbitMQ optimization with parameterized settings via environment variables (MESSAGE_TTL, MAX_LENGTH and MAX_LENGTH_BYTES)
* Non-persistent messages to reduce disk usage
* Automatic cleanup of expired messages in queues
# 1.8.5 (2025-02-03 12:32)
### Fixed

View File

@ -1,6 +1,6 @@
{
"name": "evolution-api",
"version": "1.8.5",
"version": "1.8.6",
"description": "Rest api for communication with WhatsApp",
"main": "./dist/src/main.js",
"scripts": {

View File

@ -127,8 +127,12 @@ export const getAMQP = (): amqp.Channel | null => {
export const initGlobalQueues = () => {
logger.info('Initializing global queues');
const events = configService.get<Rabbitmq>('RABBITMQ').EVENTS;
const prefixKey = configService.get<Rabbitmq>('RABBITMQ').PREFIX_KEY;
const rabbitmqConfig = configService.get<Rabbitmq>('RABBITMQ');
const events = rabbitmqConfig.EVENTS;
const prefixKey = rabbitmqConfig.PREFIX_KEY;
const messageTtl = rabbitmqConfig.MESSAGE_TTL;
const maxLength = rabbitmqConfig.MAX_LENGTH;
const maxLengthBytes = rabbitmqConfig.MAX_LENGTH_BYTES;
if (!events) {
logger.warn('No events to initialize on AMQP');
@ -160,6 +164,10 @@ export const initGlobalQueues = () => {
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
'x-message-ttl': messageTtl,
'x-max-length': maxLength,
'x-max-length-bytes': maxLengthBytes,
'x-overflow': 'reject-publish',
},
});
@ -172,6 +180,11 @@ export const initQueues = (instanceName: string, events: string[]) => {
return;
}
const rabbitmqConfig = configService.get<Rabbitmq>('RABBITMQ');
const messageTtl = rabbitmqConfig.MESSAGE_TTL;
const maxLength = rabbitmqConfig.MAX_LENGTH;
const maxLengthBytes = rabbitmqConfig.MAX_LENGTH_BYTES;
const queues = events.map((event) => {
return `${event.replace(/_/g, '.').toLowerCase()}`;
});
@ -192,6 +205,10 @@ export const initQueues = (instanceName: string, events: string[]) => {
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
'x-message-ttl': messageTtl,
'x-max-length': maxLength,
'x-max-length-bytes': maxLengthBytes,
'x-overflow': 'reject-publish',
},
});

View File

@ -1,3 +1,4 @@
import Long from 'long';
import { Schema } from 'mongoose';
import { dbserver } from '../../libs/db.connect';
@ -25,7 +26,7 @@ export class MessageRaw {
participant?: string;
message?: object;
messageType?: string;
messageTimestamp?: number | Long.Long;
messageTimestamp?: number | Long;
owner: string;
source?: 'android' | 'web' | 'ios' | 'unknown' | 'desktop';
source_id?: string;

View File

@ -781,7 +781,10 @@ export class ChannelStartupService {
message['apikey'] = instanceApikey;
}
await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)), {
persistent: false,
expiration: this.configService.get<Rabbitmq>('RABBITMQ').MESSAGE_TTL.toString(),
});
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
@ -849,7 +852,10 @@ export class ChannelStartupService {
message['apikey'] = instanceApikey;
}
await amqp.publish(exchangeName, queueName, Buffer.from(JSON.stringify(message)));
await amqp.publish(exchangeName, queueName, Buffer.from(JSON.stringify(message)), {
persistent: false,
expiration: this.configService.get<Rabbitmq>('RABBITMQ').MESSAGE_TTL.toString(),
});
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {

View File

@ -106,6 +106,9 @@ export type Rabbitmq = {
EXCHANGE_NAME: string;
PREFIX_KEY?: string;
GLOBAL_ENABLED: boolean;
MESSAGE_TTL: number;
MAX_LENGTH: number;
MAX_LENGTH_BYTES: number;
EVENTS: EventsRabbitmq;
};
@ -326,6 +329,9 @@ export class ConfigService {
EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange',
PREFIX_KEY: process.env?.RABBITMQ_PREFIX_KEY || '',
URI: process.env.RABBITMQ_URI || '',
MESSAGE_TTL: Number.parseInt(process.env?.RABBITMQ_MESSAGE_TTL) || 604800,
MAX_LENGTH: Number.parseInt(process.env?.RABBITMQ_MAX_LENGTH) || 10000,
MAX_LENGTH_BYTES: Number.parseInt(process.env?.RABBITMQ_MAX_LENGTH_BYTES) || 8192,
EVENTS: {
APPLICATION_STARTUP: process.env?.RABBITMQ_EVENTS_APPLICATION_STARTUP === 'true',
INSTANCE_CREATE: process.env?.RABBITMQ_EVENTS_INSTANCE_CREATE === 'true',

View File

@ -91,6 +91,12 @@ RABBITMQ:
EXCHANGE_NAME: evolution_exchange
PREFIX_KEY: evolution
GLOBAL_ENABLED: true
# Tempo de vida das mensagens: 1 hora em milissegundos (3600000 = 60 * 60 * 1000)
MESSAGE_TTL: 3600000
# Limite máximo de mensagens por fila (quando atingido, novas mensagens são rejeitadas)
MAX_LENGTH: 1000
# Tamanho máximo em bytes permitido para filas: 10MB (10485760 = 10 * 1024 * 1024)
MAX_LENGTH_BYTES: 10485760
EVENTS:
APPLICATION_STARTUP: false
INSTANCE_CREATE: false