mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-07-16 04:02:54 -06:00
feat(rabbitmq): implement robust connection handling with reconnection logic and error logging
This commit is contained in:
parent
e86b6463fd
commit
72fb2f408b
@ -8,7 +8,12 @@ import { EmitData, EventController, EventControllerInterface } from '../event.co
|
|||||||
|
|
||||||
export class RabbitmqController extends EventController implements EventControllerInterface {
|
export class RabbitmqController extends EventController implements EventControllerInterface {
|
||||||
public amqpChannel: amqp.Channel | null = null;
|
public amqpChannel: amqp.Channel | null = null;
|
||||||
|
private amqpConnection: amqp.Connection | null = null;
|
||||||
private readonly logger = new Logger('RabbitmqController');
|
private readonly logger = new Logger('RabbitmqController');
|
||||||
|
private reconnectAttempts = 0;
|
||||||
|
private maxReconnectAttempts = 10;
|
||||||
|
private reconnectDelay = 5000; // 5 seconds
|
||||||
|
private isReconnecting = false;
|
||||||
|
|
||||||
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
|
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
|
||||||
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
|
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
|
||||||
@ -19,7 +24,11 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await new Promise<void>((resolve, reject) => {
|
await this.connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async connect(): Promise<void> {
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
|
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
|
||||||
const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX;
|
const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX;
|
||||||
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
|
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
|
||||||
@ -33,22 +42,61 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
password: url.password || 'guest',
|
password: url.password || 'guest',
|
||||||
vhost: url.pathname.slice(1) || '/',
|
vhost: url.pathname.slice(1) || '/',
|
||||||
frameMax: frameMax,
|
frameMax: frameMax,
|
||||||
|
heartbeat: 30, // Add heartbeat of 30 seconds
|
||||||
};
|
};
|
||||||
|
|
||||||
amqp.connect(connectionOptions, (error, connection) => {
|
amqp.connect(connectionOptions, (error, connection) => {
|
||||||
if (error) {
|
if (error) {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.connect',
|
||||||
|
message: 'Failed to connect to RabbitMQ',
|
||||||
|
error: error.message || error,
|
||||||
|
});
|
||||||
reject(error);
|
reject(error);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Connection event handlers
|
||||||
|
connection.on('error', (err) => {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.connectionError',
|
||||||
|
message: 'RabbitMQ connection error',
|
||||||
|
error: err.message || err,
|
||||||
|
});
|
||||||
|
this.handleConnectionLoss();
|
||||||
|
});
|
||||||
|
|
||||||
|
connection.on('close', () => {
|
||||||
|
this.logger.warn('RabbitMQ connection closed');
|
||||||
|
this.handleConnectionLoss();
|
||||||
|
});
|
||||||
|
|
||||||
connection.createChannel((channelError, channel) => {
|
connection.createChannel((channelError, channel) => {
|
||||||
if (channelError) {
|
if (channelError) {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.createChannel',
|
||||||
|
message: 'Failed to create RabbitMQ channel',
|
||||||
|
error: channelError.message || channelError,
|
||||||
|
});
|
||||||
reject(channelError);
|
reject(channelError);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Channel event handlers
|
||||||
|
channel.on('error', (err) => {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.channelError',
|
||||||
|
message: 'RabbitMQ channel error',
|
||||||
|
error: err.message || err,
|
||||||
|
});
|
||||||
|
this.handleConnectionLoss();
|
||||||
|
});
|
||||||
|
|
||||||
|
channel.on('close', () => {
|
||||||
|
this.logger.warn('RabbitMQ channel closed');
|
||||||
|
this.handleConnectionLoss();
|
||||||
|
});
|
||||||
|
|
||||||
const exchangeName = rabbitmqExchangeName;
|
const exchangeName = rabbitmqExchangeName;
|
||||||
|
|
||||||
channel.assertExchange(exchangeName, 'topic', {
|
channel.assertExchange(exchangeName, 'topic', {
|
||||||
@ -56,16 +104,81 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.amqpConnection = connection;
|
||||||
this.amqpChannel = channel;
|
this.amqpChannel = channel;
|
||||||
|
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
|
||||||
|
this.isReconnecting = false;
|
||||||
|
|
||||||
this.logger.info('AMQP initialized');
|
this.logger.info('AMQP initialized successfully');
|
||||||
|
|
||||||
resolve();
|
resolve();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}).then(() => {
|
})
|
||||||
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) this.initGlobalQueues();
|
.then(() => {
|
||||||
});
|
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) {
|
||||||
|
this.initGlobalQueues();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch((error) => {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.init',
|
||||||
|
message: 'Failed to initialize AMQP',
|
||||||
|
error: error.message || error,
|
||||||
|
});
|
||||||
|
this.scheduleReconnect();
|
||||||
|
throw error;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleConnectionLoss(): void {
|
||||||
|
if (this.isReconnecting) {
|
||||||
|
return; // Already attempting to reconnect
|
||||||
|
}
|
||||||
|
|
||||||
|
this.amqpChannel = null;
|
||||||
|
this.amqpConnection = null;
|
||||||
|
this.scheduleReconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
private scheduleReconnect(): void {
|
||||||
|
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||||
|
this.logger.error(
|
||||||
|
`Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.isReconnecting) {
|
||||||
|
return; // Already scheduled
|
||||||
|
}
|
||||||
|
|
||||||
|
this.isReconnecting = true;
|
||||||
|
this.reconnectAttempts++;
|
||||||
|
|
||||||
|
const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); // Exponential backoff with max delay
|
||||||
|
|
||||||
|
this.logger.info(
|
||||||
|
`Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
|
||||||
|
);
|
||||||
|
|
||||||
|
setTimeout(async () => {
|
||||||
|
try {
|
||||||
|
this.logger.info(
|
||||||
|
`Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
|
||||||
|
);
|
||||||
|
await this.connect();
|
||||||
|
this.logger.info('Successfully reconnected to RabbitMQ');
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.scheduleReconnect',
|
||||||
|
message: `Reconnection attempt ${this.reconnectAttempts} failed`,
|
||||||
|
error: error.message || error,
|
||||||
|
});
|
||||||
|
this.isReconnecting = false;
|
||||||
|
this.scheduleReconnect();
|
||||||
|
}
|
||||||
|
}, delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
private set channel(channel: amqp.Channel) {
|
private set channel(channel: amqp.Channel) {
|
||||||
@ -76,6 +189,17 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
return this.amqpChannel;
|
return this.amqpChannel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async ensureConnection(): Promise<boolean> {
|
||||||
|
if (!this.amqpChannel) {
|
||||||
|
this.logger.warn('AMQP channel is not available, attempting to reconnect...');
|
||||||
|
if (!this.isReconnecting) {
|
||||||
|
this.scheduleReconnect();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
public async emit({
|
public async emit({
|
||||||
instanceName,
|
instanceName,
|
||||||
origin,
|
origin,
|
||||||
@ -95,6 +219,11 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!(await this.ensureConnection())) {
|
||||||
|
this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No AMQP connection`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const instanceRabbitmq = await this.get(instanceName);
|
const instanceRabbitmq = await this.get(instanceName);
|
||||||
const rabbitmqLocal = instanceRabbitmq?.events;
|
const rabbitmqLocal = instanceRabbitmq?.events;
|
||||||
const rabbitmqGlobal = configService.get<Rabbitmq>('RABBITMQ').GLOBAL_ENABLED;
|
const rabbitmqGlobal = configService.get<Rabbitmq>('RABBITMQ').GLOBAL_ENABLED;
|
||||||
@ -154,7 +283,15 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
|
|
||||||
break;
|
break;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.emit',
|
||||||
|
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`,
|
||||||
|
error: error.message || error,
|
||||||
|
});
|
||||||
retry++;
|
retry++;
|
||||||
|
if (retry >= 3) {
|
||||||
|
this.handleConnectionLoss();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -199,7 +336,15 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
|
|
||||||
break;
|
break;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.emit',
|
||||||
|
message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`,
|
||||||
|
error: error.message || error,
|
||||||
|
});
|
||||||
retry++;
|
retry++;
|
||||||
|
if (retry >= 3) {
|
||||||
|
this.handleConnectionLoss();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -208,41 +353,57 @@ export class RabbitmqController extends EventController implements EventControll
|
|||||||
private async initGlobalQueues(): Promise<void> {
|
private async initGlobalQueues(): Promise<void> {
|
||||||
this.logger.info('Initializing global queues');
|
this.logger.info('Initializing global queues');
|
||||||
|
|
||||||
|
if (!(await this.ensureConnection())) {
|
||||||
|
this.logger.error('Cannot initialize global queues: No AMQP connection');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
|
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
|
||||||
const events = configService.get<Rabbitmq>('RABBITMQ').EVENTS;
|
const events = configService.get<Rabbitmq>('RABBITMQ').EVENTS;
|
||||||
const prefixKey = configService.get<Rabbitmq>('RABBITMQ').PREFIX_KEY;
|
const prefixKey = configService.get<Rabbitmq>('RABBITMQ').PREFIX_KEY;
|
||||||
|
|
||||||
if (!events) {
|
if (!events) {
|
||||||
this.logger.warn('No events to initialize on AMQP');
|
this.logger.warn('No events to initialize on AMQP');
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const eventKeys = Object.keys(events);
|
const eventKeys = Object.keys(events);
|
||||||
|
|
||||||
eventKeys.forEach((event) => {
|
for (const event of eventKeys) {
|
||||||
if (events[event] === false) return;
|
if (events[event] === false) continue;
|
||||||
|
|
||||||
const queueName =
|
try {
|
||||||
prefixKey !== ''
|
const queueName =
|
||||||
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
|
prefixKey !== ''
|
||||||
: `${event.replace(/_/g, '.').toLowerCase()}`;
|
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
|
||||||
const exchangeName = rabbitmqExchangeName;
|
: `${event.replace(/_/g, '.').toLowerCase()}`;
|
||||||
|
const exchangeName = rabbitmqExchangeName;
|
||||||
|
|
||||||
this.amqpChannel.assertExchange(exchangeName, 'topic', {
|
await this.amqpChannel.assertExchange(exchangeName, 'topic', {
|
||||||
durable: true,
|
durable: true,
|
||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
this.amqpChannel.assertQueue(queueName, {
|
await this.amqpChannel.assertQueue(queueName, {
|
||||||
durable: true,
|
durable: true,
|
||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
arguments: {
|
arguments: {
|
||||||
'x-queue-type': 'quorum',
|
'x-queue-type': 'quorum',
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
this.amqpChannel.bindQueue(queueName, exchangeName, event);
|
await this.amqpChannel.bindQueue(queueName, exchangeName, event);
|
||||||
});
|
|
||||||
|
this.logger.info(`Global queue initialized: ${queueName}`);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error({
|
||||||
|
local: 'RabbitmqController.initGlobalQueues',
|
||||||
|
message: `Failed to initialize global queue for event ${event}`,
|
||||||
|
error: error.message || error,
|
||||||
|
});
|
||||||
|
this.handleConnectionLoss();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user