diff --git a/src/api/integrations/chatbot/dify/services/dify.service.ts b/src/api/integrations/chatbot/dify/services/dify.service.ts index c60782d7..348ee70c 100644 --- a/src/api/integrations/chatbot/dify/services/dify.service.ts +++ b/src/api/integrations/chatbot/dify/services/dify.service.ts @@ -224,63 +224,43 @@ export class DifyService { headers: { Authorization: `Bearer ${dify.apiKey}`, }, - responseType: 'stream', }); let conversationId; let answer = ''; - const stream = response.data; - const reader = new Readable().wrap(stream); + const data = response.data.replaceAll('data: ', ''); - reader.on('data', (chunk) => { - const data = chunk.toString().replace(/data:\s*/g, ''); + const events = data.split('\n').filter((line) => line.trim() !== ''); - if (data.trim() === '' || !data.startsWith('{')) { - return; - } + for (const eventString of events) { + if (eventString.trim().startsWith('{')) { + const event = JSON.parse(eventString); - try { - const events = data.split('\n').filter((line) => line.trim() !== ''); - - for (const eventString of events) { - if (eventString.trim().startsWith('{')) { - const event = JSON.parse(eventString); - - if (event?.event === 'agent_message') { - console.log('event:', event); - conversationId = conversationId ?? event?.conversation_id; - answer += event?.answer; - } - } + if (event?.event === 'agent_message') { + console.log('event:', event); + conversationId = conversationId ?? event?.conversation_id; + answer += event?.answer; } - } catch (error) { - console.error('Error parsing stream data:', error); } - }); + } - reader.on('end', async () => { - if (instance.integration === Integration.WHATSAPP_BAILEYS) - await instance.client.sendPresenceUpdate('paused', remoteJid); + if (instance.integration === Integration.WHATSAPP_BAILEYS) + await instance.client.sendPresenceUpdate('paused', remoteJid); - const message = answer; + const message = answer; - await this.sendMessageWhatsApp(instance, remoteJid, message, settings); + await this.sendMessageWhatsApp(instance, remoteJid, message, settings); - await this.prismaRepository.integrationSession.update({ - where: { - id: session.id, - }, - data: { - status: 'opened', - awaitUser: true, - sessionId: conversationId, - }, - }); - }); - - reader.on('error', (error) => { - console.error('Error reading stream:', error); + await this.prismaRepository.integrationSession.update({ + where: { + id: session.id, + }, + data: { + status: 'opened', + awaitUser: true, + sessionId: conversationId, + }, }); return;