Compare commits

...

11 Commits

Author SHA1 Message Date
Davidson Gomes
d458c978f3 Merge branch 'develop' into develop 2025-07-14 14:42:18 -03:00
Davidson Gomes
e6ec706a38 Merge pull request #1665 from pauloboc/fix-prisma-type-mysql
Fix prisma type mysql
2025-07-02 11:22:48 -03:00
Davidson Gomes
53101d4571 Merge pull request #1664 from pauloboc/fix-setup-mysql
(mysql): remove out-of-order wavoipToken migration
2025-07-01 08:15:26 -03:00
Davidson Gomes
c7b5abce6e Merge pull request #1670 from Santosl2/fix/typebot-variables
fix: correçao do typebot não conseguir ouvir mensagens de input
2025-07-01 08:14:28 -03:00
Santosl2
5b1b5ff9d2 fix: bind applyFormatting method in processMessages to maintain context 2025-06-29 20:23:31 -03:00
Paulo Ferreira
3efe69ada3 fix(prisma) Mysql: update data types for N8n, N8nSetting, Evoai, and EvoaiSetting models 2025-06-28 09:34:52 -03:00
Paulo Ferreira
287c679ce4 (mysql): remove out-of-order wavoipToken migration
Delete prisma/mysql-migrations/1707735894523_add_wavoip_token_to_settings_table, which executes before the initial Setting table is created and breaks fresh MySQL installs.

The later migration 20250214181954_add_wavoip_token_column, line 145, already adds the column correctly, so keeping only that directory guarantees a clean deploy.
2025-06-28 09:27:13 -03:00
Davidson Gomes
918697866f fix(package-lock): update baileys dependency to latest commit hash 2025-06-27 09:59:40 -03:00
Davidson Gomes
3c917af602 Merge pull request #1660 from KokeroO/develop
fix(whatsapp-baileys): Verifica eventos com falhas e fallback para erro ao baixar mídias
2025-06-27 09:55:22 -03:00
Davidson Gomes
ef31b6de1f Merge pull request #1655 from KokeroO/develop
fix/references-instanceName-typebot
2025-06-26 17:10:58 -03:00
Davidson Gomes
72fb2f408b feat(rabbitmq): implement robust connection handling with reconnection logic and error logging 2025-06-26 14:25:37 -03:00
3 changed files with 209 additions and 57 deletions

View File

@@ -1,9 +0,0 @@
/*
Warnings:
- A unique constraint covering the columns `[remoteJid,instanceId]` on the table `Chat` will be added. If there are existing duplicate values, this will fail.
*/
-- AlterTable
ALTER TABLE `Setting`
ADD COLUMN IF NOT EXISTS `wavoipToken` VARCHAR(100);

View File

@@ -652,16 +652,16 @@ model N8n {
webhookUrl String? @db.VarChar(255)
basicAuthUser String? @db.VarChar(255)
basicAuthPass String? @db.VarChar(255)
expire Int? @default(0) @db.Integer
expire Int? @default(0) @db.Int
keywordFinish String? @db.VarChar(100)
delayMessage Int? @db.Integer
unknownMessage String? @db.VarChar(100)
listeningFromMe Boolean? @default(false) @db.Boolean
stopBotFromMe Boolean? @default(false) @db.Boolean
keepOpen Boolean? @default(false) @db.Boolean
listeningFromMe Boolean? @default(false)
stopBotFromMe Boolean? @default(false)
keepOpen Boolean? @default(false)
debounceTime Int? @db.Integer
ignoreJids Json?
splitMessages Boolean? @default(false) @db.Boolean
splitMessages Boolean? @default(false)
timePerChar Int? @default(50) @db.Integer
triggerType TriggerType?
triggerOperator TriggerOperator?
@@ -675,16 +675,16 @@ model N8n {
model N8nSetting {
id String @id @default(cuid())
expire Int? @default(0) @db.Integer
expire Int? @default(0) @db.Int
keywordFinish String? @db.VarChar(100)
delayMessage Int? @db.Integer
unknownMessage String? @db.VarChar(100)
listeningFromMe Boolean? @default(false) @db.Boolean
stopBotFromMe Boolean? @default(false) @db.Boolean
keepOpen Boolean? @default(false) @db.Boolean
listeningFromMe Boolean? @default(false)
stopBotFromMe Boolean? @default(false)
keepOpen Boolean? @default(false)
debounceTime Int? @db.Integer
ignoreJids Json?
splitMessages Boolean? @default(false) @db.Boolean
splitMessages Boolean? @default(false)
timePerChar Int? @default(50) @db.Integer
createdAt DateTime? @default(now()) @db.Timestamp
updatedAt DateTime @updatedAt @db.Timestamp
@@ -700,16 +700,16 @@ model Evoai {
description String? @db.VarChar(255)
agentUrl String? @db.VarChar(255)
apiKey String? @db.VarChar(255)
expire Int? @default(0) @db.Integer
expire Int? @default(0) @db.Int
keywordFinish String? @db.VarChar(100)
delayMessage Int? @db.Integer
unknownMessage String? @db.VarChar(100)
listeningFromMe Boolean? @default(false) @db.Boolean
stopBotFromMe Boolean? @default(false) @db.Boolean
keepOpen Boolean? @default(false) @db.Boolean
listeningFromMe Boolean? @default(false)
stopBotFromMe Boolean? @default(false)
keepOpen Boolean? @default(false)
debounceTime Int? @db.Integer
ignoreJids Json?
splitMessages Boolean? @default(false) @db.Boolean
splitMessages Boolean? @default(false)
timePerChar Int? @default(50) @db.Integer
triggerType TriggerType?
triggerOperator TriggerOperator?
@@ -723,16 +723,16 @@ model Evoai {
model EvoaiSetting {
id String @id @default(cuid())
expire Int? @default(0) @db.Integer
expire Int? @default(0) @db.Int
keywordFinish String? @db.VarChar(100)
delayMessage Int? @db.Integer
unknownMessage String? @db.VarChar(100)
listeningFromMe Boolean? @default(false) @db.Boolean
stopBotFromMe Boolean? @default(false) @db.Boolean
keepOpen Boolean? @default(false) @db.Boolean
listeningFromMe Boolean? @default(false)
stopBotFromMe Boolean? @default(false)
keepOpen Boolean? @default(false)
debounceTime Int? @db.Integer
ignoreJids Json?
splitMessages Boolean? @default(false) @db.Boolean
splitMessages Boolean? @default(false)
timePerChar Int? @default(50) @db.Integer
createdAt DateTime? @default(now()) @db.Timestamp
updatedAt DateTime @updatedAt @db.Timestamp

View File

@@ -8,7 +8,12 @@ import { EmitData, EventController, EventControllerInterface } from '../event.co
export class RabbitmqController extends EventController implements EventControllerInterface {
public amqpChannel: amqp.Channel | null = null;
private amqpConnection: amqp.Connection | null = null;
private readonly logger = new Logger('RabbitmqController');
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private reconnectDelay = 5000; // 5 seconds
private isReconnecting = false;
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
@@ -19,7 +24,11 @@ export class RabbitmqController extends EventController implements EventControll
return;
}
await new Promise<void>((resolve, reject) => {
await this.connect();
}
private async connect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX;
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
@@ -33,22 +42,61 @@ export class RabbitmqController extends EventController implements EventControll
password: url.password || 'guest',
vhost: url.pathname.slice(1) || '/',
frameMax: frameMax,
heartbeat: 30, // Add heartbeat of 30 seconds
};
amqp.connect(connectionOptions, (error, connection) => {
if (error) {
this.logger.error({
local: 'RabbitmqController.connect',
message: 'Failed to connect to RabbitMQ',
error: error.message || error,
});
reject(error);
return;
}
// Connection event handlers
connection.on('error', (err) => {
this.logger.error({
local: 'RabbitmqController.connectionError',
message: 'RabbitMQ connection error',
error: err.message || err,
});
this.handleConnectionLoss();
});
connection.on('close', () => {
this.logger.warn('RabbitMQ connection closed');
this.handleConnectionLoss();
});
connection.createChannel((channelError, channel) => {
if (channelError) {
this.logger.error({
local: 'RabbitmqController.createChannel',
message: 'Failed to create RabbitMQ channel',
error: channelError.message || channelError,
});
reject(channelError);
return;
}
// Channel event handlers
channel.on('error', (err) => {
this.logger.error({
local: 'RabbitmqController.channelError',
message: 'RabbitMQ channel error',
error: err.message || err,
});
this.handleConnectionLoss();
});
channel.on('close', () => {
this.logger.warn('RabbitMQ channel closed');
this.handleConnectionLoss();
});
const exchangeName = rabbitmqExchangeName;
channel.assertExchange(exchangeName, 'topic', {
@@ -56,16 +104,81 @@ export class RabbitmqController extends EventController implements EventControll
autoDelete: false,
});
this.amqpConnection = connection;
this.amqpChannel = channel;
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
this.isReconnecting = false;
this.logger.info('AMQP initialized');
this.logger.info('AMQP initialized successfully');
resolve();
});
});
}).then(() => {
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) this.initGlobalQueues();
});
})
.then(() => {
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) {
this.initGlobalQueues();
}
})
.catch((error) => {
this.logger.error({
local: 'RabbitmqController.init',
message: 'Failed to initialize AMQP',
error: error.message || error,
});
this.scheduleReconnect();
throw error;
});
}
private handleConnectionLoss(): void {
if (this.isReconnecting) {
return; // Already attempting to reconnect
}
this.amqpChannel = null;
this.amqpConnection = null;
this.scheduleReconnect();
}
private scheduleReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.logger.error(
`Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`,
);
return;
}
if (this.isReconnecting) {
return; // Already scheduled
}
this.isReconnecting = true;
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); // Exponential backoff with max delay
this.logger.info(
`Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
);
setTimeout(async () => {
try {
this.logger.info(
`Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
);
await this.connect();
this.logger.info('Successfully reconnected to RabbitMQ');
} catch (error) {
this.logger.error({
local: 'RabbitmqController.scheduleReconnect',
message: `Reconnection attempt ${this.reconnectAttempts} failed`,
error: error.message || error,
});
this.isReconnecting = false;
this.scheduleReconnect();
}
}, delay);
}
private set channel(channel: amqp.Channel) {
@@ -76,6 +189,17 @@ export class RabbitmqController extends EventController implements EventControll
return this.amqpChannel;
}
private async ensureConnection(): Promise<boolean> {
if (!this.amqpChannel) {
this.logger.warn('AMQP channel is not available, attempting to reconnect...');
if (!this.isReconnecting) {
this.scheduleReconnect();
}
return false;
}
return true;
}
public async emit({
instanceName,
origin,
@@ -95,6 +219,11 @@ export class RabbitmqController extends EventController implements EventControll
return;
}
if (!(await this.ensureConnection())) {
this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No AMQP connection`);
return;
}
const instanceRabbitmq = await this.get(instanceName);
const rabbitmqLocal = instanceRabbitmq?.events;
const rabbitmqGlobal = configService.get<Rabbitmq>('RABBITMQ').GLOBAL_ENABLED;
@@ -154,7 +283,15 @@ export class RabbitmqController extends EventController implements EventControll
break;
} catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
}
}
}
@@ -199,7 +336,15 @@ export class RabbitmqController extends EventController implements EventControll
break;
} catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
}
}
}
@@ -208,41 +353,57 @@ export class RabbitmqController extends EventController implements EventControll
private async initGlobalQueues(): Promise<void> {
this.logger.info('Initializing global queues');
if (!(await this.ensureConnection())) {
this.logger.error('Cannot initialize global queues: No AMQP connection');
return;
}
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
const events = configService.get<Rabbitmq>('RABBITMQ').EVENTS;
const prefixKey = configService.get<Rabbitmq>('RABBITMQ').PREFIX_KEY;
if (!events) {
this.logger.warn('No events to initialize on AMQP');
return;
}
const eventKeys = Object.keys(events);
eventKeys.forEach((event) => {
if (events[event] === false) return;
for (const event of eventKeys) {
if (events[event] === false) continue;
const queueName =
prefixKey !== ''
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
: `${event.replace(/_/g, '.').toLowerCase()}`;
const exchangeName = rabbitmqExchangeName;
try {
const queueName =
prefixKey !== ''
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
: `${event.replace(/_/g, '.').toLowerCase()}`;
const exchangeName = rabbitmqExchangeName;
this.amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
await this.amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
this.amqpChannel.bindQueue(queueName, exchangeName, event);
});
await this.amqpChannel.bindQueue(queueName, exchangeName, event);
this.logger.info(`Global queue initialized: ${queueName}`);
} catch (error) {
this.logger.error({
local: 'RabbitmqController.initGlobalQueues',
message: `Failed to initialize global queue for event ${event}`,
error: error.message || error,
});
this.handleConnectionLoss();
break;
}
}
}
}