import { PrismaRepository } from '@api/repository/repository.service'; import { WAMonitoringService } from '@api/services/monitor.service'; import { configService, Log, Rabbitmq } from '@config/env.config'; import { Logger } from '@config/logger.config'; import * as amqp from 'amqplib/callback_api'; import { EmitData, EventController, EventControllerInterface } from '../event.controller'; export class RabbitmqController extends EventController implements EventControllerInterface { public amqpChannel: amqp.Channel | null = null; private readonly logger = new Logger('RabbitmqController'); constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); } public async init(): Promise { if (!this.status) { return; } await new Promise((resolve, reject) => { const uri = configService.get('RABBITMQ').URI; const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; amqp.connect(uri, (error, connection) => { if (error) { reject(error); return; } connection.createChannel((channelError, channel) => { if (channelError) { reject(channelError); return; } const exchangeName = rabbitmqExchangeName; channel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, }); this.amqpChannel = channel; this.logger.info('AMQP initialized'); resolve(); }); }); }).then(() => { if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) this.initGlobalQueues(); }); } private set channel(channel: amqp.Channel) { this.amqpChannel = channel; } public get channel(): amqp.Channel { return this.amqpChannel; } public async emit({ instanceName, origin, event, data, serverUrl, dateTime, sender, apiKey, integration, }: EmitData): Promise { if (integration && !integration.includes('rabbitmq')) { return; } if (!this.status) { return; } const instanceRabbitmq = await this.get(instanceName); const rabbitmqLocal = instanceRabbitmq?.events; const rabbitmqGlobal = configService.get('RABBITMQ').GLOBAL_ENABLED; const rabbitmqEvents = configService.get('RABBITMQ').EVENTS; const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; const we = event.replace(/[.-]/gm, '_').toUpperCase(); const logEnabled = configService.get('LOG').LEVEL.includes('WEBHOOKS'); const message = { event, instance: instanceName, data, server_url: serverUrl, date_time: dateTime, sender, apikey: apiKey, }; if (instanceRabbitmq?.enabled && this.amqpChannel) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { const exchangeName = instanceName ?? rabbitmqExchangeName; let retry = 0; while (retry < 3) { try { await this.amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, }); const eventName = event.replace(/_/g, '.').toLowerCase(); const queueName = `${instanceName}.${eventName}`; await this.amqpChannel.assertQueue(queueName, { durable: true, autoDelete: false, arguments: { 'x-queue-type': 'quorum', }, }); await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); if (logEnabled) { const logData = { local: `${origin}.sendData-RabbitMQ`, ...message, }; this.logger.log(logData); } break; } catch (error) { retry++; } } } } if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) { const exchangeName = rabbitmqExchangeName; let retry = 0; while (retry < 3) { try { await this.amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, }); const queueName = prefixKey ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` : event.replace(/_/g, '.').toLowerCase(); await this.amqpChannel.assertQueue(queueName, { durable: true, autoDelete: false, arguments: { 'x-queue-type': 'quorum', }, }); await this.amqpChannel.bindQueue(queueName, exchangeName, event); await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); if (logEnabled) { const logData = { local: `${origin}.sendData-RabbitMQ-Global`, ...message, }; this.logger.log(logData); } break; } catch (error) { retry++; } } } } private async initGlobalQueues(): Promise { this.logger.info('Initializing global queues'); const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; const events = configService.get('RABBITMQ').EVENTS; const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; if (!events) { this.logger.warn('No events to initialize on AMQP'); return; } const eventKeys = Object.keys(events); eventKeys.forEach((event) => { if (events[event] === false) return; const queueName = prefixKey !== '' ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` : `${event.replace(/_/g, '.').toLowerCase()}`; const exchangeName = rabbitmqExchangeName; this.amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, }); this.amqpChannel.assertQueue(queueName, { durable: true, autoDelete: false, arguments: { 'x-queue-type': 'quorum', }, }); this.amqpChannel.bindQueue(queueName, exchangeName, event); }); } }