From 72fb2f408b3d8f9de318344803ef283d381566ef Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Thu, 26 Jun 2025 14:25:37 -0300 Subject: [PATCH 1/5] feat(rabbitmq): implement robust connection handling with reconnection logic and error logging --- .../event/rabbitmq/rabbitmq.controller.ts | 217 +++++++++++++++--- 1 file changed, 189 insertions(+), 28 deletions(-) diff --git a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts index 07694f5f..be73b157 100644 --- a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts @@ -8,7 +8,12 @@ import { EmitData, EventController, EventControllerInterface } from '../event.co export class RabbitmqController extends EventController implements EventControllerInterface { public amqpChannel: amqp.Channel | null = null; + private amqpConnection: amqp.Connection | null = null; private readonly logger = new Logger('RabbitmqController'); + private reconnectAttempts = 0; + private maxReconnectAttempts = 10; + private reconnectDelay = 5000; // 5 seconds + private isReconnecting = false; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); @@ -19,7 +24,11 @@ export class RabbitmqController extends EventController implements EventControll return; } - await new Promise((resolve, reject) => { + await this.connect(); + } + + private async connect(): Promise { + return new Promise((resolve, reject) => { const uri = configService.get('RABBITMQ').URI; const frameMax = configService.get('RABBITMQ').FRAME_MAX; const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; @@ -33,22 +42,61 @@ export class RabbitmqController extends EventController implements EventControll password: url.password || 'guest', vhost: url.pathname.slice(1) || '/', frameMax: frameMax, + heartbeat: 30, // Add heartbeat of 30 seconds }; amqp.connect(connectionOptions, (error, connection) => { if (error) { + this.logger.error({ + local: 'RabbitmqController.connect', + message: 'Failed to connect to RabbitMQ', + error: error.message || error, + }); reject(error); - return; } + // Connection event handlers + connection.on('error', (err) => { + this.logger.error({ + local: 'RabbitmqController.connectionError', + message: 'RabbitMQ connection error', + error: err.message || err, + }); + this.handleConnectionLoss(); + }); + + connection.on('close', () => { + this.logger.warn('RabbitMQ connection closed'); + this.handleConnectionLoss(); + }); + connection.createChannel((channelError, channel) => { if (channelError) { + this.logger.error({ + local: 'RabbitmqController.createChannel', + message: 'Failed to create RabbitMQ channel', + error: channelError.message || channelError, + }); reject(channelError); - return; } + // Channel event handlers + channel.on('error', (err) => { + this.logger.error({ + local: 'RabbitmqController.channelError', + message: 'RabbitMQ channel error', + error: err.message || err, + }); + this.handleConnectionLoss(); + }); + + channel.on('close', () => { + this.logger.warn('RabbitMQ channel closed'); + this.handleConnectionLoss(); + }); + const exchangeName = rabbitmqExchangeName; channel.assertExchange(exchangeName, 'topic', { @@ -56,16 +104,81 @@ export class RabbitmqController extends EventController implements EventControll autoDelete: false, }); + this.amqpConnection = connection; this.amqpChannel = channel; + this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection + this.isReconnecting = false; - this.logger.info('AMQP initialized'); + this.logger.info('AMQP initialized successfully'); resolve(); }); }); - }).then(() => { - if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) this.initGlobalQueues(); - }); + }) + .then(() => { + if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { + this.initGlobalQueues(); + } + }) + .catch((error) => { + this.logger.error({ + local: 'RabbitmqController.init', + message: 'Failed to initialize AMQP', + error: error.message || error, + }); + this.scheduleReconnect(); + throw error; + }); + } + + private handleConnectionLoss(): void { + if (this.isReconnecting) { + return; // Already attempting to reconnect + } + + this.amqpChannel = null; + this.amqpConnection = null; + this.scheduleReconnect(); + } + + private scheduleReconnect(): void { + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + this.logger.error( + `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, + ); + return; + } + + if (this.isReconnecting) { + return; // Already scheduled + } + + this.isReconnecting = true; + this.reconnectAttempts++; + + const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); // Exponential backoff with max delay + + this.logger.info( + `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, + ); + + setTimeout(async () => { + try { + this.logger.info( + `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, + ); + await this.connect(); + this.logger.info('Successfully reconnected to RabbitMQ'); + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.scheduleReconnect', + message: `Reconnection attempt ${this.reconnectAttempts} failed`, + error: error.message || error, + }); + this.isReconnecting = false; + this.scheduleReconnect(); + } + }, delay); } private set channel(channel: amqp.Channel) { @@ -76,6 +189,17 @@ export class RabbitmqController extends EventController implements EventControll return this.amqpChannel; } + private async ensureConnection(): Promise { + if (!this.amqpChannel) { + this.logger.warn('AMQP channel is not available, attempting to reconnect...'); + if (!this.isReconnecting) { + this.scheduleReconnect(); + } + return false; + } + return true; + } + public async emit({ instanceName, origin, @@ -95,6 +219,11 @@ export class RabbitmqController extends EventController implements EventControll return; } + if (!(await this.ensureConnection())) { + this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No AMQP connection`); + return; + } + const instanceRabbitmq = await this.get(instanceName); const rabbitmqLocal = instanceRabbitmq?.events; const rabbitmqGlobal = configService.get('RABBITMQ').GLOBAL_ENABLED; @@ -154,7 +283,15 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { + this.logger.error({ + local: 'RabbitmqController.emit', + message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } } } } @@ -199,7 +336,15 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { + this.logger.error({ + local: 'RabbitmqController.emit', + message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } } } } @@ -208,41 +353,57 @@ export class RabbitmqController extends EventController implements EventControll private async initGlobalQueues(): Promise { this.logger.info('Initializing global queues'); + if (!(await this.ensureConnection())) { + this.logger.error('Cannot initialize global queues: No AMQP connection'); + return; + } + const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; const events = configService.get('RABBITMQ').EVENTS; const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; if (!events) { this.logger.warn('No events to initialize on AMQP'); - return; } const eventKeys = Object.keys(events); - eventKeys.forEach((event) => { - if (events[event] === false) return; + for (const event of eventKeys) { + if (events[event] === false) continue; - const queueName = - prefixKey !== '' - ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` - : `${event.replace(/_/g, '.').toLowerCase()}`; - const exchangeName = rabbitmqExchangeName; + try { + const queueName = + prefixKey !== '' + ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` + : `${event.replace(/_/g, '.').toLowerCase()}`; + const exchangeName = rabbitmqExchangeName; - this.amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - this.amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); - this.amqpChannel.bindQueue(queueName, exchangeName, event); - }); + await this.amqpChannel.bindQueue(queueName, exchangeName, event); + + this.logger.info(`Global queue initialized: ${queueName}`); + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.initGlobalQueues', + message: `Failed to initialize global queue for event ${event}`, + error: error.message || error, + }); + this.handleConnectionLoss(); + break; + } + } } } From 918697866f8fa5f8845e74af6fe3c5bc27e8f614 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Fri, 27 Jun 2025 09:59:40 -0300 Subject: [PATCH 2/5] fix(package-lock): update baileys dependency to latest commit hash --- package-lock.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package-lock.json b/package-lock.json index be131e39..3bc5d2af 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5061,7 +5061,7 @@ }, "node_modules/baileys": { "version": "6.7.18", - "resolved": "git+ssh://git@github.com/EvolutionAPI/Baileys.git#210338c74732ffafd31c67727e88d9c5b7dc80c0", + "resolved": "git+ssh://git@github.com/EvolutionAPI/Baileys.git#f1bb8c62bf8750ddf49cda2310f29ff9be0f44b8", "hasInstallScript": true, "license": "MIT", "dependencies": { From 287c679ce4540757cdbb4992c37d5a4f569226bd Mon Sep 17 00:00:00 2001 From: Paulo Ferreira Date: Sat, 28 Jun 2025 09:27:13 -0300 Subject: [PATCH 3/5] (mysql): remove out-of-order wavoipToken migration Delete prisma/mysql-migrations/1707735894523_add_wavoip_token_to_settings_table, which executes before the initial Setting table is created and breaks fresh MySQL installs. The later migration 20250214181954_add_wavoip_token_column, line 145, already adds the column correctly, so keeping only that directory guarantees a clean deploy. --- .../migration.sql | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 prisma/mysql-migrations/1707735894523_add_wavoip_token_to_settings_table/migration.sql diff --git a/prisma/mysql-migrations/1707735894523_add_wavoip_token_to_settings_table/migration.sql b/prisma/mysql-migrations/1707735894523_add_wavoip_token_to_settings_table/migration.sql deleted file mode 100644 index 2b634b17..00000000 --- a/prisma/mysql-migrations/1707735894523_add_wavoip_token_to_settings_table/migration.sql +++ /dev/null @@ -1,9 +0,0 @@ -/* - Warnings: - - - A unique constraint covering the columns `[remoteJid,instanceId]` on the table `Chat` will be added. If there are existing duplicate values, this will fail. -*/ - --- AlterTable -ALTER TABLE `Setting` - ADD COLUMN IF NOT EXISTS `wavoipToken` VARCHAR(100); From 3efe69ada34f37e6017fc5bb9d88f6f677481eea Mon Sep 17 00:00:00 2001 From: Paulo Ferreira Date: Sat, 28 Jun 2025 09:34:52 -0300 Subject: [PATCH 4/5] fix(prisma) Mysql: update data types for N8n, N8nSetting, Evoai, and EvoaiSetting models --- prisma/mysql-schema.prisma | 40 +++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/prisma/mysql-schema.prisma b/prisma/mysql-schema.prisma index 782e4d85..33d09d2b 100644 --- a/prisma/mysql-schema.prisma +++ b/prisma/mysql-schema.prisma @@ -652,16 +652,16 @@ model N8n { webhookUrl String? @db.VarChar(255) basicAuthUser String? @db.VarChar(255) basicAuthPass String? @db.VarChar(255) - expire Int? @default(0) @db.Integer + expire Int? @default(0) @db.Int keywordFinish String? @db.VarChar(100) delayMessage Int? @db.Integer unknownMessage String? @db.VarChar(100) - listeningFromMe Boolean? @default(false) @db.Boolean - stopBotFromMe Boolean? @default(false) @db.Boolean - keepOpen Boolean? @default(false) @db.Boolean + listeningFromMe Boolean? @default(false) + stopBotFromMe Boolean? @default(false) + keepOpen Boolean? @default(false) debounceTime Int? @db.Integer ignoreJids Json? - splitMessages Boolean? @default(false) @db.Boolean + splitMessages Boolean? @default(false) timePerChar Int? @default(50) @db.Integer triggerType TriggerType? triggerOperator TriggerOperator? @@ -675,16 +675,16 @@ model N8n { model N8nSetting { id String @id @default(cuid()) - expire Int? @default(0) @db.Integer + expire Int? @default(0) @db.Int keywordFinish String? @db.VarChar(100) delayMessage Int? @db.Integer unknownMessage String? @db.VarChar(100) - listeningFromMe Boolean? @default(false) @db.Boolean - stopBotFromMe Boolean? @default(false) @db.Boolean - keepOpen Boolean? @default(false) @db.Boolean + listeningFromMe Boolean? @default(false) + stopBotFromMe Boolean? @default(false) + keepOpen Boolean? @default(false) debounceTime Int? @db.Integer ignoreJids Json? - splitMessages Boolean? @default(false) @db.Boolean + splitMessages Boolean? @default(false) timePerChar Int? @default(50) @db.Integer createdAt DateTime? @default(now()) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp @@ -700,16 +700,16 @@ model Evoai { description String? @db.VarChar(255) agentUrl String? @db.VarChar(255) apiKey String? @db.VarChar(255) - expire Int? @default(0) @db.Integer + expire Int? @default(0) @db.Int keywordFinish String? @db.VarChar(100) delayMessage Int? @db.Integer unknownMessage String? @db.VarChar(100) - listeningFromMe Boolean? @default(false) @db.Boolean - stopBotFromMe Boolean? @default(false) @db.Boolean - keepOpen Boolean? @default(false) @db.Boolean + listeningFromMe Boolean? @default(false) + stopBotFromMe Boolean? @default(false) + keepOpen Boolean? @default(false) debounceTime Int? @db.Integer ignoreJids Json? - splitMessages Boolean? @default(false) @db.Boolean + splitMessages Boolean? @default(false) timePerChar Int? @default(50) @db.Integer triggerType TriggerType? triggerOperator TriggerOperator? @@ -723,16 +723,16 @@ model Evoai { model EvoaiSetting { id String @id @default(cuid()) - expire Int? @default(0) @db.Integer + expire Int? @default(0) @db.Int keywordFinish String? @db.VarChar(100) delayMessage Int? @db.Integer unknownMessage String? @db.VarChar(100) - listeningFromMe Boolean? @default(false) @db.Boolean - stopBotFromMe Boolean? @default(false) @db.Boolean - keepOpen Boolean? @default(false) @db.Boolean + listeningFromMe Boolean? @default(false) + stopBotFromMe Boolean? @default(false) + keepOpen Boolean? @default(false) debounceTime Int? @db.Integer ignoreJids Json? - splitMessages Boolean? @default(false) @db.Boolean + splitMessages Boolean? @default(false) timePerChar Int? @default(50) @db.Integer createdAt DateTime? @default(now()) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp From 5b1b5ff9d20bbe0b4d129d937ef07a2114c6ce0b Mon Sep 17 00:00:00 2001 From: Santosl2 Date: Sun, 29 Jun 2025 20:23:31 -0300 Subject: [PATCH 5/5] fix: bind applyFormatting method in processMessages to maintain context --- .../integrations/chatbot/typebot/services/typebot.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/integrations/chatbot/typebot/services/typebot.service.ts b/src/api/integrations/chatbot/typebot/services/typebot.service.ts index 01346fa6..e7ae24e6 100644 --- a/src/api/integrations/chatbot/typebot/services/typebot.service.ts +++ b/src/api/integrations/chatbot/typebot/services/typebot.service.ts @@ -186,7 +186,7 @@ export class TypebotService extends BaseChatbotService { messages, input, clientSideActions, - this.applyFormatting, + this.applyFormatting.bind(this), this.prismaRepository, ).catch((err) => { console.error('Erro ao processar mensagens:', err);