feat(chatbot): implement base chatbot structure and enhance integration capabilities

- Introduced a base structure for chatbot integrations, including BaseChatbotController and BaseChatbotService.
- Added common DTOs for chatbot settings and data to streamline integration processes.
- Updated existing chatbot controllers (Dify, Evoai, N8n) to extend from the new base classes, improving code reusability and maintainability.
- Enhanced media message handling across integrations, including audio transcription capabilities using OpenAI's Whisper API.
- Refactored service methods to accommodate new message structures and improve error handling.
This commit is contained in:
Guilherme Gomes
2025-05-17 16:22:13 -03:00
parent d3ee370bdc
commit 69b4f1aa02
13 changed files with 2152 additions and 3229 deletions

View File

@@ -1,22 +1,29 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Logger } from '@config/logger.config';
import { ConfigService } from '@config/env.config';
import { IntegrationSession, N8n, N8nSetting } from '@prisma/client';
import { sendTelemetry } from '@utils/sendTelemetry';
import axios from 'axios';
import { downloadMediaMessage } from 'baileys';
import { BaseChatbotService } from '../../base-chatbot.service';
import { N8nDto } from '../dto/n8n.dto';
export class N8nService {
private readonly logger = new Logger('N8nService');
private readonly waMonitor: WAMonitoringService;
export class N8nService extends BaseChatbotService<N8n, N8nSetting> {
constructor(
waMonitor: WAMonitoringService,
private readonly prismaRepository: PrismaRepository,
prismaRepository: PrismaRepository,
configService: ConfigService,
) {
this.waMonitor = waMonitor;
super(waMonitor, prismaRepository, 'N8nService', configService);
}
/**
* Return the bot type for N8n
*/
protected getBotType(): string {
return 'n8n';
}
/**
@@ -122,40 +129,10 @@ export class N8nService {
}
public async createNewSession(instance: InstanceDto, data: any) {
try {
const session = await this.prismaRepository.integrationSession.create({
data: {
remoteJid: data.remoteJid,
pushName: data.pushName,
sessionId: data.remoteJid,
status: 'opened',
awaitUser: false,
botId: data.botId,
instanceId: instance.instanceId,
type: 'n8n',
},
});
return { session };
} catch (error) {
this.logger.error(error);
return;
}
return super.createNewSession(instance, data, 'n8n');
}
private isImageMessage(content: string) {
return content.includes('imageMessage');
}
private isJSON(str: string): boolean {
try {
JSON.parse(str);
return true;
} catch (e) {
return false;
}
}
private async sendMessageToBot(
protected async sendMessageToBot(
instance: any,
session: IntegrationSession,
settings: N8nSetting,
@@ -163,6 +140,7 @@ export class N8nService {
remoteJid: string,
pushName: string,
content: string,
msg?: any,
) {
try {
const endpoint: string = n8n.webhookUrl;
@@ -170,6 +148,24 @@ export class N8nService {
chatInput: content,
sessionId: session.sessionId,
};
// Handle audio messages
if (this.isAudioMessage(content) && msg) {
try {
this.logger.debug(`[N8n] Downloading audio for Whisper transcription`);
const mediaBuffer = await downloadMediaMessage({ key: msg.key, message: msg.message }, 'buffer', {});
const transcribedText = await this.speechToText(mediaBuffer);
if (transcribedText) {
payload.chatInput = transcribedText;
} else {
payload.chatInput = '[Audio message could not be transcribed]';
}
} catch (err) {
this.logger.error(`[N8n] Failed to transcribe audio: ${err}`);
payload.chatInput = '[Audio message could not be transcribed]';
}
}
const headers: Record<string, string> = {};
if (n8n.basicAuthUser && n8n.basicAuthPass) {
const auth = Buffer.from(`${n8n.basicAuthUser}:${n8n.basicAuthPass}`).toString('base64');
@@ -193,45 +189,39 @@ export class N8nService {
}
}
private async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: N8nSetting) {
protected async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: N8nSetting) {
const linkRegex = /(!?)\[(.*?)\]\((.*?)\)/g;
let textBuffer = '';
let lastIndex = 0;
let match: RegExpExecArray | null;
const getMediaType = (url: string): string | null => {
const extension = url.split('.').pop()?.toLowerCase();
const imageExtensions = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp'];
const audioExtensions = ['mp3', 'wav', 'aac', 'ogg'];
const videoExtensions = ['mp4', 'avi', 'mkv', 'mov'];
const documentExtensions = ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt'];
if (imageExtensions.includes(extension || '')) return 'image';
if (audioExtensions.includes(extension || '')) return 'audio';
if (videoExtensions.includes(extension || '')) return 'video';
if (documentExtensions.includes(extension || '')) return 'document';
return null;
};
while ((match = linkRegex.exec(message)) !== null) {
const [altText, url] = match;
const mediaType = getMediaType(url);
const [fullMatch, exclamation, altText, url] = match;
const mediaType = this.getMediaType(url);
const beforeText = message.slice(lastIndex, match.index);
if (beforeText) {
textBuffer += beforeText;
}
if (mediaType) {
const splitMessages = settings.splitMessages ?? false;
const timePerChar = settings.timePerChar ?? 0;
const minDelay = 1000;
const maxDelay = 20000;
if (textBuffer.trim()) {
if (splitMessages) {
const multipleMessages = textBuffer.trim().split('\n\n');
for (let index = 0; index < multipleMessages.length; index++) {
const message = multipleMessages[index];
const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
@@ -245,67 +235,103 @@ export class N8nService {
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
} else {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: textBuffer.trim(),
},
false,
);
const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: textBuffer,
},
false,
);
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
textBuffer = '';
}
if (mediaType === 'audio') {
await instance.audioWhatsapp({
textBuffer = '';
if (mediaType === 'image') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
audio: url,
caption: altText,
caption: exclamation === '!' ? undefined : altText,
mediatype: 'image',
media: url,
});
} else if (mediaType === 'video') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
caption: exclamation === '!' ? undefined : altText,
mediatype: 'video',
media: url,
});
} else if (mediaType === 'audio') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
mediatype: 'audio',
media: url,
});
} else if (mediaType === 'document') {
await instance.mediaMessage({
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
caption: exclamation === '!' ? undefined : altText,
mediatype: 'document',
media: url,
fileName: altText || 'file',
});
} else {
await instance.mediaMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
mediatype: mediaType,
media: url,
caption: altText,
},
null,
false,
);
}
} else {
textBuffer += `[${altText}](${url})`;
}
lastIndex = linkRegex.lastIndex;
lastIndex = match.index + fullMatch.length;
}
if (lastIndex < message.length) {
const remainingText = message.slice(lastIndex);
if (remainingText.trim()) {
textBuffer += remainingText;
}
const remainingText = message.slice(lastIndex);
if (remainingText) {
textBuffer += remainingText;
}
const splitMessages = settings.splitMessages ?? false;
const timePerChar = settings.timePerChar ?? 0;
const minDelay = 1000;
const maxDelay = 20000;
if (textBuffer.trim()) {
const splitMessages = settings.splitMessages ?? false;
const timePerChar = settings.timePerChar ?? 0;
const minDelay = 1000;
const maxDelay = 20000;
if (splitMessages) {
const multipleMessages = textBuffer.trim().split('\n\n');
for (let index = 0; index < multipleMessages.length; index++) {
const message = multipleMessages[index];
const delay = Math.min(Math.max(message.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
@@ -319,25 +345,41 @@ export class N8nService {
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
} else {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: textBuffer.trim(),
},
false,
);
const delay = Math.min(Math.max(textBuffer.length * timePerChar, minDelay), maxDelay);
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
await new Promise<void>((resolve) => {
setTimeout(async () => {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: textBuffer,
},
false,
);
resolve();
}, delay);
});
if (instance.integration === 'WHATSAPP_BAILEYS') {
await instance.client.sendPresenceUpdate('paused', remoteJid);
}
}
}
sendTelemetry('/message/sendText');
}
private async initNewSession(
protected async initNewSession(
instance: any,
remoteJid: string,
n8n: N8n,
@@ -345,88 +387,97 @@ export class N8nService {
session: IntegrationSession,
content: string,
pushName?: string,
msg?: any,
) {
const data = await this.createNewSession(instance, {
remoteJid,
pushName,
botId: n8n.id,
});
if (data.session) {
session = data.session;
}
await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName, content);
return;
}
public async processN8n(
instance: any,
remoteJid: string,
n8n: N8n,
session: IntegrationSession,
settings: N8nSetting,
content: string,
pushName?: string,
) {
if (session && session.status !== 'opened') {
try {
await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName || '', content, msg);
} catch (error) {
this.logger.error(error);
return;
}
if (session && settings.expire && settings.expire > 0) {
const now = Date.now();
const sessionUpdatedAt = new Date(session.updatedAt).getTime();
const diff = now - sessionUpdatedAt;
const diffInMinutes = Math.floor(diff / 1000 / 60);
if (diffInMinutes > settings.expire) {
if (settings.keepOpen) {
}
public async process(
instance: any,
remoteJid: string,
n8n: N8n,
session: IntegrationSession,
settings: N8nSetting,
content: string,
pushName?: string,
msg?: any,
) {
try {
// Handle keyword finish
if (settings?.keywordFinish?.includes(content.toLowerCase())) {
if (settings?.keepOpen) {
await this.prismaRepository.integrationSession.update({
where: { id: session.id },
data: { status: 'closed' },
where: {
id: session.id,
},
data: {
status: 'closed',
},
});
} else {
await this.prismaRepository.integrationSession.deleteMany({
where: { botId: n8n.id, remoteJid: remoteJid },
await this.prismaRepository.integrationSession.delete({
where: {
id: session.id,
},
});
}
await this.initNewSession(instance, remoteJid, n8n, settings, session, content, pushName);
return;
}
}
if (!session) {
await this.initNewSession(instance, remoteJid, n8n, settings, session, content, pushName);
return;
}
await this.prismaRepository.integrationSession.update({
where: { id: session.id },
data: { status: 'opened', awaitUser: false },
});
if (!content) {
if (settings.unknownMessage) {
this.waMonitor.waInstances[instance.instanceName].textMessage(
{
number: remoteJid.split('@')[0],
delay: settings.delayMessage || 1000,
text: settings.unknownMessage,
},
false,
// If session is new or doesn't exist
if (!session) {
const data = {
remoteJid,
pushName,
botId: n8n.id,
};
const createSession = await this.createNewSession(
{ instanceName: instance.instanceName, instanceId: instance.instanceId },
data,
);
sendTelemetry('/message/sendText');
await this.initNewSession(
instance,
remoteJid,
n8n,
settings,
createSession.session,
content,
pushName,
msg,
);
await sendTelemetry('/n8n/session/start');
return;
}
return;
}
if (settings.keywordFinish && content.toLowerCase() === settings.keywordFinish.toLowerCase()) {
if (settings.keepOpen) {
// If session exists but is paused
if (session.status === 'paused') {
await this.prismaRepository.integrationSession.update({
where: { id: session.id },
data: { status: 'closed' },
});
} else {
await this.prismaRepository.integrationSession.deleteMany({
where: { botId: n8n.id, remoteJid: remoteJid },
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
},
});
return;
}
// Regular message for ongoing session
await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName || '', content, msg);
} catch (error) {
this.logger.error(error);
return;
}
await this.sendMessageToBot(instance, session, settings, n8n, remoteJid, pushName, content);
return;
}
}