mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-09-07 14:49:31 -06:00
ajustes rabbitmq
This commit is contained in:
parent
2b30c273a4
commit
12761cbfce
@ -285,7 +285,7 @@ export class ConfigService {
|
|||||||
},
|
},
|
||||||
RABBITMQ: {
|
RABBITMQ: {
|
||||||
ENABLED: process.env?.RABBITMQ_ENABLED === 'true',
|
ENABLED: process.env?.RABBITMQ_ENABLED === 'true',
|
||||||
MODE: (process.env?.RABBITMQ_MODE as Rabbitmq['MODE']) || 'isolated',
|
MODE: (process.env?.RABBITMQ_MODE as 'isolated' | 'global' | 'single') || 'isolated',
|
||||||
EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange',
|
EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange',
|
||||||
URI: process.env.RABBITMQ_URI || '',
|
URI: process.env.RABBITMQ_URI || '',
|
||||||
},
|
},
|
||||||
|
@ -40,7 +40,9 @@ export const initAMQP = () => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
channel.assertExchange(rabbitConfig.EXCHANGE_NAME, 'topic', {
|
const exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
|
||||||
|
|
||||||
|
channel.assertExchange(exchangeName, 'topic', {
|
||||||
durable: true,
|
durable: true,
|
||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
});
|
});
|
||||||
@ -61,13 +63,14 @@ export const getAMQP = (): Channel | null => {
|
|||||||
export const initQueues = (instanceName: string, events: string[]) => {
|
export const initQueues = (instanceName: string, events: string[]) => {
|
||||||
if (!instanceName || !events || !events.length) return;
|
if (!instanceName || !events || !events.length) return;
|
||||||
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
|
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
|
||||||
const TWO_DAYS_IN_MS = 2 * 24 * 60 * 60 * 1000;
|
// const TWO_DAYS_IN_MS = 2 * 24 * 60 * 60 * 1000;
|
||||||
const amqp = getAMQP();
|
const amqp = getAMQP();
|
||||||
|
|
||||||
let exchangeName = rabbitConfig.EXCHANGE_NAME;
|
const rabbitMode = rabbitConfig.MODE || 'isolated';
|
||||||
|
let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
|
||||||
|
|
||||||
const receivedEvents = events.map(parseEvtName);
|
const receivedEvents = events.map(parseEvtName);
|
||||||
if (rabbitConfig.MODE === 'isolated') {
|
if (rabbitMode === 'isolated') {
|
||||||
exchangeName = instanceName ?? 'evolution_exchange';
|
exchangeName = instanceName ?? 'evolution_exchange';
|
||||||
|
|
||||||
receivedEvents.forEach((event) => {
|
receivedEvents.forEach((event) => {
|
||||||
@ -80,7 +83,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
|
|||||||
amqp.assertQueue(queueName, {
|
amqp.assertQueue(queueName, {
|
||||||
durable: true,
|
durable: true,
|
||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
messageTtl: TWO_DAYS_IN_MS,
|
// messageTtl: TWO_DAYS_IN_MS,
|
||||||
arguments: {
|
arguments: {
|
||||||
'x-queue-type': 'quorum',
|
'x-queue-type': 'quorum',
|
||||||
},
|
},
|
||||||
@ -88,7 +91,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
|
|||||||
|
|
||||||
amqp.bindQueue(queueName, exchangeName, event);
|
amqp.bindQueue(queueName, exchangeName, event);
|
||||||
});
|
});
|
||||||
} else if (rabbitConfig.MODE === 'single') {
|
} else if (rabbitMode === 'single') {
|
||||||
amqp.assertExchange(exchangeName, 'topic', {
|
amqp.assertExchange(exchangeName, 'topic', {
|
||||||
durable: true,
|
durable: true,
|
||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
@ -98,7 +101,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
|
|||||||
amqp.assertQueue(queueName, {
|
amqp.assertQueue(queueName, {
|
||||||
durable: true,
|
durable: true,
|
||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
messageTtl: TWO_DAYS_IN_MS,
|
// messageTtl: TWO_DAYS_IN_MS,
|
||||||
arguments: {
|
arguments: {
|
||||||
'x-queue-type': 'quorum',
|
'x-queue-type': 'quorum',
|
||||||
},
|
},
|
||||||
@ -107,7 +110,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
|
|||||||
receivedEvents.forEach((event) => {
|
receivedEvents.forEach((event) => {
|
||||||
amqp.bindQueue(queueName, exchangeName, event);
|
amqp.bindQueue(queueName, exchangeName, event);
|
||||||
});
|
});
|
||||||
} else if (rabbitConfig.MODE === 'global') {
|
} else if (rabbitMode === 'global') {
|
||||||
const queues = Object.keys(globalQueues);
|
const queues = Object.keys(globalQueues);
|
||||||
|
|
||||||
const addQueues = queues.filter((evt) => {
|
const addQueues = queues.filter((evt) => {
|
||||||
@ -132,7 +135,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
|
|||||||
amqp.assertQueue(queueName, {
|
amqp.assertQueue(queueName, {
|
||||||
durable: true,
|
durable: true,
|
||||||
autoDelete: false,
|
autoDelete: false,
|
||||||
messageTtl: TWO_DAYS_IN_MS,
|
// messageTtl: TWO_DAYS_IN_MS,
|
||||||
arguments: {
|
arguments: {
|
||||||
'x-queue-type': 'quorum',
|
'x-queue-type': 'quorum',
|
||||||
},
|
},
|
||||||
@ -161,11 +164,13 @@ export const initQueues = (instanceName: string, events: string[]) => {
|
|||||||
export const removeQueues = (instanceName: string, events: string[]) => {
|
export const removeQueues = (instanceName: string, events: string[]) => {
|
||||||
if (!events || !events.length) return;
|
if (!events || !events.length) return;
|
||||||
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
|
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
|
||||||
let exchangeName = rabbitConfig.EXCHANGE_NAME;
|
|
||||||
|
const rabbitMode = rabbitConfig.MODE || 'isolated';
|
||||||
|
let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
|
||||||
const amqp = getAMQP();
|
const amqp = getAMQP();
|
||||||
|
|
||||||
const receivedEvents = events.map(parseEvtName);
|
const receivedEvents = events.map(parseEvtName);
|
||||||
if (rabbitConfig.MODE === 'isolated') {
|
if (rabbitMode === 'isolated') {
|
||||||
exchangeName = instanceName;
|
exchangeName = instanceName;
|
||||||
receivedEvents.forEach((event) => {
|
receivedEvents.forEach((event) => {
|
||||||
amqp.assertExchange(exchangeName, 'topic', {
|
amqp.assertExchange(exchangeName, 'topic', {
|
||||||
@ -190,8 +195,9 @@ interface SendEventData {
|
|||||||
|
|
||||||
export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => {
|
export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => {
|
||||||
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
|
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
|
||||||
let exchangeName = rabbitConfig.EXCHANGE_NAME;
|
const rabbitMode = rabbitConfig.MODE || 'isolated';
|
||||||
if (rabbitConfig.MODE === 'isolated') exchangeName = instanceName ?? 'evolution_exchange';
|
let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
|
||||||
|
if (rabbitMode === 'isolated') exchangeName = instanceName ?? 'evolution_exchange';
|
||||||
|
|
||||||
amqpChannel.assertExchange(exchangeName, 'topic', {
|
amqpChannel.assertExchange(exchangeName, 'topic', {
|
||||||
durable: true,
|
durable: true,
|
||||||
@ -200,9 +206,9 @@ export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendE
|
|||||||
|
|
||||||
let queueName = event;
|
let queueName = event;
|
||||||
|
|
||||||
if (rabbitConfig.MODE === 'single') {
|
if (rabbitMode === 'single') {
|
||||||
queueName = 'evolution';
|
queueName = 'evolution';
|
||||||
} else if (rabbitConfig.MODE === 'global') {
|
} else if (rabbitMode === 'global') {
|
||||||
let eventName = '';
|
let eventName = '';
|
||||||
|
|
||||||
Object.keys(globalQueues).forEach((key) => {
|
Object.keys(globalQueues).forEach((key) => {
|
||||||
@ -214,7 +220,7 @@ export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendE
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
queueName = eventName;
|
queueName = eventName;
|
||||||
} else if (rabbitConfig.MODE === 'isolated') {
|
} else if (rabbitMode === 'isolated') {
|
||||||
queueName = `${instanceName}.${event}`;
|
queueName = `${instanceName}.${event}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,6 +230,8 @@ export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendE
|
|||||||
arguments: { 'x-queue-type': 'quorum' },
|
arguments: { 'x-queue-type': 'quorum' },
|
||||||
});
|
});
|
||||||
|
|
||||||
|
console.log('envia na fila: ', queueName, exchangeName, event);
|
||||||
|
|
||||||
amqpChannel.bindQueue(queueName, exchangeName, event);
|
amqpChannel.bindQueue(queueName, exchangeName, event);
|
||||||
|
|
||||||
const serverUrl = configService.get<HttpServer>('SERVER').URL;
|
const serverUrl = configService.get<HttpServer>('SERVER').URL;
|
||||||
|
@ -685,6 +685,7 @@ export class WAStartupService {
|
|||||||
|
|
||||||
if (amqp) {
|
if (amqp) {
|
||||||
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
|
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
|
||||||
|
console.log('envia na fila: ', we);
|
||||||
sendEventData({
|
sendEventData({
|
||||||
data,
|
data,
|
||||||
event,
|
event,
|
||||||
|
Loading…
Reference in New Issue
Block a user