merging with develop

This commit is contained in:
pedro-php
2025-03-28 10:36:34 -03:00
50 changed files with 2129 additions and 934 deletions

View File

@@ -152,5 +152,8 @@ export class EventController {
'TYPEBOT_CHANGE_STATUS',
'REMOVE_INSTANCE',
'LOGOUT_INSTANCE',
'INSTANCE_CREATE',
'INSTANCE_DELETE',
'STATUS_INSTANCE',
];
}

View File

@@ -26,6 +26,11 @@ export class EventDto {
events?: string[];
};
nats?: {
enabled?: boolean;
events?: string[];
};
pusher?: {
enabled?: boolean;
appId?: string;
@@ -63,6 +68,11 @@ export function EventInstanceMixin<TBase extends Constructor>(Base: TBase) {
events?: string[];
};
nats?: {
enabled?: boolean;
events?: string[];
};
pusher?: {
enabled?: boolean;
appId?: string;

View File

@@ -1,3 +1,4 @@
import { NatsController } from '@api/integrations/event/nats/nats.controller';
import { PusherController } from '@api/integrations/event/pusher/pusher.controller';
import { RabbitmqController } from '@api/integrations/event/rabbitmq/rabbitmq.controller';
import { SqsController } from '@api/integrations/event/sqs/sqs.controller';
@@ -13,6 +14,7 @@ export class EventManager {
private websocketController: WebsocketController;
private webhookController: WebhookController;
private rabbitmqController: RabbitmqController;
private natsController: NatsController;
private sqsController: SqsController;
private pusherController: PusherController;
@@ -23,6 +25,7 @@ export class EventManager {
this.websocket = new WebsocketController(prismaRepository, waMonitor);
this.webhook = new WebhookController(prismaRepository, waMonitor);
this.rabbitmq = new RabbitmqController(prismaRepository, waMonitor);
this.nats = new NatsController(prismaRepository, waMonitor);
this.sqs = new SqsController(prismaRepository, waMonitor);
this.pusher = new PusherController(prismaRepository, waMonitor);
}
@@ -67,6 +70,14 @@ export class EventManager {
return this.rabbitmqController;
}
public set nats(nats: NatsController) {
this.natsController = nats;
}
public get nats() {
return this.natsController;
}
public set sqs(sqs: SqsController) {
this.sqsController = sqs;
}
@@ -85,6 +96,7 @@ export class EventManager {
public init(httpServer: Server): void {
this.websocket.init(httpServer);
this.rabbitmq.init();
this.nats.init();
this.sqs.init();
this.pusher.init();
}
@@ -103,6 +115,7 @@ export class EventManager {
}): Promise<void> {
await this.websocket.emit(eventData);
await this.rabbitmq.emit(eventData);
await this.nats.emit(eventData);
await this.sqs.emit(eventData);
await this.webhook.emit(eventData);
await this.pusher.emit(eventData);
@@ -125,6 +138,14 @@ export class EventManager {
},
});
if (data.nats)
await this.nats.set(instanceName, {
nats: {
enabled: true,
events: data.nats?.events,
},
});
if (data.sqs)
await this.sqs.set(instanceName, {
sqs: {

View File

@@ -1,3 +1,4 @@
import { NatsRouter } from '@api/integrations/event/nats/nats.router';
import { PusherRouter } from '@api/integrations/event/pusher/pusher.router';
import { RabbitmqRouter } from '@api/integrations/event/rabbitmq/rabbitmq.router';
import { SqsRouter } from '@api/integrations/event/sqs/sqs.router';
@@ -14,6 +15,7 @@ export class EventRouter {
this.router.use('/webhook', new WebhookRouter(configService, ...guards).router);
this.router.use('/websocket', new WebsocketRouter(...guards).router);
this.router.use('/rabbitmq', new RabbitmqRouter(...guards).router);
this.router.use('/nats', new NatsRouter(...guards).router);
this.router.use('/pusher', new PusherRouter(...guards).router);
this.router.use('/sqs', new SqsRouter(...guards).router);
}

View File

@@ -16,6 +16,9 @@ export const eventSchema: JSONSchema7 = {
rabbitmq: {
$ref: '#/$defs/event',
},
nats: {
$ref: '#/$defs/event',
},
sqs: {
$ref: '#/$defs/event',
},

View File

@@ -0,0 +1,161 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { configService, Log, Nats } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { connect, NatsConnection, StringCodec } from 'nats';
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
export class NatsController extends EventController implements EventControllerInterface {
public natsClient: NatsConnection | null = null;
private readonly logger = new Logger('NatsController');
private readonly sc = StringCodec();
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<Nats>('NATS')?.ENABLED, 'nats');
}
public async init(): Promise<void> {
if (!this.status) {
return;
}
try {
const uri = configService.get<Nats>('NATS').URI;
this.natsClient = await connect({ servers: uri });
this.logger.info('NATS initialized');
if (configService.get<Nats>('NATS')?.GLOBAL_ENABLED) {
await this.initGlobalSubscriptions();
}
} catch (error) {
this.logger.error('Failed to connect to NATS:');
this.logger.error(error);
throw error;
}
}
public async emit({
instanceName,
origin,
event,
data,
serverUrl,
dateTime,
sender,
apiKey,
integration,
}: EmitData): Promise<void> {
if (integration && !integration.includes('nats')) {
return;
}
if (!this.status || !this.natsClient) {
return;
}
const instanceNats = await this.get(instanceName);
const natsLocal = instanceNats?.events;
const natsGlobal = configService.get<Nats>('NATS').GLOBAL_ENABLED;
const natsEvents = configService.get<Nats>('NATS').EVENTS;
const prefixKey = configService.get<Nats>('NATS').PREFIX_KEY;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const logEnabled = configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS');
const message = {
event,
instance: instanceName,
data,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};
// Instância específica
if (instanceNats?.enabled) {
if (Array.isArray(natsLocal) && natsLocal.includes(we)) {
const subject = `${instanceName}.${event.toLowerCase()}`;
try {
this.natsClient.publish(subject, this.sc.encode(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-NATS`,
...message,
};
this.logger.log(logData);
}
} catch (error) {
this.logger.error(`Failed to publish to NATS (instance): ${error}`);
}
}
}
// Global
if (natsGlobal && natsEvents[we]) {
try {
const subject = prefixKey ? `${prefixKey}.${event.toLowerCase()}` : event.toLowerCase();
this.natsClient.publish(subject, this.sc.encode(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-NATS-Global`,
...message,
};
this.logger.log(logData);
}
} catch (error) {
this.logger.error(`Failed to publish to NATS (global): ${error}`);
}
}
}
private async initGlobalSubscriptions(): Promise<void> {
this.logger.info('Initializing global subscriptions');
const events = configService.get<Nats>('NATS').EVENTS;
const prefixKey = configService.get<Nats>('NATS').PREFIX_KEY;
if (!events) {
this.logger.warn('No events to initialize on NATS');
return;
}
const eventKeys = Object.keys(events);
for (const event of eventKeys) {
if (events[event] === false) continue;
const subject = prefixKey ? `${prefixKey}.${event.toLowerCase()}` : event.toLowerCase();
// Criar uma subscription para cada evento
try {
const subscription = this.natsClient.subscribe(subject);
this.logger.info(`Subscribed to: ${subject}`);
// Processar mensagens (exemplo básico)
(async () => {
for await (const msg of subscription) {
try {
const data = JSON.parse(this.sc.decode(msg.data));
// Aqui você pode adicionar a lógica de processamento
this.logger.debug(`Received message on ${subject}:`);
this.logger.debug(data);
} catch (error) {
this.logger.error(`Error processing message on ${subject}:`);
this.logger.error(error);
}
}
})();
} catch (error) {
this.logger.error(`Failed to subscribe to ${subject}:`);
this.logger.error(error);
}
}
}
}

View File

@@ -0,0 +1,36 @@
import { RouterBroker } from '@api/abstract/abstract.router';
import { InstanceDto } from '@api/dto/instance.dto';
import { EventDto } from '@api/integrations/event/event.dto';
import { HttpStatus } from '@api/routes/index.router';
import { eventManager } from '@api/server.module';
import { eventSchema, instanceSchema } from '@validate/validate.schema';
import { RequestHandler, Router } from 'express';
export class NatsRouter extends RouterBroker {
constructor(...guards: RequestHandler[]) {
super();
this.router
.post(this.routerPath('set'), ...guards, async (req, res) => {
const response = await this.dataValidate<EventDto>({
request: req,
schema: eventSchema,
ClassRef: EventDto,
execute: (instance, data) => eventManager.nats.set(instance.instanceName, data),
});
res.status(HttpStatus.CREATED).json(response);
})
.get(this.routerPath('find'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => eventManager.nats.get(instance.instanceName),
});
res.status(HttpStatus.OK).json(response);
});
}
public readonly router: Router = Router();
}

View File

@@ -1,10 +1,11 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { SQS } from '@aws-sdk/client-sqs';
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
import { configService, Log, Sqs } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
import { EventDto } from '../event.dto';
export class SqsController extends EventController implements EventControllerInterface {
private sqs: SQS;
@@ -45,6 +46,39 @@ export class SqsController extends EventController implements EventControllerInt
return this.sqs;
}
override async set(instanceName: string, data: EventDto): Promise<any> {
if (!this.status) {
return;
}
if (!data[this.name]?.enabled) {
data[this.name].events = [];
} else {
if (0 === data[this.name].events.length) {
data[this.name].events = EventController.events;
}
}
await this.saveQueues(instanceName, data[this.name].events, data[this.name]?.enabled);
const payload: any = {
where: {
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
update: {
enabled: data[this.name]?.enabled,
events: data[this.name].events,
},
create: {
enabled: data[this.name]?.enabled,
events: data[this.name].events,
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
};
console.log('*** payload: ', payload);
return this.prisma[this.name].upsert(payload);
}
public async emit({
instanceName,
origin,
@@ -121,70 +155,92 @@ export class SqsController extends EventController implements EventControllerInt
}
}
public async initQueues(instanceName: string, events: string[]) {
if (!events || !events.length) return;
private async saveQueues(instanceName: string, events: string[], enable: boolean) {
if (enable) {
const eventsFinded = await this.listQueuesByInstance(instanceName);
console.log('eventsFinded', eventsFinded);
const queues = events.map((event) => {
return `${event.replace(/_/g, '_').toLowerCase()}`;
});
for (const event of events) {
const normalizedEvent = event.toLowerCase();
queues.forEach((event) => {
const queueName = `${instanceName}_${event}.fifo`;
if (eventsFinded.includes(normalizedEvent)) {
this.logger.info(`A queue para o evento "${normalizedEvent}" já existe. Ignorando criação.`);
continue;
}
this.sqs.createQueue(
{
QueueName: queueName,
Attributes: {
FifoQueue: 'true',
},
},
(err, data) => {
if (err) {
this.logger.error(`Error creating queue ${queueName}: ${err.message}`);
} else {
this.logger.info(`Queue ${queueName} created: ${data.QueueUrl}`);
}
},
);
});
const queueName = `${instanceName}_${normalizedEvent}.fifo`;
try {
const createCommand = new CreateQueueCommand({
QueueName: queueName,
Attributes: {
FifoQueue: 'true',
},
});
const data = await this.sqs.send(createCommand);
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
} catch (err: any) {
this.logger.error(`Erro ao criar queue ${queueName}: ${err.message}`);
}
}
}
}
public async removeQueues(instanceName: string, events: any) {
const eventsArray = Array.isArray(events) ? events.map((event) => String(event)) : [];
if (!events || !eventsArray.length) return;
private async listQueuesByInstance(instanceName: string) {
let existingQueues: string[] = [];
try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
});
const listData = await this.sqs.send(listCommand);
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
// Extrai o nome da fila a partir da URL
existingQueues = listData.QueueUrls.map((queueUrl) => {
const parts = queueUrl.split('/');
return parts[parts.length - 1];
});
}
} catch (error: any) {
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
return;
}
const queues = eventsArray.map((event) => {
return `${event.replace(/_/g, '_').toLowerCase()}`;
});
// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
return existingQueues
.map((queueName) => {
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
}
return '';
})
.filter((event) => event !== '');
}
queues.forEach((event) => {
const queueName = `${instanceName}_${event}.fifo`;
// Para uma futura feature de exclusão forçada das queues
private async removeQueuesByInstance(instanceName: string) {
try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
});
const listData = await this.sqs.send(listCommand);
this.sqs.getQueueUrl(
{
QueueName: queueName,
},
(err, data) => {
if (err) {
this.logger.error(`Error getting queue URL for ${queueName}: ${err.message}`);
} else {
const queueUrl = data.QueueUrl;
if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
this.logger.info(`No queues found for instance ${instanceName}`);
return;
}
this.sqs.deleteQueue(
{
QueueUrl: queueUrl,
},
(deleteErr) => {
if (deleteErr) {
this.logger.error(`Error deleting queue ${queueName}: ${deleteErr.message}`);
} else {
this.logger.info(`Queue ${queueName} deleted`);
}
},
);
}
},
);
});
for (const queueUrl of listData.QueueUrls) {
try {
const deleteCommand = new DeleteQueueCommand({ QueueUrl: queueUrl });
await this.sqs.send(deleteCommand);
this.logger.info(`Queue ${queueUrl} deleted`);
} catch (err: any) {
this.logger.error(`Error deleting queue ${queueUrl}: ${err.message}`);
}
}
} catch (err: any) {
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
}
}
}

View File

@@ -6,7 +6,6 @@ import { configService, Log, Webhook } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { BadRequestException } from '@exceptions';
import axios, { AxiosInstance } from 'axios';
import { isURL } from 'class-validator';
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
@@ -18,7 +17,7 @@ export class WebhookController extends EventController implements EventControlle
}
override async set(instanceName: string, data: EventDto): Promise<wa.LocalWebHook> {
if (!isURL(data.webhook.url, { require_tld: false })) {
if (!/^(https?:\/\/)/.test(data.webhook.url)) {
throw new BadRequestException('Invalid "url" property');
}
@@ -78,6 +77,7 @@ export class WebhookController extends EventController implements EventControlle
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const transformedWe = we.replace(/_/gm, '-').toLowerCase();
const enabledLog = configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS');
const regex = /^(https?:\/\/)/;
const webhookData = {
event,
@@ -111,7 +111,7 @@ export class WebhookController extends EventController implements EventControlle
}
try {
if (instance?.enabled && isURL(instance.url, { require_tld: false })) {
if (instance?.enabled && regex.test(instance.url)) {
const httpService = axios.create({
baseURL,
headers: webhookHeaders as Record<string, string> | undefined,
@@ -155,7 +155,7 @@ export class WebhookController extends EventController implements EventControlle
}
try {
if (isURL(globalURL)) {
if (regex.test(globalURL)) {
const httpService = axios.create({ baseURL: globalURL });
await this.retryWebhookRequest(

View File

@@ -1,6 +1,6 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { configService, Cors, Log, Websocket } from '@config/env.config';
import { Auth, configService, Cors, Log, Websocket } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { Server } from 'http';
import { Server as SocketIO } from 'socket.io';
@@ -24,8 +24,40 @@ export class WebsocketController extends EventController implements EventControl
}
this.socket = new SocketIO(httpServer, {
cors: {
origin: this.cors,
cors: { origin: this.cors },
allowRequest: async (req, callback) => {
try {
const url = new URL(req.url || '', 'http://localhost');
const params = new URLSearchParams(url.search);
// Permite conexões internas do Socket.IO (EIO=4 é o Engine.IO v4)
if (params.has('EIO')) {
return callback(null, true);
}
const apiKey = params.get('apikey') || (req.headers.apikey as string);
if (!apiKey) {
this.logger.error('Connection rejected: apiKey not provided');
return callback('apiKey is required', false);
}
const instance = await this.prismaRepository.instance.findFirst({ where: { token: apiKey } });
if (!instance) {
const globalToken = configService.get<Auth>('AUTHENTICATION').API_KEY.KEY;
if (apiKey !== globalToken) {
this.logger.error('Connection rejected: invalid global token');
return callback('Invalid global token', false);
}
}
callback(null, true);
} catch (error) {
this.logger.error('Authentication error:');
this.logger.error(error);
callback('Authentication error', false);
}
},
});
@@ -101,10 +133,7 @@ export class WebsocketController extends EventController implements EventControl
this.socket.emit(event, message);
if (logEnabled) {
this.logger.log({
local: `${origin}.sendData-WebsocketGlobal`,
...message,
});
this.logger.log({ local: `${origin}.sendData-WebsocketGlobal`, ...message });
}
}
@@ -119,10 +148,7 @@ export class WebsocketController extends EventController implements EventControl
this.socket.of(`/${instanceName}`).emit(event, message);
if (logEnabled) {
this.logger.log({
local: `${origin}.sendData-Websocket`,
...message,
});
this.logger.log({ local: `${origin}.sendData-Websocket`, ...message });
}
}
} catch (err) {