This commit is contained in:
luissantosjs 2025-08-22 22:49:40 +01:00
parent 7a99fba556
commit 4681576cfc

View File

@ -14,9 +14,6 @@ export class RabbitmqController extends EventController implements EventControll
private maxReconnectAttempts = 10; private maxReconnectAttempts = 10;
private reconnectDelay = 5000; // 5 seconds private reconnectDelay = 5000; // 5 seconds
private isReconnecting = false; private isReconnecting = false;
private reconnectTimer: NodeJS.Timeout | null = null;
private connectionStatus: 'connected' | 'disconnected' | 'connecting' | 'reconnecting' = 'disconnected';
private isShuttingDown = false;
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq'); super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
@ -30,91 +27,7 @@ export class RabbitmqController extends EventController implements EventControll
await this.connect(); await this.connect();
} }
public async shutdown(): Promise<void> {
this.logger.info('Shutting down RabbitMQ controller...');
this.isShuttingDown = true;
// Clear any pending reconnect timer
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
// Close channel and connection gracefully
await this.closeConnection();
this.logger.info('RabbitMQ controller shutdown complete');
}
private async closeConnection(): Promise<void> {
try {
if (this.amqpChannel) {
await new Promise<void>((resolve) => {
this.amqpChannel?.close((err) => {
if (err) {
this.logger.warn(`Error closing channel: ${err.message}`);
}
resolve();
});
});
this.amqpChannel = null;
}
if (this.amqpConnection) {
await new Promise<void>((resolve) => {
this.amqpConnection?.close((err) => {
if (err) {
this.logger.warn(`Error closing connection: ${err.message}`);
}
resolve();
});
});
this.amqpConnection = null;
}
} catch (error) {
this.logger.error({
local: 'RabbitmqController.closeConnection',
message: 'Error during connection cleanup',
error: error.message || error,
});
}
}
public getConnectionStatus(): string {
return this.connectionStatus;
}
public isConnected(): boolean {
return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null;
}
public async forceReconnect(): Promise<void> {
this.logger.info('Force reconnect requested');
// Reset reconnect attempts for forced reconnect
this.reconnectAttempts = 0;
// Close existing connections
await this.closeConnection();
// Clear any pending reconnect
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.isReconnecting = false;
// Attempt immediate reconnection
await this.connect();
}
private async connect(): Promise<void> { private async connect(): Promise<void> {
if (this.isShuttingDown) {
return;
}
this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting';
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
const uri = configService.get<Rabbitmq>('RABBITMQ').URI; const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX; const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX;
@ -134,7 +47,6 @@ export class RabbitmqController extends EventController implements EventControll
amqp.connect(connectionOptions, (error, connection) => { amqp.connect(connectionOptions, (error, connection) => {
if (error) { if (error) {
this.connectionStatus = 'disconnected';
this.logger.error({ this.logger.error({
local: 'RabbitmqController.connect', local: 'RabbitmqController.connect',
message: 'Failed to connect to RabbitMQ', message: 'Failed to connect to RabbitMQ',
@ -151,25 +63,16 @@ export class RabbitmqController extends EventController implements EventControll
message: 'RabbitMQ connection error', message: 'RabbitMQ connection error',
error: err.message || err, error: err.message || err,
}); });
this.handleConnectionLoss('connection_error', err); this.handleConnectionLoss();
}); });
connection.on('close', () => { connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed'); this.logger.warn('RabbitMQ connection closed');
this.handleConnectionLoss('connection_closed'); this.handleConnectionLoss();
});
connection.on('blocked', (reason) => {
this.logger.warn(`RabbitMQ connection blocked: ${reason}`);
});
connection.on('unblocked', () => {
this.logger.info('RabbitMQ connection unblocked');
}); });
connection.createChannel((channelError, channel) => { connection.createChannel((channelError, channel) => {
if (channelError) { if (channelError) {
this.connectionStatus = 'disconnected';
this.logger.error({ this.logger.error({
local: 'RabbitmqController.createChannel', local: 'RabbitmqController.createChannel',
message: 'Failed to create RabbitMQ channel', message: 'Failed to create RabbitMQ channel',
@ -186,21 +89,12 @@ export class RabbitmqController extends EventController implements EventControll
message: 'RabbitMQ channel error', message: 'RabbitMQ channel error',
error: err.message || err, error: err.message || err,
}); });
this.handleConnectionLoss('channel_error', err); this.handleConnectionLoss();
}); });
channel.on('close', () => { channel.on('close', () => {
this.logger.warn('RabbitMQ channel closed'); this.logger.warn('RabbitMQ channel closed');
this.handleConnectionLoss('channel_closed'); this.handleConnectionLoss();
});
channel.on('return', (msg) => {
this.logger.warn('RabbitMQ message returned' + JSON.stringify({
exchange: msg.fields.exchange,
routingKey: msg.fields.routingKey,
replyCode: msg.fields.replyCode,
replyText: msg.fields.replyText,
}));
}); });
const exchangeName = rabbitmqExchangeName; const exchangeName = rabbitmqExchangeName;
@ -208,37 +102,25 @@ export class RabbitmqController extends EventController implements EventControll
channel.assertExchange(exchangeName, 'topic', { channel.assertExchange(exchangeName, 'topic', {
durable: true, durable: true,
autoDelete: false, autoDelete: false,
}, (exchangeError) => {
if (exchangeError) {
this.connectionStatus = 'disconnected';
this.logger.error({
local: 'RabbitmqController.assertExchange',
message: 'Failed to assert exchange',
error: exchangeError.message || exchangeError,
});
reject(exchangeError);
return;
}
this.amqpConnection = connection;
this.amqpChannel = channel;
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
this.isReconnecting = false;
this.connectionStatus = 'connected';
this.logger.info('AMQP initialized successfully');
resolve();
}); });
this.amqpConnection = connection;
this.amqpChannel = channel;
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
this.isReconnecting = false;
this.logger.info('AMQP initialized successfully');
resolve();
}); });
}); });
}) })
.then(() => { .then(() => {
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) { if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) {
return this.initGlobalQueues(); this.initGlobalQueues();
} }
}) })
.catch((error) => { .catch((error) => {
this.connectionStatus = 'disconnected';
this.logger.error({ this.logger.error({
local: 'RabbitmqController.init', local: 'RabbitmqController.init',
message: 'Failed to initialize AMQP', message: 'Failed to initialize AMQP',
@ -249,30 +131,21 @@ export class RabbitmqController extends EventController implements EventControll
}); });
} }
private handleConnectionLoss(reason?: string, error?: any): void { private handleConnectionLoss(): void {
if (this.isReconnecting || this.isShuttingDown) { if (this.isReconnecting) {
return; // Already attempting to reconnect or shutting down return; // Already attempting to reconnect
} }
this.logger.warn(`Connection lost due to: ${reason || 'unknown reason'}` + JSON.stringify(error ? { error: error.message || error } : {}));
this.connectionStatus = 'disconnected';
this.amqpChannel = null; this.amqpChannel = null;
this.amqpConnection = null; this.amqpConnection = null;
this.scheduleReconnect(); this.scheduleReconnect();
} }
private scheduleReconnect(): void { private scheduleReconnect(): void {
if (this.isShuttingDown) {
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) { if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.logger.error( this.logger.error(
`Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`,
); );
this.connectionStatus = 'disconnected';
return; return;
} }
@ -289,11 +162,7 @@ export class RabbitmqController extends EventController implements EventControll
`Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
); );
this.reconnectTimer = setTimeout(async () => { setTimeout(async () => {
if (this.isShuttingDown) {
return;
}
try { try {
this.logger.info( this.logger.info(
`Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
@ -308,8 +177,6 @@ export class RabbitmqController extends EventController implements EventControll
}); });
this.isReconnecting = false; this.isReconnecting = false;
this.scheduleReconnect(); this.scheduleReconnect();
} finally {
this.reconnectTimer = null;
} }
}, delay); }, delay);
} }
@ -323,9 +190,9 @@ export class RabbitmqController extends EventController implements EventControll
} }
private async ensureConnection(): Promise<boolean> { private async ensureConnection(): Promise<boolean> {
if (!this.amqpChannel || !this.isConnected()) { if (!this.amqpChannel) {
this.logger.warn('AMQP channel is not available, attempting to reconnect...'); this.logger.warn('AMQP channel is not available, attempting to reconnect...');
if (!this.isReconnecting && !this.isShuttingDown) { if (!this.isReconnecting) {
this.scheduleReconnect(); this.scheduleReconnect();
} }
return false; return false;
@ -333,25 +200,6 @@ export class RabbitmqController extends EventController implements EventControll
return true; return true;
} }
public async waitForConnection(timeoutMs: number = 30000): Promise<boolean> {
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
if (this.isConnected()) {
return true;
}
if (this.isShuttingDown) {
return false;
}
// Wait 100ms before checking again
await new Promise(resolve => setTimeout(resolve, 100));
}
return false;
}
public async emit({ public async emit({
instanceName, instanceName,
origin, origin,
@ -398,113 +246,106 @@ export class RabbitmqController extends EventController implements EventControll
if (instanceRabbitmq?.enabled && this.amqpChannel) { if (instanceRabbitmq?.enabled && this.amqpChannel) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = instanceName ?? rabbitmqExchangeName; const exchangeName = instanceName ?? rabbitmqExchangeName;
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local');
let retry = 0;
while (retry < 3) {
try {
await this.amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
const eventName = event.replace(/_/g, '.').toLowerCase();
const queueName = `${instanceName}.${eventName}`;
await this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await this.amqpChannel.bindQueue(queueName, exchangeName, eventName);
await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-RabbitMQ`,
...message,
};
this.logger.log(logData);
}
break;
} catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
}
}
} }
} }
if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) { if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) {
const exchangeName = rabbitmqExchangeName; const exchangeName = rabbitmqExchangeName;
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'global');
}
}
private async publishMessage( let retry = 0;
exchangeName: string,
event: string,
message: any,
instanceName: string,
origin: string,
logEnabled: boolean,
type: 'local' | 'global'
): Promise<void> {
let retry = 0;
const maxRetries = 3;
while (retry < maxRetries) { while (retry < 3) {
try { try {
if (!(await this.ensureConnection())) { await this.amqpChannel.assertExchange(exchangeName, 'topic', {
throw new Error('No AMQP connection available'); durable: true,
} autoDelete: false,
});
await this.amqpChannel.assertExchange(exchangeName, 'topic', { const queueName = prefixKey
durable: true,
autoDelete: false,
});
let queueName: string;
let routingKey: string;
if (type === 'local') {
const eventName = event.replace(/_/g, '.').toLowerCase();
queueName = `${instanceName}.${eventName}`;
routingKey = eventName;
} else {
const prefixKey = configService.get<Rabbitmq>('RABBITMQ').PREFIX_KEY;
queueName = prefixKey
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
: event.replace(/_/g, '.').toLowerCase(); : event.replace(/_/g, '.').toLowerCase();
routingKey = event;
}
await this.amqpChannel.assertQueue(queueName, { await this.amqpChannel.assertQueue(queueName, {
durable: true, durable: true,
autoDelete: false, autoDelete: false,
arguments: { arguments: {
'x-queue-type': 'quorum', 'x-queue-type': 'quorum',
}, },
}); });
await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey); await this.amqpChannel.bindQueue(queueName, exchangeName, event);
const published = await new Promise<boolean>((resolve) => { await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
const success = this.amqpChannel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true },
(err) => {
if (err) {
resolve(false);
} else {
resolve(true);
}
}
);
if (!success) { if (logEnabled) {
resolve(false); const logData = {
local: `${origin}.sendData-RabbitMQ-Global`,
...message,
};
this.logger.log(logData);
} }
});
if (!published) { break;
throw new Error('Failed to publish message - channel write buffer full'); } catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
} }
if (logEnabled) {
const logData = {
local: `${origin}.sendData-RabbitMQ${type === 'global' ? '-Global' : ''}`,
...message,
};
this.logger.log(logData);
}
break; // Success, exit retry loop
} catch (error) {
this.logger.error({
local: 'RabbitmqController.publishMessage',
message: `Error publishing ${type} RabbitMQ message (attempt ${retry + 1}/${maxRetries})`,
error: error.message || error,
});
retry++;
if (retry >= maxRetries) {
this.handleConnectionLoss('publish_error', error);
throw error;
}
// Wait before retry
await new Promise(resolve => setTimeout(resolve, 1000 * retry));
} }
} }
} }
@ -560,7 +401,7 @@ export class RabbitmqController extends EventController implements EventControll
message: `Failed to initialize global queue for event ${event}`, message: `Failed to initialize global queue for event ${event}`,
error: error.message || error, error: error.message || error,
}); });
this.handleConnectionLoss('queue_init_error', error); this.handleConnectionLoss();
break; break;
} }
} }