refactor: improve session handling and validation in N8n integration

This commit enhances the N8n integration by refining session management and validation logic. Key changes include:
- Added error handling for session creation failures in the BaseChatbotService.
- Removed unused methods and properties in N8nService and N8nDto to streamline the codebase.
- Updated N8n schema to enforce required fields and improve validation checks.
- Simplified message processing logic to utilize base class methods, enhancing maintainability.

These improvements contribute to a more robust and efficient N8n integration.
This commit is contained in:
Guilherme Gomes 2025-05-27 15:10:47 -03:00
parent 39aaf29d54
commit 64fc7a05ac
5 changed files with 114 additions and 477 deletions

View File

@ -353,20 +353,25 @@ export abstract class BaseChatbotService<BotType = any, SettingsType = any> {
? pushName
: null;
session = (
await this.createNewSession(
{
instanceName: instance.instanceName,
instanceId: instance.instanceId,
},
{
remoteJid,
pushName: pushNameValue,
botId: (bot as any).id,
},
this.getBotType(),
)
)?.session;
const sessionResult = await this.createNewSession(
{
instanceName: instance.instanceName,
instanceId: instance.instanceId,
},
{
remoteJid,
pushName: pushNameValue,
botId: (bot as any).id,
},
this.getBotType(),
);
if (!sessionResult || !sessionResult.session) {
this.logger.error('Failed to create new session');
return;
}
session = sessionResult.session;
}
// Update session status to opened

View File

@ -110,58 +110,6 @@ export class N8nController extends BaseChatbotController<N8nModel, N8nDto> {
return super.createBot(instance, data);
}
public async findBot(instance: InstanceDto) {
if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled');
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const bots = await this.botRepository.findMany({
where: {
instanceId: instanceId,
},
});
if (!bots.length) {
return null;
}
return bots;
}
public async fetchBot(instance: InstanceDto, botId: string) {
if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled');
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const bot = await this.botRepository.findFirst({
where: {
id: botId,
},
});
if (!bot) {
throw new Error('N8n not found');
}
if (bot.instanceId !== instanceId) {
throw new Error('N8n not found');
}
return bot;
}
// Process N8n-specific bot logic
protected async processBot(
instance: any,
@ -173,6 +121,7 @@ export class N8nController extends BaseChatbotController<N8nModel, N8nDto> {
pushName?: string,
msg?: any,
) {
this.n8nService.process(instance, remoteJid, bot, session, settings, content, pushName, msg);
// Use the base class pattern instead of calling n8nService.process directly
await this.n8nService.process(instance, remoteJid, bot, session, settings, content, pushName, msg);
}
}

View File

@ -1,5 +1,3 @@
import { TriggerOperator, TriggerType } from '@prisma/client';
import { BaseChatbotDto, BaseChatbotSettingDto } from '../../base-chatbot.dto';
export class N8nDto extends BaseChatbotDto {
@ -7,26 +5,10 @@ export class N8nDto extends BaseChatbotDto {
webhookUrl?: string;
basicAuthUser?: string;
basicAuthPass?: string;
// Advanced bot properties (copied from DifyDto style)
triggerType: TriggerType;
triggerOperator?: TriggerOperator;
triggerValue?: string;
expire?: number;
keywordFinish?: string;
delayMessage?: number;
unknownMessage?: string;
listeningFromMe?: boolean;
stopBotFromMe?: boolean;
keepOpen?: boolean;
debounceTime?: number;
ignoreJids?: string[];
splitMessages?: boolean;
timePerChar?: number;
}
export class N8nSettingDto extends BaseChatbotSettingDto {
// N8n specific fields
// N8n has no specific fields
}
export class N8nMessageDto {

View File

@ -1,14 +1,12 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { ConfigService, HttpServer } from '@config/env.config';
import { IntegrationSession, N8n, N8nSetting } from '@prisma/client';
import { sendTelemetry } from '@utils/sendTelemetry';
import axios from 'axios';
import { BaseChatbotService } from '../../base-chatbot.service';
import { OpenaiService } from '../../openai/services/openai.service';
import { N8nDto } from '../dto/n8n.dto';
export class N8nService extends BaseChatbotService<N8n, N8nSetting> {
private openaiService: OpenaiService;
@ -29,92 +27,6 @@ export class N8nService extends BaseChatbotService<N8n, N8nSetting> {
return 'n8n';
}
/**
* Create a new N8n bot for the given instance.
*/
public async createBot(instanceId: string, data: N8nDto) {
try {
return await this.prismaRepository.n8n.create({
data: {
enabled: data.enabled ?? true,
description: data.description,
webhookUrl: data.webhookUrl,
basicAuthUser: data.basicAuthUser,
basicAuthPass: data.basicAuthPass,
instanceId,
},
});
} catch (error) {
this.logger.error(error);
throw error;
}
}
/**
* Find all N8n bots for the given instance.
*/
public async findBots(instanceId: string) {
try {
return await this.prismaRepository.n8n.findMany({ where: { instanceId } });
} catch (error) {
this.logger.error(error);
throw error;
}
}
/**
* Fetch a specific N8n bot by ID and instance.
*/
public async fetchBot(instanceId: string, n8nId: string) {
try {
const bot = await this.prismaRepository.n8n.findFirst({ where: { id: n8nId } });
if (!bot || bot.instanceId !== instanceId) throw new Error('N8n bot not found');
return bot;
} catch (error) {
this.logger.error(error);
throw error;
}
}
/**
* Update a specific N8n bot.
*/
public async updateBot(instanceId: string, n8nId: string, data: N8nDto) {
try {
await this.fetchBot(instanceId, n8nId);
return await this.prismaRepository.n8n.update({
where: { id: n8nId },
data: {
enabled: data.enabled,
description: data.description,
webhookUrl: data.webhookUrl,
basicAuthUser: data.basicAuthUser,
basicAuthPass: data.basicAuthPass,
},
});
} catch (error) {
this.logger.error(error);
throw error;
}
}
/**
* Delete a specific N8n bot.
*/
public async deleteBot(instanceId: string, n8nId: string) {
try {
await this.fetchBot(instanceId, n8nId);
return await this.prismaRepository.n8n.delete({ where: { id: n8nId } });
} catch (error) {
this.logger.error(error);
throw error;
}
}
public async createNewSession(instance: InstanceDto, data: any) {
return super.createNewSession(instance, data, 'n8n');
}
protected async sendMessageToBot(
instance: any,
session: IntegrationSession,
@ -126,6 +38,11 @@ export class N8nService extends BaseChatbotService<N8n, N8nSetting> {
msg?: any,
) {
try {
if (!session) {
this.logger.error('Session is null in sendMessageToBot');
return;
}
const endpoint: string = n8n.webhookUrl;
const payload: any = {
chatInput: content,
@ -142,15 +59,12 @@ export class N8nService extends BaseChatbotService<N8n, N8nSetting> {
if (this.isAudioMessage(content) && msg) {
try {
this.logger.debug(`[N8n] Downloading audio for Whisper transcription`);
const transcription = await this.openaiService.speechToText(msg);
const transcription = await this.openaiService.speechToText(msg, instance);
if (transcription) {
payload.chatInput = transcription;
} else {
payload.chatInput = '[Audio message could not be transcribed]';
}
} catch (err) {
this.logger.error(`[N8n] Failed to transcribe audio: ${err}`);
payload.chatInput = '[Audio message could not be transcribed]';
}
}
@ -161,7 +75,10 @@ export class N8nService extends BaseChatbotService<N8n, N8nSetting> {
}
const response = await axios.post(endpoint, payload, { headers });
const message = response?.data?.output || response?.data?.answer;
// Use base class method instead of custom implementation
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
@ -176,277 +93,4 @@ export class N8nService extends BaseChatbotService<N8n, N8nSetting> {
return;
}
}
protected async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: N8nSetting) {
const linkRegex = /(!?)\[(.*?)\]\((.*?)\)/g;
let textBuffer = '';
let lastIndex = 0;
let match: RegExpExecArray | null;
while ((match = linkRegex.exec(message)) !== null) {
const [fullMatch, exclamation, altText, url] = match;
const mediaType = this.getMediaType(url);
const beforeText = message.slice(lastIndex, match.index).trim();
if (beforeText) {
textBuffer += beforeText;
}
if (mediaType) {
const splitMessages = settings.splitMessages ?? false;
const timePerChar = settings.timePerChar ?? 0;
const minDelay = 1000;
const maxDelay = 20000;
if (textBuffer.trim()) {
if (splitMessages) {
const multipleMessages = textBuffer.trim().split('\n\n');
for (let index = 0; index < multipleMessages.length; index++) {
const message = multipleMessages[index];
const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: message,
},
false,
);
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
} else {
const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: textBuffer,
},
false,
);
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
}
textBuffer = '';
if (mediaType === 'image') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
caption: exclamation === '!' ? undefined : altText,
mediatype: 'image',
media: url,
});
} else if (mediaType === 'video') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
caption: exclamation === '!' ? undefined : altText,
mediatype: 'video',
media: url,
});
} else if (mediaType === 'audio') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
mediatype: 'audio',
media: url,
});
} else if (mediaType === 'document') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
caption: exclamation === '!' ? undefined : altText,
mediatype: 'document',
media: url,
fileName: altText || 'file',
});
}
} else {
textBuffer += `[${altText}](${url})`;
}
lastIndex = match.index + fullMatch.length;
}
const remainingText = message.slice(lastIndex).trim();
if (remainingText) {
textBuffer += remainingText;
}
if (textBuffer.trim()) {
const splitMessages = settings.splitMessages ?? false;
const timePerChar = settings.timePerChar ?? 0;
const minDelay = 1000;
const maxDelay = 20000;
if (splitMessages) {
const multipleMessages = textBuffer.trim().split('\n\n');
for (let index = 0; index < multipleMessages.length; index++) {
const message = multipleMessages[index];
const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: message,
},
false,
);
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
} else {
const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: textBuffer,
},
false,
);
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
}
}
protected async initNewSession(
instance: any,
remoteJid: string,
n8n: N8n,
settings: N8nSetting,
session: IntegrationSession,
content: string,
pushName?: string,
msg?: any,
) {
try {
await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName || '', content, msg);
} catch (error) {
this.logger.error(error);
return;
}
}
public async process(
instance: any,
remoteJid: string,
n8n: N8n,
session: IntegrationSession,
settings: N8nSetting,
content: string,
pushName?: string,
msg?: any,
) {
try {
// Handle keyword finish
if (settings?.keywordFinish?.includes(content.toLowerCase())) {
if (settings?.keepOpen) {
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'closed',
},
});
} else {
await this.prismaRepository.integrationSession.delete({
where: {
id: session.id,
},
});
}
return;
}
// If session is new or doesn't exist
if (!session) {
const data = {
remoteJid,
pushName,
botId: n8n.id,
};
const createSession = await this.createNewSession(
{ instanceName: instance.instanceName, instanceId: instance.instanceId },
data,
);
await this.initNewSession(instance, remoteJid, n8n, settings, createSession.session, content, pushName, msg);
await sendTelemetry('/n8n/session/start');
return;
}
// If session exists but is paused
if (session.status === 'paused') {
return;
}
// Regular message for ongoing session
await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName || '', content, msg);
} catch (error) {
this.logger.error(error);
return;
}
}
}

View File

@ -1,59 +1,116 @@
import { JSONSchema7 } from 'json-schema';
import { v4 } from 'uuid';
const isNotEmpty = (...propertyNames: string[]): JSONSchema7 => {
const properties = {};
propertyNames.forEach(
(property) =>
(properties[property] = {
minLength: 1,
description: `The "${property}" cannot be empty`,
}),
);
return {
if: {
propertyNames: {
enum: [...propertyNames],
},
},
then: { properties },
};
};
export const n8nSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
enabled: { type: 'boolean' },
description: { type: 'string' },
webhookUrl: { type: 'string', minLength: 1 },
webhookUrl: { type: 'string' },
basicAuthUser: { type: 'string' },
basicAuthPass: { type: 'string' },
},
required: ['enabled', 'webhookUrl'],
};
export const n8nMessageSchema: JSONSchema7 = {
type: 'object',
properties: {
chatInput: { type: 'string', minLength: 1 },
sessionId: { type: 'string', minLength: 1 },
},
required: ['chatInput', 'sessionId'],
};
export const n8nSettingSchema: JSONSchema7 = {
type: 'object',
properties: {
expire: { type: 'number' },
basicAuthPassword: { type: 'string' },
triggerType: { type: 'string', enum: ['all', 'keyword', 'none', 'advanced'] },
triggerOperator: { type: 'string', enum: ['equals', 'contains', 'startsWith', 'endsWith', 'regex'] },
triggerValue: { type: 'string' },
expire: { type: 'integer' },
keywordFinish: { type: 'string' },
delayMessage: { type: 'number' },
delayMessage: { type: 'integer' },
unknownMessage: { type: 'string' },
listeningFromMe: { type: 'boolean' },
stopBotFromMe: { type: 'boolean' },
keepOpen: { type: 'boolean' },
debounceTime: { type: 'number' },
n8nIdFallback: { type: 'string' },
debounceTime: { type: 'integer' },
ignoreJids: { type: 'array', items: { type: 'string' } },
splitMessages: { type: 'boolean' },
timePerChar: { type: 'number' },
timePerChar: { type: 'integer' },
},
required: [],
required: ['enabled', 'webhookUrl', 'triggerType'],
...isNotEmpty('enabled', 'webhookUrl', 'triggerType'),
};
export const n8nStatusSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
remoteJid: { type: 'string' },
status: { type: 'string', enum: ['opened', 'closed', 'delete', 'paused'] },
status: { type: 'string', enum: ['opened', 'closed', 'paused', 'delete'] },
},
required: ['remoteJid', 'status'],
...isNotEmpty('remoteJid', 'status'),
};
export const n8nSettingSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
expire: { type: 'integer' },
keywordFinish: { type: 'string' },
delayMessage: { type: 'integer' },
unknownMessage: { type: 'string' },
listeningFromMe: { type: 'boolean' },
stopBotFromMe: { type: 'boolean' },
keepOpen: { type: 'boolean' },
debounceTime: { type: 'integer' },
ignoreJids: { type: 'array', items: { type: 'string' } },
botIdFallback: { type: 'string' },
splitMessages: { type: 'boolean' },
timePerChar: { type: 'integer' },
},
required: [
'expire',
'keywordFinish',
'delayMessage',
'unknownMessage',
'listeningFromMe',
'stopBotFromMe',
'keepOpen',
'debounceTime',
'ignoreJids',
'splitMessages',
'timePerChar',
],
...isNotEmpty(
'expire',
'keywordFinish',
'delayMessage',
'unknownMessage',
'listeningFromMe',
'stopBotFromMe',
'keepOpen',
'debounceTime',
'ignoreJids',
'splitMessages',
'timePerChar',
),
};
export const n8nIgnoreJidSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
remoteJid: { type: 'string' },
action: { type: 'string', enum: ['add', 'remove'] },
},
required: ['remoteJid', 'action'],
...isNotEmpty('remoteJid', 'action'),
};