diff --git a/src/api/integrations/chatbot/dify/controllers/dify.controller.ts b/src/api/integrations/chatbot/dify/controllers/dify.controller.ts index 5d2c9b5f..ebbf2b0d 100644 --- a/src/api/integrations/chatbot/dify/controllers/dify.controller.ts +++ b/src/api/integrations/chatbot/dify/controllers/dify.controller.ts @@ -80,7 +80,7 @@ export class DifyController extends BaseChatbotController { } } - // Bots + // Override createBot to add Dify-specific validation public async createBot(instance: InstanceDto, data: DifyDto) { if (!this.integrationEnabled) throw new BadRequestException('Dify is disabled'); @@ -92,7 +92,7 @@ export class DifyController extends BaseChatbotController { }) .then((instance) => instance.id); - // Check for Dify-specific duplicate + // Dify-specific duplicate check const checkDuplicate = await this.botRepository.findFirst({ where: { instanceId: instanceId, @@ -106,62 +106,10 @@ export class DifyController extends BaseChatbotController { throw new Error('Dify already exists'); } - // Let the base class handle the rest of the bot creation process + // Let the base class handle the rest return super.createBot(instance, data); } - public async findBot(instance: InstanceDto) { - if (!this.integrationEnabled) throw new BadRequestException('Dify is disabled'); - - const instanceId = await this.prismaRepository.instance - .findFirst({ - where: { - name: instance.instanceName, - }, - }) - .then((instance) => instance.id); - - const bots = await this.botRepository.findMany({ - where: { - instanceId: instanceId, - }, - }); - - if (!bots.length) { - return null; - } - - return bots; - } - - public async fetchBot(instance: InstanceDto, botId: string) { - if (!this.integrationEnabled) throw new BadRequestException('Dify is disabled'); - - const instanceId = await this.prismaRepository.instance - .findFirst({ - where: { - name: instance.instanceName, - }, - }) - .then((instance) => instance.id); - - const bot = await this.botRepository.findFirst({ - where: { - id: botId, - }, - }); - - if (!bot) { - throw new Error('Dify not found'); - } - - if (bot.instanceId !== instanceId) { - throw new Error('Dify not found'); - } - - return bot; - } - // Process Dify-specific bot logic protected async processBot( instance: any, @@ -173,6 +121,6 @@ export class DifyController extends BaseChatbotController { pushName?: string, msg?: any, ) { - this.difyService.process(instance, remoteJid, bot, session, settings, content, pushName, msg); + await this.difyService.process(instance, remoteJid, bot, session, settings, content, pushName, msg); } } diff --git a/src/api/integrations/chatbot/dify/dto/dify.dto.ts b/src/api/integrations/chatbot/dify/dto/dify.dto.ts index 07e7b265..9e436705 100644 --- a/src/api/integrations/chatbot/dify/dto/dify.dto.ts +++ b/src/api/integrations/chatbot/dify/dto/dify.dto.ts @@ -3,12 +3,11 @@ import { $Enums } from '@prisma/client'; import { BaseChatbotDto, BaseChatbotSettingDto } from '../../base-chatbot.dto'; export class DifyDto extends BaseChatbotDto { - // Dify specific fields botType?: $Enums.DifyBotType; apiUrl?: string; apiKey?: string; } export class DifySettingDto extends BaseChatbotSettingDto { - // Dify specific fields + difyIdFallback?: string; } diff --git a/src/api/integrations/chatbot/dify/services/dify.service.ts b/src/api/integrations/chatbot/dify/services/dify.service.ts index 4a9ef583..773efe49 100644 --- a/src/api/integrations/chatbot/dify/services/dify.service.ts +++ b/src/api/integrations/chatbot/dify/services/dify.service.ts @@ -1,10 +1,8 @@ -import { InstanceDto } from '@api/dto/instance.dto'; import { PrismaRepository } from '@api/repository/repository.service'; import { WAMonitoringService } from '@api/services/monitor.service'; import { Integration } from '@api/types/wa.types'; import { ConfigService, HttpServer } from '@config/env.config'; import { Dify, DifySetting, IntegrationSession } from '@prisma/client'; -import { sendTelemetry } from '@utils/sendTelemetry'; import axios from 'axios'; import { BaseChatbotService } from '../../base-chatbot.service'; @@ -15,8 +13,8 @@ export class DifyService extends BaseChatbotService { constructor( waMonitor: WAMonitoringService, - configService: ConfigService, prismaRepository: PrismaRepository, + configService: ConfigService, openaiService: OpenaiService, ) { super(waMonitor, prismaRepository, 'DifyService', configService); @@ -30,10 +28,6 @@ export class DifyService extends BaseChatbotService { return 'dify'; } - public async createNewSession(instance: InstanceDto, data: any) { - return super.createNewSession(instance, data, 'dify'); - } - protected async sendMessageToBot( instance: any, session: IntegrationSession, @@ -43,10 +37,29 @@ export class DifyService extends BaseChatbotService { pushName: string, content: string, msg?: any, - ) { + ): Promise { try { let endpoint: string = dify.apiUrl; + if (!endpoint) { + this.logger.error('No Dify endpoint defined'); + return; + } + + // Handle audio messages - transcribe using OpenAI Whisper + let processedContent = content; + if (this.isAudioMessage(content) && msg) { + try { + this.logger.debug(`[Dify] Downloading audio for Whisper transcription`); + const transcription = await this.openaiService.speechToText(msg, instance); + if (transcription) { + processedContent = `[audio] ${transcription}`; + } + } catch (err) { + this.logger.error(`[Dify] Failed to transcribe audio: ${err}`); + } + } + if (dify.botType === 'chatBot') { endpoint += '/chat-messages'; const payload: any = { @@ -57,7 +70,7 @@ export class DifyService extends BaseChatbotService { serverUrl: this.configService.get('SERVER').URL, apiKey: instance.token, }, - query: content, + query: processedContent, response_mode: 'blocking', conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, user: remoteJid, @@ -66,7 +79,6 @@ export class DifyService extends BaseChatbotService { // Handle image messages if (this.isImageMessage(content)) { const contentSplit = content.split('|'); - payload.files = [ { type: 'image', @@ -77,22 +89,6 @@ export class DifyService extends BaseChatbotService { payload.query = contentSplit[2] || content; } - // Handle audio messages - if (this.isAudioMessage(content) && msg) { - try { - this.logger.debug(`[Dify] Downloading audio for Whisper transcription`); - const transcription = await this.openaiService.speechToText(msg); - if (transcription) { - payload.query = transcription; - } else { - payload.query = '[Audio message could not be transcribed]'; - } - } catch (err) { - this.logger.error(`[Dify] Failed to transcribe audio: ${err}`); - payload.query = '[Audio message could not be transcribed]'; - } - } - if (instance.integration === Integration.WHATSAPP_BAILEYS) { await instance.client.presenceSubscribe(remoteJid); await instance.client.sendPresenceUpdate('composing', remoteJid); @@ -110,7 +106,9 @@ export class DifyService extends BaseChatbotService { const message = response?.data?.answer; const conversationId = response?.data?.conversation_id; - await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + if (message) { + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + } await this.prismaRepository.integrationSession.update({ where: { @@ -128,7 +126,7 @@ export class DifyService extends BaseChatbotService { endpoint += '/completion-messages'; const payload: any = { inputs: { - query: content, + query: processedContent, pushName: pushName, remoteJid: remoteJid, instanceName: instance.instanceName, @@ -143,7 +141,6 @@ export class DifyService extends BaseChatbotService { // Handle image messages if (this.isImageMessage(content)) { const contentSplit = content.split('|'); - payload.files = [ { type: 'image', @@ -154,22 +151,6 @@ export class DifyService extends BaseChatbotService { payload.inputs.query = contentSplit[2] || content; } - // Handle audio messages - if (this.isAudioMessage(content) && msg) { - try { - this.logger.debug(`[Dify] Downloading audio for Whisper transcription`); - const transcription = await this.openaiService.speechToText(msg); - if (transcription) { - payload.inputs.query = transcription; - } else { - payload.inputs.query = '[Audio message could not be transcribed]'; - } - } catch (err) { - this.logger.error(`[Dify] Failed to transcribe audio: ${err}`); - payload.inputs.query = '[Audio message could not be transcribed]'; - } - } - if (instance.integration === Integration.WHATSAPP_BAILEYS) { await instance.client.presenceSubscribe(remoteJid); await instance.client.sendPresenceUpdate('composing', remoteJid); @@ -187,7 +168,9 @@ export class DifyService extends BaseChatbotService { const message = response?.data?.answer; const conversationId = response?.data?.conversation_id; - await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + if (message) { + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + } await this.prismaRepository.integrationSession.update({ where: { @@ -211,7 +194,7 @@ export class DifyService extends BaseChatbotService { serverUrl: this.configService.get('SERVER').URL, apiKey: instance.token, }, - query: content, + query: processedContent, response_mode: 'streaming', conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, user: remoteJid, @@ -220,7 +203,6 @@ export class DifyService extends BaseChatbotService { // Handle image messages if (this.isImageMessage(content)) { const contentSplit = content.split('|'); - payload.files = [ { type: 'image', @@ -231,22 +213,6 @@ export class DifyService extends BaseChatbotService { payload.query = contentSplit[2] || content; } - // Handle audio messages - if (this.isAudioMessage(content) && msg) { - try { - this.logger.debug(`[Dify] Downloading audio for Whisper transcription`); - const transcription = await this.openaiService.speechToText(msg); - if (transcription) { - payload.query = transcription; - } else { - payload.query = '[Audio message could not be transcribed]'; - } - } catch (err) { - this.logger.error(`[Dify] Failed to transcribe audio: ${err}`); - payload.query = '[Audio message could not be transcribed]'; - } - } - if (instance.integration === Integration.WHATSAPP_BAILEYS) { await instance.client.presenceSubscribe(remoteJid); await instance.client.sendPresenceUpdate('composing', remoteJid); @@ -262,7 +228,6 @@ export class DifyService extends BaseChatbotService { let answer = ''; const data = response.data.replaceAll('data: ', ''); - const events = data.split('\n').filter((line) => line.trim() !== ''); for (const eventString of events) { @@ -280,7 +245,9 @@ export class DifyService extends BaseChatbotService { if (instance.integration === Integration.WHATSAPP_BAILEYS) await instance.client.sendPresenceUpdate('paused', remoteJid); - await this.sendMessageWhatsApp(instance, remoteJid, answer, settings); + if (answer) { + await this.sendMessageWhatsApp(instance, remoteJid, answer, settings); + } await this.prismaRepository.integrationSession.update({ where: { @@ -298,288 +265,4 @@ export class DifyService extends BaseChatbotService { return; } } - - protected async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: DifySetting) { - const linkRegex = /(!?)\[(.*?)\]\((.*?)\)/g; - let textBuffer = ''; - let lastIndex = 0; - let match: RegExpExecArray | null; - - while ((match = linkRegex.exec(message)) !== null) { - const [fullMatch, exclamation, altText, url] = match; - const mediaType = this.getMediaType(url); - const beforeText = message.slice(lastIndex, match.index); - - if (beforeText) { - textBuffer += beforeText; - } - - if (mediaType) { - const splitMessages = settings.splitMessages ?? false; - const timePerChar = settings.timePerChar ?? 0; - const minDelay = 1000; - const maxDelay = 20000; - - if (textBuffer.trim()) { - if (splitMessages) { - const multipleMessages = textBuffer.trim().split('\n\n'); - for (let index = 0; index < multipleMessages.length; index++) { - const message = multipleMessages[index]; - const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } else { - const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: textBuffer, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } - - textBuffer = ''; - - if (mediaType === 'image') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - caption: exclamation === '!' ? undefined : altText, - mediatype: 'image', - media: url, - }); - } else if (mediaType === 'video') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - caption: exclamation === '!' ? undefined : altText, - mediatype: 'video', - media: url, - }); - } else if (mediaType === 'audio') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'audio', - media: url, - }); - } else if (mediaType === 'document') { - await instance.mediaMessage({ - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - caption: exclamation === '!' ? undefined : altText, - mediatype: 'document', - media: url, - fileName: altText || 'file', - }); - } - } else { - textBuffer += `[${altText}](${url})`; - } - - lastIndex = match.index + fullMatch.length; - } - - const remainingText = message.slice(lastIndex); - if (remainingText) { - textBuffer += remainingText; - } - - if (textBuffer.trim()) { - const splitMessages = settings.splitMessages ?? false; - const timePerChar = settings.timePerChar ?? 0; - const minDelay = 1000; - const maxDelay = 20000; - - if (splitMessages) { - const multipleMessages = textBuffer.trim().split('\n\n'); - for (let index = 0; index < multipleMessages.length; index++) { - const message = multipleMessages[index]; - const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } else { - const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.presenceSubscribe(remoteJid); - await instance.client.sendPresenceUpdate('composing', remoteJid); - } - - await new Promise((resolve) => { - setTimeout(async () => { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: textBuffer, - }, - false, - ); - resolve(); - }, delay); - }); - - if (instance.integration === Integration.WHATSAPP_BAILEYS) { - await instance.client.sendPresenceUpdate('paused', remoteJid); - } - } - } - } - - protected async initNewSession( - instance: any, - remoteJid: string, - dify: Dify, - settings: DifySetting, - session: IntegrationSession, - content: string, - pushName?: string, - msg?: any, - ) { - try { - await this.sendMessageToBot(instance, session, settings, dify, remoteJid, pushName || '', content, msg); - } catch (error) { - this.logger.error(error); - return; - } - } - - public async process( - instance: any, - remoteJid: string, - dify: Dify, - session: IntegrationSession, - settings: DifySetting, - content: string, - pushName?: string, - msg?: any, - ) { - try { - // Handle keyword finish - if (settings?.keywordFinish?.includes(content.toLowerCase())) { - if (settings?.keepOpen) { - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'closed', - }, - }); - } else { - await this.prismaRepository.integrationSession.delete({ - where: { - id: session.id, - }, - }); - } - - await sendTelemetry('/dify/session/finish'); - return; - } - - // If session is new or doesn't exist - if (!session) { - const data = { - remoteJid, - pushName, - botId: dify.id, - }; - - const createSession = await this.createNewSession( - { instanceName: instance.instanceName, instanceId: instance.instanceId }, - data, - ); - - await this.initNewSession(instance, remoteJid, dify, settings, createSession.session, content, pushName, msg); - - await sendTelemetry('/dify/session/start'); - return; - } - - // If session exists but is paused - if (session.status === 'paused') { - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - }, - }); - - return; - } - - // Regular message for ongoing session - await this.sendMessageToBot(instance, session, settings, dify, remoteJid, pushName || '', content, msg); - } catch (error) { - this.logger.error(error); - return; - } - } }