fix: dify agent integration

This commit is contained in:
Davidson Gomes 2024-08-22 18:15:30 -03:00
parent e9f4477d11
commit 2196f65b7a
2 changed files with 258 additions and 214 deletions

View File

@ -11,6 +11,7 @@
### Fixed
* Refactor integrations structure for modular system
* Fixed dify agent integration
# 2.0.10 (2024-08-16 16:23)

View File

@ -41,6 +41,15 @@ export class DifyService {
return content.includes('imageMessage');
}
private isJSON(str: string): boolean {
try {
JSON.parse(str);
return true;
} catch (e) {
return false;
}
}
private async sendMessageToBot(
instance: any,
session: IntegrationSession,
@ -50,228 +59,279 @@ export class DifyService {
pushName: string,
content: string,
) {
let endpoint: string = dify.apiUrl;
try {
let endpoint: string = dify.apiUrl;
if (dify.botType === 'chatBot') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
query: content,
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
if (dify.botType === 'chatBot') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
];
payload.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
}
if (dify.botType === 'textGenerator') {
endpoint += '/completion-messages';
const payload: any = {
inputs: {
query: content,
pushName: pushName,
remoteJid: remoteJid,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.inputs.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
}
if (dify.botType === 'agent') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
query: content,
response_mode: 'streaming',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
responseType: 'stream',
});
let conversationId;
const stream = response.data;
const reader = new Readable().wrap(stream);
reader.on('data', (chunk) => {
const data = chunk.toString();
try {
const event = JSON.parse(data);
if (event.event === 'agent_message') {
conversationId = conversationId ?? event?.conversation_id;
}
} catch (error) {
console.error('Error parsing stream data:', error);
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.query = contentSplit[2] || content;
}
});
reader.on('end', async () => {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
const conversationId = response?.data?.conversation_id;
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
});
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
reader.on('error', (error) => {
console.error('Error reading stream:', error);
});
return;
}
if (dify.botType === 'workflow') {
endpoint += '/workflows/run';
const payload: any = {
inputs: {
query: content,
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
];
payload.inputs.query = contentSplit[2] || content;
data: {
status: 'opened',
awaitUser: true,
sessionId: session.sessionId === remoteJid ? conversationId : session.sessionId,
},
});
}
await instance.client.presenceSubscribe(remoteJid);
if (dify.botType === 'textGenerator') {
endpoint += '/completion-messages';
const payload: any = {
inputs: {
query: content,
pushName: pushName,
remoteJid: remoteJid,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
await instance.client.sendPresenceUpdate('composing', remoteJid);
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.inputs.query = contentSplit[2] || content;
}
await instance.client.sendPresenceUpdate('paused', remoteJid);
await instance.client.presenceSubscribe(remoteJid);
const message = response?.data?.data.outputs.text;
await instance.client.sendPresenceUpdate('composing', remoteJid);
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
const conversationId = response?.data?.conversation_id;
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
sessionId: session.sessionId === remoteJid ? conversationId : session.sessionId,
},
});
}
if (dify.botType === 'agent') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
query: content,
response_mode: 'streaming',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
responseType: 'stream',
});
let conversationId;
let answer = '';
const stream = response.data;
const reader = new Readable().wrap(stream);
reader.on('data', (chunk) => {
const data = chunk.toString();
try {
const cleanedData = data.replace(/^data:\s*/, '');
const event = JSON.parse(cleanedData);
if (event?.event === 'agent_message') {
console.log('event:', event);
conversationId = conversationId ?? event?.conversation_id;
answer += event?.answer;
}
} catch (error) {
console.error('Error parsing stream data:', error);
}
});
reader.on('end', async () => {
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = answer;
console.log('message:', answer);
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
sessionId: conversationId,
},
});
});
reader.on('error', (error) => {
console.error('Error reading stream:', error);
});
return;
}
if (dify.botType === 'workflow') {
endpoint += '/workflows/run';
const payload: any = {
inputs: {
query: content,
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.inputs.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.data.outputs.text;
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
},
});
return;
}
} catch (error) {
this.logger.error(error.response?.data || error);
return;
}
}
private async sendMessageWhatsApp(
instance: any,
remoteJid: string,
message: string,
session: IntegrationSession,
settings: DifySetting,
) {
private async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: DifySetting) {
const regex = /!?\[(.*?)\]\((.*?)\)/g;
const result = [];
@ -318,23 +378,6 @@ export class DifyService {
}
}
if (settings.keepOpen) {
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'closed',
},
});
} else {
await this.prismaRepository.integrationSession.delete({
where: {
id: session.id,
},
});
}
sendTelemetry('/message/sendText');
}