mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-07-14 01:41:24 -06:00
fix(dify agent integration): Updated dify agent integration to use streaming response mode and handle message data in chunks. Modified CHANGELOG.md and src/api/integrations/dify/services/dify.service.ts.
This commit addresses an issue with the dify agent integration by updating it to use a streaming response mode. This allows for handling message data in smaller chunks, improving performance and reducing memory usage. The commit also includes updates to the CHANGELOG.md file and the dify.service.ts file, where the actual changes were implemented.
This commit is contained in:
parent
2aa0e08ae4
commit
de8e4a0ca3
@ -8,6 +8,7 @@
|
||||
* Session is now individual per instance and remoteJid
|
||||
* Credentials verify on manager login
|
||||
* Added description column on typebot, dify and openai
|
||||
* Fixed dify agent integration
|
||||
|
||||
# 2.0.6-rc (2024-08-02 19:23)
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
import { Dify, DifySession, DifySetting, Message } from '@prisma/client';
|
||||
import axios from 'axios';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import { ConfigService, S3 } from '../../../../config/env.config';
|
||||
import { Logger } from '../../../../config/logger.config';
|
||||
@ -1161,7 +1162,7 @@ export class DifyService {
|
||||
const payload = {
|
||||
inputs: {},
|
||||
query: content,
|
||||
response_mode: 'blocking',
|
||||
response_mode: 'streaming',
|
||||
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
|
||||
user: remoteJid,
|
||||
};
|
||||
@ -1174,33 +1175,60 @@ export class DifyService {
|
||||
headers: {
|
||||
Authorization: `Bearer ${dify.apiKey}`,
|
||||
},
|
||||
responseType: 'stream',
|
||||
});
|
||||
|
||||
await instance.client.sendPresenceUpdate('paused', remoteJid);
|
||||
let completeMessage = '';
|
||||
|
||||
const message = response?.data?.answer;
|
||||
const stream = response.data;
|
||||
const reader = new Readable().wrap(stream);
|
||||
|
||||
await instance.textMessage(
|
||||
{
|
||||
number: remoteJid.split('@')[0],
|
||||
delay: settings?.delayMessage || 1000,
|
||||
text: message,
|
||||
},
|
||||
false,
|
||||
);
|
||||
reader.on('data', (chunk) => {
|
||||
const data = chunk.toString();
|
||||
|
||||
await this.prismaRepository.difySession.update({
|
||||
where: {
|
||||
id: session.id,
|
||||
},
|
||||
data: {
|
||||
status: 'opened',
|
||||
awaitUser: true,
|
||||
sessionId: response?.data?.conversation_id,
|
||||
},
|
||||
try {
|
||||
const event = JSON.parse(data);
|
||||
if (event.event === 'agent_message') {
|
||||
completeMessage += event.answer;
|
||||
|
||||
console.log('completeMessage:', completeMessage);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error parsing stream data:', error);
|
||||
}
|
||||
});
|
||||
|
||||
sendTelemetry('/message/sendText');
|
||||
reader.on('end', async () => {
|
||||
await instance.client.sendPresenceUpdate('paused', remoteJid);
|
||||
|
||||
const message = response?.data?.answer;
|
||||
|
||||
await instance.textMessage(
|
||||
{
|
||||
number: remoteJid.split('@')[0],
|
||||
delay: settings?.delayMessage || 1000,
|
||||
text: message,
|
||||
},
|
||||
false,
|
||||
);
|
||||
|
||||
await this.prismaRepository.difySession.update({
|
||||
where: {
|
||||
id: session.id,
|
||||
},
|
||||
data: {
|
||||
status: 'opened',
|
||||
awaitUser: true,
|
||||
sessionId: response?.data?.conversation_id,
|
||||
},
|
||||
});
|
||||
|
||||
sendTelemetry('/message/sendText');
|
||||
});
|
||||
|
||||
reader.on('error', (error) => {
|
||||
console.error('Error reading stream:', error);
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
@ -1466,7 +1494,7 @@ export class DifyService {
|
||||
const payload = {
|
||||
inputs: {},
|
||||
query: content,
|
||||
response_mode: 'blocking',
|
||||
response_mode: 'streaming',
|
||||
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
|
||||
user: remoteJid,
|
||||
};
|
||||
@ -1479,33 +1507,62 @@ export class DifyService {
|
||||
headers: {
|
||||
Authorization: `Bearer ${dify.apiKey}`,
|
||||
},
|
||||
responseType: 'stream',
|
||||
});
|
||||
|
||||
await instance.client.sendPresenceUpdate('paused', remoteJid);
|
||||
let completeMessage = '';
|
||||
|
||||
const message = response?.data?.answer;
|
||||
const stream = response.data;
|
||||
const reader = new Readable().wrap(stream);
|
||||
|
||||
await instance.textMessage(
|
||||
{
|
||||
number: remoteJid.split('@')[0],
|
||||
delay: settings?.delayMessage || 1000,
|
||||
text: message,
|
||||
},
|
||||
false,
|
||||
);
|
||||
reader.on('data', (chunk) => {
|
||||
const data = chunk.toString();
|
||||
const lines = data.split('\n');
|
||||
|
||||
await this.prismaRepository.difySession.update({
|
||||
where: {
|
||||
id: session.id,
|
||||
},
|
||||
data: {
|
||||
status: 'opened',
|
||||
awaitUser: true,
|
||||
sessionId: response?.data?.conversation_id,
|
||||
},
|
||||
lines.forEach((line) => {
|
||||
if (line.startsWith('data: ')) {
|
||||
const jsonString = line.substring(6);
|
||||
try {
|
||||
const event = JSON.parse(jsonString);
|
||||
if (event.event === 'agent_message') {
|
||||
completeMessage += event.answer;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error parsing stream data:', error);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
sendTelemetry('/message/sendText');
|
||||
reader.on('end', async () => {
|
||||
await instance.client.sendPresenceUpdate('paused', remoteJid);
|
||||
|
||||
await instance.textMessage(
|
||||
{
|
||||
number: remoteJid.split('@')[0],
|
||||
delay: settings?.delayMessage || 1000,
|
||||
text: completeMessage,
|
||||
},
|
||||
false,
|
||||
);
|
||||
|
||||
await this.prismaRepository.difySession.update({
|
||||
where: {
|
||||
id: session.id,
|
||||
},
|
||||
data: {
|
||||
status: 'opened',
|
||||
awaitUser: true,
|
||||
sessionId: response?.data?.conversation_id,
|
||||
},
|
||||
});
|
||||
|
||||
sendTelemetry('/message/sendText');
|
||||
});
|
||||
|
||||
reader.on('error', (error) => {
|
||||
console.error('Error reading stream:', error);
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user