fix: Refactor SQS controller to correct bug in sqs events by instance

- Implement dynamic queue creation based on enabled events - Add method to list existing queues for an instance - Improve error handling and logging for SQS operations - Remove unused queue removal methods - Update set method to handle queue creation/deletion on event changes - Add comments for future feature of forced queue deletion
This commit is contained in:
“fernandoralha”
2025-02-17 21:22:59 -03:00
parent 427c994993
commit 278add6b11
2 changed files with 199 additions and 72 deletions

View File

@@ -1,10 +1,11 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { SQS } from '@aws-sdk/client-sqs';
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
import { configService, Log, Sqs } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
import { EventDto } from '../event.dto';
export class SqsController extends EventController implements EventControllerInterface {
private sqs: SQS;
@@ -45,6 +46,39 @@ export class SqsController extends EventController implements EventControllerInt
return this.sqs;
}
override async set(instanceName: string, data: EventDto): Promise<any> {
if (!this.status) {
return;
}
if (!data[this.name]?.enabled) {
data[this.name].events = [];
} else {
if (0 === data[this.name].events.length) {
data[this.name].events = EventController.events;
}
}
await this.saveQueues(instanceName, data[this.name].events, data[this.name]?.enabled);
const payload: any = {
where: {
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
update: {
enabled: data[this.name]?.enabled,
events: data[this.name].events,
},
create: {
enabled: data[this.name]?.enabled,
events: data[this.name].events,
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
};
console.log('*** payload: ', payload);
return this.prisma[this.name].upsert(payload);
}
public async emit({
instanceName,
origin,
@@ -121,70 +155,92 @@ export class SqsController extends EventController implements EventControllerInt
}
}
public async initQueues(instanceName: string, events: string[]) {
if (!events || !events.length) return;
private async saveQueues(instanceName: string, events: string[], enable: boolean) {
if (enable) {
const eventsFinded = await this.listQueuesByInstance(instanceName);
console.log('eventsFinded', eventsFinded);
const queues = events.map((event) => {
return `${event.replace(/_/g, '_').toLowerCase()}`;
});
for (const event of events) {
const normalizedEvent = event.toLowerCase();
queues.forEach((event) => {
const queueName = `${instanceName}_${event}.fifo`;
if (eventsFinded.includes(normalizedEvent)) {
this.logger.info(`A queue para o evento "${normalizedEvent}" já existe. Ignorando criação.`);
continue;
}
this.sqs.createQueue(
{
QueueName: queueName,
Attributes: {
FifoQueue: 'true',
},
},
(err, data) => {
if (err) {
this.logger.error(`Error creating queue ${queueName}: ${err.message}`);
} else {
this.logger.info(`Queue ${queueName} created: ${data.QueueUrl}`);
}
},
);
});
const queueName = `${instanceName}_${normalizedEvent}.fifo`;
try {
const createCommand = new CreateQueueCommand({
QueueName: queueName,
Attributes: {
FifoQueue: 'true',
},
});
const data = await this.sqs.send(createCommand);
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
} catch (err: any) {
this.logger.error(`Erro ao criar queue ${queueName}: ${err.message}`);
}
}
}
}
public async removeQueues(instanceName: string, events: any) {
const eventsArray = Array.isArray(events) ? events.map((event) => String(event)) : [];
if (!events || !eventsArray.length) return;
private async listQueuesByInstance(instanceName: string) {
let existingQueues: string[] = [];
try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
});
const listData = await this.sqs.send(listCommand);
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
// Extrai o nome da fila a partir da URL
existingQueues = listData.QueueUrls.map((queueUrl) => {
const parts = queueUrl.split('/');
return parts[parts.length - 1];
});
}
} catch (error: any) {
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
return;
}
const queues = eventsArray.map((event) => {
return `${event.replace(/_/g, '_').toLowerCase()}`;
});
// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
return existingQueues
.map((queueName) => {
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
}
return '';
})
.filter((event) => event !== '');
}
queues.forEach((event) => {
const queueName = `${instanceName}_${event}.fifo`;
// Para uma futura feature de exclusão forçada das queues
private async removeQueuesByInstance(instanceName: string) {
try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
});
const listData = await this.sqs.send(listCommand);
this.sqs.getQueueUrl(
{
QueueName: queueName,
},
(err, data) => {
if (err) {
this.logger.error(`Error getting queue URL for ${queueName}: ${err.message}`);
} else {
const queueUrl = data.QueueUrl;
if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
this.logger.info(`No queues found for instance ${instanceName}`);
return;
}
this.sqs.deleteQueue(
{
QueueUrl: queueUrl,
},
(deleteErr) => {
if (deleteErr) {
this.logger.error(`Error deleting queue ${queueName}: ${deleteErr.message}`);
} else {
this.logger.info(`Queue ${queueName} deleted`);
}
},
);
}
},
);
});
for (const queueUrl of listData.QueueUrls) {
try {
const deleteCommand = new DeleteQueueCommand({ QueueUrl: queueUrl });
await this.sqs.send(deleteCommand);
this.logger.info(`Queue ${queueUrl} deleted`);
} catch (err: any) {
this.logger.error(`Error deleting queue ${queueUrl}: ${err.message}`);
}
}
} catch (err: any) {
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
}
}
}