diff --git a/src/api/integrations/chatbot/base-chatbot.service.ts b/src/api/integrations/chatbot/base-chatbot.service.ts index eddf2a38..f19cb9d4 100644 --- a/src/api/integrations/chatbot/base-chatbot.service.ts +++ b/src/api/integrations/chatbot/base-chatbot.service.ts @@ -353,20 +353,25 @@ export abstract class BaseChatbotService { ? pushName : null; - session = ( - await this.createNewSession( - { - instanceName: instance.instanceName, - instanceId: instance.instanceId, - }, - { - remoteJid, - pushName: pushNameValue, - botId: (bot as any).id, - }, - this.getBotType(), - ) - )?.session; + const sessionResult = await this.createNewSession( + { + instanceName: instance.instanceName, + instanceId: instance.instanceId, + }, + { + remoteJid, + pushName: pushNameValue, + botId: (bot as any).id, + }, + this.getBotType(), + ); + + if (!sessionResult || !sessionResult.session) { + this.logger.error('Failed to create new session'); + return; + } + + session = sessionResult.session; } // Update session status to opened diff --git a/src/api/integrations/chatbot/n8n/controllers/n8n.controller.ts b/src/api/integrations/chatbot/n8n/controllers/n8n.controller.ts index 34e5828a..0eb7f7a6 100644 --- a/src/api/integrations/chatbot/n8n/controllers/n8n.controller.ts +++ b/src/api/integrations/chatbot/n8n/controllers/n8n.controller.ts @@ -110,58 +110,6 @@ export class N8nController extends BaseChatbotController { return super.createBot(instance, data); } - public async findBot(instance: InstanceDto) { - if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); - - const instanceId = await this.prismaRepository.instance - .findFirst({ - where: { - name: instance.instanceName, - }, - }) - .then((instance) => instance.id); - - const bots = await this.botRepository.findMany({ - where: { - instanceId: instanceId, - }, - }); - - if (!bots.length) { - return null; - } - - return bots; - } - - public async fetchBot(instance: InstanceDto, botId: string) { - if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); - - const instanceId = await this.prismaRepository.instance - .findFirst({ - where: { - name: instance.instanceName, - }, - }) - .then((instance) => instance.id); - - const bot = await this.botRepository.findFirst({ - where: { - id: botId, - }, - }); - - if (!bot) { - throw new Error('N8n not found'); - } - - if (bot.instanceId !== instanceId) { - throw new Error('N8n not found'); - } - - return bot; - } - // Process N8n-specific bot logic protected async processBot( instance: any, @@ -173,6 +121,7 @@ export class N8nController extends BaseChatbotController { pushName?: string, msg?: any, ) { - this.n8nService.process(instance, remoteJid, bot, session, settings, content, pushName, msg); + // Use the base class pattern instead of calling n8nService.process directly + await this.n8nService.process(instance, remoteJid, bot, session, settings, content, pushName, msg); } } diff --git a/src/api/integrations/chatbot/n8n/dto/n8n.dto.ts b/src/api/integrations/chatbot/n8n/dto/n8n.dto.ts index c26608fd..c844f76e 100644 --- a/src/api/integrations/chatbot/n8n/dto/n8n.dto.ts +++ b/src/api/integrations/chatbot/n8n/dto/n8n.dto.ts @@ -1,5 +1,3 @@ -import { TriggerOperator, TriggerType } from '@prisma/client'; - import { BaseChatbotDto, BaseChatbotSettingDto } from '../../base-chatbot.dto'; export class N8nDto extends BaseChatbotDto { @@ -7,26 +5,10 @@ export class N8nDto extends BaseChatbotDto { webhookUrl?: string; basicAuthUser?: string; basicAuthPass?: string; - - // Advanced bot properties (copied from DifyDto style) - triggerType: TriggerType; - triggerOperator?: TriggerOperator; - triggerValue?: string; - expire?: number; - keywordFinish?: string; - delayMessage?: number; - unknownMessage?: string; - listeningFromMe?: boolean; - stopBotFromMe?: boolean; - keepOpen?: boolean; - debounceTime?: number; - ignoreJids?: string[]; - splitMessages?: boolean; - timePerChar?: number; } export class N8nSettingDto extends BaseChatbotSettingDto { - // N8n specific fields + // N8n has no specific fields } export class N8nMessageDto { diff --git a/src/api/integrations/chatbot/n8n/services/n8n.service.ts b/src/api/integrations/chatbot/n8n/services/n8n.service.ts index 34f1e27e..2d37dca0 100644 --- a/src/api/integrations/chatbot/n8n/services/n8n.service.ts +++ b/src/api/integrations/chatbot/n8n/services/n8n.service.ts @@ -1,14 +1,12 @@ -import { InstanceDto } from '@api/dto/instance.dto'; import { PrismaRepository } from '@api/repository/repository.service'; import { WAMonitoringService } from '@api/services/monitor.service'; import { ConfigService, HttpServer } from '@config/env.config'; import { IntegrationSession, N8n, N8nSetting } from '@prisma/client'; -import { sendTelemetry } from '@utils/sendTelemetry'; import axios from 'axios'; import { BaseChatbotService } from '../../base-chatbot.service'; import { OpenaiService } from '../../openai/services/openai.service'; -import { N8nDto } from '../dto/n8n.dto'; + export class N8nService extends BaseChatbotService { private openaiService: OpenaiService; @@ -29,92 +27,6 @@ export class N8nService extends BaseChatbotService { return 'n8n'; } - /** - * Create a new N8n bot for the given instance. - */ - public async createBot(instanceId: string, data: N8nDto) { - try { - return await this.prismaRepository.n8n.create({ - data: { - enabled: data.enabled ?? true, - description: data.description, - webhookUrl: data.webhookUrl, - basicAuthUser: data.basicAuthUser, - basicAuthPass: data.basicAuthPass, - instanceId, - }, - }); - } catch (error) { - this.logger.error(error); - throw error; - } - } - - /** - * Find all N8n bots for the given instance. - */ - public async findBots(instanceId: string) { - try { - return await this.prismaRepository.n8n.findMany({ where: { instanceId } }); - } catch (error) { - this.logger.error(error); - throw error; - } - } - - /** - * Fetch a specific N8n bot by ID and instance. - */ - public async fetchBot(instanceId: string, n8nId: string) { - try { - const bot = await this.prismaRepository.n8n.findFirst({ where: { id: n8nId } }); - if (!bot || bot.instanceId !== instanceId) throw new Error('N8n bot not found'); - return bot; - } catch (error) { - this.logger.error(error); - throw error; - } - } - - /** - * Update a specific N8n bot. - */ - public async updateBot(instanceId: string, n8nId: string, data: N8nDto) { - try { - await this.fetchBot(instanceId, n8nId); - return await this.prismaRepository.n8n.update({ - where: { id: n8nId }, - data: { - enabled: data.enabled, - description: data.description, - webhookUrl: data.webhookUrl, - basicAuthUser: data.basicAuthUser, - basicAuthPass: data.basicAuthPass, - }, - }); - } catch (error) { - this.logger.error(error); - throw error; - } - } - - /** - * Delete a specific N8n bot. - */ - public async deleteBot(instanceId: string, n8nId: string) { - try { - await this.fetchBot(instanceId, n8nId); - return await this.prismaRepository.n8n.delete({ where: { id: n8nId } }); - } catch (error) { - this.logger.error(error); - throw error; - } - } - - public async createNewSession(instance: InstanceDto, data: any) { - return super.createNewSession(instance, data, 'n8n'); - } - protected async sendMessageToBot( instance: any, session: IntegrationSession, @@ -126,6 +38,11 @@ export class N8nService extends BaseChatbotService { msg?: any, ) { try { + if (!session) { + this.logger.error('Session is null in sendMessageToBot'); + return; + } + const endpoint: string = n8n.webhookUrl; const payload: any = { chatInput: content, @@ -142,15 +59,12 @@ export class N8nService extends BaseChatbotService { if (this.isAudioMessage(content) && msg) { try { this.logger.debug(`[N8n] Downloading audio for Whisper transcription`); - const transcription = await this.openaiService.speechToText(msg); + const transcription = await this.openaiService.speechToText(msg, instance); if (transcription) { payload.chatInput = transcription; - } else { - payload.chatInput = '[Audio message could not be transcribed]'; } } catch (err) { this.logger.error(`[N8n] Failed to transcribe audio: ${err}`); - payload.chatInput = '[Audio message could not be transcribed]'; } } @@ -161,7 +75,10 @@ export class N8nService extends BaseChatbotService { } const response = await axios.post(endpoint, payload, { headers }); const message = response?.data?.output || response?.data?.answer; + + // Use base class method instead of custom implementation await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + await this.prismaRepository.integrationSession.update({ where: { id: session.id, @@ -176,277 +93,4 @@ export class N8nService extends BaseChatbotService { return; } } - - protected async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: N8nSetting) { - const linkRegex = /(!?)\[(.*?)\]\((.*?)\)/g; - let textBuffer = ''; - let lastIndex = 0; - let match: RegExpExecArray | null; - - while ((match = linkRegex.exec(message)) !== null) { - const [fullMatch, exclamation, altText, url] = match; - const mediaType = this.getMediaType(url); - const beforeText = message.slice(lastIndex, match.index).trim(); - - if (beforeText) { - textBuffer += beforeText; - } - - if (mediaType) { - const splitMessages = settings.splitMessages ?? false; - const timePerChar = settings.timePerChar ?? 0; - const minDelay = 1000; - const maxDelay = 20000; - - if (textBuffer.trim()) { - if (splitMessages) { - const multipleMessages = textBuffer.trim().split('\n\n'); - for (let index = 0; index < multipleMessages.length; index++) { - const message = multipleMessages[index]; - const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } else { - const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: textBuffer, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } - - textBuffer = ''; - - if (mediaType === 'image') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - caption: exclamation === '!' ? undefined : altText, - mediatype: 'image', - media: url, - }); - } else if (mediaType === 'video') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - caption: exclamation === '!' ? undefined : altText, - mediatype: 'video', - media: url, - }); - } else if (mediaType === 'audio') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'audio', - media: url, - }); - } else if (mediaType === 'document') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - caption: exclamation === '!' ? undefined : altText, - mediatype: 'document', - media: url, - fileName: altText || 'file', - }); - } - } else { - textBuffer += `[${altText}](${url})`; - } - - lastIndex = match.index + fullMatch.length; - } - - const remainingText = message.slice(lastIndex).trim(); - if (remainingText) { - textBuffer += remainingText; - } - - if (textBuffer.trim()) { - const splitMessages = settings.splitMessages ?? false; - const timePerChar = settings.timePerChar ?? 0; - const minDelay = 1000; - const maxDelay = 20000; - - if (splitMessages) { - const multipleMessages = textBuffer.trim().split('\n\n'); - for (let index = 0; index < multipleMessages.length; index++) { - const message = multipleMessages[index]; - const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } else { - const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: textBuffer, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === 'WHATSAPP_BAILEYS') { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } - } - - protected async initNewSession( - instance: any, - remoteJid: string, - n8n: N8n, - settings: N8nSetting, - session: IntegrationSession, - content: string, - pushName?: string, - msg?: any, - ) { - try { - await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName || '', content, msg); - } catch (error) { - this.logger.error(error); - return; - } - } - - public async process( - instance: any, - remoteJid: string, - n8n: N8n, - session: IntegrationSession, - settings: N8nSetting, - content: string, - pushName?: string, - msg?: any, - ) { - try { - // Handle keyword finish - if (settings?.keywordFinish?.includes(content.toLowerCase())) { - if (settings?.keepOpen) { - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'closed', - }, - }); - } else { - await this.prismaRepository.integrationSession.delete({ - where: { - id: session.id, - }, - }); - } - - return; - } - - // If session is new or doesn't exist - if (!session) { - const data = { - remoteJid, - pushName, - botId: n8n.id, - }; - - const createSession = await this.createNewSession( - { instanceName: instance.instanceName, instanceId: instance.instanceId }, - data, - ); - - await this.initNewSession(instance, remoteJid, n8n, settings, createSession.session, content, pushName, msg); - - await sendTelemetry('/n8n/session/start'); - return; - } - - // If session exists but is paused - if (session.status === 'paused') { - return; - } - - // Regular message for ongoing session - await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName || '', content, msg); - } catch (error) { - this.logger.error(error); - return; - } - } } diff --git a/src/api/integrations/chatbot/n8n/validate/n8n.schema.ts b/src/api/integrations/chatbot/n8n/validate/n8n.schema.ts index ccb55ae1..b1382ac3 100644 --- a/src/api/integrations/chatbot/n8n/validate/n8n.schema.ts +++ b/src/api/integrations/chatbot/n8n/validate/n8n.schema.ts @@ -1,59 +1,116 @@ import { JSONSchema7 } from 'json-schema'; +import { v4 } from 'uuid'; + +const isNotEmpty = (...propertyNames: string[]): JSONSchema7 => { + const properties = {}; + propertyNames.forEach( + (property) => + (properties[property] = { + minLength: 1, + description: `The "${property}" cannot be empty`, + }), + ); + return { + if: { + propertyNames: { + enum: [...propertyNames], + }, + }, + then: { properties }, + }; +}; export const n8nSchema: JSONSchema7 = { + $id: v4(), type: 'object', properties: { enabled: { type: 'boolean' }, description: { type: 'string' }, - webhookUrl: { type: 'string', minLength: 1 }, + webhookUrl: { type: 'string' }, basicAuthUser: { type: 'string' }, - basicAuthPass: { type: 'string' }, - }, - required: ['enabled', 'webhookUrl'], -}; - -export const n8nMessageSchema: JSONSchema7 = { - type: 'object', - properties: { - chatInput: { type: 'string', minLength: 1 }, - sessionId: { type: 'string', minLength: 1 }, - }, - required: ['chatInput', 'sessionId'], -}; - -export const n8nSettingSchema: JSONSchema7 = { - type: 'object', - properties: { - expire: { type: 'number' }, + basicAuthPassword: { type: 'string' }, + triggerType: { type: 'string', enum: ['all', 'keyword', 'none', 'advanced'] }, + triggerOperator: { type: 'string', enum: ['equals', 'contains', 'startsWith', 'endsWith', 'regex'] }, + triggerValue: { type: 'string' }, + expire: { type: 'integer' }, keywordFinish: { type: 'string' }, - delayMessage: { type: 'number' }, + delayMessage: { type: 'integer' }, unknownMessage: { type: 'string' }, listeningFromMe: { type: 'boolean' }, stopBotFromMe: { type: 'boolean' }, keepOpen: { type: 'boolean' }, - debounceTime: { type: 'number' }, - n8nIdFallback: { type: 'string' }, + debounceTime: { type: 'integer' }, ignoreJids: { type: 'array', items: { type: 'string' } }, splitMessages: { type: 'boolean' }, - timePerChar: { type: 'number' }, + timePerChar: { type: 'integer' }, }, - required: [], + required: ['enabled', 'webhookUrl', 'triggerType'], + ...isNotEmpty('enabled', 'webhookUrl', 'triggerType'), }; export const n8nStatusSchema: JSONSchema7 = { + $id: v4(), type: 'object', properties: { remoteJid: { type: 'string' }, - status: { type: 'string', enum: ['opened', 'closed', 'delete', 'paused'] }, + status: { type: 'string', enum: ['opened', 'closed', 'paused', 'delete'] }, }, required: ['remoteJid', 'status'], + ...isNotEmpty('remoteJid', 'status'), +}; + +export const n8nSettingSchema: JSONSchema7 = { + $id: v4(), + type: 'object', + properties: { + expire: { type: 'integer' }, + keywordFinish: { type: 'string' }, + delayMessage: { type: 'integer' }, + unknownMessage: { type: 'string' }, + listeningFromMe: { type: 'boolean' }, + stopBotFromMe: { type: 'boolean' }, + keepOpen: { type: 'boolean' }, + debounceTime: { type: 'integer' }, + ignoreJids: { type: 'array', items: { type: 'string' } }, + botIdFallback: { type: 'string' }, + splitMessages: { type: 'boolean' }, + timePerChar: { type: 'integer' }, + }, + required: [ + 'expire', + 'keywordFinish', + 'delayMessage', + 'unknownMessage', + 'listeningFromMe', + 'stopBotFromMe', + 'keepOpen', + 'debounceTime', + 'ignoreJids', + 'splitMessages', + 'timePerChar', + ], + ...isNotEmpty( + 'expire', + 'keywordFinish', + 'delayMessage', + 'unknownMessage', + 'listeningFromMe', + 'stopBotFromMe', + 'keepOpen', + 'debounceTime', + 'ignoreJids', + 'splitMessages', + 'timePerChar', + ), }; export const n8nIgnoreJidSchema: JSONSchema7 = { + $id: v4(), type: 'object', properties: { remoteJid: { type: 'string' }, action: { type: 'string', enum: ['add', 'remove'] }, }, required: ['remoteJid', 'action'], + ...isNotEmpty('remoteJid', 'action'), };