From de8e4a0ca398eb4ef297a8a93b821fb33c810560 Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Sat, 3 Aug 2024 14:04:39 -0300 Subject: [PATCH] fix(dify agent integration): Updated dify agent integration to use streaming response mode and handle message data in chunks. Modified CHANGELOG.md and src/api/integrations/dify/services/dify.service.ts. This commit addresses an issue with the dify agent integration by updating it to use a streaming response mode. This allows for handling message data in smaller chunks, improving performance and reducing memory usage. The commit also includes updates to the CHANGELOG.md file and the dify.service.ts file, where the actual changes were implemented. --- CHANGELOG.md | 1 + .../dify/services/dify.service.ts | 141 ++++++++++++------ 2 files changed, 100 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfc07620..ba250b05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Session is now individual per instance and remoteJid * Credentials verify on manager login * Added description column on typebot, dify and openai +* Fixed dify agent integration # 2.0.6-rc (2024-08-02 19:23) diff --git a/src/api/integrations/dify/services/dify.service.ts b/src/api/integrations/dify/services/dify.service.ts index ad0568ed..bfdc4f0b 100644 --- a/src/api/integrations/dify/services/dify.service.ts +++ b/src/api/integrations/dify/services/dify.service.ts @@ -1,5 +1,6 @@ import { Dify, DifySession, DifySetting, Message } from '@prisma/client'; import axios from 'axios'; +import { Readable } from 'stream'; import { ConfigService, S3 } from '../../../../config/env.config'; import { Logger } from '../../../../config/logger.config'; @@ -1161,7 +1162,7 @@ export class DifyService { const payload = { inputs: {}, query: content, - response_mode: 'blocking', + response_mode: 'streaming', conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, user: remoteJid, }; @@ -1174,33 +1175,60 @@ export class DifyService { headers: { Authorization: `Bearer ${dify.apiKey}`, }, + responseType: 'stream', }); - await instance.client.sendPresenceUpdate('paused', remoteJid); + let completeMessage = ''; - const message = response?.data?.answer; + const stream = response.data; + const reader = new Readable().wrap(stream); - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); + reader.on('data', (chunk) => { + const data = chunk.toString(); - await this.prismaRepository.difySession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, + try { + const event = JSON.parse(data); + if (event.event === 'agent_message') { + completeMessage += event.answer; + + console.log('completeMessage:', completeMessage); + } + } catch (error) { + console.error('Error parsing stream data:', error); + } }); - sendTelemetry('/message/sendText'); + reader.on('end', async () => { + await instance.client.sendPresenceUpdate('paused', remoteJid); + + const message = response?.data?.answer; + + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: message, + }, + false, + ); + + await this.prismaRepository.difySession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + sessionId: response?.data?.conversation_id, + }, + }); + + sendTelemetry('/message/sendText'); + }); + + reader.on('error', (error) => { + console.error('Error reading stream:', error); + }); return; } @@ -1466,7 +1494,7 @@ export class DifyService { const payload = { inputs: {}, query: content, - response_mode: 'blocking', + response_mode: 'streaming', conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId, user: remoteJid, }; @@ -1479,33 +1507,62 @@ export class DifyService { headers: { Authorization: `Bearer ${dify.apiKey}`, }, + responseType: 'stream', }); - await instance.client.sendPresenceUpdate('paused', remoteJid); + let completeMessage = ''; - const message = response?.data?.answer; + const stream = response.data; + const reader = new Readable().wrap(stream); - await instance.textMessage( - { - number: remoteJid.split('@')[0], - delay: settings?.delayMessage || 1000, - text: message, - }, - false, - ); + reader.on('data', (chunk) => { + const data = chunk.toString(); + const lines = data.split('\n'); - await this.prismaRepository.difySession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: response?.data?.conversation_id, - }, + lines.forEach((line) => { + if (line.startsWith('data: ')) { + const jsonString = line.substring(6); + try { + const event = JSON.parse(jsonString); + if (event.event === 'agent_message') { + completeMessage += event.answer; + } + } catch (error) { + console.error('Error parsing stream data:', error); + } + } + }); }); - sendTelemetry('/message/sendText'); + reader.on('end', async () => { + await instance.client.sendPresenceUpdate('paused', remoteJid); + + await instance.textMessage( + { + number: remoteJid.split('@')[0], + delay: settings?.delayMessage || 1000, + text: completeMessage, + }, + false, + ); + + await this.prismaRepository.difySession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + sessionId: response?.data?.conversation_id, + }, + }); + + sendTelemetry('/message/sendText'); + }); + + reader.on('error', (error) => { + console.error('Error reading stream:', error); + }); return; }