feat: sqs

This commit is contained in:
Davidson Gomes
2023-11-20 17:52:36 -03:00
parent b0a0e805cf
commit 6c9e86e17a
21 changed files with 621 additions and 25 deletions

View File

@@ -0,0 +1,35 @@
import { Logger } from '../../config/logger.config';
import { initQueues } from '../../libs/sqs.server';
import { InstanceDto } from '../dto/instance.dto';
import { SqsDto } from '../dto/sqs.dto';
import { SqsRaw } from '../models';
import { WAMonitoringService } from './monitor.service';
export class SqsService {
constructor(private readonly waMonitor: WAMonitoringService) {}
private readonly logger = new Logger(SqsService.name);
public create(instance: InstanceDto, data: SqsDto) {
this.logger.verbose('create sqs: ' + instance.instanceName);
this.waMonitor.waInstances[instance.instanceName].setSqs(data);
initQueues(instance.instanceName, data.events);
return { sqs: { ...instance, sqs: data } };
}
public async find(instance: InstanceDto): Promise<SqsRaw> {
try {
this.logger.verbose('find sqs: ' + instance.instanceName);
const result = await this.waMonitor.waInstances[instance.instanceName].findSqs();
if (Object.keys(result).length === 0) {
throw new Error('Sqs not found');
}
return result;
} catch (error) {
return { enabled: false, events: [] };
}
}
}

View File

@@ -60,6 +60,7 @@ import {
Log,
QrCode,
Redis,
Sqs,
Webhook,
Websocket,
} from '../../config/env.config';
@@ -70,6 +71,7 @@ import { getAMQP, removeQueues } from '../../libs/amqp.server';
import { dbserver } from '../../libs/db.connect';
import { RedisCache } from '../../libs/redis.client';
import { getIO } from '../../libs/socket.server';
import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server';
import { useMultiFileAuthStateDb } from '../../utils/use-multi-file-auth-state-db';
import { useMultiFileAuthStateRedisDb } from '../../utils/use-multi-file-auth-state-redis-db';
import {
@@ -113,7 +115,7 @@ import {
SendTextDto,
StatusMessage,
} from '../dto/sendMessage.dto';
import { ChamaaiRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, TypebotRaw } from '../models';
import { ChamaaiRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, SqsRaw, TypebotRaw } from '../models';
import { ChatRaw } from '../models/chat.model';
import { ChatwootRaw } from '../models/chatwoot.model';
import { ContactRaw } from '../models/contact.model';
@@ -149,6 +151,7 @@ export class WAStartupService {
private readonly localSettings: wa.LocalSettings = {};
private readonly localWebsocket: wa.LocalWebsocket = {};
private readonly localRabbitmq: wa.LocalRabbitmq = {};
private readonly localSqs: wa.LocalSqs = {};
public readonly localTypebot: wa.LocalTypebot = {};
private readonly localProxy: wa.LocalProxy = {};
private readonly localChamaai: wa.LocalChamaai = {};
@@ -504,6 +507,48 @@ export class WAStartupService {
}
}
private async loadSqs() {
this.logger.verbose('Loading sqs');
const data = await this.repository.sqs.find(this.instanceName);
this.localSqs.enabled = data?.enabled;
this.logger.verbose(`Sqs enabled: ${this.localSqs.enabled}`);
this.localSqs.events = data?.events;
this.logger.verbose(`Sqs events: ${this.localSqs.events}`);
this.logger.verbose('Sqs loaded');
}
public async setSqs(data: SqsRaw) {
this.logger.verbose('Setting sqs');
await this.repository.sqs.create(data, this.instanceName);
this.logger.verbose(`Sqs events: ${data.events}`);
Object.assign(this.localSqs, data);
this.logger.verbose('Sqs set');
}
public async findSqs() {
this.logger.verbose('Finding sqs');
const data = await this.repository.sqs.find(this.instanceName);
if (!data) {
this.logger.verbose('Sqs not found');
throw new NotFoundException('Sqs not found');
}
this.logger.verbose(`Sqs events: ${data.events}`);
return data;
}
public async removeSqsQueues() {
this.logger.verbose('Removing sqs');
if (this.localSqs.enabled) {
removeQueuesSQS(this.instanceName, this.localSqs.events);
}
}
private async loadTypebot() {
this.logger.verbose('Loading typebot');
const data = await this.repository.typebot.find(this.instanceName);
@@ -648,6 +693,7 @@ export class WAStartupService {
const webhookLocal = this.localWebhook.events;
const websocketLocal = this.localWebsocket.events;
const rabbitmqLocal = this.localRabbitmq.events;
const sqsLocal = this.localSqs.events;
const serverUrl = this.configService.get<HttpServer>('SERVER').URL;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const transformedWe = we.replace(/_/gm, '-').toLowerCase();
@@ -720,6 +766,76 @@ export class WAStartupService {
}
}
if (this.localSqs.enabled) {
const sqs = getSQS();
if (sqs) {
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
const queueName = `${this.instanceName}_${eventFormatted}.fifo`;
const sqsConfig = this.configService.get<Sqs>('SQS');
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}
const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: 'evolution',
MessageDeduplicationId: `${this.instanceName}_${eventFormatted}_${Date.now()}`,
QueueUrl: sqsUrl,
};
sqs.sendMessage(params, (err, data) => {
if (err) {
this.logger.error({
local: WAStartupService.name + '.sendData-SQS',
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else {
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: WAStartupService.name + '.sendData-SQS',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};
if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}
this.logger.log(logData);
}
}
});
}
}
}
if (this.configService.get<Websocket>('WEBSOCKET')?.ENABLED && this.localWebsocket.enabled) {
this.logger.verbose('Sending data to websocket on channel: ' + this.instance.name);
if (Array.isArray(websocketLocal) && websocketLocal.includes(we)) {
@@ -1165,6 +1281,7 @@ export class WAStartupService {
this.loadSettings();
this.loadWebsocket();
this.loadRabbitmq();
this.loadSqs();
this.loadTypebot();
this.loadProxy();
this.loadChamaai();
@@ -1520,7 +1637,6 @@ export class WAStartupService {
this.logger.verbose('Event received: messages.upsert');
const received = messages[0];
console.log(received, type);
if (
type !== 'notify' ||
!received?.message ||