feat: enhance RabbitMQ controller with improved connection management and shutdown procedures

This commit is contained in:
luissantosjs 2025-08-20 12:29:45 +01:00
parent 9cdb897a0f
commit 7a99fba556

View File

@ -14,6 +14,9 @@ export class RabbitmqController extends EventController implements EventControll
private maxReconnectAttempts = 10; private maxReconnectAttempts = 10;
private reconnectDelay = 5000; // 5 seconds private reconnectDelay = 5000; // 5 seconds
private isReconnecting = false; private isReconnecting = false;
private reconnectTimer: NodeJS.Timeout | null = null;
private connectionStatus: 'connected' | 'disconnected' | 'connecting' | 'reconnecting' = 'disconnected';
private isShuttingDown = false;
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq'); super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
@ -27,7 +30,91 @@ export class RabbitmqController extends EventController implements EventControll
await this.connect(); await this.connect();
} }
public async shutdown(): Promise<void> {
this.logger.info('Shutting down RabbitMQ controller...');
this.isShuttingDown = true;
// Clear any pending reconnect timer
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
// Close channel and connection gracefully
await this.closeConnection();
this.logger.info('RabbitMQ controller shutdown complete');
}
private async closeConnection(): Promise<void> {
try {
if (this.amqpChannel) {
await new Promise<void>((resolve) => {
this.amqpChannel?.close((err) => {
if (err) {
this.logger.warn(`Error closing channel: ${err.message}`);
}
resolve();
});
});
this.amqpChannel = null;
}
if (this.amqpConnection) {
await new Promise<void>((resolve) => {
this.amqpConnection?.close((err) => {
if (err) {
this.logger.warn(`Error closing connection: ${err.message}`);
}
resolve();
});
});
this.amqpConnection = null;
}
} catch (error) {
this.logger.error({
local: 'RabbitmqController.closeConnection',
message: 'Error during connection cleanup',
error: error.message || error,
});
}
}
public getConnectionStatus(): string {
return this.connectionStatus;
}
public isConnected(): boolean {
return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null;
}
public async forceReconnect(): Promise<void> {
this.logger.info('Force reconnect requested');
// Reset reconnect attempts for forced reconnect
this.reconnectAttempts = 0;
// Close existing connections
await this.closeConnection();
// Clear any pending reconnect
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.isReconnecting = false;
// Attempt immediate reconnection
await this.connect();
}
private async connect(): Promise<void> { private async connect(): Promise<void> {
if (this.isShuttingDown) {
return;
}
this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting';
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
const uri = configService.get<Rabbitmq>('RABBITMQ').URI; const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX; const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX;
@ -47,6 +134,7 @@ export class RabbitmqController extends EventController implements EventControll
amqp.connect(connectionOptions, (error, connection) => { amqp.connect(connectionOptions, (error, connection) => {
if (error) { if (error) {
this.connectionStatus = 'disconnected';
this.logger.error({ this.logger.error({
local: 'RabbitmqController.connect', local: 'RabbitmqController.connect',
message: 'Failed to connect to RabbitMQ', message: 'Failed to connect to RabbitMQ',
@ -63,16 +151,25 @@ export class RabbitmqController extends EventController implements EventControll
message: 'RabbitMQ connection error', message: 'RabbitMQ connection error',
error: err.message || err, error: err.message || err,
}); });
this.handleConnectionLoss(); this.handleConnectionLoss('connection_error', err);
}); });
connection.on('close', () => { connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed'); this.logger.warn('RabbitMQ connection closed');
this.handleConnectionLoss(); this.handleConnectionLoss('connection_closed');
});
connection.on('blocked', (reason) => {
this.logger.warn(`RabbitMQ connection blocked: ${reason}`);
});
connection.on('unblocked', () => {
this.logger.info('RabbitMQ connection unblocked');
}); });
connection.createChannel((channelError, channel) => { connection.createChannel((channelError, channel) => {
if (channelError) { if (channelError) {
this.connectionStatus = 'disconnected';
this.logger.error({ this.logger.error({
local: 'RabbitmqController.createChannel', local: 'RabbitmqController.createChannel',
message: 'Failed to create RabbitMQ channel', message: 'Failed to create RabbitMQ channel',
@ -89,12 +186,21 @@ export class RabbitmqController extends EventController implements EventControll
message: 'RabbitMQ channel error', message: 'RabbitMQ channel error',
error: err.message || err, error: err.message || err,
}); });
this.handleConnectionLoss(); this.handleConnectionLoss('channel_error', err);
}); });
channel.on('close', () => { channel.on('close', () => {
this.logger.warn('RabbitMQ channel closed'); this.logger.warn('RabbitMQ channel closed');
this.handleConnectionLoss(); this.handleConnectionLoss('channel_closed');
});
channel.on('return', (msg) => {
this.logger.warn('RabbitMQ message returned' + JSON.stringify({
exchange: msg.fields.exchange,
routingKey: msg.fields.routingKey,
replyCode: msg.fields.replyCode,
replyText: msg.fields.replyText,
}));
}); });
const exchangeName = rabbitmqExchangeName; const exchangeName = rabbitmqExchangeName;
@ -102,25 +208,37 @@ export class RabbitmqController extends EventController implements EventControll
channel.assertExchange(exchangeName, 'topic', { channel.assertExchange(exchangeName, 'topic', {
durable: true, durable: true,
autoDelete: false, autoDelete: false,
}, (exchangeError) => {
if (exchangeError) {
this.connectionStatus = 'disconnected';
this.logger.error({
local: 'RabbitmqController.assertExchange',
message: 'Failed to assert exchange',
error: exchangeError.message || exchangeError,
});
reject(exchangeError);
return;
}
this.amqpConnection = connection;
this.amqpChannel = channel;
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
this.isReconnecting = false;
this.connectionStatus = 'connected';
this.logger.info('AMQP initialized successfully');
resolve();
}); });
this.amqpConnection = connection;
this.amqpChannel = channel;
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
this.isReconnecting = false;
this.logger.info('AMQP initialized successfully');
resolve();
}); });
}); });
}) })
.then(() => { .then(() => {
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) { if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) {
this.initGlobalQueues(); return this.initGlobalQueues();
} }
}) })
.catch((error) => { .catch((error) => {
this.connectionStatus = 'disconnected';
this.logger.error({ this.logger.error({
local: 'RabbitmqController.init', local: 'RabbitmqController.init',
message: 'Failed to initialize AMQP', message: 'Failed to initialize AMQP',
@ -131,21 +249,30 @@ export class RabbitmqController extends EventController implements EventControll
}); });
} }
private handleConnectionLoss(): void { private handleConnectionLoss(reason?: string, error?: any): void {
if (this.isReconnecting) { if (this.isReconnecting || this.isShuttingDown) {
return; // Already attempting to reconnect return; // Already attempting to reconnect or shutting down
} }
this.logger.warn(`Connection lost due to: ${reason || 'unknown reason'}` + JSON.stringify(error ? { error: error.message || error } : {}));
this.connectionStatus = 'disconnected';
this.amqpChannel = null; this.amqpChannel = null;
this.amqpConnection = null; this.amqpConnection = null;
this.scheduleReconnect(); this.scheduleReconnect();
} }
private scheduleReconnect(): void { private scheduleReconnect(): void {
if (this.isShuttingDown) {
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) { if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.logger.error( this.logger.error(
`Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`,
); );
this.connectionStatus = 'disconnected';
return; return;
} }
@ -162,7 +289,11 @@ export class RabbitmqController extends EventController implements EventControll
`Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
); );
setTimeout(async () => { this.reconnectTimer = setTimeout(async () => {
if (this.isShuttingDown) {
return;
}
try { try {
this.logger.info( this.logger.info(
`Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
@ -177,6 +308,8 @@ export class RabbitmqController extends EventController implements EventControll
}); });
this.isReconnecting = false; this.isReconnecting = false;
this.scheduleReconnect(); this.scheduleReconnect();
} finally {
this.reconnectTimer = null;
} }
}, delay); }, delay);
} }
@ -190,9 +323,9 @@ export class RabbitmqController extends EventController implements EventControll
} }
private async ensureConnection(): Promise<boolean> { private async ensureConnection(): Promise<boolean> {
if (!this.amqpChannel) { if (!this.amqpChannel || !this.isConnected()) {
this.logger.warn('AMQP channel is not available, attempting to reconnect...'); this.logger.warn('AMQP channel is not available, attempting to reconnect...');
if (!this.isReconnecting) { if (!this.isReconnecting && !this.isShuttingDown) {
this.scheduleReconnect(); this.scheduleReconnect();
} }
return false; return false;
@ -200,6 +333,25 @@ export class RabbitmqController extends EventController implements EventControll
return true; return true;
} }
public async waitForConnection(timeoutMs: number = 30000): Promise<boolean> {
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
if (this.isConnected()) {
return true;
}
if (this.isShuttingDown) {
return false;
}
// Wait 100ms before checking again
await new Promise(resolve => setTimeout(resolve, 100));
}
return false;
}
public async emit({ public async emit({
instanceName, instanceName,
origin, origin,
@ -246,106 +398,113 @@ export class RabbitmqController extends EventController implements EventControll
if (instanceRabbitmq?.enabled && this.amqpChannel) { if (instanceRabbitmq?.enabled && this.amqpChannel) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = instanceName ?? rabbitmqExchangeName; const exchangeName = instanceName ?? rabbitmqExchangeName;
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local');
let retry = 0;
while (retry < 3) {
try {
await this.amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
const eventName = event.replace(/_/g, '.').toLowerCase();
const queueName = `${instanceName}.${eventName}`;
await this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await this.amqpChannel.bindQueue(queueName, exchangeName, eventName);
await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-RabbitMQ`,
...message,
};
this.logger.log(logData);
}
break;
} catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
}
}
} }
} }
if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) { if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) {
const exchangeName = rabbitmqExchangeName; const exchangeName = rabbitmqExchangeName;
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'global');
}
}
let retry = 0; private async publishMessage(
exchangeName: string,
event: string,
message: any,
instanceName: string,
origin: string,
logEnabled: boolean,
type: 'local' | 'global'
): Promise<void> {
let retry = 0;
const maxRetries = 3;
while (retry < 3) { while (retry < maxRetries) {
try { try {
await this.amqpChannel.assertExchange(exchangeName, 'topic', { if (!(await this.ensureConnection())) {
durable: true, throw new Error('No AMQP connection available');
autoDelete: false, }
});
const queueName = prefixKey await this.amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
let queueName: string;
let routingKey: string;
if (type === 'local') {
const eventName = event.replace(/_/g, '.').toLowerCase();
queueName = `${instanceName}.${eventName}`;
routingKey = eventName;
} else {
const prefixKey = configService.get<Rabbitmq>('RABBITMQ').PREFIX_KEY;
queueName = prefixKey
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
: event.replace(/_/g, '.').toLowerCase(); : event.replace(/_/g, '.').toLowerCase();
routingKey = event;
await this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await this.amqpChannel.bindQueue(queueName, exchangeName, event);
await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-RabbitMQ-Global`,
...message,
};
this.logger.log(logData);
}
break;
} catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
} }
await this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey);
const published = await new Promise<boolean>((resolve) => {
const success = this.amqpChannel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true },
(err) => {
if (err) {
resolve(false);
} else {
resolve(true);
}
}
);
if (!success) {
resolve(false);
}
});
if (!published) {
throw new Error('Failed to publish message - channel write buffer full');
}
if (logEnabled) {
const logData = {
local: `${origin}.sendData-RabbitMQ${type === 'global' ? '-Global' : ''}`,
...message,
};
this.logger.log(logData);
}
break; // Success, exit retry loop
} catch (error) {
this.logger.error({
local: 'RabbitmqController.publishMessage',
message: `Error publishing ${type} RabbitMQ message (attempt ${retry + 1}/${maxRetries})`,
error: error.message || error,
});
retry++;
if (retry >= maxRetries) {
this.handleConnectionLoss('publish_error', error);
throw error;
}
// Wait before retry
await new Promise(resolve => setTimeout(resolve, 1000 * retry));
} }
} }
} }
@ -401,9 +560,9 @@ export class RabbitmqController extends EventController implements EventControll
message: `Failed to initialize global queue for event ${event}`, message: `Failed to initialize global queue for event ${event}`,
error: error.message || error, error: error.message || error,
}); });
this.handleConnectionLoss(); this.handleConnectionLoss('queue_init_error', error);
break; break;
} }
} }
} }
} }