diff --git a/CHANGELOG.md b/CHANGELOG.md index e6f4e431..8f838f78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ ### Fixed * Refactor integrations structure for modular system +* Fixed dify agent integration # 2.0.10 (2024-08-16 16:23) diff --git a/src/api/integrations/chatbot/dify/services/dify.service.ts b/src/api/integrations/chatbot/dify/services/dify.service.ts index c467af7f..7af70408 100644 --- a/src/api/integrations/chatbot/dify/services/dify.service.ts +++ b/src/api/integrations/chatbot/dify/services/dify.service.ts @@ -41,6 +41,15 @@ export class DifyService { return content.includes('imageMessage'); } + private isJSON(str: string): boolean { + try { + JSON.parse(str); + return true; + } catch (e) { + return false; + } + } + private async sendMessageToBot( instance: any, session: IntegrationSession, @@ -50,228 +59,279 @@ export class DifyService { pushName: string, content: string, ) { - let endpoint: string = dify.apiUrl; + try { + let endpoint: string = dify.apiUrl; - if (dify.botType === 'chatBot') { - endpoint += '/chat-messages'; - const payload: any = { - inputs: { - remoteJid: remoteJid, - pushName: pushName, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - query: content, - response_mode: 'blocking', - conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, - user: remoteJid, - }; - - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); - - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], + if (dify.botType === 'chatBot') { + endpoint += '/chat-messages'; + const payload: any = { + inputs: { + remoteJid: remoteJid, + pushName: pushName, + instanceName: instance.instanceName, + serverUrl: this.configService.get('SERVER').URL, + apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, }, - ]; - payload.query = contentSplit[2] || content; - } - - await instance.client.presenceSubscribe(remoteJid); - - await instance.client.sendPresenceUpdate('composing', remoteJid); - - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - }); - - await instance.client.sendPresenceUpdate('paused', remoteJid); - - const message = response?.data?.answer; - - await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); - } - - if (dify.botType === 'textGenerator') { - endpoint += '/completion-messages'; - const payload: any = { - inputs: { query: content, - pushName: pushName, - remoteJid: remoteJid, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - response_mode: 'blocking', - conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, - user: remoteJid, - }; + response_mode: 'blocking', + conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, + user: remoteJid, + }; - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); + if (this.isImageMessage(content)) { + const contentSplit = content.split('|'); - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], - }, - ]; - payload.inputs.query = contentSplit[2] || content; - } - - await instance.client.presenceSubscribe(remoteJid); - - await instance.client.sendPresenceUpdate('composing', remoteJid); - - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - }); - - await instance.client.sendPresenceUpdate('paused', remoteJid); - - const message = response?.data?.answer; - - await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); - } - - if (dify.botType === 'agent') { - endpoint += '/chat-messages'; - const payload: any = { - inputs: { - remoteJid: remoteJid, - pushName: pushName, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - query: content, - response_mode: 'streaming', - conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, - user: remoteJid, - }; - - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); - - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], - }, - ]; - payload.query = contentSplit[2] || content; - } - - await instance.client.presenceSubscribe(remoteJid); - - await instance.client.sendPresenceUpdate('composing', remoteJid); - - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - responseType: 'stream', - }); - - let conversationId; - - const stream = response.data; - const reader = new Readable().wrap(stream); - - reader.on('data', (chunk) => { - const data = chunk.toString(); - - try { - const event = JSON.parse(data); - if (event.event === 'agent_message') { - conversationId = conversationId ?? event?.conversation_id; - } - } catch (error) { - console.error('Error parsing stream data:', error); + payload.files = [ + { + type: 'image', + transfer_method: 'remote_url', + url: contentSplit[1].split('?')[0], + }, + ]; + payload.query = contentSplit[2] || content; } - }); - reader.on('end', async () => { + await instance.client.presenceSubscribe(remoteJid); + + await instance.client.sendPresenceUpdate('composing', remoteJid); + + const response = await axios.post(endpoint, payload, { + headers: { + Authorization: `Bearer ${dify.apiKey}`, + }, + }); + await instance.client.sendPresenceUpdate('paused', remoteJid); const message = response?.data?.answer; + const conversationId = response?.data?.conversation_id; - await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); - }); + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); - reader.on('error', (error) => { - console.error('Error reading stream:', error); - }); - - return; - } - - if (dify.botType === 'workflow') { - endpoint += '/workflows/run'; - const payload: any = { - inputs: { - query: content, - remoteJid: remoteJid, - pushName: pushName, - instanceName: instance.instanceName, - serverUrl: this.configService.get('SERVER').URL, - apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, - }, - response_mode: 'blocking', - user: remoteJid, - }; - - if (this.isImageMessage(content)) { - const contentSplit = content.split('|'); - - payload.files = [ - { - type: 'image', - transfer_method: 'remote_url', - url: contentSplit[1].split('?')[0], + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, }, - ]; - payload.inputs.query = contentSplit[2] || content; + data: { + status: 'opened', + awaitUser: true, + sessionId: session.sessionId === remoteJid ? conversationId : session.sessionId, + }, + }); } - await instance.client.presenceSubscribe(remoteJid); + if (dify.botType === 'textGenerator') { + endpoint += '/completion-messages'; + const payload: any = { + inputs: { + query: content, + pushName: pushName, + remoteJid: remoteJid, + instanceName: instance.instanceName, + serverUrl: this.configService.get('SERVER').URL, + apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, + }, + response_mode: 'blocking', + conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, + user: remoteJid, + }; - await instance.client.sendPresenceUpdate('composing', remoteJid); + if (this.isImageMessage(content)) { + const contentSplit = content.split('|'); - const response = await axios.post(endpoint, payload, { - headers: { - Authorization: `Bearer ${dify.apiKey}`, - }, - }); + payload.files = [ + { + type: 'image', + transfer_method: 'remote_url', + url: contentSplit[1].split('?')[0], + }, + ]; + payload.inputs.query = contentSplit[2] || content; + } - await instance.client.sendPresenceUpdate('paused', remoteJid); + await instance.client.presenceSubscribe(remoteJid); - const message = response?.data?.data.outputs.text; + await instance.client.sendPresenceUpdate('composing', remoteJid); - await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings); + const response = await axios.post(endpoint, payload, { + headers: { + Authorization: `Bearer ${dify.apiKey}`, + }, + }); + await instance.client.sendPresenceUpdate('paused', remoteJid); + + const message = response?.data?.answer; + const conversationId = response?.data?.conversation_id; + + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + sessionId: session.sessionId === remoteJid ? conversationId : session.sessionId, + }, + }); + } + + if (dify.botType === 'agent') { + endpoint += '/chat-messages'; + const payload: any = { + inputs: { + remoteJid: remoteJid, + pushName: pushName, + instanceName: instance.instanceName, + serverUrl: this.configService.get('SERVER').URL, + apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, + }, + query: content, + response_mode: 'streaming', + conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, + user: remoteJid, + }; + + if (this.isImageMessage(content)) { + const contentSplit = content.split('|'); + + payload.files = [ + { + type: 'image', + transfer_method: 'remote_url', + url: contentSplit[1].split('?')[0], + }, + ]; + payload.query = contentSplit[2] || content; + } + + await instance.client.presenceSubscribe(remoteJid); + + await instance.client.sendPresenceUpdate('composing', remoteJid); + + const response = await axios.post(endpoint, payload, { + headers: { + Authorization: `Bearer ${dify.apiKey}`, + }, + responseType: 'stream', + }); + + let conversationId; + let answer = ''; + + const stream = response.data; + const reader = new Readable().wrap(stream); + + reader.on('data', (chunk) => { + const data = chunk.toString(); + + try { + const cleanedData = data.replace(/^data:\s*/, ''); + + const event = JSON.parse(cleanedData); + + if (event?.event === 'agent_message') { + console.log('event:', event); + conversationId = conversationId ?? event?.conversation_id; + answer += event?.answer; + } + } catch (error) { + console.error('Error parsing stream data:', error); + } + }); + + reader.on('end', async () => { + await instance.client.sendPresenceUpdate('paused', remoteJid); + + const message = answer; + + console.log('message:', answer); + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + sessionId: conversationId, + }, + }); + }); + + reader.on('error', (error) => { + console.error('Error reading stream:', error); + }); + + return; + } + + if (dify.botType === 'workflow') { + endpoint += '/workflows/run'; + const payload: any = { + inputs: { + query: content, + remoteJid: remoteJid, + pushName: pushName, + instanceName: instance.instanceName, + serverUrl: this.configService.get('SERVER').URL, + apiKey: this.configService.get('AUTHENTICATION').API_KEY.KEY, + }, + response_mode: 'blocking', + user: remoteJid, + }; + + if (this.isImageMessage(content)) { + const contentSplit = content.split('|'); + + payload.files = [ + { + type: 'image', + transfer_method: 'remote_url', + url: contentSplit[1].split('?')[0], + }, + ]; + payload.inputs.query = contentSplit[2] || content; + } + + await instance.client.presenceSubscribe(remoteJid); + + await instance.client.sendPresenceUpdate('composing', remoteJid); + + const response = await axios.post(endpoint, payload, { + headers: { + Authorization: `Bearer ${dify.apiKey}`, + }, + }); + + await instance.client.sendPresenceUpdate('paused', remoteJid); + + const message = response?.data?.data.outputs.text; + + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + }, + }); + + return; + } + } catch (error) { + this.logger.error(error.response?.data || error); return; } } - private async sendMessageWhatsApp( - instance: any, - remoteJid: string, - message: string, - session: IntegrationSession, - settings: DifySetting, - ) { + private async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: DifySetting) { const regex = /!?\[(.*?)\]\((.*?)\)/g; const result = []; @@ -318,23 +378,6 @@ export class DifyService { } } - if (settings.keepOpen) { - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'closed', - }, - }); - } else { - await this.prismaRepository.integrationSession.delete({ - where: { - id: session.id, - }, - }); - } - sendTelemetry('/message/sendText'); }