feat: Added rabbitmq to send events

This commit is contained in:
Davidson Gomes 2023-08-02 17:28:52 -03:00
parent 24c880343b
commit e05c48979d
18 changed files with 414 additions and 7 deletions

View File

@ -47,6 +47,7 @@
"@hapi/boom": "^10.0.1",
"@sentry/node": "^7.59.2",
"@whiskeysockets/baileys": "github:EvolutionAPI/Baileys",
"amqplib": "^0.10.3",
"axios": "^1.3.5",
"class-validator": "^0.13.2",
"compression": "^1.7.4",

37
src/libs/amqp.server.ts Normal file
View File

@ -0,0 +1,37 @@
import * as amqp from 'amqplib/callback_api';
import { Logger } from '../config/logger.config';
const logger = new Logger('AMQP');
let amqpChannel: amqp.Channel | null = null;
export const initAMQP = () => {
return new Promise<void>((resolve, reject) => {
amqp.connect('amqp://admin:admin@localhost:5672', (error, connection) => {
if (error) {
reject(error);
return;
}
connection.createChannel((channelError, channel) => {
if (channelError) {
reject(channelError);
return;
}
const exchangeName = 'evolution_exchange';
channel.assertExchange(exchangeName, 'topic', { durable: false });
amqpChannel = channel;
logger.log('Serviço do RabbitMQ inicializado com sucesso.');
resolve();
});
});
});
};
export const getAMQP = (): amqp.Channel | null => {
return amqpChannel;
};

View File

@ -9,7 +9,8 @@ import { configService, Cors, HttpServer } from './config/env.config';
import { onUnexpectedError } from './config/error.config';
import { Logger } from './config/logger.config';
import { ROOT_DIR } from './config/path.config';
import { initIO } from './libs/socket';
import { initAMQP } from './libs/amqp.server';
import { initIO } from './libs/socket.server';
import { ServerUP } from './utils/server-up';
import { HttpStatus, router } from './whatsapp/routers/index.router';
import { waMonitor } from './whatsapp/whatsapp.module';
@ -86,6 +87,8 @@ function bootstrap() {
initIO(server);
initAMQP();
onUnexpectedError();
}

View File

@ -934,3 +934,43 @@ export const websocketSchema: JSONSchema7 = {
required: ['enabled'],
...isNotEmpty('enabled'),
};
export const rabbitmqSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
enabled: { type: 'boolean', enum: [true, false] },
events: {
type: 'array',
minItems: 0,
items: {
type: 'string',
enum: [
'APPLICATION_STARTUP',
'QRCODE_UPDATED',
'MESSAGES_SET',
'MESSAGES_UPSERT',
'MESSAGES_UPDATE',
'MESSAGES_DELETE',
'SEND_MESSAGE',
'CONTACTS_SET',
'CONTACTS_UPSERT',
'CONTACTS_UPDATE',
'PRESENCE_UPDATE',
'CHATS_SET',
'CHATS_UPSERT',
'CHATS_UPDATE',
'CHATS_DELETE',
'GROUPS_UPSERT',
'GROUP_UPDATE',
'GROUP_PARTICIPANTS_UPDATE',
'CONNECTION_UPDATE',
'CALL',
'NEW_JWT_TOKEN',
],
},
},
},
required: ['enabled'],
...isNotEmpty('enabled'),
};

View File

@ -11,6 +11,7 @@ import { RepositoryBroker } from '../repository/repository.manager';
import { AuthService, OldToken } from '../services/auth.service';
import { ChatwootService } from '../services/chatwoot.service';
import { WAMonitoringService } from '../services/monitor.service';
import { RabbitmqService } from '../services/rabbitmq.service';
import { SettingsService } from '../services/settings.service';
import { WebhookService } from '../services/webhook.service';
import { WebsocketService } from '../services/websocket.service';
@ -28,6 +29,7 @@ export class InstanceController {
private readonly chatwootService: ChatwootService,
private readonly settingsService: SettingsService,
private readonly websocketService: WebsocketService,
private readonly rebbitmqService: RabbitmqService,
private readonly cache: RedisCache,
) {}

View File

@ -0,0 +1,53 @@
import { Logger } from '../../config/logger.config';
import { InstanceDto } from '../dto/instance.dto';
import { RabbitmqDto } from '../dto/rabbitmq.dto';
import { RabbitmqService } from '../services/rabbitmq.service';
const logger = new Logger('RabbitmqController');
export class RabbitmqController {
constructor(private readonly rabbitmqService: RabbitmqService) {}
public async createRabbitmq(instance: InstanceDto, data: RabbitmqDto) {
logger.verbose('requested createRabbitmq from ' + instance.instanceName + ' instance');
if (!data.enabled) {
logger.verbose('rabbitmq disabled');
data.events = [];
}
if (data.events.length === 0) {
logger.verbose('rabbitmq events empty');
data.events = [
'APPLICATION_STARTUP',
'QRCODE_UPDATED',
'MESSAGES_SET',
'MESSAGES_UPSERT',
'MESSAGES_UPDATE',
'MESSAGES_DELETE',
'SEND_MESSAGE',
'CONTACTS_SET',
'CONTACTS_UPSERT',
'CONTACTS_UPDATE',
'PRESENCE_UPDATE',
'CHATS_SET',
'CHATS_UPSERT',
'CHATS_UPDATE',
'CHATS_DELETE',
'GROUPS_UPSERT',
'GROUP_UPDATE',
'GROUP_PARTICIPANTS_UPDATE',
'CONNECTION_UPDATE',
'CALL',
'NEW_JWT_TOKEN',
];
}
return this.rabbitmqService.create(instance, data);
}
public async findRabbitmq(instance: InstanceDto) {
logger.verbose('requested findRabbitmq from ' + instance.instanceName + ' instance');
return this.rabbitmqService.find(instance);
}
}

View File

@ -0,0 +1,4 @@
export class RabbitmqDto {
enabled: boolean;
events?: string[];
}

View File

@ -3,6 +3,7 @@ export * from './chat.model';
export * from './chatwoot.model';
export * from './contact.model';
export * from './message.model';
export * from './rabbitmq.model';
export * from './settings.model';
export * from './webhook.model';
export * from './websocket.model';

View File

@ -0,0 +1,18 @@
import { Schema } from 'mongoose';
import { dbserver } from '../../libs/db.connect';
export class RabbitmqRaw {
_id?: string;
enabled?: boolean;
events?: string[];
}
const rabbitmqSchema = new Schema<RabbitmqRaw>({
_id: { type: String, _id: true },
enabled: { type: Boolean, required: true },
events: { type: [String], required: true },
});
export const RabbitmqModel = dbserver?.model(RabbitmqRaw.name, rabbitmqSchema, 'rabbitmq');
export type IRabbitmqModel = typeof RabbitmqModel;

View File

@ -0,0 +1,62 @@
import { readFileSync } from 'fs';
import { join } from 'path';
import { ConfigService } from '../../config/env.config';
import { Logger } from '../../config/logger.config';
import { IInsert, Repository } from '../abstract/abstract.repository';
import { IRabbitmqModel, RabbitmqRaw } from '../models';
export class RabbitmqRepository extends Repository {
constructor(private readonly rabbitmqModel: IRabbitmqModel, private readonly configService: ConfigService) {
super(configService);
}
private readonly logger = new Logger('RabbitmqRepository');
public async create(data: RabbitmqRaw, instance: string): Promise<IInsert> {
try {
this.logger.verbose('creating rabbitmq');
if (this.dbSettings.ENABLED) {
this.logger.verbose('saving rabbitmq to db');
const insert = await this.rabbitmqModel.replaceOne({ _id: instance }, { ...data }, { upsert: true });
this.logger.verbose('rabbitmq saved to db: ' + insert.modifiedCount + ' rabbitmq');
return { insertCount: insert.modifiedCount };
}
this.logger.verbose('saving rabbitmq to store');
this.writeStore<RabbitmqRaw>({
path: join(this.storePath, 'rabbitmq'),
fileName: instance,
data,
});
this.logger.verbose('rabbitmq saved to store in path: ' + join(this.storePath, 'rabbitmq') + '/' + instance);
this.logger.verbose('rabbitmq created');
return { insertCount: 1 };
} catch (error) {
return error;
}
}
public async find(instance: string): Promise<RabbitmqRaw> {
try {
this.logger.verbose('finding rabbitmq');
if (this.dbSettings.ENABLED) {
this.logger.verbose('finding rabbitmq in db');
return await this.rabbitmqModel.findOne({ _id: instance });
}
this.logger.verbose('finding rabbitmq in store');
return JSON.parse(
readFileSync(join(this.storePath, 'rabbitmq', instance + '.json'), {
encoding: 'utf-8',
}),
) as RabbitmqRaw;
} catch (error) {
return {};
}
}
}

View File

@ -10,6 +10,7 @@ import { ChatwootRepository } from './chatwoot.repository';
import { ContactRepository } from './contact.repository';
import { MessageRepository } from './message.repository';
import { MessageUpRepository } from './messageUp.repository';
import { RabbitmqRepository } from './rabbitmq.repository';
import { SettingsRepository } from './settings.repository';
import { WebhookRepository } from './webhook.repository';
import { WebsocketRepository } from './websocket.repository';
@ -23,6 +24,7 @@ export class RepositoryBroker {
public readonly chatwoot: ChatwootRepository,
public readonly settings: SettingsRepository,
public readonly websocket: WebsocketRepository,
public readonly rabbitmq: RabbitmqRepository,
public readonly auth: AuthRepository,
private configService: ConfigService,
dbServer?: MongoClient,
@ -54,6 +56,7 @@ export class RepositoryBroker {
const chatwootDir = join(storePath, 'chatwoot');
const settingsDir = join(storePath, 'settings');
const websocketDir = join(storePath, 'websocket');
const rabbitmqDir = join(storePath, 'rabbitmq');
const tempDir = join(storePath, 'temp');
if (!fs.existsSync(authDir)) {
@ -92,6 +95,10 @@ export class RepositoryBroker {
this.logger.verbose('creating websocket dir: ' + websocketDir);
fs.mkdirSync(websocketDir, { recursive: true });
}
if (!fs.existsSync(rabbitmqDir)) {
this.logger.verbose('creating rabbitmq dir: ' + rabbitmqDir);
fs.mkdirSync(rabbitmqDir, { recursive: true });
}
if (!fs.existsSync(tempDir)) {
this.logger.verbose('creating temp dir: ' + tempDir);
fs.mkdirSync(tempDir, { recursive: true });

View File

@ -8,6 +8,7 @@ import { ChatRouter } from './chat.router';
import { ChatwootRouter } from './chatwoot.router';
import { GroupRouter } from './group.router';
import { InstanceRouter } from './instance.router';
import { RabbitmqRouter } from './rabbitmq.router';
import { MessageRouter } from './sendMessage.router';
import { SettingsRouter } from './settings.router';
import { ViewsRouter } from './view.router';
@ -46,6 +47,7 @@ router
.use('/webhook', new WebhookRouter(...guards).router)
.use('/chatwoot', new ChatwootRouter(...guards).router)
.use('/settings', new SettingsRouter(...guards).router)
.use('/websocket', new WebsocketRouter(...guards).router);
.use('/websocket', new WebsocketRouter(...guards).router)
.use('/rabbitmq', new RabbitmqRouter(...guards).router);
export { HttpStatus, router };

View File

@ -0,0 +1,52 @@
import { RequestHandler, Router } from 'express';
import { Logger } from '../../config/logger.config';
import { instanceNameSchema, rabbitmqSchema } from '../../validate/validate.schema';
import { RouterBroker } from '../abstract/abstract.router';
import { InstanceDto } from '../dto/instance.dto';
import { RabbitmqDto } from '../dto/rabbitmq.dto';
import { rabbitmqController } from '../whatsapp.module';
import { HttpStatus } from './index.router';
const logger = new Logger('RabbitmqRouter');
export class RabbitmqRouter extends RouterBroker {
constructor(...guards: RequestHandler[]) {
super();
this.router
.post(this.routerPath('set'), ...guards, async (req, res) => {
logger.verbose('request received in setRabbitmq');
logger.verbose('request body: ');
logger.verbose(req.body);
logger.verbose('request query: ');
logger.verbose(req.query);
const response = await this.dataValidate<RabbitmqDto>({
request: req,
schema: rabbitmqSchema,
ClassRef: RabbitmqDto,
execute: (instance, data) => rabbitmqController.createRabbitmq(instance, data),
});
res.status(HttpStatus.CREATED).json(response);
})
.get(this.routerPath('find'), ...guards, async (req, res) => {
logger.verbose('request received in findRabbitmq');
logger.verbose('request body: ');
logger.verbose(req.body);
logger.verbose('request query: ');
logger.verbose(req.query);
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceNameSchema,
ClassRef: InstanceDto,
execute: (instance) => rabbitmqController.findRabbitmq(instance),
});
res.status(HttpStatus.OK).json(response);
});
}
public readonly router = Router();
}

View File

@ -0,0 +1,33 @@
import { Logger } from '../../config/logger.config';
import { InstanceDto } from '../dto/instance.dto';
import { RabbitmqDto } from '../dto/rabbitmq.dto';
import { RabbitmqRaw } from '../models';
import { WAMonitoringService } from './monitor.service';
export class RabbitmqService {
constructor(private readonly waMonitor: WAMonitoringService) {}
private readonly logger = new Logger(RabbitmqService.name);
public create(instance: InstanceDto, data: RabbitmqDto) {
this.logger.verbose('create rabbitmq: ' + instance.instanceName);
this.waMonitor.waInstances[instance.instanceName].setRabbitmq(data);
return { rabbitmq: { ...instance, rabbitmq: data } };
}
public async find(instance: InstanceDto): Promise<RabbitmqRaw> {
try {
this.logger.verbose('find rabbitmq: ' + instance.instanceName);
const result = await this.waMonitor.waInstances[instance.instanceName].findRabbitmq();
if (Object.keys(result).length === 0) {
throw new Error('Rabbitmq not found');
}
return result;
} catch (error) {
return { enabled: false, events: [] };
}
}
}

View File

@ -64,9 +64,10 @@ import {
import { Logger } from '../../config/logger.config';
import { INSTANCE_DIR, ROOT_DIR } from '../../config/path.config';
import { BadRequestException, InternalServerErrorException, NotFoundException } from '../../exceptions';
import { getAMQP } from '../../libs/amqp.server';
import { dbserver } from '../../libs/db.connect';
import { RedisCache } from '../../libs/redis.client';
import { getIO } from '../../libs/socket';
import { getIO } from '../../libs/socket.server';
import { useMultiFileAuthStateDb } from '../../utils/use-multi-file-auth-state-db';
import { useMultiFileAuthStateRedisDb } from '../../utils/use-multi-file-auth-state-redis-db';
import {
@ -110,7 +111,7 @@ import {
SendTextDto,
StatusMessage,
} from '../dto/sendMessage.dto';
import { SettingsRaw } from '../models';
import { RabbitmqRaw, SettingsRaw } from '../models';
import { ChatRaw } from '../models/chat.model';
import { ChatwootRaw } from '../models/chatwoot.model';
import { ContactRaw } from '../models/contact.model';
@ -144,6 +145,7 @@ export class WAStartupService {
private readonly localChatwoot: wa.LocalChatwoot = {};
private readonly localSettings: wa.LocalSettings = {};
private readonly localWebsocket: wa.LocalWebsocket = {};
private readonly localRabbitmq: wa.LocalRabbitmq = {};
public stateConnection: wa.StateConnection = { state: 'close' };
public readonly storePath = join(ROOT_DIR, 'store');
private readonly msgRetryCounterCache: CacheStore = new NodeCache();
@ -447,10 +449,45 @@ export class WAStartupService {
return data;
}
private async loadRabbitmq() {
this.logger.verbose('Loading rabbitmq');
const data = await this.repository.rabbitmq.find(this.instanceName);
this.localRabbitmq.enabled = data?.enabled;
this.logger.verbose(`Rabbitmq enabled: ${this.localRabbitmq.enabled}`);
this.localRabbitmq.events = data?.events;
this.logger.verbose(`Rabbitmq events: ${this.localRabbitmq.events}`);
this.logger.verbose('Rabbitmq loaded');
}
public async setRabbitmq(data: RabbitmqRaw) {
this.logger.verbose('Setting rabbitmq');
await this.repository.rabbitmq.create(data, this.instanceName);
this.logger.verbose(`Rabbitmq events: ${data.events}`);
Object.assign(this.localRabbitmq, data);
this.logger.verbose('Rabbitmq set');
}
public async findRabbitmq() {
this.logger.verbose('Finding rabbitmq');
const data = await this.repository.rabbitmq.find(this.instanceName);
if (!data) {
this.logger.verbose('Rabbitmq not found');
throw new NotFoundException('Rabbitmq not found');
}
this.logger.verbose(`Rabbitmq events: ${data.events}`);
return data;
}
public async sendDataWebhook<T = any>(event: Events, data: T, local = true) {
const webhookGlobal = this.configService.get<Webhook>('WEBHOOK');
const webhookLocal = this.localWebhook.events;
const websocketLocal = this.localWebsocket.events;
const rabbitmqLocal = this.localRabbitmq.events;
const serverUrl = this.configService.get<HttpServer>('SERVER').URL;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const transformedWe = we.replace(/_/gm, '-').toLowerCase();
@ -459,18 +496,56 @@ export class WAStartupService {
const tokenStore = await this.repository.auth.find(this.instanceName);
const instanceApikey = tokenStore?.apikey || 'Apikey not found';
if (this.localRabbitmq.enabled) {
const amqp = getAMQP();
if (amqp) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = 'evolution_exchange';
amqp.assertExchange(exchangeName, 'topic', { durable: false });
const queueName = `${this.instanceName}.${event}`;
amqp.assertQueue(queueName, { durable: false });
amqp.bindQueue(queueName, exchangeName, event);
const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
};
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}
amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
}
}
}
if (this.localWebsocket.enabled) {
this.logger.verbose('Sending data to websocket on channel: ' + this.instance.name);
if (Array.isArray(websocketLocal) && websocketLocal.includes(we)) {
this.logger.verbose('Sending data to websocket on event: ' + event);
const io = getIO();
this.logger.verbose('Sending data to socket.io in channel: ' + this.instance.name);
io.of(`/${this.instance.name}`).emit(event, {
const message = {
event,
instance: this.instance.name,
data,
});
server_url: serverUrl,
};
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}
this.logger.verbose('Sending data to socket.io in channel: ' + this.instance.name);
io.of(`/${this.instance.name}`).emit(event, message);
}
}
@ -867,6 +942,7 @@ export class WAStartupService {
this.loadChatwoot();
this.loadSettings();
this.loadWebsocket();
this.loadRabbitmq();
this.instance.authState = await this.defineAuthState();

View File

@ -75,6 +75,11 @@ export declare namespace wa {
events?: string[];
};
export type LocalRabbitmq = {
enabled?: boolean;
events?: string[];
};
export type StateConnection = {
instance?: string;
state?: WAConnectionState | 'refused';

View File

@ -7,6 +7,7 @@ import { ChatController } from './controllers/chat.controller';
import { ChatwootController } from './controllers/chatwoot.controller';
import { GroupController } from './controllers/group.controller';
import { InstanceController } from './controllers/instance.controller';
import { RabbitmqController } from './controllers/rabbitmq.controller';
import { SendMessageController } from './controllers/sendMessage.controller';
import { SettingsController } from './controllers/settings.controller';
import { ViewsController } from './controllers/views.controller';
@ -19,6 +20,7 @@ import {
ContactModel,
MessageModel,
MessageUpModel,
RabbitmqModel,
SettingsModel,
WebhookModel,
WebsocketModel,
@ -29,6 +31,7 @@ import { ChatwootRepository } from './repository/chatwoot.repository';
import { ContactRepository } from './repository/contact.repository';
import { MessageRepository } from './repository/message.repository';
import { MessageUpRepository } from './repository/messageUp.repository';
import { RabbitmqRepository } from './repository/rabbitmq.repository';
import { RepositoryBroker } from './repository/repository.manager';
import { SettingsRepository } from './repository/settings.repository';
import { WebhookRepository } from './repository/webhook.repository';
@ -36,6 +39,7 @@ import { WebsocketRepository } from './repository/websocket.repository';
import { AuthService } from './services/auth.service';
import { ChatwootService } from './services/chatwoot.service';
import { WAMonitoringService } from './services/monitor.service';
import { RabbitmqService } from './services/rabbitmq.service';
import { SettingsService } from './services/settings.service';
import { WebhookService } from './services/webhook.service';
import { WebsocketService } from './services/websocket.service';
@ -48,6 +52,7 @@ const contactRepository = new ContactRepository(ContactModel, configService);
const messageUpdateRepository = new MessageUpRepository(MessageUpModel, configService);
const webhookRepository = new WebhookRepository(WebhookModel, configService);
const websocketRepository = new WebsocketRepository(WebsocketModel, configService);
const rabbitmqRepository = new RabbitmqRepository(RabbitmqModel, configService);
const chatwootRepository = new ChatwootRepository(ChatwootModel, configService);
const settingsRepository = new SettingsRepository(SettingsModel, configService);
const authRepository = new AuthRepository(AuthModel, configService);
@ -61,6 +66,7 @@ export const repository = new RepositoryBroker(
chatwootRepository,
settingsRepository,
websocketRepository,
rabbitmqRepository,
authRepository,
configService,
dbserver?.getClient(),
@ -80,6 +86,10 @@ const websocketService = new WebsocketService(waMonitor);
export const websocketController = new WebsocketController(websocketService);
const rabbitmqService = new RabbitmqService(waMonitor);
export const rabbitmqController = new RabbitmqController(rabbitmqService);
const chatwootService = new ChatwootService(waMonitor, configService);
export const chatwootController = new ChatwootController(chatwootService, configService);
@ -98,6 +108,7 @@ export const instanceController = new InstanceController(
chatwootService,
settingsService,
websocketService,
rabbitmqService,
cache,
);
export const viewsController = new ViewsController(waMonitor, configService);