diff --git a/prisma/mysql-schema.prisma b/prisma/mysql-schema.prisma index 99d81a0b..74a232b2 100644 --- a/prisma/mysql-schema.prisma +++ b/prisma/mysql-schema.prisma @@ -106,6 +106,7 @@ model Instance { Flowise Flowise[] FlowiseSetting FlowiseSetting? Pusher Pusher? + N8n N8n[] } model Session { @@ -643,3 +644,52 @@ model IsOnWhatsapp { createdAt DateTime @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp } + +model N8n { + id String @id @default(cuid()) + enabled Boolean @default(true) @db.Boolean + description String? @db.VarChar(255) + webhookUrl String? @db.VarChar(255) + basicAuthUser String? @db.VarChar(255) + basicAuthPass String? @db.VarChar(255) + expire Int? @default(0) @db.Integer + keywordFinish String? @db.VarChar(100) + delayMessage Int? @db.Integer + unknownMessage String? @db.VarChar(100) + listeningFromMe Boolean? @default(false) @db.Boolean + stopBotFromMe Boolean? @default(false) @db.Boolean + keepOpen Boolean? @default(false) @db.Boolean + debounceTime Int? @db.Integer + ignoreJids Json? + splitMessages Boolean? @default(false) @db.Boolean + timePerChar Int? @default(50) @db.Integer + triggerType TriggerType? + triggerOperator TriggerOperator? + triggerValue String? + createdAt DateTime? @default(now()) @db.Timestamp + updatedAt DateTime @updatedAt @db.Timestamp + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String + N8nSetting N8nSetting[] +} + +model N8nSetting { + id String @id @default(cuid()) + expire Int? @default(0) @db.Integer + keywordFinish String? @db.VarChar(100) + delayMessage Int? @db.Integer + unknownMessage String? @db.VarChar(100) + listeningFromMe Boolean? @default(false) @db.Boolean + stopBotFromMe Boolean? @default(false) @db.Boolean + keepOpen Boolean? @default(false) @db.Boolean + debounceTime Int? @db.Integer + ignoreJids Json? + splitMessages Boolean? @default(false) @db.Boolean + timePerChar Int? @default(50) @db.Integer + createdAt DateTime? @default(now()) @db.Timestamp + updatedAt DateTime @updatedAt @db.Timestamp + Fallback N8n? @relation(fields: [n8nIdFallback], references: [id]) + n8nIdFallback String? @db.VarChar(100) + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String @unique +} diff --git a/prisma/postgresql-migrations/20250514232744_add_n8n_table/migration.sql b/prisma/postgresql-migrations/20250514232744_add_n8n_table/migration.sql new file mode 100644 index 00000000..18a0d23d --- /dev/null +++ b/prisma/postgresql-migrations/20250514232744_add_n8n_table/migration.sql @@ -0,0 +1,62 @@ +-- CreateTable +CREATE TABLE "N8n" ( + "id" TEXT NOT NULL, + "enabled" BOOLEAN NOT NULL DEFAULT true, + "description" VARCHAR(255), + "webhookUrl" VARCHAR(255), + "basicAuthUser" VARCHAR(255), + "basicAuthPass" VARCHAR(255), + "expire" INTEGER DEFAULT 0, + "keywordFinish" VARCHAR(100), + "delayMessage" INTEGER, + "unknownMessage" VARCHAR(100), + "listeningFromMe" BOOLEAN DEFAULT false, + "stopBotFromMe" BOOLEAN DEFAULT false, + "keepOpen" BOOLEAN DEFAULT false, + "debounceTime" INTEGER, + "ignoreJids" JSONB, + "splitMessages" BOOLEAN DEFAULT false, + "timePerChar" INTEGER DEFAULT 50, + "triggerType" "TriggerType", + "triggerOperator" "TriggerOperator", + "triggerValue" TEXT, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP NOT NULL, + "instanceId" TEXT NOT NULL, + + CONSTRAINT "N8n_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "N8nSetting" ( + "id" TEXT NOT NULL, + "expire" INTEGER DEFAULT 0, + "keywordFinish" VARCHAR(100), + "delayMessage" INTEGER, + "unknownMessage" VARCHAR(100), + "listeningFromMe" BOOLEAN DEFAULT false, + "stopBotFromMe" BOOLEAN DEFAULT false, + "keepOpen" BOOLEAN DEFAULT false, + "debounceTime" INTEGER, + "ignoreJids" JSONB, + "splitMessages" BOOLEAN DEFAULT false, + "timePerChar" INTEGER DEFAULT 50, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP NOT NULL, + "n8nIdFallback" VARCHAR(100), + "instanceId" TEXT NOT NULL, + + CONSTRAINT "N8nSetting_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "N8nSetting_instanceId_key" ON "N8nSetting"("instanceId"); + +-- AddForeignKey +ALTER TABLE "N8n" ADD CONSTRAINT "N8n_instanceId_fkey" FOREIGN KEY ("instanceId") REFERENCES "Instance"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "N8nSetting" ADD CONSTRAINT "N8nSetting_n8nIdFallback_fkey" FOREIGN KEY ("n8nIdFallback") REFERENCES "N8n"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "N8nSetting" ADD CONSTRAINT "N8nSetting_instanceId_fkey" FOREIGN KEY ("instanceId") REFERENCES "Instance"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/postgresql-schema.prisma b/prisma/postgresql-schema.prisma index 5394348d..4bd1b417 100644 --- a/prisma/postgresql-schema.prisma +++ b/prisma/postgresql-schema.prisma @@ -106,6 +106,8 @@ model Instance { Flowise Flowise[] FlowiseSetting FlowiseSetting? Pusher Pusher? + N8n N8n[] + N8nSetting N8nSetting[] } model Session { @@ -643,3 +645,52 @@ model IsOnWhatsapp { createdAt DateTime @default(now()) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp } + +model N8n { + id String @id @default(cuid()) + enabled Boolean @default(true) @db.Boolean + description String? @db.VarChar(255) + webhookUrl String? @db.VarChar(255) + basicAuthUser String? @db.VarChar(255) + basicAuthPass String? @db.VarChar(255) + expire Int? @default(0) @db.Integer + keywordFinish String? @db.VarChar(100) + delayMessage Int? @db.Integer + unknownMessage String? @db.VarChar(100) + listeningFromMe Boolean? @default(false) @db.Boolean + stopBotFromMe Boolean? @default(false) @db.Boolean + keepOpen Boolean? @default(false) @db.Boolean + debounceTime Int? @db.Integer + ignoreJids Json? + splitMessages Boolean? @default(false) @db.Boolean + timePerChar Int? @default(50) @db.Integer + triggerType TriggerType? + triggerOperator TriggerOperator? + triggerValue String? + createdAt DateTime? @default(now()) @db.Timestamp + updatedAt DateTime @updatedAt @db.Timestamp + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String + N8nSetting N8nSetting[] +} + +model N8nSetting { + id String @id @default(cuid()) + expire Int? @default(0) @db.Integer + keywordFinish String? @db.VarChar(100) + delayMessage Int? @db.Integer + unknownMessage String? @db.VarChar(100) + listeningFromMe Boolean? @default(false) @db.Boolean + stopBotFromMe Boolean? @default(false) @db.Boolean + keepOpen Boolean? @default(false) @db.Boolean + debounceTime Int? @db.Integer + ignoreJids Json? + splitMessages Boolean? @default(false) @db.Boolean + timePerChar Int? @default(50) @db.Integer + createdAt DateTime? @default(now()) @db.Timestamp + updatedAt DateTime @updatedAt @db.Timestamp + Fallback N8n? @relation(fields: [n8nIdFallback], references: [id]) + n8nIdFallback String? @db.VarChar(100) + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String @unique +} diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 22c610c5..58115e07 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -134,7 +134,6 @@ import { randomBytes } from 'crypto'; import EventEmitter2 from 'eventemitter2'; import ffmpeg from 'fluent-ffmpeg'; import FormData from 'form-data'; -import { readFileSync } from 'fs'; import Long from 'long'; import mimeTypes from 'mime-types'; import NodeCache from 'node-cache'; @@ -230,10 +229,10 @@ export class BaileysStartupService extends ChannelStartupService { private authStateProvider: AuthStateProvider; private readonly msgRetryCounterCache: CacheStore = new NodeCache(); - private readonly userDevicesCache: CacheStore = new NodeCache({ - stdTTL: 300000, - useClones: false - }); + private readonly userDevicesCache: CacheStore = new NodeCache({ + stdTTL: 300000, + useClones: false, + }); private endSession = false; private logBaileys = this.configService.get('LOG').BAILEYS; diff --git a/src/api/integrations/chatbot/chatbot.controller.ts b/src/api/integrations/chatbot/chatbot.controller.ts index f99b4fae..f09de5d0 100644 --- a/src/api/integrations/chatbot/chatbot.controller.ts +++ b/src/api/integrations/chatbot/chatbot.controller.ts @@ -4,6 +4,7 @@ import { difyController, evolutionBotController, flowiseController, + n8nController, openaiController, typebotController, } from '@api/server.module'; @@ -97,6 +98,8 @@ export class ChatbotController { await difyController.emit(emitData); + await n8nController.emit(emitData); + await flowiseController.emit(emitData); } diff --git a/src/api/integrations/chatbot/chatbot.router.ts b/src/api/integrations/chatbot/chatbot.router.ts index 19fc74b7..32d2add0 100644 --- a/src/api/integrations/chatbot/chatbot.router.ts +++ b/src/api/integrations/chatbot/chatbot.router.ts @@ -6,6 +6,7 @@ import { Router } from 'express'; import { EvolutionBotRouter } from './evolutionBot/routes/evolutionBot.router'; import { FlowiseRouter } from './flowise/routes/flowise.router'; +import { N8nRouter } from './n8n/routes/n8n.router'; export class ChatbotRouter { public readonly router: Router; @@ -19,5 +20,6 @@ export class ChatbotRouter { this.router.use('/openai', new OpenaiRouter(...guards).router); this.router.use('/dify', new DifyRouter(...guards).router); this.router.use('/flowise', new FlowiseRouter(...guards).router); + this.router.use('/n8n', new N8nRouter(...guards).router); } } diff --git a/src/api/integrations/chatbot/chatbot.schema.ts b/src/api/integrations/chatbot/chatbot.schema.ts index efc2388f..b85dc887 100644 --- a/src/api/integrations/chatbot/chatbot.schema.ts +++ b/src/api/integrations/chatbot/chatbot.schema.ts @@ -4,3 +4,4 @@ export * from '@api/integrations/chatbot/evolutionBot/validate/evolutionBot.sche export * from '@api/integrations/chatbot/flowise/validate/flowise.schema'; export * from '@api/integrations/chatbot/openai/validate/openai.schema'; export * from '@api/integrations/chatbot/typebot/validate/typebot.schema'; +export * from '@api/integrations/chatbot/n8n/validate/n8n.schema'; diff --git a/src/api/integrations/chatbot/n8n/controllers/n8n.controller.ts b/src/api/integrations/chatbot/n8n/controllers/n8n.controller.ts new file mode 100644 index 00000000..a58a386d --- /dev/null +++ b/src/api/integrations/chatbot/n8n/controllers/n8n.controller.ts @@ -0,0 +1,888 @@ +import { IgnoreJidDto } from '@api/dto/chatbot.dto'; +import { InstanceDto } from '@api/dto/instance.dto'; +import { N8nDto } from '@api/integrations/chatbot/n8n/dto/n8n.dto'; +import { N8nService } from '@api/integrations/chatbot/n8n/services/n8n.service'; +import { PrismaRepository } from '@api/repository/repository.service'; +import { WAMonitoringService } from '@api/services/monitor.service'; +import { configService } from '@config/env.config'; +import { Logger } from '@config/logger.config'; +import { BadRequestException } from '@exceptions'; +import { N8n as N8nModel } from '@prisma/client'; +import { getConversationMessage } from '@utils/getConversationMessage'; + +import { ChatbotController, ChatbotControllerInterface, EmitData } from '../../chatbot.controller'; + +export class N8nController extends ChatbotController implements ChatbotControllerInterface { + constructor( + private readonly n8nService: N8nService, + prismaRepository: PrismaRepository, + waMonitor: WAMonitoringService, + ) { + super(prismaRepository, waMonitor); + + this.botRepository = this.prismaRepository.n8n; + this.settingsRepository = this.prismaRepository.n8nSetting; + this.sessionRepository = this.prismaRepository.integrationSession; + } + + public readonly logger = new Logger('N8nController'); + + integrationEnabled = configService.get('N8N').ENABLED; + botRepository: any; + settingsRepository: any; + sessionRepository: any; + userMessageDebounce: { [key: string]: { message: string; timeoutId: NodeJS.Timeout } } = {}; + + // Bots + public async createBot(instance: InstanceDto, data: N8nDto) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + if ( + !data.expire || + !data.keywordFinish || + !data.delayMessage || + !data.unknownMessage || + !data.listeningFromMe || + !data.stopBotFromMe || + !data.keepOpen || + !data.debounceTime || + !data.ignoreJids || + !data.splitMessages || + !data.timePerChar + ) { + const defaultSettingCheck = await this.settingsRepository.findFirst({ + where: { + instanceId: instanceId, + }, + }); + + if (data.expire === undefined || data.expire === null) data.expire = defaultSettingCheck.expire; + if (data.keywordFinish === undefined || data.keywordFinish === null) + data.keywordFinish = defaultSettingCheck.keywordFinish; + if (data.delayMessage === undefined || data.delayMessage === null) + data.delayMessage = defaultSettingCheck.delayMessage; + if (data.unknownMessage === undefined || data.unknownMessage === null) + data.unknownMessage = defaultSettingCheck.unknownMessage; + if (data.listeningFromMe === undefined || data.listeningFromMe === null) + data.listeningFromMe = defaultSettingCheck.listeningFromMe; + if (data.stopBotFromMe === undefined || data.stopBotFromMe === null) + data.stopBotFromMe = defaultSettingCheck.stopBotFromMe; + if (data.keepOpen === undefined || data.keepOpen === null) data.keepOpen = defaultSettingCheck.keepOpen; + if (data.debounceTime === undefined || data.debounceTime === null) + data.debounceTime = defaultSettingCheck.debounceTime; + if (data.ignoreJids === undefined || data.ignoreJids === null) data.ignoreJids = defaultSettingCheck.ignoreJids; + if (data.splitMessages === undefined || data.splitMessages === null) + data.splitMessages = defaultSettingCheck?.splitMessages ?? false; + if (data.timePerChar === undefined || data.timePerChar === null) + data.timePerChar = defaultSettingCheck?.timePerChar ?? 0; + + if (!defaultSettingCheck) { + await this.settings(instance, { + expire: data.expire, + keywordFinish: data.keywordFinish, + delayMessage: data.delayMessage, + unknownMessage: data.unknownMessage, + listeningFromMe: data.listeningFromMe, + stopBotFromMe: data.stopBotFromMe, + keepOpen: data.keepOpen, + debounceTime: data.debounceTime, + ignoreJids: data.ignoreJids, + splitMessages: data.splitMessages, + timePerChar: data.timePerChar, + }); + } + } + + const checkTriggerAll = await this.botRepository.findFirst({ + where: { + enabled: true, + triggerType: 'all', + instanceId: instanceId, + }, + }); + + if (checkTriggerAll && data.triggerType === 'all') { + throw new Error('You already have an n8n with an "All" trigger, you cannot have more bots while it is active'); + } + + const checkDuplicate = await this.botRepository.findFirst({ + where: { + instanceId: instanceId, + webhookUrl: data.webhookUrl, + basicAuthUser: data.basicAuthUser, + basicAuthPass: data.basicAuthPass, + }, + }); + + if (checkDuplicate) { + throw new Error('N8n already exists'); + } + + if (data.triggerType === 'keyword') { + if (!data.triggerOperator || !data.triggerValue) { + throw new Error('Trigger operator and value are required'); + } + + const checkDuplicate = await this.botRepository.findFirst({ + where: { + triggerOperator: data.triggerOperator, + triggerValue: data.triggerValue, + instanceId: instanceId, + }, + }); + + if (checkDuplicate) { + throw new Error('Trigger already exists'); + } + } + + if (data.triggerType === 'advanced') { + if (!data.triggerValue) { + throw new Error('Trigger value is required'); + } + + const checkDuplicate = await this.botRepository.findFirst({ + where: { + triggerValue: data.triggerValue, + instanceId: instanceId, + }, + }); + + if (checkDuplicate) { + throw new Error('Trigger already exists'); + } + } + + try { + const bot = await this.botRepository.create({ + data: { + enabled: data?.enabled, + description: data.description, + webhookUrl: data.webhookUrl, + basicAuthUser: data.basicAuthUser, + basicAuthPass: data.basicAuthPass, + expire: data.expire, + keywordFinish: data.keywordFinish, + delayMessage: data.delayMessage, + unknownMessage: data.unknownMessage, + listeningFromMe: data.listeningFromMe, + stopBotFromMe: data.stopBotFromMe, + keepOpen: data.keepOpen, + debounceTime: data.debounceTime, + instanceId: instanceId, + triggerType: data.triggerType, + triggerOperator: data.triggerOperator, + triggerValue: data.triggerValue, + ignoreJids: data.ignoreJids, + splitMessages: data.splitMessages, + timePerChar: data.timePerChar, + }, + }); + + return bot; + } catch (error) { + this.logger.error(error); + throw new Error('Error creating n8n'); + } + } + + public async findBot(instance: InstanceDto) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const bots = await this.botRepository.findMany({ + where: { + instanceId: instanceId, + }, + }); + + if (!bots.length) { + return null; + } + + return bots; + } + + public async fetchBot(instance: InstanceDto, botId: string) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const bot = await this.botRepository.findFirst({ + where: { + id: botId, + }, + }); + + if (!bot) { + throw new Error('N8n not found'); + } + + if (bot.instanceId !== instanceId) { + throw new Error('N8n not found'); + } + + return bot; + } + + public async updateBot(instance: InstanceDto, botId: string, data: N8nDto) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const bot = await this.botRepository.findFirst({ + where: { + id: botId, + }, + }); + + if (!bot) { + throw new Error('N8n not found'); + } + + if (bot.instanceId !== instanceId) { + throw new Error('N8n not found'); + } + + if (data.triggerType === 'all') { + const checkTriggerAll = await this.botRepository.findFirst({ + where: { + enabled: true, + triggerType: 'all', + id: { + not: botId, + }, + instanceId: instanceId, + }, + }); + + if (checkTriggerAll) { + throw new Error('You already have an n8n with an "All" trigger, you cannot have more bots while it is active'); + } + } + + const checkDuplicate = await this.botRepository.findFirst({ + where: { + id: { + not: botId, + }, + instanceId: instanceId, + webhookUrl: data.webhookUrl, + basicAuthUser: data.basicAuthUser, + basicAuthPass: data.basicAuthPass, + }, + }); + + if (checkDuplicate) { + throw new Error('N8n already exists'); + } + + if (data.triggerType === 'keyword') { + if (!data.triggerOperator || !data.triggerValue) { + throw new Error('Trigger operator and value are required'); + } + + const checkDuplicate = await this.botRepository.findFirst({ + where: { + triggerOperator: data.triggerOperator, + triggerValue: data.triggerValue, + id: { not: botId }, + instanceId: instanceId, + }, + }); + + if (checkDuplicate) { + throw new Error('Trigger already exists'); + } + } + + if (data.triggerType === 'advanced') { + if (!data.triggerValue) { + throw new Error('Trigger value is required'); + } + + const checkDuplicate = await this.botRepository.findFirst({ + where: { + triggerValue: data.triggerValue, + id: { not: botId }, + instanceId: instanceId, + }, + }); + + if (checkDuplicate) { + throw new Error('Trigger already exists'); + } + } + + try { + const bot = await this.botRepository.update({ + where: { + id: botId, + }, + data: { + enabled: data?.enabled, + description: data.description, + webhookUrl: data.webhookUrl, + basicAuthUser: data.basicAuthUser, + basicAuthPass: data.basicAuthPass, + expire: data.expire, + keywordFinish: data.keywordFinish, + delayMessage: data.delayMessage, + unknownMessage: data.unknownMessage, + listeningFromMe: data.listeningFromMe, + stopBotFromMe: data.stopBotFromMe, + keepOpen: data.keepOpen, + debounceTime: data.debounceTime, + instanceId: instanceId, + triggerType: data.triggerType, + triggerOperator: data.triggerOperator, + triggerValue: data.triggerValue, + ignoreJids: data.ignoreJids, + splitMessages: data.splitMessages, + timePerChar: data.timePerChar, + }, + }); + + return bot; + } catch (error) { + this.logger.error(error); + throw new Error('Error updating n8n'); + } + } + + public async deleteBot(instance: InstanceDto, botId: string) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const bot = await this.botRepository.findFirst({ + where: { + id: botId, + }, + }); + + if (!bot) { + throw new Error('N8n not found'); + } + + if (bot.instanceId !== instanceId) { + throw new Error('N8n not found'); + } + try { + await this.prismaRepository.integrationSession.deleteMany({ + where: { + botId: botId, + }, + }); + + await this.botRepository.delete({ + where: { + id: botId, + }, + }); + + return { bot: { id: botId } }; + } catch (error) { + this.logger.error(error); + throw new Error('Error deleting n8n bot'); + } + } + + // Settings + public async settings(instance: InstanceDto, data: any) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + try { + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const settings = await this.settingsRepository.findFirst({ + where: { + instanceId: instanceId, + }, + }); + + if (settings) { + const updateSettings = await this.settingsRepository.update({ + where: { + id: settings.id, + }, + data: { + expire: data.expire, + keywordFinish: data.keywordFinish, + delayMessage: data.delayMessage, + unknownMessage: data.unknownMessage, + listeningFromMe: data.listeningFromMe, + stopBotFromMe: data.stopBotFromMe, + keepOpen: data.keepOpen, + debounceTime: data.debounceTime, + n8nIdFallback: data.n8nIdFallback, + ignoreJids: data.ignoreJids, + splitMessages: data.splitMessages, + timePerChar: data.timePerChar, + }, + }); + + return { + expire: updateSettings.expire, + keywordFinish: updateSettings.keywordFinish, + delayMessage: updateSettings.delayMessage, + unknownMessage: updateSettings.unknownMessage, + listeningFromMe: updateSettings.listeningFromMe, + stopBotFromMe: updateSettings.stopBotFromMe, + keepOpen: updateSettings.keepOpen, + debounceTime: updateSettings.debounceTime, + n8nIdFallback: updateSettings.n8nIdFallback, + ignoreJids: updateSettings.ignoreJids, + splitMessages: updateSettings.splitMessages, + timePerChar: updateSettings.timePerChar, + }; + } + + const newSettings = await this.settingsRepository.create({ + data: { + expire: data.expire, + keywordFinish: data.keywordFinish, + delayMessage: data.delayMessage, + unknownMessage: data.unknownMessage, + listeningFromMe: data.listeningFromMe, + stopBotFromMe: data.stopBotFromMe, + keepOpen: data.keepOpen, + debounceTime: data.debounceTime, + n8nIdFallback: data.n8nIdFallback, + ignoreJids: data.ignoreJids, + instanceId: instanceId, + splitMessages: data.splitMessages, + timePerChar: data.timePerChar, + }, + }); + + return { + expire: newSettings.expire, + keywordFinish: newSettings.keywordFinish, + delayMessage: newSettings.delayMessage, + unknownMessage: newSettings.unknownMessage, + listeningFromMe: newSettings.listeningFromMe, + stopBotFromMe: newSettings.stopBotFromMe, + keepOpen: newSettings.keepOpen, + debounceTime: newSettings.debounceTime, + n8nIdFallback: newSettings.n8nIdFallback, + ignoreJids: newSettings.ignoreJids, + splitMessages: newSettings.splitMessages, + timePerChar: newSettings.timePerChar, + }; + } catch (error) { + this.logger.error(error); + throw new Error('Error setting default settings'); + } + } + + public async fetchSettings(instance: InstanceDto) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + try { + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const settings = await this.settingsRepository.findFirst({ + where: { + instanceId: instanceId, + }, + include: { + Fallback: true, + }, + }); + + if (!settings) { + return { + expire: 0, + keywordFinish: '', + delayMessage: 0, + unknownMessage: '', + listeningFromMe: false, + stopBotFromMe: false, + keepOpen: false, + ignoreJids: [], + splitMessages: false, + timePerChar: 0, + n8nIdFallback: '', + fallback: null, + }; + } + + return { + expire: settings.expire, + keywordFinish: settings.keywordFinish, + delayMessage: settings.delayMessage, + unknownMessage: settings.unknownMessage, + listeningFromMe: settings.listeningFromMe, + stopBotFromMe: settings.stopBotFromMe, + keepOpen: settings.keepOpen, + ignoreJids: settings.ignoreJids, + splitMessages: settings.splitMessages, + timePerChar: settings.timePerChar, + n8nIdFallback: settings.n8nIdFallback, + fallback: settings.Fallback, + }; + } catch (error) { + this.logger.error(error); + throw new Error('Error fetching default settings'); + } + } + + // Sessions + public async changeStatus(instance: InstanceDto, data: any) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + try { + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const defaultSettingCheck = await this.settingsRepository.findFirst({ + where: { + instanceId, + }, + }); + + const remoteJid = data.remoteJid; + const status = data.status; + + if (status === 'delete') { + await this.sessionRepository.deleteMany({ + where: { + remoteJid: remoteJid, + botId: { not: null }, + }, + }); + + return { bot: { remoteJid: remoteJid, status: status } }; + } + + if (status === 'closed') { + if (defaultSettingCheck?.keepOpen) { + await this.sessionRepository.updateMany({ + where: { + remoteJid: remoteJid, + botId: { not: null }, + }, + data: { + status: 'closed', + }, + }); + } else { + await this.sessionRepository.deleteMany({ + where: { + remoteJid: remoteJid, + botId: { not: null }, + }, + }); + } + + return { bot: { ...instance, bot: { remoteJid: remoteJid, status: status } } }; + } else { + const session = await this.sessionRepository.updateMany({ + where: { + instanceId: instanceId, + remoteJid: remoteJid, + botId: { not: null }, + }, + data: { + status: status, + }, + }); + + const botData = { + remoteJid: remoteJid, + status: status, + session, + }; + + return { bot: { ...instance, bot: botData } }; + } + } catch (error) { + this.logger.error(error); + throw new Error('Error changing status'); + } + } + + public async fetchSessions(instance: InstanceDto, botId: string, remoteJid?: string) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + try { + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const bot = await this.botRepository.findFirst({ + where: { + id: botId, + }, + }); + + if (bot && bot.instanceId !== instanceId) { + throw new Error('N8n not found'); + } + + return await this.sessionRepository.findMany({ + where: { + instanceId: instanceId, + remoteJid, + botId: bot ? botId : { not: null }, + type: 'n8n', + }, + }); + } catch (error) { + this.logger.error(error); + throw new Error('Error fetching sessions'); + } + } + + public async ignoreJid(instance: InstanceDto, data: IgnoreJidDto) { + if (!this.integrationEnabled) throw new BadRequestException('N8n is disabled'); + + try { + const instanceId = await this.prismaRepository.instance + .findFirst({ + where: { + name: instance.instanceName, + }, + }) + .then((instance) => instance.id); + + const settings = await this.settingsRepository.findFirst({ + where: { + instanceId: instanceId, + }, + }); + + if (!settings) { + throw new Error('Settings not found'); + } + + let ignoreJids: any = settings?.ignoreJids || []; + + if (data.action === 'add') { + if (ignoreJids.includes(data.remoteJid)) return { ignoreJids: ignoreJids }; + + ignoreJids.push(data.remoteJid); + } else { + ignoreJids = ignoreJids.filter((jid) => jid !== data.remoteJid); + } + + const updateSettings = await this.settingsRepository.update({ + where: { + id: settings.id, + }, + data: { + ignoreJids: ignoreJids, + }, + }); + + return { + ignoreJids: updateSettings.ignoreJids, + }; + } catch (error) { + this.logger.error(error); + throw new Error('Error setting default settings'); + } + } + + // Emit + public async emit({ instance, remoteJid, msg }: EmitData) { + if (!this.integrationEnabled) return; + + try { + const settings = await this.settingsRepository.findFirst({ + where: { + instanceId: instance.instanceId, + }, + }); + + if (this.checkIgnoreJids(settings?.ignoreJids, remoteJid)) return; + + const session = await this.getSession(remoteJid, instance); + + const content = getConversationMessage(msg); + + let findBot = (await this.findBotTrigger(this.botRepository, content, instance, session)) as N8nModel; + + if (!findBot) { + const fallback = await this.settingsRepository.findFirst({ + where: { + instanceId: instance.instanceId, + }, + }); + + if (fallback?.n8nIdFallback) { + const findFallback = await this.botRepository.findFirst({ + where: { + id: fallback.n8nIdFallback, + }, + }); + + findBot = findFallback; + } else { + return; + } + } + + let expire = findBot?.expire; + let keywordFinish = findBot?.keywordFinish; + let delayMessage = findBot?.delayMessage; + let unknownMessage = findBot?.unknownMessage; + let listeningFromMe = findBot?.listeningFromMe; + let stopBotFromMe = findBot?.stopBotFromMe; + let keepOpen = findBot?.keepOpen; + let debounceTime = findBot?.debounceTime; + let ignoreJids = findBot?.ignoreJids; + let splitMessages = findBot?.splitMessages; + let timePerChar = findBot?.timePerChar; + + if (expire === undefined || expire === null) expire = settings.expire; + if (keywordFinish === undefined || keywordFinish === null) keywordFinish = settings.keywordFinish; + if (delayMessage === undefined || delayMessage === null) delayMessage = settings.delayMessage; + if (unknownMessage === undefined || unknownMessage === null) unknownMessage = settings.unknownMessage; + if (listeningFromMe === undefined || listeningFromMe === null) listeningFromMe = settings.listeningFromMe; + if (stopBotFromMe === undefined || stopBotFromMe === null) stopBotFromMe = settings.stopBotFromMe; + if (keepOpen === undefined || keepOpen === null) keepOpen = settings.keepOpen; + if (debounceTime === undefined || debounceTime === null) debounceTime = settings.debounceTime; + if (ignoreJids === undefined || ignoreJids === null) ignoreJids = settings.ignoreJids; + if (splitMessages === undefined || splitMessages === null) splitMessages = settings?.splitMessages ?? false; + if (timePerChar === undefined || timePerChar === null) timePerChar = settings?.timePerChar ?? 0; + + const key = msg.key as { + id: string; + remoteJid: string; + fromMe: boolean; + participant: string; + }; + + if (stopBotFromMe && key.fromMe && session) { + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, + }, + data: { + status: 'paused', + }, + }); + return; + } + + if (!listeningFromMe && key.fromMe) { + return; + } + + if (session && !session.awaitUser) { + return; + } + + if (debounceTime && debounceTime > 0) { + this.processDebounce(this.userMessageDebounce, content, remoteJid, debounceTime, async (debouncedContent) => { + await this.n8nService.processN8n( + this.waMonitor.waInstances[instance.instanceName], + remoteJid, + findBot, + session, + { + ...settings, + expire, + keywordFinish, + delayMessage, + unknownMessage, + listeningFromMe, + stopBotFromMe, + keepOpen, + debounceTime, + ignoreJids, + splitMessages, + timePerChar, + }, + debouncedContent, + msg?.pushName, + ); + }); + } else { + await this.n8nService.processN8n( + this.waMonitor.waInstances[instance.instanceName], + remoteJid, + findBot, + session, + { + ...settings, + expire, + keywordFinish, + delayMessage, + unknownMessage, + listeningFromMe, + stopBotFromMe, + keepOpen, + debounceTime, + ignoreJids, + splitMessages, + timePerChar, + }, + content, + msg?.pushName, + ); + } + + return; + } catch (error) { + this.logger.error(error); + return; + } + } +} diff --git a/src/api/integrations/chatbot/n8n/dto/n8n.dto.ts b/src/api/integrations/chatbot/n8n/dto/n8n.dto.ts new file mode 100644 index 00000000..bcb7fdc2 --- /dev/null +++ b/src/api/integrations/chatbot/n8n/dto/n8n.dto.ts @@ -0,0 +1,34 @@ +import { TriggerType, TriggerOperator } from '@prisma/client'; + +export class N8nDto { + enabled?: boolean; + description?: string; + webhookUrl?: string; + basicAuthUser?: string; + basicAuthPass?: string; + + // Advanced bot properties (copied from DifyDto style) + triggerType?: TriggerType; + triggerOperator?: TriggerOperator; + triggerValue?: string; + expire?: number; + keywordFinish?: string; + delayMessage?: number; + unknownMessage?: string; + listeningFromMe?: boolean; + stopBotFromMe?: boolean; + keepOpen?: boolean; + debounceTime?: number; + ignoreJids?: string[]; + splitMessages?: boolean; + timePerChar?: number; +} + +export class N8nSettingDto { + // Add settings fields here if needed for compatibility +} + +export class N8nMessageDto { + chatInput: string; + sessionId: string; +} diff --git a/src/api/integrations/chatbot/n8n/routes/n8n.router.ts b/src/api/integrations/chatbot/n8n/routes/n8n.router.ts new file mode 100644 index 00000000..901e9ee2 --- /dev/null +++ b/src/api/integrations/chatbot/n8n/routes/n8n.router.ts @@ -0,0 +1,113 @@ +import { RouterBroker } from '@api/abstract/abstract.router'; +import { IgnoreJidDto } from '@api/dto/chatbot.dto'; +import { InstanceDto } from '@api/dto/instance.dto'; +import { N8nDto, N8nSettingDto } from '../dto/n8n.dto'; +import { HttpStatus } from '@api/routes/index.router'; +import { n8nController } from '@api/server.module'; +import { + n8nIgnoreJidSchema, + n8nSchema, + n8nSettingSchema, + n8nStatusSchema, + instanceSchema, +} from '@validate/validate.schema'; +import { RequestHandler, Router } from 'express'; + +export class N8nRouter extends RouterBroker { + constructor(...guards: RequestHandler[]) { + super(); + this.router + .post(this.routerPath('create'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: n8nSchema, + ClassRef: N8nDto, + execute: (instance, data) => n8nController.createBot(instance, data), + }); + res.status(HttpStatus.CREATED).json(response); + }) + .get(this.routerPath('find'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: instanceSchema, + ClassRef: InstanceDto, + execute: (instance) => n8nController.findBot(instance), + }); + res.status(HttpStatus.OK).json(response); + }) + .get(this.routerPath('fetch/:n8nId'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: instanceSchema, + ClassRef: InstanceDto, + execute: (instance) => n8nController.fetchBot(instance, req.params.n8nId), + }); + res.status(HttpStatus.OK).json(response); + }) + .put(this.routerPath('update/:n8nId'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: n8nSchema, + ClassRef: N8nDto, + execute: (instance, data) => n8nController.updateBot(instance, req.params.n8nId, data), + }); + res.status(HttpStatus.OK).json(response); + }) + .delete(this.routerPath('delete/:n8nId'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: instanceSchema, + ClassRef: InstanceDto, + execute: (instance) => n8nController.deleteBot(instance, req.params.n8nId), + }); + res.status(HttpStatus.OK).json(response); + }) + .post(this.routerPath('settings'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: n8nSettingSchema, + ClassRef: N8nSettingDto, + execute: (instance, data) => n8nController.settings(instance, data), + }); + res.status(HttpStatus.OK).json(response); + }) + .get(this.routerPath('fetchSettings'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: instanceSchema, + ClassRef: InstanceDto, + execute: (instance) => n8nController.fetchSettings(instance), + }); + res.status(HttpStatus.OK).json(response); + }) + .post(this.routerPath('changeStatus'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: n8nStatusSchema, + ClassRef: InstanceDto, + execute: (instance, data) => n8nController.changeStatus(instance, data), + }); + res.status(HttpStatus.OK).json(response); + }) + .get(this.routerPath('fetchSessions/:n8nId'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: instanceSchema, + ClassRef: InstanceDto, + execute: (instance) => n8nController.fetchSessions(instance, req.params.n8nId), + }); + res.status(HttpStatus.OK).json(response); + }) + .post(this.routerPath('ignoreJid'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: n8nIgnoreJidSchema, + ClassRef: IgnoreJidDto, + execute: (instance, data) => n8nController.ignoreJid(instance, data), + }); + res.status(HttpStatus.OK).json(response); + }); + } + + public readonly router: Router = Router(); +} diff --git a/src/api/integrations/chatbot/n8n/services/n8n.service.ts b/src/api/integrations/chatbot/n8n/services/n8n.service.ts new file mode 100644 index 00000000..027c0c14 --- /dev/null +++ b/src/api/integrations/chatbot/n8n/services/n8n.service.ts @@ -0,0 +1,431 @@ +import { PrismaRepository } from '@api/repository/repository.service'; +import { Logger } from '@config/logger.config'; +import { IntegrationSession, N8n, N8nSetting } from '@prisma/client'; +import { sendTelemetry } from '@utils/sendTelemetry'; +import axios from 'axios'; +import { InstanceDto } from '@api/dto/instance.dto'; +import { N8nDto } from '../dto/n8n.dto'; +import { WAMonitoringService } from '@api/services/monitor.service'; + +export class N8nService { + private readonly logger = new Logger('N8nService'); + private readonly waMonitor: WAMonitoringService; + + constructor( + waMonitor: WAMonitoringService, + private readonly prismaRepository: PrismaRepository + ) { + this.waMonitor = waMonitor; + } + + /** + * Create a new N8n bot for the given instance. + */ + public async createBot(instanceId: string, data: N8nDto) { + try { + return await this.prismaRepository.n8n.create({ + data: { + enabled: data.enabled ?? true, + description: data.description, + webhookUrl: data.webhookUrl, + basicAuthUser: data.basicAuthUser, + basicAuthPass: data.basicAuthPass, + instanceId, + }, + }); + } catch (error) { + this.logger.error(error); + throw error; + } + } + + /** + * Find all N8n bots for the given instance. + */ + public async findBots(instanceId: string) { + try { + return await this.prismaRepository.n8n.findMany({ where: { instanceId } }); + } catch (error) { + this.logger.error(error); + throw error; + } + } + + /** + * Fetch a specific N8n bot by ID and instance. + */ + public async fetchBot(instanceId: string, n8nId: string) { + try { + const bot = await this.prismaRepository.n8n.findFirst({ where: { id: n8nId } }); + if (!bot || bot.instanceId !== instanceId) throw new Error('N8n bot not found'); + return bot; + } catch (error) { + this.logger.error(error); + throw error; + } + } + + /** + * Update a specific N8n bot. + */ + public async updateBot(instanceId: string, n8nId: string, data: N8nDto) { + try { + await this.fetchBot(instanceId, n8nId); + return await this.prismaRepository.n8n.update({ + where: { id: n8nId }, + data: { + enabled: data.enabled, + description: data.description, + webhookUrl: data.webhookUrl, + basicAuthUser: data.basicAuthUser, + basicAuthPass: data.basicAuthPass, + }, + }); + } catch (error) { + this.logger.error(error); + throw error; + } + } + + /** + * Delete a specific N8n bot. + */ + public async deleteBot(instanceId: string, n8nId: string) { + try { + await this.fetchBot(instanceId, n8nId); + return await this.prismaRepository.n8n.delete({ where: { id: n8nId } }); + } catch (error) { + this.logger.error(error); + throw error; + } + } + + /** + * Send a message to the N8n bot webhook. + */ + public async sendMessage(n8nId: string, chatInput: string, sessionId: string): Promise { + try { + const bot = await this.prismaRepository.n8n.findFirst({ where: { id: n8nId, enabled: true } }); + if (!bot) throw new Error('N8n bot not found or not enabled'); + const headers: Record = {}; + if (bot.basicAuthUser && bot.basicAuthPass) { + const auth = Buffer.from(`${bot.basicAuthUser}:${bot.basicAuthPass}`).toString('base64'); + headers['Authorization'] = `Basic ${auth}`; + } + const response = await axios.post(bot.webhookUrl, { chatInput, sessionId }, { headers }); + return response.data.output; + } catch (error) { + this.logger.error(error); + throw new Error('Error sending message to n8n bot'); + } + } + + public async createNewSession(instance: InstanceDto, data: any) { + try { + const session = await this.prismaRepository.integrationSession.create({ + data: { + remoteJid: data.remoteJid, + pushName: data.pushName, + sessionId: data.remoteJid, + status: 'opened', + awaitUser: false, + botId: data.botId, + instanceId: instance.instanceId, + type: 'n8n', + }, + }); + return { session }; + } catch (error) { + this.logger.error(error); + return; + } + } + + private isImageMessage(content: string) { + return content.includes('imageMessage'); + } + + private isJSON(str: string): boolean { + try { + JSON.parse(str); + return true; + } catch (e) { + return false; + } + } + + private async sendMessageToBot( + instance: any, + session: IntegrationSession, + settings: N8nSetting, + n8n: N8n, + remoteJid: string, + pushName: string, + content: string, + ) { + try { + let endpoint: string = n8n.webhookUrl; + const payload: any = { + chatInput: content, + sessionId: session.sessionId, + }; + const headers: Record = {}; + if (n8n.basicAuthUser && n8n.basicAuthPass) { + const auth = Buffer.from(`${n8n.basicAuthUser}:${n8n.basicAuthPass}`).toString('base64'); + headers['Authorization'] = `Basic ${auth}`; + } + const response = await axios.post(endpoint, payload, { headers }); + const message = response?.data?.output || response?.data?.answer; + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + }, + }); + } catch (error) { + this.logger.error(error.response?.data || error); + return; + } + } + + private async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: N8nSetting) { + const linkRegex = /(!?)\[(.*?)\]\((.*?)\)/g; + let textBuffer = ''; + let lastIndex = 0; + let match: RegExpExecArray | null; + const getMediaType = (url: string): string | null => { + const extension = url.split('.').pop()?.toLowerCase(); + const imageExtensions = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp']; + const audioExtensions = ['mp3', 'wav', 'aac', 'ogg']; + const videoExtensions = ['mp4', 'avi', 'mkv', 'mov']; + const documentExtensions = ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt']; + if (imageExtensions.includes(extension || '')) return 'image'; + if (audioExtensions.includes(extension || '')) return 'audio'; + if (videoExtensions.includes(extension || '')) return 'video'; + if (documentExtensions.includes(extension || '')) return 'document'; + return null; + }; + while ((match = linkRegex.exec(message)) !== null) { + const [fullMatch, exclMark, altText, url] = match; + const mediaType = getMediaType(url); + const beforeText = message.slice(lastIndex, match.index); + if (beforeText) { + textBuffer += beforeText; + } + if (mediaType) { + const splitMessages = settings.splitMessages ?? false; + const timePerChar = settings.timePerChar ?? 0; + const minDelay = 1000; + const maxDelay = 20000; + if (textBuffer.trim()) { + if (splitMessages) { + const multipleMessages = textBuffer.trim().split('\n\n'); + for (let index = 0; index < multipleMessages.length; index++) { + const message = multipleMessages[index]; + const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay); + if (instance.integration === 'WHATSAPP_BAILEYS') { + await instance.client.presenceSubscribe(remoteJid); + await instance.client.sendPresenceUpdate('composing', remoteJid); + } + await new Promise((resolve) => { + setTimeout(async () => { + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: message, + }, + false, + ); + resolve(); + }, delay); + }); + if (instance.integration === 'WHATSAPP_BAILEYS') { + await instance.client.sendPresenceUpdate('paused', remoteJid); + } + } + } else { + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: textBuffer.trim(), + }, + false, + ); + } + textBuffer = ''; + } + if (mediaType === 'audio') { + await instance.audioWhatsapp({ + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + audio: url, + caption: altText, + }); + } else { + await instance.mediaMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + mediatype: mediaType, + media: url, + caption: altText, + }, + null, + false, + ); + } + } else { + textBuffer += `[${altText}](${url})`; + } + lastIndex = linkRegex.lastIndex; + } + if (lastIndex < message.length) { + const remainingText = message.slice(lastIndex); + if (remainingText.trim()) { + textBuffer += remainingText; + } + } + const splitMessages = settings.splitMessages ?? false; + const timePerChar = settings.timePerChar ?? 0; + const minDelay = 1000; + const maxDelay = 20000; + if (textBuffer.trim()) { + if (splitMessages) { + const multipleMessages = textBuffer.trim().split('\n\n'); + for (let index = 0; index < multipleMessages.length; index++) { + const message = multipleMessages[index]; + const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay); + if (instance.integration === 'WHATSAPP_BAILEYS') { + await instance.client.presenceSubscribe(remoteJid); + await instance.client.sendPresenceUpdate('composing', remoteJid); + } + await new Promise((resolve) => { + setTimeout(async () => { + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: message, + }, + false, + ); + resolve(); + }, delay); + }); + if (instance.integration === 'WHATSAPP_BAILEYS') { + await instance.client.sendPresenceUpdate('paused', remoteJid); + } + } + } else { + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: textBuffer.trim(), + }, + false, + ); + } + } + sendTelemetry('/message/sendText'); + } + + private async initNewSession( + instance: any, + remoteJid: string, + n8n: N8n, + settings: N8nSetting, + session: IntegrationSession, + content: string, + pushName?: string, + ) { + const data = await this.createNewSession(instance, { + remoteJid, + pushName, + botId: n8n.id, + }); + if (data.session) { + session = data.session; + } + await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName, content); + return; + } + + public async processN8n( + instance: any, + remoteJid: string, + n8n: N8n, + session: IntegrationSession, + settings: N8nSetting, + content: string, + pushName?: string, + ) { + if (session && session.status !== 'opened') { + return; + } + if (session && settings.expire && settings.expire > 0) { + const now = Date.now(); + const sessionUpdatedAt = new Date(session.updatedAt).getTime(); + const diff = now - sessionUpdatedAt; + const diffInMinutes = Math.floor(diff / 1000 / 60); + if (diffInMinutes > settings.expire) { + if (settings.keepOpen) { + await this.prismaRepository.integrationSession.update({ + where: { id: session.id }, + data: { status: 'closed' }, + }); + } else { + await this.prismaRepository.integrationSession.deleteMany({ + where: { botId: n8n.id, remoteJid: remoteJid }, + }); + } + await this.initNewSession(instance, remoteJid, n8n, settings, session, content, pushName); + return; + } + } + if (!session) { + await this.initNewSession(instance, remoteJid, n8n, settings, session, content, pushName); + return; + } + await this.prismaRepository.integrationSession.update({ + where: { id: session.id }, + data: { status: 'opened', awaitUser: false }, + }); + if (!content) { + if (settings.unknownMessage) { + this.waMonitor.waInstances[instance.instanceName].textMessage( + { + number: remoteJid.split('@')[0], + delay: settings.delayMessage || 1000, + text: settings.unknownMessage, + }, + false, + ); + + sendTelemetry('/message/sendText'); + } + return; + } + if (settings.keywordFinish && content.toLowerCase() === settings.keywordFinish.toLowerCase()) { + if (settings.keepOpen) { + await this.prismaRepository.integrationSession.update({ + where: { id: session.id }, + data: { status: 'closed' }, + }); + } else { + await this.prismaRepository.integrationSession.deleteMany({ + where: { botId: n8n.id, remoteJid: remoteJid }, + }); + } + return; + } + await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName, content); + return; + } +} diff --git a/src/api/integrations/chatbot/n8n/validate/n8n.schema.ts b/src/api/integrations/chatbot/n8n/validate/n8n.schema.ts new file mode 100644 index 00000000..ccb55ae1 --- /dev/null +++ b/src/api/integrations/chatbot/n8n/validate/n8n.schema.ts @@ -0,0 +1,59 @@ +import { JSONSchema7 } from 'json-schema'; + +export const n8nSchema: JSONSchema7 = { + type: 'object', + properties: { + enabled: { type: 'boolean' }, + description: { type: 'string' }, + webhookUrl: { type: 'string', minLength: 1 }, + basicAuthUser: { type: 'string' }, + basicAuthPass: { type: 'string' }, + }, + required: ['enabled', 'webhookUrl'], +}; + +export const n8nMessageSchema: JSONSchema7 = { + type: 'object', + properties: { + chatInput: { type: 'string', minLength: 1 }, + sessionId: { type: 'string', minLength: 1 }, + }, + required: ['chatInput', 'sessionId'], +}; + +export const n8nSettingSchema: JSONSchema7 = { + type: 'object', + properties: { + expire: { type: 'number' }, + keywordFinish: { type: 'string' }, + delayMessage: { type: 'number' }, + unknownMessage: { type: 'string' }, + listeningFromMe: { type: 'boolean' }, + stopBotFromMe: { type: 'boolean' }, + keepOpen: { type: 'boolean' }, + debounceTime: { type: 'number' }, + n8nIdFallback: { type: 'string' }, + ignoreJids: { type: 'array', items: { type: 'string' } }, + splitMessages: { type: 'boolean' }, + timePerChar: { type: 'number' }, + }, + required: [], +}; + +export const n8nStatusSchema: JSONSchema7 = { + type: 'object', + properties: { + remoteJid: { type: 'string' }, + status: { type: 'string', enum: ['opened', 'closed', 'delete', 'paused'] }, + }, + required: ['remoteJid', 'status'], +}; + +export const n8nIgnoreJidSchema: JSONSchema7 = { + type: 'object', + properties: { + remoteJid: { type: 'string' }, + action: { type: 'string', enum: ['add', 'remove'] }, + }, + required: ['remoteJid', 'action'], +}; diff --git a/src/api/server.module.ts b/src/api/server.module.ts index 9d8df8a6..ac4d7e91 100644 --- a/src/api/server.module.ts +++ b/src/api/server.module.ts @@ -26,6 +26,8 @@ import { EvolutionBotController } from './integrations/chatbot/evolutionBot/cont import { EvolutionBotService } from './integrations/chatbot/evolutionBot/services/evolutionBot.service'; import { FlowiseController } from './integrations/chatbot/flowise/controllers/flowise.controller'; import { FlowiseService } from './integrations/chatbot/flowise/services/flowise.service'; +import { N8nController } from './integrations/chatbot/n8n/controllers/n8n.controller'; +import { N8nService } from './integrations/chatbot/n8n/services/n8n.service'; import { OpenaiController } from './integrations/chatbot/openai/controllers/openai.controller'; import { OpenaiService } from './integrations/chatbot/openai/services/openai.service'; import { TypebotController } from './integrations/chatbot/typebot/controllers/typebot.controller'; @@ -127,4 +129,7 @@ export const evolutionBotController = new EvolutionBotController(evolutionBotSer const flowiseService = new FlowiseService(waMonitor, configService, prismaRepository); export const flowiseController = new FlowiseController(flowiseService, prismaRepository, waMonitor); +const n8nService = new N8nService(waMonitor, prismaRepository); +export const n8nController = new N8nController(n8nService, prismaRepository, waMonitor); + logger.info('Module - ON'); diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 8e683ea7..c5cac824 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -253,6 +253,7 @@ export type Chatwoot = { }; export type Openai = { ENABLED: boolean; API_KEY_GLOBAL?: string }; export type Dify = { ENABLED: boolean }; +export type N8n = { ENABLED: boolean }; export type S3 = { ACCESS_KEY: string; @@ -292,6 +293,7 @@ export interface Env { CHATWOOT: Chatwoot; OPENAI: Openai; DIFY: Dify; + N8N: N8n; CACHE: CacheConf; S3?: S3; AUTHENTICATION: Auth; @@ -587,6 +589,9 @@ export class ConfigService { DIFY: { ENABLED: process.env?.DIFY_ENABLED === 'true', }, + N8N: { + ENABLED: process.env?.N8N_ENABLED === 'true', + }, CACHE: { REDIS: { ENABLED: process.env?.CACHE_REDIS_ENABLED === 'true',