From 1bf76ecb94d90095d3ab7a6daf1f7189e01de6d5 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Wed, 21 Aug 2024 14:48:59 -0300 Subject: [PATCH] refactor: dify services --- .../chatbot/dify/services/dify.service.ts | 781 +++--------------- 1 file changed, 104 insertions(+), 677 deletions(-) diff --git a/src/api/integrations/chatbot/dify/services/dify.service.ts b/src/api/integrations/chatbot/dify/services/dify.service.ts index f59fd673..c467af7f 100644 --- a/src/api/integrations/chatbot/dify/services/dify.service.ts +++ b/src/api/integrations/chatbot/dify/services/dify.service.ts @@ -41,24 +41,15 @@ export class DifyService { return content.includes('imageMessage'); } - private async initNewSession( + private async sendMessageToBot( instance: any, - remoteJid: string, - dify: Dify, - settings: DifySetting, session: IntegrationSession, + settings: DifySetting, + dify: Dify, + remoteJid: string, + pushName: string, content: string, - pushName?: string, ) { - const data = await this.createNewSession(instance, { - remoteJid, - botId: dify.id, - }); - - if (data.session) { - session = data.session; - } - let endpoint: string = dify.apiUrl; if (dify.botType === 'chatBot') { @@ -104,66 +95,7 @@ export class DifyService { const message = response?.data?.answer; - const regex = /!?\[(.*?)\]\((.*?)\)/g; - - const result = []; - let lastIndex = 0; - - let match; - while ((match = regex.exec(message)) !== null) { - if (match.index > lastIndex) { - result.push({ text: message.slice(lastIndex, match.index).trim() }); - } - - result.push({ caption: match[1], url: match[2] }); - - lastIndex = regex.lastIndex; - } - - if (lastIndex < message.length) { - result.push({ text: message.slice(lastIndex).trim() }); - } - - for (const item of result) { - if (item.text) { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: item.text, - }, - false, - ); - } - - if (item.url) { - await instance.mediaMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'image', - media: item.url, - caption: item.caption, - }, - false, - ); - } - } - - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, - }); - - sendTelemetry('/message/sendText'); - - return; + await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); } if (dify.botType === 'textGenerator') { @@ -209,66 +141,7 @@ export class DifyService { const message = response?.data?.answer; - const regex = /!?\[(.*?)\]\((.*?)\)/g; - - const result = []; - let lastIndex = 0; - - let match; - while ((match = regex.exec(message)) !== null) { - if (match.index > lastIndex) { - result.push({ text: message.slice(lastIndex, match.index).trim() }); - } - - result.push({ caption: match[1], url: match[2] }); - - lastIndex = regex.lastIndex; - } - - if (lastIndex < message.length) { - result.push({ text: message.slice(lastIndex).trim() }); - } - - for (const item of result) { - if (item.text) { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: item.text, - }, - false, - ); - } - - if (item.url) { - await instance.mediaMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'image', - media: item.url, - caption: item.caption, - }, - false, - ); - } - } - - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, - }); - - sendTelemetry('/message/sendText'); - - return; + await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); } if (dify.botType === 'agent') { @@ -334,64 +207,7 @@ export class DifyService { const message = response?.data?.answer; - const regex = /!?\[(.*?)\]\((.*?)\)/g; - - const result = []; - let lastIndex = 0; - - let match; - while ((match = regex.exec(message)) !== null) { - if (match.index > lastIndex) { - result.push({ text: message.slice(lastIndex, match.index).trim() }); - } - - result.push({ caption: match[1], url: match[2] }); - - lastIndex = regex.lastIndex; - } - - if (lastIndex < message.length) { - result.push({ text: message.slice(lastIndex).trim() }); - } - - for (const item of result) { - if (item.text) { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: item.text, - }, - false, - ); - } - - if (item.url) { - await instance.mediaMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'image', - media: item.url, - caption: item.caption, - }, - false, - ); - } - } - - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: conversationId, - }, - }); - - sendTelemetry('/message/sendText'); + await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); }); reader.on('error', (error) => { @@ -443,73 +259,104 @@ export class DifyService { const message = response?.data?.data.outputs.text; - const regex = /!?\[(.*?)\]\((.*?)\)/g; - - const result = []; - let lastIndex = 0; - - let match; - while ((match = regex.exec(message)) !== null) { - if (match.index > lastIndex) { - result.push({ text: message.slice(lastIndex, match.index).trim() }); - } - - result.push({ caption: match[1], url: match[2] }); - - lastIndex = regex.lastIndex; - } - - if (lastIndex < message.length) { - result.push({ text: message.slice(lastIndex).trim() }); - } - - for (const item of result) { - if (item.text) { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: item.text, - }, - false, - ); - } - - if (item.url) { - await instance.mediaMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'image', - media: item.url, - caption: item.caption, - }, - false, - ); - } - } - - if (settings.keepOpen) { - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'closed', - }, - }); - } else { - await this.prismaRepository.integrationSession.delete({ - where: { - id: session.id, - }, - }); - } - - sendTelemetry('/message/sendText'); + await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); return; } + } + + private async sendMessageWhatsApp( + instance: any, + remoteJid: string, + message: string, + session: IntegrationSession, + settings: DifySetting, + ) { + const regex = /!?\[(.*?)\]\((.*?)\)/g; + + const result = []; + let lastIndex = 0; + + let match; + while ((match = regex.exec(message)) !== null) { + if (match.index > lastIndex) { + result.push({ text: message.slice(lastIndex, match.index).trim() }); + } + + result.push({ caption: match[1], url: match[2] }); + + lastIndex = regex.lastIndex; + } + + if (lastIndex < message.length) { + result.push({ text: message.slice(lastIndex).trim() }); + } + + for (const item of result) { + if (item.text) { + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: item.text, + }, + false, + ); + } + + if (item.url) { + await instance.mediaMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + mediatype: 'image', + media: item.url, + caption: item.caption, + }, + false, + ); + } + } + + if (settings.keepOpen) { + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, + }, + data: { + status: 'closed', + }, + }); + } else { + await this.prismaRepository.integrationSession.delete({ + where: { + id: session.id, + }, + }); + } + + sendTelemetry('/message/sendText'); + } + + private async initNewSession( + instance: any, + remoteJid: string, + dify: Dify, + settings: DifySetting, + session: IntegrationSession, + content: string, + pushName?: string, + ) { + const data = await this.createNewSession(instance, { + remoteJid, + botId: dify.id, + }); + + if (data.session) { + session = data.session; + } + + await this.sendMessageToBot(instance, session, settings, dify, remoteJid, pushName, content); return; } @@ -612,427 +459,7 @@ export class DifyService { return; } - let endpoint: string = dify.apiUrl; - - if (dify.botType === 'chatBot') { - endpoint += '/chat-messages'; - const payload: any = { - inputs: { - remoteJid: remoteJid, - pushName: pushName, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - query: content, - response_mode: 'blocking', - conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, - user: remoteJid, - }; - - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); - - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], - }, - ]; - payload.query = contentSplit[2] || content; - } - - await instance.client.presenceSubscribe(remoteJid); - - await instance.client.sendPresenceUpdate('composing', remoteJid); - - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - }); - - await instance.client.sendPresenceUpdate('paused', remoteJid); - - const message = response?.data?.answer; - - const regex = /!?\[(.*?)\]\((.*?)\)/g; - - const result = []; - let lastIndex = 0; - - let match; - while ((match = regex.exec(message)) !== null) { - if (match.index > lastIndex) { - result.push({ text: message.slice(lastIndex, match.index).trim() }); - } - - result.push({ caption: match[1], url: match[2] }); - - lastIndex = regex.lastIndex; - } - - if (lastIndex < message.length) { - result.push({ text: message.slice(lastIndex).trim() }); - } - - for (const item of result) { - if (item.text) { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: item.text, - }, - false, - ); - } - - if (item.url) { - await instance.mediaMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'image', - media: item.url, - caption: item.caption, - }, - false, - ); - } - } - - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, - }); - - sendTelemetry('/message/sendText'); - - return; - } - - if (dify.botType === 'textGenerator') { - endpoint += '/completion-messages'; - const payload: any = { - inputs: { - query: content, - remoteJid: remoteJid, - pushName: pushName, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - response_mode: 'blocking', - conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, - user: remoteJid, - }; - - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); - - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], - }, - ]; - payload.inputs.query = contentSplit[2] || content; - } - - await instance.client.presenceSubscribe(remoteJid); - - await instance.client.sendPresenceUpdate('composing', remoteJid); - - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - }); - - await instance.client.sendPresenceUpdate('paused', remoteJid); - - const message = response?.data?.answer; - - const regex = /!?\[(.*?)\]\((.*?)\)/g; - - const result = []; - let lastIndex = 0; - - let match; - while ((match = regex.exec(message)) !== null) { - if (match.index > lastIndex) { - result.push({ text: message.slice(lastIndex, match.index).trim() }); - } - - result.push({ caption: match[1], url: match[2] }); - - lastIndex = regex.lastIndex; - } - - if (lastIndex < message.length) { - result.push({ text: message.slice(lastIndex).trim() }); - } - - for (const item of result) { - if (item.text) { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: item.text, - }, - false, - ); - } - - if (item.url) { - await instance.mediaMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'image', - media: item.url, - caption: item.caption, - }, - false, - ); - } - } - - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, - }); - - sendTelemetry('/message/sendText'); - - return; - } - - if (dify.botType === 'agent') { - endpoint += '/chat-messages'; - const payload: any = { - inputs: { - remoteJid: remoteJid, - pushName: pushName, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - query: content, - response_mode: 'streaming', - conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, - user: remoteJid, - }; - - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); - - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], - }, - ]; - payload.query = contentSplit[2] || content; - } - - await instance.client.presenceSubscribe(remoteJid); - - await instance.client.sendPresenceUpdate('composing', remoteJid); - - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - responseType: 'stream', - }); - - let completeMessage = ''; - let conversationId; - - const stream = response.data; - const reader = new Readable().wrap(stream); - - reader.on('data', (chunk) => { - const data = chunk.toString(); - const lines = data.split('\n'); - - lines.forEach((line) => { - if (line.startsWith('data: ')) { - const jsonString = line.substring(6); - try { - const event = JSON.parse(jsonString); - if (event.event === 'agent_message') { - completeMessage += event.answer; - conversationId = conversationId ?? event?.conversation_id; - } - } catch (error) { - console.error('Error parsing stream data:', error); - } - } - }); - }); - - reader.on('end', async () => { - await instance.client.sendPresenceUpdate('paused', remoteJid); - - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: completeMessage, - }, - false, - ); - - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: conversationId, - }, - }); - - sendTelemetry('/message/sendText'); - }); - - reader.on('error', (error) => { - console.error('Error reading stream:', error); - }); - - return; - } - - if (dify.botType === 'workflow') { - endpoint += '/workflows/run'; - const payload: any = { - inputs: { - query: content, - remoteJid: remoteJid, - pushName: pushName, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - response_mode: 'blocking', - conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, - user: remoteJid, - }; - - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); - - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], - }, - ]; - payload.inputs.query = contentSplit[2] || content; - } - - await instance.client.presenceSubscribe(remoteJid); - - await instance.client.sendPresenceUpdate('composing', remoteJid); - - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - }); - - await instance.client.sendPresenceUpdate('paused', remoteJid); - - const message = response?.data?.data.outputs.text; - - const regex = /!?\[(.*?)\]\((.*?)\)/g; - - const result = []; - let lastIndex = 0; - - let match; - while ((match = regex.exec(message)) !== null) { - if (match.index > lastIndex) { - result.push({ text: message.slice(lastIndex, match.index).trim() }); - } - - result.push({ caption: match[1], url: match[2] }); - - lastIndex = regex.lastIndex; - } - - if (lastIndex < message.length) { - result.push({ text: message.slice(lastIndex).trim() }); - } - - for (const item of result) { - if (item.text) { - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: item.text, - }, - false, - ); - } - - if (item.url) { - await instance.mediaMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - mediatype: 'image', - media: item.url, - caption: item.caption, - }, - false, - ); - } - } - - if (settings.keepOpen) { - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'closed', - }, - }); - } else { - await this.prismaRepository.integrationSession.delete({ - where: { - id: session.id, - }, - }); - } - - sendTelemetry('/message/sendText'); - - return; - } + await this.sendMessageToBot(instance, session, settings, dify, remoteJid, pushName, content); return; }