diff --git a/CHANGELOG.md b/CHANGELOG.md index 66a2879b..35705cd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +# 1.8.6 (develop) + +### Fixed + +* Retry and Reconnect system in rabbitmq integration + +### Feature + +* RabbitMQ optimization with parameterized settings via environment variables (MESSAGE_TTL, MAX_LENGTH and MAX_LENGTH_BYTES) +* Non-persistent messages to reduce disk usage +* Automatic cleanup of expired messages in queues + # 1.8.5 (2025-02-03 12:32) ### Fixed diff --git a/package.json b/package.json index ffbeeda3..895a8fab 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "evolution-api", - "version": "1.8.5", + "version": "1.8.6", "description": "Rest api for communication with WhatsApp", "main": "./dist/src/main.js", "scripts": { diff --git a/src/api/integrations/rabbitmq/libs/amqp.server.ts b/src/api/integrations/rabbitmq/libs/amqp.server.ts index 5ad888ec..7dfc4113 100644 --- a/src/api/integrations/rabbitmq/libs/amqp.server.ts +++ b/src/api/integrations/rabbitmq/libs/amqp.server.ts @@ -127,8 +127,12 @@ export const getAMQP = (): amqp.Channel | null => { export const initGlobalQueues = () => { logger.info('Initializing global queues'); - const events = configService.get('RABBITMQ').EVENTS; - const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; + const rabbitmqConfig = configService.get('RABBITMQ'); + const events = rabbitmqConfig.EVENTS; + const prefixKey = rabbitmqConfig.PREFIX_KEY; + const messageTtl = rabbitmqConfig.MESSAGE_TTL; + const maxLength = rabbitmqConfig.MAX_LENGTH; + const maxLengthBytes = rabbitmqConfig.MAX_LENGTH_BYTES; if (!events) { logger.warn('No events to initialize on AMQP'); @@ -160,6 +164,10 @@ export const initGlobalQueues = () => { autoDelete: false, arguments: { 'x-queue-type': 'quorum', + 'x-message-ttl': messageTtl, + 'x-max-length': maxLength, + 'x-max-length-bytes': maxLengthBytes, + 'x-overflow': 'reject-publish', }, }); @@ -172,6 +180,11 @@ export const initQueues = (instanceName: string, events: string[]) => { return; } + const rabbitmqConfig = configService.get('RABBITMQ'); + const messageTtl = rabbitmqConfig.MESSAGE_TTL; + const maxLength = rabbitmqConfig.MAX_LENGTH; + const maxLengthBytes = rabbitmqConfig.MAX_LENGTH_BYTES; + const queues = events.map((event) => { return `${event.replace(/_/g, '.').toLowerCase()}`; }); @@ -192,6 +205,10 @@ export const initQueues = (instanceName: string, events: string[]) => { autoDelete: false, arguments: { 'x-queue-type': 'quorum', + 'x-message-ttl': messageTtl, + 'x-max-length': maxLength, + 'x-max-length-bytes': maxLengthBytes, + 'x-overflow': 'reject-publish', }, }); diff --git a/src/api/models/message.model.ts b/src/api/models/message.model.ts index 95cb5513..6fb14f23 100644 --- a/src/api/models/message.model.ts +++ b/src/api/models/message.model.ts @@ -1,3 +1,4 @@ +import Long from 'long'; import { Schema } from 'mongoose'; import { dbserver } from '../../libs/db.connect'; @@ -25,7 +26,7 @@ export class MessageRaw { participant?: string; message?: object; messageType?: string; - messageTimestamp?: number | Long.Long; + messageTimestamp?: number | Long; owner: string; source?: 'android' | 'web' | 'ios' | 'unknown' | 'desktop'; source_id?: string; diff --git a/src/api/services/channel.service.ts b/src/api/services/channel.service.ts index 9343e812..702b8b81 100644 --- a/src/api/services/channel.service.ts +++ b/src/api/services/channel.service.ts @@ -781,7 +781,10 @@ export class ChannelStartupService { message['apikey'] = instanceApikey; } - await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)), { + persistent: false, + expiration: this.configService.get('RABBITMQ').MESSAGE_TTL.toString(), + }); if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { const logData = { @@ -849,7 +852,10 @@ export class ChannelStartupService { message['apikey'] = instanceApikey; } - await amqp.publish(exchangeName, queueName, Buffer.from(JSON.stringify(message))); + await amqp.publish(exchangeName, queueName, Buffer.from(JSON.stringify(message)), { + persistent: false, + expiration: this.configService.get('RABBITMQ').MESSAGE_TTL.toString(), + }); if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { const logData = { diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 659858fd..b7bccdb9 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -106,6 +106,9 @@ export type Rabbitmq = { EXCHANGE_NAME: string; PREFIX_KEY?: string; GLOBAL_ENABLED: boolean; + MESSAGE_TTL: number; + MAX_LENGTH: number; + MAX_LENGTH_BYTES: number; EVENTS: EventsRabbitmq; }; @@ -326,6 +329,9 @@ export class ConfigService { EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange', PREFIX_KEY: process.env?.RABBITMQ_PREFIX_KEY || '', URI: process.env.RABBITMQ_URI || '', + MESSAGE_TTL: Number.parseInt(process.env?.RABBITMQ_MESSAGE_TTL) || 604800, + MAX_LENGTH: Number.parseInt(process.env?.RABBITMQ_MAX_LENGTH) || 10000, + MAX_LENGTH_BYTES: Number.parseInt(process.env?.RABBITMQ_MAX_LENGTH_BYTES) || 8192, EVENTS: { APPLICATION_STARTUP: process.env?.RABBITMQ_EVENTS_APPLICATION_STARTUP === 'true', INSTANCE_CREATE: process.env?.RABBITMQ_EVENTS_INSTANCE_CREATE === 'true', diff --git a/src/dev-env.yml b/src/dev-env.yml index 03d5e84c..3630702a 100644 --- a/src/dev-env.yml +++ b/src/dev-env.yml @@ -91,6 +91,12 @@ RABBITMQ: EXCHANGE_NAME: evolution_exchange PREFIX_KEY: evolution GLOBAL_ENABLED: true + # Tempo de vida das mensagens: 1 hora em milissegundos (3600000 = 60 * 60 * 1000) + MESSAGE_TTL: 3600000 + # Limite máximo de mensagens por fila (quando atingido, novas mensagens são rejeitadas) + MAX_LENGTH: 1000 + # Tamanho máximo em bytes permitido para filas: 10MB (10485760 = 10 * 1024 * 1024) + MAX_LENGTH_BYTES: 10485760 EVENTS: APPLICATION_STARTUP: false INSTANCE_CREATE: false