refactor: event folder

This commit is contained in:
Davidson Gomes
2024-08-20 15:27:32 -03:00
parent 64ed0faa83
commit 7b79591e42
35 changed files with 1100 additions and 1539 deletions

View File

@@ -1,27 +1,20 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { ProxyDto } from '@api/dto/proxy.dto';
import { SettingsDto } from '@api/dto/settings.dto';
import { WebhookDto } from '@api/dto/webhook.dto';
import { ChatwootDto } from '@api/integrations/chatbot/chatwoot/dto/chatwoot.dto';
import { ChatwootService } from '@api/integrations/chatbot/chatwoot/services/chatwoot.service';
import { DifyService } from '@api/integrations/chatbot/dify/services/dify.service';
import { OpenaiService } from '@api/integrations/chatbot/openai/services/openai.service';
import { TypebotService } from '@api/integrations/chatbot/typebot/services/typebot.service';
import { RabbitmqDto } from '@api/integrations/event/rabbitmq/dto/rabbitmq.dto';
import { getAMQP, removeQueues } from '@api/integrations/event/rabbitmq/libs/amqp.server';
import { SqsDto } from '@api/integrations/event/sqs/dto/sqs.dto';
import { getSQS, removeQueues as removeQueuesSQS } from '@api/integrations/event/sqs/libs/sqs.server';
import { PrismaRepository, Query } from '@api/repository/repository.service';
import { eventController, waMonitor } from '@api/server.module';
import { Events, wa } from '@api/types/wa.types';
import { Auth, Chatwoot, ConfigService, HttpServer, Log, Rabbitmq, Sqs, Webhook } from '@config/env.config';
import { Auth, Chatwoot, ConfigService, HttpServer } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { ROOT_DIR } from '@config/path.config';
import { NotFoundException } from '@exceptions';
import { Contact, Message } from '@prisma/client';
import axios from 'axios';
import { WASocket } from 'baileys';
import { isURL } from 'class-validator';
import EventEmitter2 from 'eventemitter2';
import { join } from 'path';
import { v4 } from 'uuid';
@@ -40,10 +33,7 @@ export class ChannelStartupService {
public client: WASocket;
public readonly instance: wa.Instance = {};
public readonly localWebhook: wa.LocalWebHook = {};
public readonly localChatwoot: wa.LocalChatwoot = {};
public readonly localRabbitmq: wa.LocalRabbitmq = {};
public readonly localSqs: wa.LocalSqs = {};
public readonly localProxy: wa.LocalProxy = {};
public readonly localSettings: wa.LocalSettings = {};
public readonly storePath = join(ROOT_DIR, 'store');
@@ -215,73 +205,6 @@ export class ChannelStartupService {
};
}
public async loadWebhook() {
const data = await this.prismaRepository.webhook.findUnique({
where: {
instanceId: this.instanceId,
},
});
this.localWebhook.url = data?.url;
this.localWebhook.enabled = data?.enabled;
this.localWebhook.events = data?.events;
this.localWebhook.webhookByEvents = data?.webhookByEvents;
this.localWebhook.webhookBase64 = data?.webhookBase64;
}
public async setWebhook(data: WebhookDto) {
const findWebhook = await this.prismaRepository.webhook.findUnique({
where: {
instanceId: this.instanceId,
},
});
if (findWebhook) {
await this.prismaRepository.webhook.update({
where: {
instanceId: this.instanceId,
},
data: {
url: data.url,
enabled: data.enabled,
events: data.events,
webhookByEvents: data.webhookByEvents,
webhookBase64: data.webhookBase64,
},
});
Object.assign(this.localWebhook, data);
return;
}
await this.prismaRepository.webhook.create({
data: {
url: data.url,
enabled: data.enabled,
events: data.events,
webhookByEvents: data.webhookByEvents,
webhookBase64: data.webhookBase64,
instanceId: this.instanceId,
},
});
Object.assign(this.localWebhook, data);
return;
}
public async findWebhook() {
const data = await this.prismaRepository.webhook.findUnique({
where: {
instanceId: this.instanceId,
},
});
if (!data) {
throw new NotFoundException('Webhook not found');
}
return data;
}
public async loadChatwoot() {
if (!this.configService.get<Chatwoot>('CHATWOOT').ENABLED) {
return;
@@ -422,136 +345,6 @@ export class ChannelStartupService {
}
}
public async loadRabbitmq() {
const data = await this.prismaRepository.rabbitmq.findUnique({
where: {
instanceId: this.instanceId,
},
});
this.localRabbitmq.enabled = data?.enabled;
this.localRabbitmq.events = data?.events;
}
public async setRabbitmq(data: RabbitmqDto) {
const findRabbitmq = await this.prismaRepository.rabbitmq.findUnique({
where: {
instanceId: this.instanceId,
},
});
if (findRabbitmq) {
await this.prismaRepository.rabbitmq.update({
where: {
instanceId: this.instanceId,
},
data: {
enabled: data.enabled,
events: data.events,
},
});
Object.assign(this.localRabbitmq, data);
return;
}
await this.prismaRepository.rabbitmq.create({
data: {
enabled: data.enabled,
events: data.events,
instanceId: this.instanceId,
},
});
Object.assign(this.localRabbitmq, data);
return;
}
public async findRabbitmq() {
const data = await this.prismaRepository.rabbitmq.findUnique({
where: {
instanceId: this.instanceId,
},
});
if (!data) {
throw new NotFoundException('Rabbitmq not found');
}
return data;
}
public async removeRabbitmqQueues() {
if (this.localRabbitmq.enabled) {
removeQueues(this.instanceName, this.localRabbitmq.events);
}
}
public async loadSqs() {
const data = await this.prismaRepository.sqs.findUnique({
where: {
instanceId: this.instanceId,
},
});
this.localSqs.enabled = data?.enabled;
this.localSqs.events = data?.events;
}
public async setSqs(data: SqsDto) {
const findSqs = await this.prismaRepository.sqs.findUnique({
where: {
instanceId: this.instanceId,
},
});
if (findSqs) {
await this.prismaRepository.sqs.update({
where: {
instanceId: this.instanceId,
},
data: {
enabled: data.enabled,
events: data.events,
},
});
Object.assign(this.localSqs, data);
return;
}
await this.prismaRepository.sqs.create({
data: {
enabled: data.enabled,
events: data.events,
instanceId: this.instanceId,
},
});
Object.assign(this.localSqs, data);
return;
}
public async findSqs() {
const data = await this.prismaRepository.sqs.findUnique({
where: {
instanceId: this.instanceId,
},
});
if (!data) {
throw new NotFoundException('Sqs not found');
}
return data;
}
public async removeSqsQueues() {
if (this.localSqs.enabled) {
removeQueuesSQS(this.instanceName, this.localSqs.events);
}
}
public async loadProxy() {
const data = await this.prismaRepository.proxy.findUnique({
where: {
@@ -598,16 +391,7 @@ export class ChannelStartupService {
}
public async sendDataWebhook<T = any>(event: Events, data: T, local = true) {
const webhookGlobal = this.configService.get<Webhook>('WEBHOOK');
const webhookLocal = this.localWebhook.events;
const rabbitmqLocal = this.localRabbitmq.events;
const sqsLocal = this.localSqs.events;
const serverUrl = this.configService.get<HttpServer>('SERVER').URL;
const rabbitmqEnabled = this.configService.get<Rabbitmq>('RABBITMQ').ENABLED;
const rabbitmqGlobal = this.configService.get<Rabbitmq>('RABBITMQ').GLOBAL_ENABLED;
const rabbitmqEvents = this.configService.get<Rabbitmq>('RABBITMQ').EVENTS;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const transformedWe = we.replace(/_/gm, '-').toLowerCase();
const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds
const localISOTime = new Date(Date.now() - tzoffset).toISOString();
const now = localISOTime;
@@ -616,360 +400,17 @@ export class ChannelStartupService {
const instanceApikey = this.token || 'Apikey not found';
if (rabbitmqEnabled) {
const amqp = getAMQP();
if (this.localRabbitmq.enabled && amqp) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = this.instanceName ?? 'evolution_exchange';
let retry = 0;
while (retry < 3) {
try {
await amqp.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
const eventName = event.replace(/_/g, '.').toLowerCase();
const queueName = `${this.instanceName}.${eventName}`;
await amqp.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await amqp.bindQueue(queueName, exchangeName, eventName);
const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}
await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-RabbitMQ',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}
this.logger.log(logData);
}
break;
} catch (error) {
retry++;
}
}
}
}
if (rabbitmqGlobal && rabbitmqEvents[we] && amqp) {
const exchangeName = 'evolution_exchange';
let retry = 0;
while (retry < 3) {
try {
await amqp.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
const queueName = event;
await amqp.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await amqp.bindQueue(queueName, exchangeName, event);
const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}
await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-RabbitMQ-Global',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}
this.logger.log(logData);
}
break;
} catch (error) {
retry++;
}
}
}
}
if (this.localSqs.enabled) {
const sqs = getSQS();
if (sqs) {
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
const queueName = `${this.instanceName}_${eventFormatted}.fifo`;
const sqsConfig = this.configService.get<Sqs>('SQS');
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}
const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: 'evolution',
MessageDeduplicationId: `${this.instanceName}_${eventFormatted}_${Date.now()}`,
QueueUrl: sqsUrl,
};
sqs.sendMessage(params, (err, data) => {
if (err) {
this.logger.error({
local: ChannelStartupService.name + '.sendData-SQS',
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else {
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-SQS',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}
this.logger.log(logData);
}
}
});
}
}
}
await eventController.emit({
instanceName: this.instance.name,
origin: ChannelStartupService.name,
event,
data: {
...data,
sender: this.wuid,
apikey: (expose && instanceApikey) || null,
},
data,
serverUrl,
dateTime: now,
sender: this.wuid,
apiKey: expose && instanceApikey ? instanceApikey : null,
local,
});
const globalApiKey = this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY;
if (local) {
if (Array.isArray(webhookLocal) && webhookLocal.includes(we)) {
let baseURL: string;
if (this.localWebhook.webhookByEvents) {
baseURL = `${this.localWebhook.url}/${transformedWe}`;
} else {
baseURL = this.localWebhook.url;
}
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendDataWebhook-local',
url: baseURL,
event,
instance: this.instance.name,
data,
destination: this.localWebhook.url,
date_time: now,
sender: this.wuid,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
};
if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}
this.logger.log(logData);
}
try {
if (this.localWebhook.enabled && isURL(this.localWebhook.url, { require_tld: false })) {
const httpService = axios.create({ baseURL });
const postData = {
event,
instance: this.instance.name,
data,
destination: this.localWebhook.url,
date_time: now,
sender: this.wuid,
server_url: serverUrl,
};
if (expose && instanceApikey) {
postData['apikey'] = instanceApikey;
}
await httpService.post('', postData);
}
} catch (error) {
this.logger.error({
local: ChannelStartupService.name + '.sendDataWebhook-local',
message: error?.message,
hostName: error?.hostname,
syscall: error?.syscall,
code: error?.code,
error: error?.errno,
stack: error?.stack,
name: error?.name,
url: baseURL,
server_url: serverUrl,
});
}
}
}
if (webhookGlobal.GLOBAL?.ENABLED) {
if (webhookGlobal.EVENTS[we]) {
const globalWebhook = this.configService.get<Webhook>('WEBHOOK').GLOBAL;
let globalURL;
if (webhookGlobal.GLOBAL.WEBHOOK_BY_EVENTS) {
globalURL = `${globalWebhook.URL}/${transformedWe}`;
} else {
globalURL = globalWebhook.URL;
}
const localUrl = this.localWebhook.url;
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendDataWebhook-global',
url: globalURL,
event,
instance: this.instance.name,
data,
destination: localUrl,
date_time: now,
sender: this.wuid,
server_url: serverUrl,
};
if (expose && globalApiKey) {
logData['apikey'] = globalApiKey;
}
this.logger.log(logData);
}
try {
if (globalWebhook && globalWebhook?.ENABLED && isURL(globalURL)) {
const httpService = axios.create({ baseURL: globalURL });
const postData = {
event,
instance: this.instance.name,
data,
destination: localUrl,
date_time: now,
sender: this.wuid,
server_url: serverUrl,
};
if (expose && globalApiKey) {
postData['apikey'] = globalApiKey;
}
await httpService.post('', postData);
}
} catch (error) {
this.logger.error({
local: ChannelStartupService.name + '.sendDataWebhook-global',
message: error?.message,
hostName: error?.hostname,
syscall: error?.syscall,
code: error?.code,
error: error?.errno,
stack: error?.stack,
name: error?.name,
url: globalURL,
server_url: serverUrl,
});
}
}
}
}
// Check if the number is MX or AR

View File

@@ -654,11 +654,8 @@ export class BaileysStartupService extends ChannelStartupService {
public async connectToWhatsapp(number?: string): Promise<WASocket> {
try {
this.loadWebhook();
this.loadChatwoot();
this.loadSettings();
this.loadRabbitmq();
this.loadSqs();
this.loadProxy();
return await this.createClient(number);
@@ -1184,7 +1181,7 @@ export class BaileysStartupService extends ChannelStartupService {
}
}
if (isMedia && !this.configService.get<S3>('S3').ENABLE && this.localWebhook.webhookBase64 === true) {
if (isMedia && !this.configService.get<S3>('S3').ENABLE) {
const buffer = await downloadMediaMessage(
{ key: received.key, message: received?.message },
'buffer',

View File

@@ -129,10 +129,7 @@ export class BusinessStartupService extends ChannelStartupService {
const content = data.entry[0].changes[0].value;
try {
this.loadWebhook();
this.loadChatwoot();
this.loadRabbitmq();
this.loadSqs();
this.eventHandler(content);

View File

@@ -1,8 +1,7 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { ProviderFiles } from '@api/provider/sessions';
import { PrismaRepository } from '@api/repository/repository.service';
import { websocketController } from '@api/server.module';
import { Integration } from '@api/types/wa.types';
import { Events, Integration } from '@api/types/wa.types';
import { CacheConf, Chatwoot, ConfigService, Database, DelInstance, ProviderSession } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { INSTANCE_DIR, STORE_DIR } from '@config/path.config';
@@ -52,10 +51,8 @@ export class WAMonitoringService {
this.waInstances[instance]?.client?.ws?.close();
this.waInstances[instance]?.client?.end(undefined);
}
this.waInstances[instance]?.removeRabbitmqQueues();
this.eventEmitter.emit('remove.instance', instance, 'inner');
} else {
this.waInstances[instance]?.removeRabbitmqQueues();
this.eventEmitter.emit('remove.instance', instance, 'inner');
}
}
@@ -340,12 +337,7 @@ export class WAMonitoringService {
private removeInstance() {
this.eventEmitter.on('remove.instance', async (instanceName: string) => {
try {
await websocketController.emit({
instanceName,
origin: WAMonitoringService.name,
event: 'remove.instance',
data: null,
});
await this.waInstances[instanceName]?.sendDataWebhook(Events.REMOVE_INSTANCE, null);
this.cleaningUp(instanceName);
this.cleaningStoreData(instanceName);
@@ -361,12 +353,7 @@ export class WAMonitoringService {
});
this.eventEmitter.on('logout.instance', async (instanceName: string) => {
try {
await websocketController.emit({
instanceName,
origin: WAMonitoringService.name,
event: 'logout.instance',
data: null,
});
await this.waInstances[instanceName]?.sendDataWebhook(Events.LOGOUT_INSTANCE, null);
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED) {
this.waInstances[instanceName]?.clearCacheChatwoot();

View File

@@ -1,82 +0,0 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { WebhookDto } from '@api/dto/webhook.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { Logger } from '@config/logger.config';
import { Webhook } from '@prisma/client';
import axios from 'axios';
import { WAMonitoringService } from './monitor.service';
export class WebhookService {
constructor(private readonly waMonitor: WAMonitoringService, public readonly prismaRepository: PrismaRepository) {}
private readonly logger = new Logger('WebhookService');
public create(instance: InstanceDto, data: WebhookDto) {
this.waMonitor.waInstances[instance.instanceName].setWebhook(data);
return { webhook: { ...instance, webhook: data } };
}
public async find(instance: InstanceDto): Promise<Webhook> {
try {
const result = await this.waMonitor.waInstances[instance.instanceName].findWebhook();
if (Object.keys(result).length === 0) {
throw new Error('Webhook not found');
}
return result;
} catch (error) {
return null;
}
}
public async receiveWebhook(data: any) {
if (data.object === 'whatsapp_business_account') {
if (data.entry[0]?.changes[0]?.field === 'message_template_status_update') {
const template = await this.prismaRepository.template.findFirst({
where: { templateId: `${data.entry[0].changes[0].value.message_template_id}` },
});
if (!template) {
console.log('template not found');
return;
}
const { webhookUrl } = template;
await axios.post(webhookUrl, data.entry[0].changes[0].value, {
headers: {
'Content-Type': 'application/json',
},
});
return;
}
data.entry?.forEach(async (entry: any) => {
const numberId = entry.changes[0].value.metadata.phone_number_id;
if (!numberId) {
this.logger.error('WebhookService -> receiveWebhook -> numberId not found');
return;
}
const instance = await this.prismaRepository.instance.findFirst({
where: { number: numberId },
});
if (!instance) {
this.logger.error('WebhookService -> receiveWebhook -> instance not found');
return;
}
await this.waMonitor.waInstances[instance.name].connectToWhatsapp(data);
return;
});
}
return;
}
}