Merge pull request #789 from stenioanibal/v2.0.0

V2.0.0 - Refactor websocket structure
This commit is contained in:
Davidson Gomes
2024-08-20 11:19:36 -03:00
committed by GitHub
14 changed files with 251 additions and 270 deletions

View File

@@ -11,12 +11,10 @@ import { getAMQP, removeQueues } from '@api/integrations/rabbitmq/libs/amqp.serv
import { SqsDto } from '@api/integrations/sqs/dto/sqs.dto';
import { getSQS, removeQueues as removeQueuesSQS } from '@api/integrations/sqs/libs/sqs.server';
import { TypebotService } from '@api/integrations/typebot/services/typebot.service';
import { WebsocketDto } from '@api/integrations/websocket/dto/websocket.dto';
import { getIO } from '@api/integrations/websocket/libs/socket.server';
import { PrismaRepository, Query } from '@api/repository/repository.service';
import { waMonitor } from '@api/server.module';
import { waMonitor, websocketController } from '@api/server.module';
import { Events, wa } from '@api/types/wa.types';
import { Auth, Chatwoot, ConfigService, HttpServer, Log, Rabbitmq, Sqs, Webhook, Websocket } from '@config/env.config';
import { Auth, Chatwoot, ConfigService, HttpServer, Log, Rabbitmq, Sqs, Webhook } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { ROOT_DIR } from '@config/path.config';
import { NotFoundException } from '@exceptions';
@@ -44,7 +42,6 @@ export class ChannelStartupService {
public readonly instance: wa.Instance = {};
public readonly localWebhook: wa.LocalWebHook = {};
public readonly localChatwoot: wa.LocalChatwoot = {};
public readonly localWebsocket: wa.LocalWebsocket = {};
public readonly localRabbitmq: wa.LocalRabbitmq = {};
public readonly localSqs: wa.LocalSqs = {};
public readonly localProxy: wa.LocalProxy = {};
@@ -425,43 +422,6 @@ export class ChannelStartupService {
}
}
public async loadWebsocket() {
const data = await this.prismaRepository.websocket.findUnique({
where: {
instanceId: this.instanceId,
},
});
this.localWebsocket.enabled = data?.enabled;
this.localWebsocket.events = data?.events;
}
public async setWebsocket(data: WebsocketDto) {
await this.prismaRepository.websocket.create({
data: {
enabled: data.enabled,
events: data.events,
instanceId: this.instanceId,
},
});
Object.assign(this.localWebsocket, data);
}
public async findWebsocket() {
const data = await this.prismaRepository.websocket.findUnique({
where: {
instanceId: this.instanceId,
},
});
if (!data) {
throw new NotFoundException('Websocket not found');
}
return data;
}
public async loadRabbitmq() {
const data = await this.prismaRepository.rabbitmq.findUnique({
where: {
@@ -640,7 +600,6 @@ export class ChannelStartupService {
public async sendDataWebhook<T = any>(event: Events, data: T, local = true) {
const webhookGlobal = this.configService.get<Webhook>('WEBHOOK');
const webhookLocal = this.localWebhook.events;
const websocketLocal = this.localWebsocket.events;
const rabbitmqLocal = this.localRabbitmq.events;
const sqsLocal = this.localSqs.events;
const serverUrl = this.configService.get<HttpServer>('SERVER').URL;
@@ -862,72 +821,16 @@ export class ChannelStartupService {
}
}
if (this.configService.get<Websocket>('WEBSOCKET')?.ENABLED) {
const io = getIO();
const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
date_time: now,
await websocketController.emit({
instanceName: this.instance.name,
origin: ChannelStartupService.name,
event,
data: {
...data,
sender: this.wuid,
};
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}
if (this.configService.get<Websocket>('WEBSOCKET')?.GLOBAL_EVENTS) {
io.emit(event, message);
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-WebsocketGlobal',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}
this.logger.log(logData);
}
}
if (this.localWebsocket.enabled && Array.isArray(websocketLocal) && websocketLocal.includes(we)) {
io.of(`/${this.instance.name}`).emit(event, message);
if (this.configService.get<Websocket>('WEBSOCKET')?.GLOBAL_EVENTS) {
io.emit(event, message);
}
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-Websocket',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}
this.logger.log(logData);
}
}
}
apikey: (expose && instanceApikey) || null,
},
});
const globalApiKey = this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY;

View File

@@ -657,7 +657,6 @@ export class BaileysStartupService extends ChannelStartupService {
this.loadWebhook();
this.loadChatwoot();
this.loadSettings();
this.loadWebsocket();
this.loadRabbitmq();
this.loadSqs();
this.loadProxy();

View File

@@ -131,7 +131,6 @@ export class BusinessStartupService extends ChannelStartupService {
try {
this.loadWebhook();
this.loadChatwoot();
this.loadWebsocket();
this.loadRabbitmq();
this.loadSqs();

View File

@@ -1,6 +1,7 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { ProviderFiles } from '@api/provider/sessions';
import { PrismaRepository } from '@api/repository/repository.service';
import { websocketController } from '@api/server.module';
import { Integration } from '@api/types/wa.types';
import { CacheConf, Chatwoot, ConfigService, Database, DelInstance, ProviderSession } from '@config/env.config';
import { Logger } from '@config/logger.config';
@@ -52,10 +53,9 @@ export class WAMonitoringService {
this.waInstances[instance]?.client?.end(undefined);
}
this.waInstances[instance]?.removeRabbitmqQueues();
delete this.waInstances[instance];
this.eventEmitter.emit('remove.instance', instance, 'inner');
} else {
this.waInstances[instance]?.removeRabbitmqQueues();
delete this.waInstances[instance];
this.eventEmitter.emit('remove.instance', instance, 'inner');
}
}
@@ -340,6 +340,13 @@ export class WAMonitoringService {
private removeInstance() {
this.eventEmitter.on('remove.instance', async (instanceName: string) => {
try {
await websocketController.emit({
instanceName,
origin: WAMonitoringService.name,
event: 'remove.instance',
data: null,
});
this.cleaningUp(instanceName);
this.cleaningStoreData(instanceName);
} finally {
@@ -354,7 +361,17 @@ export class WAMonitoringService {
});
this.eventEmitter.on('logout.instance', async (instanceName: string) => {
try {
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED) this.waInstances[instanceName]?.clearCacheChatwoot();
await websocketController.emit({
instanceName,
origin: WAMonitoringService.name,
event: 'logout.instance',
data: null,
});
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED) {
this.waInstances[instanceName]?.clearCacheChatwoot();
}
this.cleaningUp(instanceName);
} finally {
this.logger.warn(`Instance "${instanceName}" - LOGOUT`);