From 7a99fba556c183194bdf64e7dc4aed5026a3f55a Mon Sep 17 00:00:00 2001 From: luissantosjs Date: Wed, 20 Aug 2025 12:29:45 +0100 Subject: [PATCH] feat: enhance RabbitMQ controller with improved connection management and shutdown procedures --- .../event/rabbitmq/rabbitmq.controller.ts | 381 +++++++++++++----- 1 file changed, 270 insertions(+), 111 deletions(-) diff --git a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts index be73b157..a6d1e565 100644 --- a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts @@ -14,6 +14,9 @@ export class RabbitmqController extends EventController implements EventControll private maxReconnectAttempts = 10; private reconnectDelay = 5000; // 5 seconds private isReconnecting = false; + private reconnectTimer: NodeJS.Timeout | null = null; + private connectionStatus: 'connected' | 'disconnected' | 'connecting' | 'reconnecting' = 'disconnected'; + private isShuttingDown = false; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); @@ -27,7 +30,91 @@ export class RabbitmqController extends EventController implements EventControll await this.connect(); } + public async shutdown(): Promise { + this.logger.info('Shutting down RabbitMQ controller...'); + this.isShuttingDown = true; + + // Clear any pending reconnect timer + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + // Close channel and connection gracefully + await this.closeConnection(); + this.logger.info('RabbitMQ controller shutdown complete'); + } + + private async closeConnection(): Promise { + try { + if (this.amqpChannel) { + await new Promise((resolve) => { + this.amqpChannel?.close((err) => { + if (err) { + this.logger.warn(`Error closing channel: ${err.message}`); + } + resolve(); + }); + }); + this.amqpChannel = null; + } + + if (this.amqpConnection) { + await new Promise((resolve) => { + this.amqpConnection?.close((err) => { + if (err) { + this.logger.warn(`Error closing connection: ${err.message}`); + } + resolve(); + }); + }); + this.amqpConnection = null; + } + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.closeConnection', + message: 'Error during connection cleanup', + error: error.message || error, + }); + } + } + + public getConnectionStatus(): string { + return this.connectionStatus; + } + + public isConnected(): boolean { + return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null; + } + + public async forceReconnect(): Promise { + this.logger.info('Force reconnect requested'); + + // Reset reconnect attempts for forced reconnect + this.reconnectAttempts = 0; + + // Close existing connections + await this.closeConnection(); + + // Clear any pending reconnect + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + this.isReconnecting = false; + + // Attempt immediate reconnection + await this.connect(); + } + private async connect(): Promise { + if (this.isShuttingDown) { + return; + } + + this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting'; + return new Promise((resolve, reject) => { const uri = configService.get('RABBITMQ').URI; const frameMax = configService.get('RABBITMQ').FRAME_MAX; @@ -47,6 +134,7 @@ export class RabbitmqController extends EventController implements EventControll amqp.connect(connectionOptions, (error, connection) => { if (error) { + this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.connect', message: 'Failed to connect to RabbitMQ', @@ -63,16 +151,25 @@ export class RabbitmqController extends EventController implements EventControll message: 'RabbitMQ connection error', error: err.message || err, }); - this.handleConnectionLoss(); + this.handleConnectionLoss('connection_error', err); }); connection.on('close', () => { this.logger.warn('RabbitMQ connection closed'); - this.handleConnectionLoss(); + this.handleConnectionLoss('connection_closed'); + }); + + connection.on('blocked', (reason) => { + this.logger.warn(`RabbitMQ connection blocked: ${reason}`); + }); + + connection.on('unblocked', () => { + this.logger.info('RabbitMQ connection unblocked'); }); connection.createChannel((channelError, channel) => { if (channelError) { + this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.createChannel', message: 'Failed to create RabbitMQ channel', @@ -89,12 +186,21 @@ export class RabbitmqController extends EventController implements EventControll message: 'RabbitMQ channel error', error: err.message || err, }); - this.handleConnectionLoss(); + this.handleConnectionLoss('channel_error', err); }); channel.on('close', () => { this.logger.warn('RabbitMQ channel closed'); - this.handleConnectionLoss(); + this.handleConnectionLoss('channel_closed'); + }); + + channel.on('return', (msg) => { + this.logger.warn('RabbitMQ message returned' + JSON.stringify({ + exchange: msg.fields.exchange, + routingKey: msg.fields.routingKey, + replyCode: msg.fields.replyCode, + replyText: msg.fields.replyText, + })); }); const exchangeName = rabbitmqExchangeName; @@ -102,25 +208,37 @@ export class RabbitmqController extends EventController implements EventControll channel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, + }, (exchangeError) => { + if (exchangeError) { + this.connectionStatus = 'disconnected'; + this.logger.error({ + local: 'RabbitmqController.assertExchange', + message: 'Failed to assert exchange', + error: exchangeError.message || exchangeError, + }); + reject(exchangeError); + return; + } + + this.amqpConnection = connection; + this.amqpChannel = channel; + this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection + this.isReconnecting = false; + this.connectionStatus = 'connected'; + + this.logger.info('AMQP initialized successfully'); + resolve(); }); - - this.amqpConnection = connection; - this.amqpChannel = channel; - this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection - this.isReconnecting = false; - - this.logger.info('AMQP initialized successfully'); - - resolve(); }); }); }) .then(() => { if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { - this.initGlobalQueues(); + return this.initGlobalQueues(); } }) .catch((error) => { + this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.init', message: 'Failed to initialize AMQP', @@ -131,21 +249,30 @@ export class RabbitmqController extends EventController implements EventControll }); } - private handleConnectionLoss(): void { - if (this.isReconnecting) { - return; // Already attempting to reconnect + private handleConnectionLoss(reason?: string, error?: any): void { + if (this.isReconnecting || this.isShuttingDown) { + return; // Already attempting to reconnect or shutting down } + this.logger.warn(`Connection lost due to: ${reason || 'unknown reason'}` + JSON.stringify(error ? { error: error.message || error } : {})); + + this.connectionStatus = 'disconnected'; this.amqpChannel = null; this.amqpConnection = null; + this.scheduleReconnect(); } private scheduleReconnect(): void { + if (this.isShuttingDown) { + return; + } + if (this.reconnectAttempts >= this.maxReconnectAttempts) { this.logger.error( `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, ); + this.connectionStatus = 'disconnected'; return; } @@ -162,7 +289,11 @@ export class RabbitmqController extends EventController implements EventControll `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, ); - setTimeout(async () => { + this.reconnectTimer = setTimeout(async () => { + if (this.isShuttingDown) { + return; + } + try { this.logger.info( `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, @@ -177,6 +308,8 @@ export class RabbitmqController extends EventController implements EventControll }); this.isReconnecting = false; this.scheduleReconnect(); + } finally { + this.reconnectTimer = null; } }, delay); } @@ -190,9 +323,9 @@ export class RabbitmqController extends EventController implements EventControll } private async ensureConnection(): Promise { - if (!this.amqpChannel) { + if (!this.amqpChannel || !this.isConnected()) { this.logger.warn('AMQP channel is not available, attempting to reconnect...'); - if (!this.isReconnecting) { + if (!this.isReconnecting && !this.isShuttingDown) { this.scheduleReconnect(); } return false; @@ -200,6 +333,25 @@ export class RabbitmqController extends EventController implements EventControll return true; } + public async waitForConnection(timeoutMs: number = 30000): Promise { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + if (this.isConnected()) { + return true; + } + + if (this.isShuttingDown) { + return false; + } + + // Wait 100ms before checking again + await new Promise(resolve => setTimeout(resolve, 100)); + } + + return false; + } + public async emit({ instanceName, origin, @@ -246,106 +398,113 @@ export class RabbitmqController extends EventController implements EventControll if (instanceRabbitmq?.enabled && this.amqpChannel) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { const exchangeName = instanceName ?? rabbitmqExchangeName; - - let retry = 0; - - while (retry < 3) { - try { - await this.amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const eventName = event.replace(/_/g, '.').toLowerCase(); - - const queueName = `${instanceName}.${eventName}`; - - await this.amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); - - await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); - - if (logEnabled) { - const logData = { - local: `${origin}.sendData-RabbitMQ`, - ...message, - }; - - this.logger.log(logData); - } - - break; - } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); - retry++; - if (retry >= 3) { - this.handleConnectionLoss(); - } - } - } + await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local'); } } if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) { const exchangeName = rabbitmqExchangeName; + await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'global'); + } + } - let retry = 0; + private async publishMessage( + exchangeName: string, + event: string, + message: any, + instanceName: string, + origin: string, + logEnabled: boolean, + type: 'local' | 'global' + ): Promise { + let retry = 0; + const maxRetries = 3; - while (retry < 3) { - try { - await this.amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + while (retry < maxRetries) { + try { + if (!(await this.ensureConnection())) { + throw new Error('No AMQP connection available'); + } - const queueName = prefixKey + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + let queueName: string; + let routingKey: string; + + if (type === 'local') { + const eventName = event.replace(/_/g, '.').toLowerCase(); + queueName = `${instanceName}.${eventName}`; + routingKey = eventName; + } else { + const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; + queueName = prefixKey ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` : event.replace(/_/g, '.').toLowerCase(); - - await this.amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - await this.amqpChannel.bindQueue(queueName, exchangeName, event); - - await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); - - if (logEnabled) { - const logData = { - local: `${origin}.sendData-RabbitMQ-Global`, - ...message, - }; - - this.logger.log(logData); - } - - break; - } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); - retry++; - if (retry >= 3) { - this.handleConnectionLoss(); - } + routingKey = event; } + + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey); + + const published = await new Promise((resolve) => { + const success = this.amqpChannel.publish( + exchangeName, + routingKey, + Buffer.from(JSON.stringify(message)), + { persistent: true }, + (err) => { + if (err) { + resolve(false); + } else { + resolve(true); + } + } + ); + + if (!success) { + resolve(false); + } + }); + + if (!published) { + throw new Error('Failed to publish message - channel write buffer full'); + } + + if (logEnabled) { + const logData = { + local: `${origin}.sendData-RabbitMQ${type === 'global' ? '-Global' : ''}`, + ...message, + }; + + this.logger.log(logData); + } + + break; // Success, exit retry loop + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.publishMessage', + message: `Error publishing ${type} RabbitMQ message (attempt ${retry + 1}/${maxRetries})`, + error: error.message || error, + }); + retry++; + + if (retry >= maxRetries) { + this.handleConnectionLoss('publish_error', error); + throw error; + } + + // Wait before retry + await new Promise(resolve => setTimeout(resolve, 1000 * retry)); } } } @@ -401,9 +560,9 @@ export class RabbitmqController extends EventController implements EventControll message: `Failed to initialize global queue for event ${event}`, error: error.message || error, }); - this.handleConnectionLoss(); + this.handleConnectionLoss('queue_init_error', error); break; } } } -} +} \ No newline at end of file