diff --git a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts index 07694f5f..be73b157 100644 --- a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts @@ -8,7 +8,12 @@ import { EmitData, EventController, EventControllerInterface } from '../event.co export class RabbitmqController extends EventController implements EventControllerInterface { public amqpChannel: amqp.Channel | null = null; + private amqpConnection: amqp.Connection | null = null; private readonly logger = new Logger('RabbitmqController'); + private reconnectAttempts = 0; + private maxReconnectAttempts = 10; + private reconnectDelay = 5000; // 5 seconds + private isReconnecting = false; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); @@ -19,7 +24,11 @@ export class RabbitmqController extends EventController implements EventControll return; } - await new Promise((resolve, reject) => { + await this.connect(); + } + + private async connect(): Promise { + return new Promise((resolve, reject) => { const uri = configService.get('RABBITMQ').URI; const frameMax = configService.get('RABBITMQ').FRAME_MAX; const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; @@ -33,22 +42,61 @@ export class RabbitmqController extends EventController implements EventControll password: url.password || 'guest', vhost: url.pathname.slice(1) || '/', frameMax: frameMax, + heartbeat: 30, // Add heartbeat of 30 seconds }; amqp.connect(connectionOptions, (error, connection) => { if (error) { + this.logger.error({ + local: 'RabbitmqController.connect', + message: 'Failed to connect to RabbitMQ', + error: error.message || error, + }); reject(error); - return; } + // Connection event handlers + connection.on('error', (err) => { + this.logger.error({ + local: 'RabbitmqController.connectionError', + message: 'RabbitMQ connection error', + error: err.message || err, + }); + this.handleConnectionLoss(); + }); + + connection.on('close', () => { + this.logger.warn('RabbitMQ connection closed'); + this.handleConnectionLoss(); + }); + connection.createChannel((channelError, channel) => { if (channelError) { + this.logger.error({ + local: 'RabbitmqController.createChannel', + message: 'Failed to create RabbitMQ channel', + error: channelError.message || channelError, + }); reject(channelError); - return; } + // Channel event handlers + channel.on('error', (err) => { + this.logger.error({ + local: 'RabbitmqController.channelError', + message: 'RabbitMQ channel error', + error: err.message || err, + }); + this.handleConnectionLoss(); + }); + + channel.on('close', () => { + this.logger.warn('RabbitMQ channel closed'); + this.handleConnectionLoss(); + }); + const exchangeName = rabbitmqExchangeName; channel.assertExchange(exchangeName, 'topic', { @@ -56,16 +104,81 @@ export class RabbitmqController extends EventController implements EventControll autoDelete: false, }); + this.amqpConnection = connection; this.amqpChannel = channel; + this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection + this.isReconnecting = false; - this.logger.info('AMQP initialized'); + this.logger.info('AMQP initialized successfully'); resolve(); }); }); - }).then(() => { - if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) this.initGlobalQueues(); - }); + }) + .then(() => { + if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { + this.initGlobalQueues(); + } + }) + .catch((error) => { + this.logger.error({ + local: 'RabbitmqController.init', + message: 'Failed to initialize AMQP', + error: error.message || error, + }); + this.scheduleReconnect(); + throw error; + }); + } + + private handleConnectionLoss(): void { + if (this.isReconnecting) { + return; // Already attempting to reconnect + } + + this.amqpChannel = null; + this.amqpConnection = null; + this.scheduleReconnect(); + } + + private scheduleReconnect(): void { + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + this.logger.error( + `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, + ); + return; + } + + if (this.isReconnecting) { + return; // Already scheduled + } + + this.isReconnecting = true; + this.reconnectAttempts++; + + const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); // Exponential backoff with max delay + + this.logger.info( + `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, + ); + + setTimeout(async () => { + try { + this.logger.info( + `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, + ); + await this.connect(); + this.logger.info('Successfully reconnected to RabbitMQ'); + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.scheduleReconnect', + message: `Reconnection attempt ${this.reconnectAttempts} failed`, + error: error.message || error, + }); + this.isReconnecting = false; + this.scheduleReconnect(); + } + }, delay); } private set channel(channel: amqp.Channel) { @@ -76,6 +189,17 @@ export class RabbitmqController extends EventController implements EventControll return this.amqpChannel; } + private async ensureConnection(): Promise { + if (!this.amqpChannel) { + this.logger.warn('AMQP channel is not available, attempting to reconnect...'); + if (!this.isReconnecting) { + this.scheduleReconnect(); + } + return false; + } + return true; + } + public async emit({ instanceName, origin, @@ -95,6 +219,11 @@ export class RabbitmqController extends EventController implements EventControll return; } + if (!(await this.ensureConnection())) { + this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No AMQP connection`); + return; + } + const instanceRabbitmq = await this.get(instanceName); const rabbitmqLocal = instanceRabbitmq?.events; const rabbitmqGlobal = configService.get('RABBITMQ').GLOBAL_ENABLED; @@ -154,7 +283,15 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { + this.logger.error({ + local: 'RabbitmqController.emit', + message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } } } } @@ -199,7 +336,15 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { + this.logger.error({ + local: 'RabbitmqController.emit', + message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } } } } @@ -208,41 +353,57 @@ export class RabbitmqController extends EventController implements EventControll private async initGlobalQueues(): Promise { this.logger.info('Initializing global queues'); + if (!(await this.ensureConnection())) { + this.logger.error('Cannot initialize global queues: No AMQP connection'); + return; + } + const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; const events = configService.get('RABBITMQ').EVENTS; const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; if (!events) { this.logger.warn('No events to initialize on AMQP'); - return; } const eventKeys = Object.keys(events); - eventKeys.forEach((event) => { - if (events[event] === false) return; + for (const event of eventKeys) { + if (events[event] === false) continue; - const queueName = - prefixKey !== '' - ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` - : `${event.replace(/_/g, '.').toLowerCase()}`; - const exchangeName = rabbitmqExchangeName; + try { + const queueName = + prefixKey !== '' + ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` + : `${event.replace(/_/g, '.').toLowerCase()}`; + const exchangeName = rabbitmqExchangeName; - this.amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - this.amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); - this.amqpChannel.bindQueue(queueName, exchangeName, event); - }); + await this.amqpChannel.bindQueue(queueName, exchangeName, event); + + this.logger.info(`Global queue initialized: ${queueName}`); + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.initGlobalQueues', + message: `Failed to initialize global queue for event ${event}`, + error: error.message || error, + }); + this.handleConnectionLoss(); + break; + } + } } }