feat: Add NATS integration support to the event system

- Added NATS package to dependencies
- Created Prisma schema models for NATS configuration
- Implemented NATS controller, router, and event management
- Updated instance controller and event manager to support NATS
- Added NATS configuration options in environment configuration
- Included NATS events in instance validation schema
This commit is contained in:
Davidson Gomes 2025-02-05 17:05:29 -03:00
parent 9a72b90ab2
commit d665474404
14 changed files with 388 additions and 9 deletions

23
package-lock.json generated
View File

@ -42,6 +42,7 @@
"mime-types": "^2.1.35",
"minio": "^8.0.3",
"multer": "^1.4.5-lts.1",
"nats": "^2.29.1",
"node-cache": "^5.1.2",
"node-cron": "^3.0.3",
"openai": "^4.77.3",
@ -8722,6 +8723,17 @@
"resolved": "https://registry.npmjs.org/napi-build-utils/-/napi-build-utils-2.0.0.tgz",
"integrity": "sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA=="
},
"node_modules/nats": {
"version": "2.29.1",
"resolved": "https://registry.npmjs.org/nats/-/nats-2.29.1.tgz",
"integrity": "sha512-OHVsxrQCITTdMKG3So0jhtnBd5jS2u1xpS91UCws7VklsaCbctwg5vT/8lYpVldPW0x3aHGF8uuAoMfCoJy7Sg==",
"dependencies": {
"nkeys.js": "1.1.0"
},
"engines": {
"node": ">= 14.0.0"
}
},
"node_modules/natural-compare": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz",
@ -8735,6 +8747,17 @@
"node": ">= 0.6"
}
},
"node_modules/nkeys.js": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/nkeys.js/-/nkeys.js-1.1.0.tgz",
"integrity": "sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg==",
"dependencies": {
"tweetnacl": "1.0.3"
},
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/node-abi": {
"version": "3.73.0",
"resolved": "https://registry.npmjs.org/node-abi/-/node-abi-3.73.0.tgz",

View File

@ -82,6 +82,7 @@
"mime-types": "^2.1.35",
"minio": "^8.0.3",
"multer": "^1.4.5-lts.1",
"nats": "^2.29.1",
"node-cache": "^5.1.2",
"node-cron": "^3.0.3",
"openai": "^4.77.3",

View File

@ -86,6 +86,7 @@ model Instance {
Proxy Proxy?
Setting Setting?
Rabbitmq Rabbitmq?
Nats Nats?
Sqs Sqs?
Websocket Websocket?
Typebot Typebot[]
@ -116,18 +117,19 @@ model Session {
}
model Chat {
id String @id @default(cuid())
remoteJid String @db.VarChar(100)
name String? @db.VarChar(100)
labels Json? @db.Json
createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp
updatedAt DateTime? @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String
id String @id @default(cuid())
remoteJid String @db.VarChar(100)
name String? @db.VarChar(100)
labels Json? @db.Json
createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp
updatedAt DateTime? @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String
unreadMessages Int @default(0)
@@unique([instanceId, remoteJid])
@@index([instanceId])
@@index([remoteJid])
@@unique([instanceId, remoteJid])
}
model Contact {
@ -170,6 +172,7 @@ model Message {
sessionId String?
session IntegrationSession? @relation(fields: [sessionId], references: [id])
@@index([instanceId])
}
@ -185,6 +188,7 @@ model MessageUpdate {
messageId String
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String
@@index([instanceId])
@@index([messageId])
}
@ -201,6 +205,7 @@ model Webhook {
updatedAt DateTime @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String @unique
@@index([instanceId])
}
@ -269,6 +274,7 @@ model Setting {
updatedAt DateTime @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String @unique
@@index([instanceId])
}
@ -282,6 +288,16 @@ model Rabbitmq {
instanceId String @unique
}
model Nats {
id String @id @default(cuid())
enabled Boolean @default(false)
events Json @db.Json
createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp
updatedAt DateTime @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String @unique
}
model Sqs {
id String @id @default(cuid())
enabled Boolean @default(false)

View File

@ -86,6 +86,7 @@ model Instance {
Proxy Proxy?
Setting Setting?
Rabbitmq Rabbitmq?
Nats Nats?
Sqs Sqs?
Websocket Websocket?
Typebot Typebot[]
@ -125,6 +126,7 @@ model Chat {
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String
unreadMessages Int @default(0)
@@index([instanceId])
@@index([remoteJid])
}
@ -168,6 +170,7 @@ model Message {
sessionId String?
session IntegrationSession? @relation(fields: [sessionId], references: [id])
@@index([instanceId])
}
@ -183,6 +186,7 @@ model MessageUpdate {
messageId String
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String
@@index([instanceId])
@@index([messageId])
}
@ -199,6 +203,7 @@ model Webhook {
updatedAt DateTime @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String @unique
@@index([instanceId])
}
@ -269,6 +274,7 @@ model Setting {
updatedAt DateTime @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String @unique
@@index([instanceId])
}
@ -282,6 +288,16 @@ model Rabbitmq {
instanceId String @unique
}
model Nats {
id String @id @default(cuid())
enabled Boolean @default(false) @db.Boolean
events Json @db.JsonB
createdAt DateTime? @default(now()) @db.Timestamp
updatedAt DateTime @updatedAt @db.Timestamp
Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade)
instanceId String @unique
}
model Sqs {
id String @id @default(cuid())
enabled Boolean @default(false) @db.Boolean

View File

@ -170,6 +170,9 @@ export class InstanceController {
rabbitmq: {
enabled: instanceData?.rabbitmq?.enabled,
},
nats: {
enabled: instanceData?.nats?.enabled,
},
sqs: {
enabled: instanceData?.sqs?.enabled,
},
@ -258,6 +261,9 @@ export class InstanceController {
rabbitmq: {
enabled: instanceData?.rabbitmq?.enabled,
},
nats: {
enabled: instanceData?.nats?.enabled,
},
sqs: {
enabled: instanceData?.sqs?.enabled,
},

View File

@ -26,6 +26,11 @@ export class EventDto {
events?: string[];
};
nats?: {
enabled?: boolean;
events?: string[];
};
pusher?: {
enabled?: boolean;
appId?: string;
@ -63,6 +68,11 @@ export function EventInstanceMixin<TBase extends Constructor>(Base: TBase) {
events?: string[];
};
nats?: {
enabled?: boolean;
events?: string[];
};
pusher?: {
enabled?: boolean;
appId?: string;

View File

@ -1,3 +1,4 @@
import { NatsController } from '@api/integrations/event/nats/nats.controller';
import { PusherController } from '@api/integrations/event/pusher/pusher.controller';
import { RabbitmqController } from '@api/integrations/event/rabbitmq/rabbitmq.controller';
import { SqsController } from '@api/integrations/event/sqs/sqs.controller';
@ -13,6 +14,7 @@ export class EventManager {
private websocketController: WebsocketController;
private webhookController: WebhookController;
private rabbitmqController: RabbitmqController;
private natsController: NatsController;
private sqsController: SqsController;
private pusherController: PusherController;
@ -23,6 +25,7 @@ export class EventManager {
this.websocket = new WebsocketController(prismaRepository, waMonitor);
this.webhook = new WebhookController(prismaRepository, waMonitor);
this.rabbitmq = new RabbitmqController(prismaRepository, waMonitor);
this.nats = new NatsController(prismaRepository, waMonitor);
this.sqs = new SqsController(prismaRepository, waMonitor);
this.pusher = new PusherController(prismaRepository, waMonitor);
}
@ -67,6 +70,14 @@ export class EventManager {
return this.rabbitmqController;
}
public set nats(nats: NatsController) {
this.natsController = nats;
}
public get nats() {
return this.natsController;
}
public set sqs(sqs: SqsController) {
this.sqsController = sqs;
}
@ -85,6 +96,7 @@ export class EventManager {
public init(httpServer: Server): void {
this.websocket.init(httpServer);
this.rabbitmq.init();
this.nats.init();
this.sqs.init();
this.pusher.init();
}
@ -103,6 +115,7 @@ export class EventManager {
}): Promise<void> {
await this.websocket.emit(eventData);
await this.rabbitmq.emit(eventData);
await this.nats.emit(eventData);
await this.sqs.emit(eventData);
await this.webhook.emit(eventData);
await this.pusher.emit(eventData);
@ -125,6 +138,14 @@ export class EventManager {
},
});
if (data.nats)
await this.nats.set(instanceName, {
nats: {
enabled: true,
events: data.nats?.events,
},
});
if (data.sqs)
await this.sqs.set(instanceName, {
sqs: {

View File

@ -1,3 +1,4 @@
import { NatsRouter } from '@api/integrations/event/nats/nats.router';
import { PusherRouter } from '@api/integrations/event/pusher/pusher.router';
import { RabbitmqRouter } from '@api/integrations/event/rabbitmq/rabbitmq.router';
import { SqsRouter } from '@api/integrations/event/sqs/sqs.router';
@ -14,6 +15,7 @@ export class EventRouter {
this.router.use('/webhook', new WebhookRouter(configService, ...guards).router);
this.router.use('/websocket', new WebsocketRouter(...guards).router);
this.router.use('/rabbitmq', new RabbitmqRouter(...guards).router);
this.router.use('/nats', new NatsRouter(...guards).router);
this.router.use('/pusher', new PusherRouter(...guards).router);
this.router.use('/sqs', new SqsRouter(...guards).router);
}

View File

@ -16,6 +16,9 @@ export const eventSchema: JSONSchema7 = {
rabbitmq: {
$ref: '#/$defs/event',
},
nats: {
$ref: '#/$defs/event',
},
sqs: {
$ref: '#/$defs/event',
},

View File

@ -0,0 +1,161 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { configService, Log, Nats } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { connect, NatsConnection, StringCodec } from 'nats';
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
export class NatsController extends EventController implements EventControllerInterface {
public natsClient: NatsConnection | null = null;
private readonly logger = new Logger('NatsController');
private readonly sc = StringCodec();
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<Nats>('NATS')?.ENABLED, 'nats');
}
public async init(): Promise<void> {
if (!this.status) {
return;
}
try {
const uri = configService.get<Nats>('NATS').URI;
this.natsClient = await connect({ servers: uri });
this.logger.info('NATS initialized');
if (configService.get<Nats>('NATS')?.GLOBAL_ENABLED) {
await this.initGlobalSubscriptions();
}
} catch (error) {
this.logger.error('Failed to connect to NATS:');
this.logger.error(error);
throw error;
}
}
public async emit({
instanceName,
origin,
event,
data,
serverUrl,
dateTime,
sender,
apiKey,
integration,
}: EmitData): Promise<void> {
if (integration && !integration.includes('nats')) {
return;
}
if (!this.status || !this.natsClient) {
return;
}
const instanceNats = await this.get(instanceName);
const natsLocal = instanceNats?.events;
const natsGlobal = configService.get<Nats>('NATS').GLOBAL_ENABLED;
const natsEvents = configService.get<Nats>('NATS').EVENTS;
const prefixKey = configService.get<Nats>('NATS').PREFIX_KEY;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const logEnabled = configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS');
const message = {
event,
instance: instanceName,
data,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};
// Instância específica
if (instanceNats?.enabled) {
if (Array.isArray(natsLocal) && natsLocal.includes(we)) {
const subject = `${instanceName}.${event.toLowerCase()}`;
try {
this.natsClient.publish(subject, this.sc.encode(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-NATS`,
...message,
};
this.logger.log(logData);
}
} catch (error) {
this.logger.error(`Failed to publish to NATS (instance): ${error}`);
}
}
}
// Global
if (natsGlobal && natsEvents[we]) {
try {
const subject = prefixKey ? `${prefixKey}.${event.toLowerCase()}` : event.toLowerCase();
this.natsClient.publish(subject, this.sc.encode(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-NATS-Global`,
...message,
};
this.logger.log(logData);
}
} catch (error) {
this.logger.error(`Failed to publish to NATS (global): ${error}`);
}
}
}
private async initGlobalSubscriptions(): Promise<void> {
this.logger.info('Initializing global subscriptions');
const events = configService.get<Nats>('NATS').EVENTS;
const prefixKey = configService.get<Nats>('NATS').PREFIX_KEY;
if (!events) {
this.logger.warn('No events to initialize on NATS');
return;
}
const eventKeys = Object.keys(events);
for (const event of eventKeys) {
if (events[event] === false) continue;
const subject = prefixKey ? `${prefixKey}.${event.toLowerCase()}` : event.toLowerCase();
// Criar uma subscription para cada evento
try {
const subscription = this.natsClient.subscribe(subject);
this.logger.info(`Subscribed to: ${subject}`);
// Processar mensagens (exemplo básico)
(async () => {
for await (const msg of subscription) {
try {
const data = JSON.parse(this.sc.decode(msg.data));
// Aqui você pode adicionar a lógica de processamento
this.logger.debug(`Received message on ${subject}:`);
this.logger.debug(data);
} catch (error) {
this.logger.error(`Error processing message on ${subject}:`);
this.logger.error(error);
}
}
})();
} catch (error) {
this.logger.error(`Failed to subscribe to ${subject}:`);
this.logger.error(error);
}
}
}
}

View File

@ -0,0 +1,36 @@
import { RouterBroker } from '@api/abstract/abstract.router';
import { InstanceDto } from '@api/dto/instance.dto';
import { EventDto } from '@api/integrations/event/event.dto';
import { HttpStatus } from '@api/routes/index.router';
import { eventManager } from '@api/server.module';
import { eventSchema, instanceSchema } from '@validate/validate.schema';
import { RequestHandler, Router } from 'express';
export class NatsRouter extends RouterBroker {
constructor(...guards: RequestHandler[]) {
super();
this.router
.post(this.routerPath('set'), ...guards, async (req, res) => {
const response = await this.dataValidate<EventDto>({
request: req,
schema: eventSchema,
ClassRef: EventDto,
execute: (instance, data) => eventManager.nats.set(instance.instanceName, data),
});
res.status(HttpStatus.CREATED).json(response);
})
.get(this.routerPath('find'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => eventManager.nats.get(instance.instanceName),
});
res.status(HttpStatus.OK).json(response);
});
}
public readonly router: Router = Router();
}

View File

@ -91,6 +91,7 @@ export class WAMonitoringService {
Chatwoot: true,
Proxy: true,
Rabbitmq: true,
Nats: true,
Sqs: true,
Websocket: true,
Setting: true,
@ -190,6 +191,7 @@ export class WAMonitoringService {
await this.prismaRepository.chatwoot.deleteMany({ where: { instanceId: instance.id } });
await this.prismaRepository.proxy.deleteMany({ where: { instanceId: instance.id } });
await this.prismaRepository.rabbitmq.deleteMany({ where: { instanceId: instance.id } });
await this.prismaRepository.nats.deleteMany({ where: { instanceId: instance.id } });
await this.prismaRepository.sqs.deleteMany({ where: { instanceId: instance.id } });
await this.prismaRepository.integrationSession.deleteMany({ where: { instanceId: instance.id } });
await this.prismaRepository.typebot.deleteMany({ where: { instanceId: instance.id } });

View File

@ -100,6 +100,15 @@ export type Rabbitmq = {
PREFIX_KEY?: string;
};
export type Nats = {
ENABLED: boolean;
URI: string;
EXCHANGE_NAME: string;
GLOBAL_ENABLED: boolean;
EVENTS: EventsRabbitmq;
PREFIX_KEY?: string;
};
export type Sqs = {
ENABLED: boolean;
ACCESS_KEY_ID: string;
@ -263,6 +272,7 @@ export interface Env {
PROVIDER: ProviderSession;
DATABASE: Database;
RABBITMQ: Rabbitmq;
NATS: Nats;
SQS: Sqs;
WEBSOCKET: Websocket;
WA_BUSINESS: WaBusiness;
@ -389,6 +399,42 @@ export class ConfigService {
TYPEBOT_CHANGE_STATUS: process.env?.RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
},
},
NATS: {
ENABLED: process.env?.NATS_ENABLED === 'true',
GLOBAL_ENABLED: process.env?.NATS_GLOBAL_ENABLED === 'true',
PREFIX_KEY: process.env?.NATS_PREFIX_KEY,
EXCHANGE_NAME: process.env?.NATS_EXCHANGE_NAME || 'evolution_exchange',
URI: process.env.NATS_URI || '',
EVENTS: {
APPLICATION_STARTUP: process.env?.NATS_EVENTS_APPLICATION_STARTUP === 'true',
INSTANCE_CREATE: process.env?.NATS_EVENTS_INSTANCE_CREATE === 'true',
INSTANCE_DELETE: process.env?.NATS_EVENTS_INSTANCE_DELETE === 'true',
QRCODE_UPDATED: process.env?.NATS_EVENTS_QRCODE_UPDATED === 'true',
MESSAGES_SET: process.env?.NATS_EVENTS_MESSAGES_SET === 'true',
MESSAGES_UPSERT: process.env?.NATS_EVENTS_MESSAGES_UPSERT === 'true',
MESSAGES_EDITED: process.env?.NATS_EVENTS_MESSAGES_EDITED === 'true',
MESSAGES_UPDATE: process.env?.NATS_EVENTS_MESSAGES_UPDATE === 'true',
MESSAGES_DELETE: process.env?.NATS_EVENTS_MESSAGES_DELETE === 'true',
SEND_MESSAGE: process.env?.NATS_EVENTS_SEND_MESSAGE === 'true',
CONTACTS_SET: process.env?.NATS_EVENTS_CONTACTS_SET === 'true',
CONTACTS_UPDATE: process.env?.NATS_EVENTS_CONTACTS_UPDATE === 'true',
CONTACTS_UPSERT: process.env?.NATS_EVENTS_CONTACTS_UPSERT === 'true',
PRESENCE_UPDATE: process.env?.NATS_EVENTS_PRESENCE_UPDATE === 'true',
CHATS_SET: process.env?.NATS_EVENTS_CHATS_SET === 'true',
CHATS_UPDATE: process.env?.NATS_EVENTS_CHATS_UPDATE === 'true',
CHATS_UPSERT: process.env?.NATS_EVENTS_CHATS_UPSERT === 'true',
CHATS_DELETE: process.env?.NATS_EVENTS_CHATS_DELETE === 'true',
CONNECTION_UPDATE: process.env?.NATS_EVENTS_CONNECTION_UPDATE === 'true',
LABELS_EDIT: process.env?.NATS_EVENTS_LABELS_EDIT === 'true',
LABELS_ASSOCIATION: process.env?.NATS_EVENTS_LABELS_ASSOCIATION === 'true',
GROUPS_UPSERT: process.env?.NATS_EVENTS_GROUPS_UPSERT === 'true',
GROUP_UPDATE: process.env?.NATS_EVENTS_GROUPS_UPDATE === 'true',
GROUP_PARTICIPANTS_UPDATE: process.env?.NATS_EVENTS_GROUP_PARTICIPANTS_UPDATE === 'true',
CALL: process.env?.NATS_EVENTS_CALL === 'true',
TYPEBOT_START: process.env?.NATS_EVENTS_TYPEBOT_START === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.NATS_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
},
},
SQS: {
ENABLED: process.env?.SQS_ENABLED === 'true',
ACCESS_KEY_ID: process.env.SQS_ACCESS_KEY_ID || '',

View File

@ -124,6 +124,42 @@ export const instanceSchema: JSONSchema7 = {
],
},
},
// NATS
natsEnabled: { type: 'boolean' },
natsEvents: {
type: 'array',
minItems: 0,
items: {
type: 'string',
enum: [
'APPLICATION_STARTUP',
'QRCODE_UPDATED',
'MESSAGES_SET',
'MESSAGES_UPSERT',
'MESSAGES_EDITED',
'MESSAGES_UPDATE',
'MESSAGES_DELETE',
'SEND_MESSAGE',
'CONTACTS_SET',
'CONTACTS_UPSERT',
'CONTACTS_UPDATE',
'PRESENCE_UPDATE',
'CHATS_SET',
'CHATS_UPSERT',
'CHATS_UPDATE',
'CHATS_DELETE',
'GROUPS_UPSERT',
'GROUP_UPDATE',
'GROUP_PARTICIPANTS_UPDATE',
'CONNECTION_UPDATE',
'LABELS_EDIT',
'LABELS_ASSOCIATION',
'CALL',
'TYPEBOT_START',
'TYPEBOT_CHANGE_STATUS',
],
},
},
// SQS
sqsEnabled: { type: 'boolean' },
sqsEvents: {