fix: Remove rabbitmq queues when delete instances

This commit is contained in:
Davidson Gomes 2023-09-30 07:20:57 -03:00
parent ba584974cb
commit 3fdb3fa673
7 changed files with 49 additions and 95 deletions

View File

@ -1,3 +1,8 @@
# 1.5.3 (develop)
### Fixed
* Remove rabbitmq queues when delete instances
# 1.5.2 (2023-09-28 17:56)

File diff suppressed because one or more lines are too long

View File

@ -71,3 +71,30 @@ export const initQueues = (instanceName: string, events: string[]) => {
amqp.bindQueue(queueName, exchangeName, event);
});
};
export const removeQueues = (instanceName: string, events: string[]) => {
if (!events || !events.length) return;
const channel = getAMQP();
const queues = events.map((event) => {
return `${event.replace(/_/g, '.').toLowerCase()}`;
});
const exchangeName = instanceName ?? 'evolution_exchange';
queues.forEach((event) => {
const amqp = getAMQP();
amqp.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
const queueName = `${instanceName}.${event}`;
amqp.deleteQueue(queueName);
});
channel.deleteExchange(exchangeName);
};

View File

@ -534,6 +534,8 @@ export class InstanceController {
throw new BadRequestException('The "' + instanceName + '" instance needs to be disconnected');
}
try {
this.waMonitor.waInstances[instanceName]?.removeRabbitmqQueues();
if (instance.state === 'connecting') {
this.logger.verbose('logging out instance: ' + instanceName);

View File

@ -65,6 +65,7 @@ export async function instanceLoggedGuard(req: Request, _: Response, next: NextF
}
if (waMonitor.waInstances[instance.instanceName]) {
waMonitor.waInstances[instance.instanceName]?.removeRabbitmqQueues();
delete waMonitor.waInstances[instance.instanceName];
}
}

View File

@ -7,9 +7,7 @@ import { join } from 'path';
import { Auth, ConfigService, Database, DelInstance, HttpServer, Redis } from '../../config/env.config';
import { Logger } from '../../config/logger.config';
import { INSTANCE_DIR, STORE_DIR } from '../../config/path.config';
// inserido por francis inicio
import { NotFoundException } from '../../exceptions';
// inserido por francis fim
import { dbserver } from '../../libs/db.connect';
import { RedisCache } from '../../libs/redis.client';
import {
@ -66,8 +64,10 @@ export class WAMonitoringService {
await this.waInstances[instance]?.client?.logout('Log out instance: ' + instance);
this.waInstances[instance]?.client?.ws?.close();
this.waInstances[instance]?.client?.end(undefined);
this.waInstances[instance]?.removeRabbitmqQueues();
delete this.waInstances[instance];
} else {
this.waInstances[instance]?.removeRabbitmqQueues();
delete this.waInstances[instance];
this.eventEmitter.emit('remove.instance', instance, 'inner');
}
@ -75,68 +75,9 @@ export class WAMonitoringService {
}, 1000 * 60 * time);
}
}
/* ocultado por francis inicio
public async instanceInfo(instanceName?: string) {
this.logger.verbose('get instance info');
const urlServer = this.configService.get<HttpServer>('SERVER').URL;
const instances: any[] = await Promise.all(
Object.entries(this.waInstances).map(async ([key, value]) => {
const status = value?.connectionStatus?.state || 'unknown';
if (status === 'unknown') {
return null;
}
if (status === 'open') {
this.logger.verbose('instance: ' + key + ' - connectionStatus: open');
}
const instanceData: any = {
instance: {
instanceName: key,
owner: value.wuid,
profileName: (await value.getProfileName()) || 'not loaded',
profilePictureUrl: value.profilePictureUrl,
profileStatus: (await value.getProfileStatus()) || '',
status: status,
},
};
if (this.configService.get<Auth>('AUTHENTICATION').EXPOSE_IN_FETCH_INSTANCES) {
instanceData.instance.serverUrl = urlServer;
instanceData.instance.apikey = (await this.repository.auth.find(key))?.apikey;
const findChatwoot = await this.waInstances[key].findChatwoot();
if (findChatwoot && findChatwoot.enabled) {
instanceData.instance.chatwoot = {
...findChatwoot,
webhook_url: `${urlServer}/chatwoot/webhook/${encodeURIComponent(key)}`,
};
}
}
return instanceData;
}),
).then((results) => results.filter((instance) => instance !== null));
this.logger.verbose('return instance info: ' + instances.length);
if (instanceName) {
const instance = instances.find((i) => i.instance.instanceName === instanceName);
return instance || [];
}
return instances;
}
ocultado por francis fim */
// inserido por francis inicio
public async instanceInfo(instanceName?: string) {
this.logger.verbose('get instance info');
if (instanceName && !this.waInstances[instanceName]) {
throw new NotFoundException(`Instance "${instanceName}" not found`);
}
@ -210,17 +151,6 @@ public async instanceInfo(instanceName?: string) {
return instances.find((i) => i.instance.instanceName === instanceName) ?? instances;
}
// inserido por francis fim
private delInstanceFiles() {
this.logger.verbose('cron to delete instance files started');
setInterval(async () => {

View File

@ -40,7 +40,6 @@ import EventEmitter2 from 'eventemitter2';
import fs, { existsSync, readFileSync } from 'fs';
import Long from 'long';
import NodeCache from 'node-cache';
import { getMIMEType } from 'node-mime-types';
import { release } from 'os';
import { join } from 'path';
import P from 'pino';
@ -66,7 +65,7 @@ import {
import { Logger } from '../../config/logger.config';
import { INSTANCE_DIR, ROOT_DIR } from '../../config/path.config';
import { BadRequestException, InternalServerErrorException, NotFoundException } from '../../exceptions';
import { getAMQP } from '../../libs/amqp.server';
import { getAMQP, removeQueues } from '../../libs/amqp.server';
import { dbserver } from '../../libs/db.connect';
import { RedisCache } from '../../libs/redis.client';
import { getIO } from '../../libs/socket.server';
@ -495,6 +494,14 @@ export class WAStartupService {
return data;
}
public async removeRabbitmqQueues() {
this.logger.verbose('Removing rabbitmq');
if (this.localRabbitmq.enabled) {
removeQueues(this.instanceName, this.localRabbitmq.events);
}
}
private async loadTypebot() {
this.logger.verbose('Loading typebot');
const data = await this.repository.typebot.find(this.instanceName);
@ -2308,37 +2315,18 @@ export class WAStartupService {
mediaMessage.fileName = arrayMatch[1];
this.logger.verbose('File name: ' + mediaMessage.fileName);
}
// *inserido francis inicio
let mimetype: string;
// *inserido francis final
if (mediaMessage.mediatype === 'image' && !mediaMessage.fileName) {
mediaMessage.fileName = 'image.png';
// inserido francis inicio
mimetype = 'image/png';
// inserido francis inicio
}
if (mediaMessage.mediatype === 'video' && !mediaMessage.fileName) {
mediaMessage.fileName = 'video.mp4';
// inserido francis inicio
mimetype = 'video/mp4';
// inserido francis final
}
// ocultado francis inicio
// let mimetype: string;
// if (isURL(mediaMessage.media)) {
// mimetype = getMIMEType(mediaMessage.media);
// } else {
// mimetype = getMIMEType(mediaMessage.fileName);
// }
// ocultado francis final
this.logger.verbose('Mimetype: ' + mimetype);
prepareMedia[mediaType].caption = mediaMessage?.caption;
@ -2714,6 +2702,7 @@ export class WAStartupService {
public async markMessageAsRead(data: ReadMessageDto) {
this.logger.verbose('Marking message as read');
try {
const keys: proto.IMessageKey[] = [];
data.read_messages.forEach((read) => {