diff --git a/CHANGELOG.md b/CHANGELOG.md index dfc07620..ba250b05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Session is now individual per instance and remoteJid * Credentials verify on manager login * Added description column on typebot, dify and openai +* Fixed dify agent integration # 2.0.6-rc (2024-08-02 19:23) diff --git a/src/api/integrations/dify/services/dify.service.ts b/src/api/integrations/dify/services/dify.service.ts index ad0568ed..bfdc4f0b 100644 --- a/src/api/integrations/dify/services/dify.service.ts +++ b/src/api/integrations/dify/services/dify.service.ts @@ -1,5 +1,6 @@ import { Dify, DifySession, DifySetting, Message } from '@prisma/client'; import axios from 'axios'; +import { Readable } from 'stream'; import { ConfigService, S3 } from '../../../../config/env.config'; import { Logger } from '../../../../config/logger.config'; @@ -1161,7 +1162,7 @@ export class DifyService { const payload = { inputs: {}, query: content, - response_mode: 'blocking', + response_mode: 'streaming', conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, user: remoteJid, }; @@ -1174,33 +1175,60 @@ export class DifyService { headers: { Authorization: `Bearer ${dify.apiKey}`, }, + responseType: 'stream', }); - await instance.client.sendPresenceUpdate('paused', remoteJid); + let completeMessage = ''; - const message = response?.data?.answer; + const stream = response.data; + const reader = new Readable().wrap(stream); - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); + reader.on('data', (chunk) => { + const data = chunk.toString(); - await this.prismaRepository.difySession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, + try { + const event = JSON.parse(data); + if (event.event === 'agent_message') { + completeMessage += event.answer; + + console.log('completeMessage:', completeMessage); + } + } catch (error) { + console.error('Error parsing stream data:', error); + } }); - sendTelemetry('/message/sendText'); + reader.on('end', async () => { + await instance.client.sendPresenceUpdate('paused', remoteJid); + + const message = response?.data?.answer; + + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: message, + }, + false, + ); + + await this.prismaRepository.difySession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + sessionId: response?.data?.conversation_id, + }, + }); + + sendTelemetry('/message/sendText'); + }); + + reader.on('error', (error) => { + console.error('Error reading stream:', error); + }); return; } @@ -1466,7 +1494,7 @@ export class DifyService { const payload = { inputs: {}, query: content, - response_mode: 'blocking', + response_mode: 'streaming', conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, user: remoteJid, }; @@ -1479,33 +1507,62 @@ export class DifyService { headers: { Authorization: `Bearer ${dify.apiKey}`, }, + responseType: 'stream', }); - await instance.client.sendPresenceUpdate('paused', remoteJid); + let completeMessage = ''; - const message = response?.data?.answer; + const stream = response.data; + const reader = new Readable().wrap(stream); - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); + reader.on('data', (chunk) => { + const data = chunk.toString(); + const lines = data.split('\n'); - await this.prismaRepository.difySession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, + lines.forEach((line) => { + if (line.startsWith('data: ')) { + const jsonString = line.substring(6); + try { + const event = JSON.parse(jsonString); + if (event.event === 'agent_message') { + completeMessage += event.answer; + } + } catch (error) { + console.error('Error parsing stream data:', error); + } + } + }); }); - sendTelemetry('/message/sendText'); + reader.on('end', async () => { + await instance.client.sendPresenceUpdate('paused', remoteJid); + + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: completeMessage, + }, + false, + ); + + await this.prismaRepository.difySession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + sessionId: response?.data?.conversation_id, + }, + }); + + sendTelemetry('/message/sendText'); + }); + + reader.on('error', (error) => { + console.error('Error reading stream:', error); + }); return; }