Merge branch 'ev2' into v2.0.0

This commit is contained in:
Stênio Aníbal
2024-08-23 10:56:17 -03:00
42 changed files with 3330 additions and 869 deletions

View File

@@ -36,11 +36,7 @@ export class InstanceController {
public async createInstance(instanceData: InstanceDto) {
try {
if (!instanceData.token && instanceData.integration === Integration.WHATSAPP_BUSINESS) {
throw new BadRequestException('token is required');
}
const instance = channelController.init(instanceData.integration, {
const instance = channelController.init(instanceData, {
configService: this.configService,
eventEmitter: this.eventEmitter,
prismaRepository: this.prismaRepository,

View File

@@ -1,4 +1,11 @@
import { proto, WAPresence, WAPrivacyOnlineValue, WAPrivacyValue, WAReadReceiptsValue } from 'baileys';
import {
proto,
WAPresence,
WAPrivacyGroupAddValue,
WAPrivacyOnlineValue,
WAPrivacyValue,
WAReadReceiptsValue,
} from 'baileys';
export class OnWhatsAppDto {
constructor(
@@ -84,7 +91,7 @@ export class PrivacySettingDto {
status: WAPrivacyValue;
online: WAPrivacyOnlineValue;
last: WAPrivacyValue;
groupadd: WAPrivacyValue;
groupadd: WAPrivacyGroupAddValue;
}
export class DeleteMessage {

View File

@@ -1,12 +1,16 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { ProviderFiles } from '@api/provider/sessions';
import { PrismaRepository } from '@api/repository/repository.service';
import { CacheService } from '@api/services/cache.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Integration } from '@api/types/wa.types';
import { ConfigService } from '@config/env.config';
import { BadRequestException } from '@exceptions';
import EventEmitter2 from 'eventemitter2';
import { BaileysStartupService } from './whatsapp/baileys/whatsapp.baileys.service';
import { BusinessStartupService } from './whatsapp/business/whatsapp.business.service';
import { EvolutionStartupService } from './evolution/evolution.channel.service';
import { BusinessStartupService } from './meta/whatsapp.business.service';
import { BaileysStartupService } from './whatsapp/whatsapp.baileys.service';
type ChannelDataType = {
configService: ConfigService;
@@ -18,9 +22,41 @@ type ChannelDataType = {
providerFiles: ProviderFiles;
};
export interface ChannelControllerInterface {
receiveWebhook(data: any): Promise<any>;
}
export class ChannelController {
public init(integration: string, data: ChannelDataType) {
if (integration === Integration.WHATSAPP_BUSINESS) {
public prismaRepository: PrismaRepository;
public waMonitor: WAMonitoringService;
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
this.prisma = prismaRepository;
this.monitor = waMonitor;
}
public set prisma(prisma: PrismaRepository) {
this.prismaRepository = prisma;
}
public get prisma() {
return this.prismaRepository;
}
public set monitor(waMonitor: WAMonitoringService) {
this.waMonitor = waMonitor;
}
public get monitor() {
return this.waMonitor;
}
public init(instanceData: InstanceDto, data: ChannelDataType) {
if (!instanceData.token && instanceData.integration === Integration.WHATSAPP_BUSINESS) {
throw new BadRequestException('token is required');
}
if (instanceData.integration === Integration.WHATSAPP_BUSINESS) {
return new BusinessStartupService(
data.configService,
data.eventEmitter,
@@ -32,6 +68,18 @@ export class ChannelController {
);
}
if (instanceData.integration === Integration.EVOLUTION) {
return new EvolutionStartupService(
data.configService,
data.eventEmitter,
data.prismaRepository,
data.cache,
data.chatwootCache,
data.baileysCache,
data.providerFiles,
);
}
return new BaileysStartupService(
data.configService,
data.eventEmitter,

View File

@@ -0,0 +1,15 @@
import { Router } from 'express';
import { EvolutionRouter } from './evolution/evolution.router';
import { MetaRouter } from './meta/meta.router';
export class ChannelRouter {
public readonly router: Router;
constructor(configService: any) {
this.router = Router();
this.router.use('/', new EvolutionRouter(configService).router);
this.router.use('/', new MetaRouter(configService).router);
}
}

View File

@@ -0,0 +1,475 @@
import { Options, SendAudioDto, SendMediaDto, SendTextDto } from '@api/dto/sendMessage.dto';
import { ProviderFiles } from '@api/provider/sessions';
import { PrismaRepository } from '@api/repository/repository.service';
import { chatbotController } from '@api/server.module';
import { CacheService } from '@api/services/cache.service';
import { ChannelStartupService } from '@api/services/channel.service';
import { Events, wa } from '@api/types/wa.types';
import { Chatwoot, ConfigService, Openai } from '@config/env.config';
import { BadRequestException, InternalServerErrorException } from '@exceptions';
import EventEmitter2 from 'eventemitter2';
export class EvolutionStartupService extends ChannelStartupService {
constructor(
public readonly configService: ConfigService,
public readonly eventEmitter: EventEmitter2,
public readonly prismaRepository: PrismaRepository,
public readonly cache: CacheService,
public readonly chatwootCache: CacheService,
public readonly baileysCache: CacheService,
private readonly providerFiles: ProviderFiles,
) {
super(configService, eventEmitter, prismaRepository, chatwootCache);
this.client = null;
}
public client: any;
public stateConnection: wa.StateConnection = { state: 'open' };
public phoneNumber: string;
public mobile: boolean;
public get connectionStatus() {
return this.stateConnection;
}
public async closeClient() {
this.stateConnection = { state: 'close' };
}
public get qrCode(): wa.QrCode {
return {
pairingCode: this.instance.qrcode?.pairingCode,
code: this.instance.qrcode?.code,
base64: this.instance.qrcode?.base64,
count: this.instance.qrcode?.count,
};
}
public async logoutInstance() {
await this.closeClient();
}
public async profilePicture(number: string) {
const jid = this.createJid(number);
return {
wuid: jid,
profilePictureUrl: null,
};
}
public async getProfileName() {
return null;
}
public async profilePictureUrl() {
return null;
}
public async getProfileStatus() {
return null;
}
public async connectToWhatsapp(data?: any): Promise<any> {
if (!data) return;
try {
this.loadChatwoot();
this.eventHandler(data);
} catch (error) {
this.logger.error(error);
throw new InternalServerErrorException(error?.toString());
}
}
protected async eventHandler(received: any) {
try {
let messageRaw: any;
if (received.message) {
const key = {
id: received.key.id,
remoteJid: received.key.remoteJid,
fromMe: received.key.fromMe,
};
messageRaw = {
key,
pushName: received.pushName,
message: received.message,
messageType: received.messageType,
messageTimestamp: Math.round(new Date().getTime() / 1000),
source: 'unknown',
instanceId: this.instanceId,
};
if (this.configService.get<Openai>('OPENAI').ENABLED) {
const openAiDefaultSettings = await this.prismaRepository.openaiSetting.findFirst({
where: {
instanceId: this.instanceId,
},
include: {
OpenaiCreds: true,
},
});
if (
openAiDefaultSettings &&
openAiDefaultSettings.openaiCredsId &&
openAiDefaultSettings.speechToText &&
received?.message?.audioMessage
) {
messageRaw.message.speechToText = await this.openaiService.speechToText(
openAiDefaultSettings.OpenaiCreds,
received,
this.client.updateMediaMessage,
);
}
}
this.logger.log(messageRaw);
this.sendDataWebhook(Events.MESSAGES_UPSERT, messageRaw);
await chatbotController.emit({
instance: { instanceName: this.instance.name, instanceId: this.instanceId },
remoteJid: messageRaw.key.remoteJid,
msg: messageRaw,
pushName: messageRaw.pushName,
});
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED && this.localChatwoot.enabled) {
const chatwootSentMessage = await this.chatwootService.eventWhatsapp(
Events.MESSAGES_UPSERT,
{ instanceName: this.instance.name, instanceId: this.instanceId },
messageRaw,
);
if (chatwootSentMessage?.id) {
messageRaw.chatwootMessageId = chatwootSentMessage.id;
messageRaw.chatwootInboxId = chatwootSentMessage.id;
messageRaw.chatwootConversationId = chatwootSentMessage.id;
}
}
await this.prismaRepository.message.create({
data: messageRaw,
});
const contact = await this.prismaRepository.contact.findFirst({
where: { instanceId: this.instanceId, remoteJid: key.remoteJid },
});
const contactRaw: any = {
remoteJid: messageRaw.key.remoteJid,
pushName: received.pushName,
instanceId: this.instanceId,
};
if (contactRaw.remoteJid === 'status@broadcast') {
return;
}
if (contact) {
const contactRaw: any = {
remoteJid: messageRaw.key.remoteJid,
pushName: received.pushName,
instanceId: this.instanceId,
};
this.sendDataWebhook(Events.CONTACTS_UPDATE, contactRaw);
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED && this.localChatwoot.enabled) {
await this.chatwootService.eventWhatsapp(
Events.CONTACTS_UPDATE,
{ instanceName: this.instance.name, instanceId: this.instanceId },
contactRaw,
);
}
await this.prismaRepository.contact.updateMany({
where: { remoteJid: contact.remoteJid },
data: contactRaw,
});
return;
}
this.sendDataWebhook(Events.CONTACTS_UPSERT, contactRaw);
this.prismaRepository.contact.create({
data: contactRaw,
});
}
} catch (error) {
this.logger.error(error);
}
}
protected async sendMessageWithTyping(number: string, message: any, options?: Options, isIntegration = false) {
try {
let quoted: any;
let webhookUrl: any;
if (options?.quoted) {
const m = options?.quoted;
const msg = m?.key;
if (!msg) {
throw 'Message not found';
}
quoted = msg;
}
if (options?.webhookUrl) {
webhookUrl = options.webhookUrl;
}
const messageRaw: any = {
key: { fromMe: true, id: 'ID', remoteJid: this.createJid(number) },
message: {
...message,
quoted,
},
messageType: 'conversation',
messageTimestamp: Math.round(new Date().getTime() / 1000),
webhookUrl,
source: 'unknown',
instanceId: this.instanceId,
};
this.logger.log(messageRaw);
this.sendDataWebhook(Events.SEND_MESSAGE, messageRaw);
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED && this.localChatwoot.enabled && !isIntegration) {
this.chatwootService.eventWhatsapp(
Events.SEND_MESSAGE,
{ instanceName: this.instance.name, instanceId: this.instanceId },
messageRaw,
);
}
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED && this.localChatwoot.enabled && isIntegration)
await chatbotController.emit({
instance: { instanceName: this.instance.name, instanceId: this.instanceId },
remoteJid: messageRaw.key.remoteJid,
msg: messageRaw,
pushName: messageRaw.pushName,
});
await this.prismaRepository.message.create({
data: messageRaw,
});
return messageRaw;
} catch (error) {
this.logger.error(error);
throw new BadRequestException(error.toString());
}
}
public async textMessage(data: SendTextDto, isIntegration = false) {
const res = await this.sendMessageWithTyping(
data.number,
{
conversation: data.text,
},
{
delay: data?.delay,
presence: 'composing',
quoted: data?.quoted,
linkPreview: data?.linkPreview,
mentionsEveryOne: data?.mentionsEveryOne,
mentioned: data?.mentioned,
},
isIntegration,
);
return res;
}
public async mediaMessage(data: SendMediaDto, isIntegration = false) {
const message = data;
return await this.sendMessageWithTyping(
data.number,
{ ...message },
{
delay: data?.delay,
presence: 'composing',
quoted: data?.quoted,
linkPreview: data?.linkPreview,
mentionsEveryOne: data?.mentionsEveryOne,
mentioned: data?.mentioned,
},
isIntegration,
);
}
public async audioWhatsapp(data: SendAudioDto, isIntegration = false) {
const message = data;
return await this.sendMessageWithTyping(
data.number,
{ ...message },
{
delay: data?.delay,
presence: 'composing',
quoted: data?.quoted,
linkPreview: data?.linkPreview,
mentionsEveryOne: data?.mentionsEveryOne,
mentioned: data?.mentioned,
},
isIntegration,
);
}
public async buttonMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async locationMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async listMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async templateMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async contactMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async reactionMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async getBase64FromMediaMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async deleteMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async mediaSticker() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async pollMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async statusMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async reloadConnection() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async whatsappNumber() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async markMessageAsRead() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async archiveChat() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async markChatUnread() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async fetchProfile() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async sendPresence() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async setPresence() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async fetchPrivacySettings() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updatePrivacySettings() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async fetchBusinessProfile() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateProfileName() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateProfileStatus() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateProfilePicture() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async removeProfilePicture() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async blockUser() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateMessage() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async createGroup() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateGroupPicture() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateGroupSubject() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateGroupDescription() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async findGroup() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async fetchAllGroups() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async inviteCode() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async inviteInfo() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async sendInvite() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async acceptInviteCode() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async revokeInviteCode() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async findParticipants() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateGParticipant() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async updateGSetting() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async toggleEphemeral() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async leaveGroup() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async fetchLabels() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async handleLabel() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async receiveMobileCode() {
throw new BadRequestException('Method not available on Evolution Channel');
}
public async fakeCall() {
throw new BadRequestException('Method not available on Evolution Channel');
}
}

View File

@@ -0,0 +1,39 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Logger } from '@config/logger.config';
import { ChannelController, ChannelControllerInterface } from '../channel.controller';
export class EvolutionController extends ChannelController implements ChannelControllerInterface {
private readonly logger = new Logger(EvolutionController.name);
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor);
}
integrationEnabled: boolean;
public async receiveWebhook(data: any) {
const numberId = data.numberId;
if (!numberId) {
this.logger.error('WebhookService -> receiveWebhookEvolution -> numberId not found');
return;
}
const instance = await this.prismaRepository.instance.findFirst({
where: { number: numberId },
});
if (!instance) {
this.logger.error('WebhookService -> receiveWebhook -> instance not found');
return;
}
await this.waMonitor.waInstances[instance.name].connectToWhatsapp(data);
return {
status: 'success',
};
}
}

View File

@@ -0,0 +1,18 @@
import { RouterBroker } from '@api/abstract/abstract.router';
import { evolutionController } from '@api/server.module';
import { ConfigService } from '@config/env.config';
import { Router } from 'express';
export class EvolutionRouter extends RouterBroker {
constructor(readonly configService: ConfigService) {
super();
this.router.post(this.routerPath('webhook/evolution', false), async (req, res) => {
const { body } = req;
const response = await evolutionController.receiveWebhook(body);
return res.status(200).json(response);
});
}
public readonly router: Router = Router();
}

View File

@@ -0,0 +1,72 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Logger } from '@config/logger.config';
import axios from 'axios';
import { ChannelController, ChannelControllerInterface } from '../channel.controller';
export class MetaController extends ChannelController implements ChannelControllerInterface {
private readonly logger = new Logger(MetaController.name);
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor);
}
integrationEnabled: boolean;
public async receiveWebhook(data: any) {
if (data.object === 'whatsapp_business_account') {
if (data.entry[0]?.changes[0]?.field === 'message_template_status_update') {
const template = await this.prismaRepository.template.findFirst({
where: { templateId: `${data.entry[0].changes[0].value.message_template_id}` },
});
if (!template) {
console.log('template not found');
return;
}
const { webhookUrl } = template;
await axios.post(webhookUrl, data.entry[0].changes[0].value, {
headers: {
'Content-Type': 'application/json',
},
});
return;
}
data.entry?.forEach(async (entry: any) => {
const numberId = entry.changes[0].value.metadata.phone_number_id;
if (!numberId) {
this.logger.error('WebhookService -> receiveWebhookMeta -> numberId not found');
return {
status: 'success',
};
}
const instance = await this.prismaRepository.instance.findFirst({
where: { number: numberId },
});
if (!instance) {
this.logger.error('WebhookService -> receiveWebhookMeta -> instance not found');
return {
status: 'success',
};
}
await this.waMonitor.waInstances[instance.name].connectToWhatsapp(data);
return {
status: 'success',
};
});
}
return {
status: 'success',
};
}
}

View File

@@ -0,0 +1,24 @@
import { RouterBroker } from '@api/abstract/abstract.router';
import { metaController } from '@api/server.module';
import { ConfigService, WaBusiness } from '@config/env.config';
import { Router } from 'express';
export class MetaRouter extends RouterBroker {
constructor(readonly configService: ConfigService) {
super();
this.router
.get(this.routerPath('webhook/meta', false), async (req, res) => {
if (req.query['hub.verify_token'] === configService.get<WaBusiness>('WA_BUSINESS').TOKEN_WEBHOOK)
res.send(req.query['hub.challenge']);
else res.send('Error, wrong validation token');
})
.post(this.routerPath('webhook/meta', false), async (req, res) => {
const { body } = req;
const response = await metaController.receiveWebhook(body);
return res.status(200).json(response);
});
}
public readonly router: Router = Router();
}

View File

@@ -85,17 +85,10 @@ export class BusinessStartupService extends ChannelStartupService {
public async profilePicture(number: string) {
const jid = this.createJid(number);
try {
return {
wuid: jid,
profilePictureUrl: await this.client.profilePictureUrl(jid, 'image'),
};
} catch (error) {
return {
wuid: jid,
profilePictureUrl: null,
};
}
return {
wuid: jid,
profilePictureUrl: null,
};
}
public async getProfileName() {

View File

@@ -92,10 +92,10 @@ import makeWASocket, {
getContentType,
getDevice,
GroupMetadata,
GroupParticipant,
// GroupParticipant,
isJidBroadcast,
isJidGroup,
// isJidNewsletter,
isJidNewsletter,
isJidUser,
makeCacheableSignalKeyStore,
MessageUpsertType,
@@ -144,7 +144,6 @@ export class BaileysStartupService extends ChannelStartupService {
) {
super(configService, eventEmitter, prismaRepository, chatwootCache);
this.instance.qrcode = { count: 0 };
// this.recoveringMessages();
this.authStateProvider = new AuthStateProvider(this.providerFiles);
}
@@ -159,58 +158,6 @@ export class BaileysStartupService extends ChannelStartupService {
public phoneNumber: string;
// private async recoveringMessages() {
// const cacheConf = this.configService.get<CacheConf>('CACHE');
// if ((cacheConf?.REDIS?.ENABLED && cacheConf?.REDIS?.URI !== '') || cacheConf?.LOCAL?.ENABLED) {
// this.logger.info('Recovering messages lost from cache');
// setInterval(async () => {
// this.baileysCache.keys().then((keys) => {
// keys.forEach(async (key) => {
// const data = await this.baileysCache.get(key.split(':')[2]);
// let message: any;
// let retry: number;
// if (!data?.message) {
// message = data;
// retry = 0;
// } else {
// message = data.message;
// retry = data.retry;
// }
// if (message.messageStubParameters && message.messageStubParameters[0] === 'Message absent from node') {
// retry = retry + 1;
// this.logger.info(`Message absent from node, retrying to send, key: ${key.split(':')[2]} retry: ${retry}`);
// if (message.messageStubParameters[1]) {
// await this.client.sendMessageAck(JSON.parse(message.messageStubParameters[1], BufferJSON.reviver));
// }
// this.baileysCache.set(key.split(':')[2], { message, retry });
// if (retry >= 100) {
// this.logger.warn(`Message absent from node, retry limit reached, key: ${key.split(':')[2]}`);
// this.baileysCache.delete(key.split(':')[2]);
// return;
// }
// }
// });
// });
// // 15 minutes
// }, 15 * 60 * 1000);
// }
// }
// private async forceUpdateGroupMetadataCache() {
// this.logger.verbose('Force update group metadata cache');
// const groups = await this.fetchAllGroups({ getParticipants: 'false' });
// for (const group of groups) {
// await this.updateGroupMetadataCache(group.id);
// }
// }
public get connectionStatus() {
return this.stateConnection;
}
@@ -418,17 +365,6 @@ export class BaileysStartupService extends ChannelStartupService {
}
}
// if (connection === 'connecting') {
// if (this.configService.get<Database>('DATABASE').ENABLED) {
// await this.prismaRepository.instance.update({
// where: { id: this.instanceId },
// data: {
// connectionStatus: 'connecting',
// },
// });
// }
// }
if (connection === 'open') {
this.instance.wuid = this.client.user.id.replace(/:\d+/, '');
this.instance.profilePictureUrl = (await this.profilePicture(this.instance.wuid)).profilePictureUrl;
@@ -625,8 +561,8 @@ export class BaileysStartupService extends ChannelStartupService {
shouldIgnoreJid: (jid) => {
const isGroupJid = this.localSettings.groupsIgnore && isJidGroup(jid);
const isBroadcast = !this.localSettings.readStatus && isJidBroadcast(jid);
// const isNewsletter = !isJidNewsletter(jid);
const isNewsletter = jid && jid.includes('newsletter');
const isNewsletter = isJidNewsletter(jid);
// const isNewsletter = jid && jid.includes('newsletter');
return isGroupJid || isBroadcast || isNewsletter;
},
@@ -634,7 +570,7 @@ export class BaileysStartupService extends ChannelStartupService {
shouldSyncHistoryMessage: (msg: proto.Message.IHistorySyncNotification) => {
return this.historySyncNotification(msg);
},
// cachedGroupMetadata: this.getGroupMetadataCache,
cachedGroupMetadata: this.getGroupMetadataCache,
userDevicesCache: this.userDevicesCache,
transactionOpts: { maxCommitRetries: 10, delayBetweenTriesMs: 3000 },
};
@@ -798,6 +734,10 @@ export class BaileysStartupService extends ChannelStartupService {
const findParticipant = await this.chatwootService.findContact(instance, contact.remoteJid.split('@')[0]);
if (!findParticipant) {
return;
}
this.chatwootService.updateContact(instance, findParticipant.id, {
name: contact.pushName,
avatar_url: contact.profilePicUrl,
@@ -1024,15 +964,15 @@ export class BaileysStartupService extends ChannelStartupService {
if (received.message?.conversation || received.message?.extendedTextMessage?.text) {
const text = received.message?.conversation || received.message?.extendedTextMessage?.text;
if (text == 'requestPlaceholder' && !requestId) {
// const messageId = await this.client.requestPlaceholderResend(received.key);
// console.log('requested placeholder resync, id=', messageId);
const messageId = await this.client.requestPlaceholderResend(received.key);
console.log('requested placeholder resync, id=', messageId);
} else if (requestId) {
console.log('Message received from phone, id=', requestId, received);
}
if (text == 'onDemandHistSync') {
// const messageId = await this.client.fetchMessageHistory(50, received.key, received.messageTimestamp!);
// console.log('requested on-demand sync, id=', messageId);
const messageId = await this.client.fetchMessageHistory(50, received.key, received.messageTimestamp!);
console.log('requested on-demand sync, id=', messageId);
}
}
@@ -1092,7 +1032,7 @@ export class BaileysStartupService extends ChannelStartupService {
pushName: received.pushName,
message: { ...received.message },
contextInfo: contentMsg?.contextInfo,
messageType: getContentType(received.message),
messageType: getContentType(received.message) || 'unknown',
messageTimestamp: received.messageTimestamp as number,
instanceId: this.instanceId,
source: getDevice(received.key.id),
@@ -1737,7 +1677,7 @@ export class BaileysStartupService extends ChannelStartupService {
quoted: any,
messageId?: string,
ephemeralExpiration?: number,
participants?: GroupParticipant[],
// participants?: GroupParticipant[],
) {
const option: any = {
quoted,
@@ -1745,10 +1685,10 @@ export class BaileysStartupService extends ChannelStartupService {
if (isJidGroup(sender)) {
option.useCachedGroupMetadata = true;
if (participants)
option.cachedGroupMetadata = async () => {
return { participants: participants as GroupParticipant[] };
};
// if (participants)
// option.cachedGroupMetadata = async () => {
// return { participants: participants as GroupParticipant[] };
// };
}
if (ephemeralExpiration) option.ephemeralExpiration = ephemeralExpiration;

View File

@@ -3,6 +3,7 @@ import { PrismaRepository } from '@api/repository/repository.service';
import {
difyController,
eventManager,
flowiseController,
genericController,
openaiController,
typebotController,
@@ -89,6 +90,9 @@ export class ChatbotController {
pushName,
isIntegration,
};
// generic
await genericController.emit(emitData);
// typebot
await typebotController.emit(emitData);
@@ -98,8 +102,8 @@ export class ChatbotController {
// dify
await difyController.emit(emitData);
// generic
await genericController.emit(emitData);
// flowise
await flowiseController.emit(emitData);
}
public async setInstance(instanceName: string, data: InstanceDto): Promise<any> {

View File

@@ -4,6 +4,7 @@ import { OpenaiRouter } from '@api/integrations/chatbot/openai/routes/openai.rou
import { TypebotRouter } from '@api/integrations/chatbot/typebot/routes/typebot.router';
import { Router } from 'express';
import { FlowiseRouter } from './flowise/routes/flowise.router';
import { GenericRouter } from './generic/routes/generic.router';
export class ChatbotRouter {
@@ -12,10 +13,11 @@ export class ChatbotRouter {
constructor(...guards: any[]) {
this.router = Router();
this.router.use('/generic', new GenericRouter(...guards).router);
this.router.use('/chatwoot', new ChatwootRouter(...guards).router);
this.router.use('/typebot', new TypebotRouter(...guards).router);
this.router.use('/openai', new OpenaiRouter(...guards).router);
this.router.use('/dify', new DifyRouter(...guards).router);
this.router.use('/generic', new GenericRouter(...guards).router);
this.router.use('/flowise', new FlowiseRouter(...guards).router);
}
}

View File

@@ -1,5 +1,6 @@
export * from '@api/integrations/chatbot/chatwoot/validate/chatwoot.schema';
export * from '@api/integrations/chatbot/dify/validate/dify.schema';
export * from '@api/integrations/chatbot/flowise/validate/flowise.schema';
export * from '@api/integrations/chatbot/generic/validate/generic.schema';
export * from '@api/integrations/chatbot/openai/validate/openai.schema';
export * from '@api/integrations/chatbot/typebot/validate/typebot.schema';

View File

@@ -1,6 +1,7 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Integration } from '@api/types/wa.types';
import { Auth, ConfigService, HttpServer } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { Dify, DifySetting, IntegrationSession } from '@prisma/client';
@@ -41,6 +42,15 @@ export class DifyService {
return content.includes('imageMessage');
}
private isJSON(str: string): boolean {
try {
JSON.parse(str);
return true;
} catch (e) {
return false;
}
}
private async sendMessageToBot(
instance: any,
session: IntegrationSession,
@@ -50,228 +60,287 @@ export class DifyService {
pushName: string,
content: string,
) {
let endpoint: string = dify.apiUrl;
try {
let endpoint: string = dify.apiUrl;
if (dify.botType === 'chatBot') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
query: content,
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
if (dify.botType === 'chatBot') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
];
payload.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
}
if (dify.botType === 'textGenerator') {
endpoint += '/completion-messages';
const payload: any = {
inputs: {
query: content,
pushName: pushName,
remoteJid: remoteJid,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.inputs.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
}
if (dify.botType === 'agent') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
query: content,
response_mode: 'streaming',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
responseType: 'stream',
});
let conversationId;
const stream = response.data;
const reader = new Readable().wrap(stream);
reader.on('data', (chunk) => {
const data = chunk.toString();
try {
const event = JSON.parse(data);
if (event.event === 'agent_message') {
conversationId = conversationId ?? event?.conversation_id;
}
} catch (error) {
console.error('Error parsing stream data:', error);
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.query = contentSplit[2] || content;
}
});
reader.on('end', async () => {
await instance.client.sendPresenceUpdate('paused', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
const conversationId = response?.data?.conversation_id;
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
});
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
reader.on('error', (error) => {
console.error('Error reading stream:', error);
});
return;
}
if (dify.botType === 'workflow') {
endpoint += '/workflows/run';
const payload: any = {
inputs: {
query: content,
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
];
payload.inputs.query = contentSplit[2] || content;
data: {
status: 'opened',
awaitUser: true,
sessionId: session.sessionId === remoteJid ? conversationId : session.sessionId,
},
});
}
await instance.client.presenceSubscribe(remoteJid);
if (dify.botType === 'textGenerator') {
endpoint += '/completion-messages';
const payload: any = {
inputs: {
query: content,
pushName: pushName,
remoteJid: remoteJid,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
await instance.client.sendPresenceUpdate('composing', remoteJid);
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.inputs.query = contentSplit[2] || content;
}
await instance.client.sendPresenceUpdate('paused', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
const message = response?.data?.data.outputs.text;
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
await this.sendMessageWhatsApp(instance, remoteJid, message, session, settings);
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;
const conversationId = response?.data?.conversation_id;
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
sessionId: session.sessionId === remoteJid ? conversationId : session.sessionId,
},
});
}
if (dify.botType === 'agent') {
endpoint += '/chat-messages';
const payload: any = {
inputs: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
query: content,
response_mode: 'streaming',
conversation_id: session.sessionId === remoteJid ? undefined : session.sessionId,
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.query = contentSplit[2] || content;
}
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
responseType: 'stream',
});
let conversationId;
let answer = '';
const stream = response.data;
const reader = new Readable().wrap(stream);
reader.on('data', (chunk) => {
const data = chunk.toString();
try {
const cleanedData = data.replace(/^data:\s*/, '');
const event = JSON.parse(cleanedData);
if (event?.event === 'agent_message') {
console.log('event:', event);
conversationId = conversationId ?? event?.conversation_id;
answer += event?.answer;
}
} catch (error) {
console.error('Error parsing stream data:', error);
}
});
reader.on('end', async () => {
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = answer;
console.log('message:', answer);
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
sessionId: conversationId,
},
});
});
reader.on('error', (error) => {
console.error('Error reading stream:', error);
});
return;
}
if (dify.botType === 'workflow') {
endpoint += '/workflows/run';
const payload: any = {
inputs: {
query: content,
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
response_mode: 'blocking',
user: remoteJid,
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.files = [
{
type: 'image',
transfer_method: 'remote_url',
url: contentSplit[1].split('?')[0],
},
];
payload.inputs.query = contentSplit[2] || content;
}
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
const response = await axios.post(endpoint, payload, {
headers: {
Authorization: `Bearer ${dify.apiKey}`,
},
});
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.data.outputs.text;
await this.sendMessageWhatsApp(instance, remoteJid, message, settings);
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
},
});
return;
}
} catch (error) {
this.logger.error(error.response?.data || error);
return;
}
}
private async sendMessageWhatsApp(
instance: any,
remoteJid: string,
message: string,
session: IntegrationSession,
settings: DifySetting,
) {
private async sendMessageWhatsApp(instance: any, remoteJid: string, message: string, settings: DifySetting) {
const regex = /!?\[(.*?)\]\((.*?)\)/g;
const result = [];
@@ -318,23 +387,6 @@ export class DifyService {
}
}
if (settings.keepOpen) {
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'closed',
},
});
} else {
await this.prismaRepository.integrationSession.delete({
where: {
id: session.id,
},
});
}
sendTelemetry('/message/sendText');
}

View File

@@ -0,0 +1,797 @@
import { IgnoreJidDto } from '@api/dto/chatbot.dto';
import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Logger } from '@config/logger.config';
import { getConversationMessage } from '@utils/getConversationMessage';
import { ChatbotController, ChatbotControllerInterface, EmitData } from '../../chatbot.controller';
import { FlowiseDto } from '../dto/flowise.dto';
import { FlowiseService } from '../services/flowise.service';
export class FlowiseController extends ChatbotController implements ChatbotControllerInterface {
constructor(
private readonly flowiseService: FlowiseService,
prismaRepository: PrismaRepository,
waMonitor: WAMonitoringService,
) {
super(prismaRepository, waMonitor);
this.botRepository = this.prismaRepository.flowise;
this.settingsRepository = this.prismaRepository.flowiseSetting;
this.sessionRepository = this.prismaRepository.integrationSession;
}
public readonly logger = new Logger(FlowiseController.name);
integrationEnabled: boolean;
botRepository: any;
settingsRepository: any;
sessionRepository: any;
userMessageDebounce: { [key: string]: { message: string; timeoutId: NodeJS.Timeout } } = {};
// Bots
public async createBot(instance: InstanceDto, data: FlowiseDto) {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
if (
!data.expire ||
!data.keywordFinish ||
!data.delayMessage ||
!data.unknownMessage ||
!data.listeningFromMe ||
!data.stopBotFromMe ||
!data.keepOpen ||
!data.debounceTime ||
!data.ignoreJids
) {
const defaultSettingCheck = await this.settingsRepository.findFirst({
where: {
instanceId: instanceId,
},
});
if (!data.expire) data.expire = defaultSettingCheck?.expire || 0;
if (!data.keywordFinish) data.keywordFinish = defaultSettingCheck?.keywordFinish || '';
if (!data.delayMessage) data.delayMessage = defaultSettingCheck?.delayMessage || 1000;
if (!data.unknownMessage) data.unknownMessage = defaultSettingCheck?.unknownMessage || '';
if (!data.listeningFromMe) data.listeningFromMe = defaultSettingCheck?.listeningFromMe || false;
if (!data.stopBotFromMe) data.stopBotFromMe = defaultSettingCheck?.stopBotFromMe || false;
if (!data.keepOpen) data.keepOpen = defaultSettingCheck?.keepOpen || false;
if (!data.debounceTime) data.debounceTime = defaultSettingCheck?.debounceTime || 0;
if (!data.ignoreJids) data.ignoreJids = defaultSettingCheck?.ignoreJids || [];
if (!defaultSettingCheck) {
await this.settings(instance, {
expire: data.expire,
keywordFinish: data.keywordFinish,
delayMessage: data.delayMessage,
unknownMessage: data.unknownMessage,
listeningFromMe: data.listeningFromMe,
stopBotFromMe: data.stopBotFromMe,
keepOpen: data.keepOpen,
debounceTime: data.debounceTime,
ignoreJids: data.ignoreJids,
});
}
}
const checkTriggerAll = await this.botRepository.findFirst({
where: {
enabled: true,
triggerType: 'all',
instanceId: instanceId,
},
});
if (checkTriggerAll && data.triggerType === 'all') {
throw new Error('You already have a Flowise with an "All" trigger, you cannot have more bots while it is active');
}
const checkDuplicate = await this.botRepository.findFirst({
where: {
instanceId: instanceId,
apiUrl: data.apiUrl,
apiKey: data.apiKey,
},
});
if (checkDuplicate) {
throw new Error('Flowise already exists');
}
if (data.triggerType === 'keyword') {
if (!data.triggerOperator || !data.triggerValue) {
throw new Error('Trigger operator and value are required');
}
const checkDuplicate = await this.botRepository.findFirst({
where: {
triggerOperator: data.triggerOperator,
triggerValue: data.triggerValue,
instanceId: instanceId,
},
});
if (checkDuplicate) {
throw new Error('Trigger already exists');
}
}
if (data.triggerType === 'advanced') {
if (!data.triggerValue) {
throw new Error('Trigger value is required');
}
const checkDuplicate = await this.botRepository.findFirst({
where: {
triggerValue: data.triggerValue,
instanceId: instanceId,
},
});
if (checkDuplicate) {
throw new Error('Trigger already exists');
}
}
try {
const bot = await this.botRepository.create({
data: {
enabled: data.enabled,
description: data.description,
apiUrl: data.apiUrl,
apiKey: data.apiKey,
expire: data.expire,
keywordFinish: data.keywordFinish,
delayMessage: data.delayMessage,
unknownMessage: data.unknownMessage,
listeningFromMe: data.listeningFromMe,
stopBotFromMe: data.stopBotFromMe,
keepOpen: data.keepOpen,
debounceTime: data.debounceTime,
instanceId: instanceId,
triggerType: data.triggerType,
triggerOperator: data.triggerOperator,
triggerValue: data.triggerValue,
ignoreJids: data.ignoreJids,
},
});
return bot;
} catch (error) {
this.logger.error(error);
throw new Error('Error creating bot');
}
}
public async findBot(instance: InstanceDto) {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const bots = await this.botRepository.findMany({
where: {
instanceId: instanceId,
},
});
if (!bots.length) {
return null;
}
return bots;
}
public async fetchBot(instance: InstanceDto, botId: string) {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const bot = await this.botRepository.findFirst({
where: {
id: botId,
},
});
if (!bot) {
throw new Error('Bot not found');
}
if (bot.instanceId !== instanceId) {
throw new Error('Bot not found');
}
return bot;
}
public async updateBot(instance: InstanceDto, botId: string, data: FlowiseDto) {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const bot = await this.botRepository.findFirst({
where: {
id: botId,
},
});
if (!bot) {
throw new Error('Bot not found');
}
if (bot.instanceId !== instanceId) {
throw new Error('Bot not found');
}
if (data.triggerType === 'all') {
const checkTriggerAll = await this.botRepository.findFirst({
where: {
enabled: true,
triggerType: 'all',
id: {
not: botId,
},
instanceId: instanceId,
},
});
if (checkTriggerAll) {
throw new Error('You already have a bot with an "All" trigger, you cannot have more bots while it is active');
}
}
const checkDuplicate = await this.botRepository.findFirst({
where: {
id: {
not: botId,
},
instanceId: instanceId,
apiUrl: data.apiUrl,
apiKey: data.apiKey,
},
});
if (checkDuplicate) {
throw new Error('Bot already exists');
}
if (data.triggerType === 'keyword') {
if (!data.triggerOperator || !data.triggerValue) {
throw new Error('Trigger operator and value are required');
}
const checkDuplicate = await this.botRepository.findFirst({
where: {
triggerOperator: data.triggerOperator,
triggerValue: data.triggerValue,
id: { not: botId },
instanceId: instanceId,
},
});
if (checkDuplicate) {
throw new Error('Trigger already exists');
}
}
if (data.triggerType === 'advanced') {
if (!data.triggerValue) {
throw new Error('Trigger value is required');
}
const checkDuplicate = await this.botRepository.findFirst({
where: {
triggerValue: data.triggerValue,
id: { not: botId },
instanceId: instanceId,
},
});
if (checkDuplicate) {
throw new Error('Trigger already exists');
}
}
try {
const bot = await this.botRepository.update({
where: {
id: botId,
},
data: {
enabled: data.enabled,
apiUrl: data.apiUrl,
apiKey: data.apiKey,
expire: data.expire,
keywordFinish: data.keywordFinish,
delayMessage: data.delayMessage,
unknownMessage: data.unknownMessage,
listeningFromMe: data.listeningFromMe,
stopBotFromMe: data.stopBotFromMe,
keepOpen: data.keepOpen,
debounceTime: data.debounceTime,
instanceId: instanceId,
triggerType: data.triggerType,
triggerOperator: data.triggerOperator,
triggerValue: data.triggerValue,
ignoreJids: data.ignoreJids,
},
});
return bot;
} catch (error) {
this.logger.error(error);
throw new Error('Error updating bot');
}
}
public async deleteBot(instance: InstanceDto, botId: string) {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const bot = await this.botRepository.findFirst({
where: {
id: botId,
},
});
if (!bot) {
throw new Error('Bot not found');
}
if (bot.instanceId !== instanceId) {
throw new Error('Bot not found');
}
try {
await this.prismaRepository.integrationSession.deleteMany({
where: {
botId: botId,
},
});
await this.botRepository.delete({
where: {
id: botId,
},
});
return { bot: { id: botId } };
} catch (error) {
this.logger.error(error);
throw new Error('Error deleting bot');
}
}
// Settings
public async settings(instance: InstanceDto, data: any) {
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const settings = await this.settingsRepository.findFirst({
where: {
instanceId: instanceId,
},
});
if (settings) {
const updateSettings = await this.settingsRepository.update({
where: {
id: settings.id,
},
data: {
expire: data.expire,
keywordFinish: data.keywordFinish,
delayMessage: data.delayMessage,
unknownMessage: data.unknownMessage,
listeningFromMe: data.listeningFromMe,
stopBotFromMe: data.stopBotFromMe,
keepOpen: data.keepOpen,
debounceTime: data.debounceTime,
flowiseIdFallback: data.flowiseIdFallback,
ignoreJids: data.ignoreJids,
},
});
return {
expire: updateSettings.expire,
keywordFinish: updateSettings.keywordFinish,
delayMessage: updateSettings.delayMessage,
unknownMessage: updateSettings.unknownMessage,
listeningFromMe: updateSettings.listeningFromMe,
stopBotFromMe: updateSettings.stopBotFromMe,
keepOpen: updateSettings.keepOpen,
debounceTime: updateSettings.debounceTime,
flowiseIdFallback: updateSettings.flowiseIdFallback,
ignoreJids: updateSettings.ignoreJids,
};
}
const newSetttings = await this.settingsRepository.create({
data: {
expire: data.expire,
keywordFinish: data.keywordFinish,
delayMessage: data.delayMessage,
unknownMessage: data.unknownMessage,
listeningFromMe: data.listeningFromMe,
stopBotFromMe: data.stopBotFromMe,
keepOpen: data.keepOpen,
debounceTime: data.debounceTime,
flowiseIdFallback: data.flowiseIdFallback,
ignoreJids: data.ignoreJids,
instanceId: instanceId,
},
});
return {
expire: newSetttings.expire,
keywordFinish: newSetttings.keywordFinish,
delayMessage: newSetttings.delayMessage,
unknownMessage: newSetttings.unknownMessage,
listeningFromMe: newSetttings.listeningFromMe,
stopBotFromMe: newSetttings.stopBotFromMe,
keepOpen: newSetttings.keepOpen,
debounceTime: newSetttings.debounceTime,
flowiseIdFallback: newSetttings.flowiseIdFallback,
ignoreJids: newSetttings.ignoreJids,
};
} catch (error) {
this.logger.error(error);
throw new Error('Error setting default settings');
}
}
public async fetchSettings(instance: InstanceDto) {
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const settings = await this.settingsRepository.findFirst({
where: {
instanceId: instanceId,
},
include: {
Fallback: true,
},
});
if (!settings) {
return {
expire: 0,
keywordFinish: '',
delayMessage: 0,
unknownMessage: '',
listeningFromMe: false,
stopBotFromMe: false,
keepOpen: false,
ignoreJids: [],
flowiseIdFallback: '',
fallback: null,
};
}
return {
expire: settings.expire,
keywordFinish: settings.keywordFinish,
delayMessage: settings.delayMessage,
unknownMessage: settings.unknownMessage,
listeningFromMe: settings.listeningFromMe,
stopBotFromMe: settings.stopBotFromMe,
keepOpen: settings.keepOpen,
ignoreJids: settings.ignoreJids,
flowiseIdFallback: settings.flowiseIdFallback,
fallback: settings.Fallback,
};
} catch (error) {
this.logger.error(error);
throw new Error('Error fetching default settings');
}
}
// Sessions
public async changeStatus(instance: InstanceDto, data: any) {
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const defaultSettingCheck = await this.settingsRepository.findFirst({
where: {
instanceId,
},
});
const remoteJid = data.remoteJid;
const status = data.status;
if (status === 'delete') {
await this.sessionRepository.deleteMany({
where: {
remoteJid: remoteJid,
botId: { not: null },
},
});
return { bot: { remoteJid: remoteJid, status: status } };
}
if (status === 'closed') {
if (defaultSettingCheck?.keepOpen) {
await this.sessionRepository.updateMany({
where: {
remoteJid: remoteJid,
botId: { not: null },
},
data: {
status: 'closed',
},
});
} else {
await this.sessionRepository.deleteMany({
where: {
remoteJid: remoteJid,
botId: { not: null },
},
});
}
return { bot: { ...instance, bot: { remoteJid: remoteJid, status: status } } };
} else {
const session = await this.sessionRepository.updateMany({
where: {
instanceId: instanceId,
remoteJid: remoteJid,
botId: { not: null },
},
data: {
status: status,
},
});
const botData = {
remoteJid: remoteJid,
status: status,
session,
};
return { bot: { ...instance, bot: botData } };
}
} catch (error) {
this.logger.error(error);
throw new Error('Error changing status');
}
}
public async fetchSessions(instance: InstanceDto, botId: string, remoteJid?: string) {
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const bot = await this.botRepository.findFirst({
where: {
id: botId,
},
});
if (bot && bot.instanceId !== instanceId) {
throw new Error('Dify not found');
}
return await this.sessionRepository.findMany({
where: {
instanceId: instanceId,
remoteJid,
botId: bot ? botId : { not: null },
},
});
} catch (error) {
this.logger.error(error);
throw new Error('Error fetching sessions');
}
}
public async ignoreJid(instance: InstanceDto, data: IgnoreJidDto) {
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
where: {
name: instance.instanceName,
},
})
.then((instance) => instance.id);
const settings = await this.settingsRepository.findFirst({
where: {
instanceId: instanceId,
},
});
if (!settings) {
throw new Error('Settings not found');
}
let ignoreJids: any = settings?.ignoreJids || [];
if (data.action === 'add') {
if (ignoreJids.includes(data.remoteJid)) return { ignoreJids: ignoreJids };
ignoreJids.push(data.remoteJid);
} else {
ignoreJids = ignoreJids.filter((jid) => jid !== data.remoteJid);
}
const updateSettings = await this.settingsRepository.update({
where: {
id: settings.id,
},
data: {
ignoreJids: ignoreJids,
},
});
return {
ignoreJids: updateSettings.ignoreJids,
};
} catch (error) {
this.logger.error(error);
throw new Error('Error setting default settings');
}
}
// Emit
public async emit({ instance, remoteJid, msg }: EmitData) {
try {
const settings = await this.settingsRepository.findFirst({
where: {
instanceId: instance.instanceId,
},
});
if (this.checkIgnoreJids(settings?.ignoreJids, remoteJid)) return;
const session = await this.getSession(remoteJid, instance);
const content = getConversationMessage(msg);
const findBot = await this.findBotTrigger(
this.botRepository,
this.settingsRepository,
content,
instance,
session,
);
if (!findBot) return;
let expire = findBot.expire;
let keywordFinish = findBot.keywordFinish;
let delayMessage = findBot.delayMessage;
let unknownMessage = findBot.unknownMessage;
let listeningFromMe = findBot.listeningFromMe;
let stopBotFromMe = findBot.stopBotFromMe;
let keepOpen = findBot.keepOpen;
let debounceTime = findBot.debounceTime;
if (
!expire ||
!keywordFinish ||
!delayMessage ||
!unknownMessage ||
!listeningFromMe ||
!stopBotFromMe ||
!keepOpen ||
!debounceTime
) {
if (!expire) expire = settings.expire;
if (!keywordFinish) keywordFinish = settings.keywordFinish;
if (!delayMessage) delayMessage = settings.delayMessage;
if (!unknownMessage) unknownMessage = settings.unknownMessage;
if (!listeningFromMe) listeningFromMe = settings.listeningFromMe;
if (!stopBotFromMe) stopBotFromMe = settings.stopBotFromMe;
if (!keepOpen) keepOpen = settings.keepOpen;
if (!debounceTime) debounceTime = settings.debounceTime;
}
const key = msg.key as {
id: string;
remoteJid: string;
fromMe: boolean;
participant: string;
};
if (stopBotFromMe && key.fromMe && session) {
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'paused',
},
});
return;
}
if (!listeningFromMe && key.fromMe) {
return;
}
if (debounceTime && debounceTime > 0) {
this.processDebounce(this.userMessageDebounce, content, remoteJid, debounceTime, async (debouncedContent) => {
await this.flowiseService.processBot(
this.waMonitor.waInstances[instance.instanceName],
remoteJid,
findBot,
session,
settings,
debouncedContent,
msg?.pushName,
);
});
} else {
await this.flowiseService.processBot(
this.waMonitor.waInstances[instance.instanceName],
remoteJid,
findBot,
session,
settings,
content,
msg?.pushName,
);
}
return;
} catch (error) {
this.logger.error(error);
return;
}
}
}

View File

@@ -0,0 +1,33 @@
import { TriggerOperator, TriggerType } from '@prisma/client';
export class FlowiseDto {
enabled?: boolean;
description?: string;
apiUrl?: string;
apiKey?: string;
expire?: number;
keywordFinish?: string;
delayMessage?: number;
unknownMessage?: string;
listeningFromMe?: boolean;
stopBotFromMe?: boolean;
keepOpen?: boolean;
debounceTime?: number;
triggerType?: TriggerType;
triggerOperator?: TriggerOperator;
triggerValue?: string;
ignoreJids?: any;
}
export class FlowiseSettingDto {
expire?: number;
keywordFinish?: string;
delayMessage?: number;
unknownMessage?: string;
listeningFromMe?: boolean;
stopBotFromMe?: boolean;
keepOpen?: boolean;
debounceTime?: number;
flowiseIdFallback?: string;
ignoreJids?: any;
}

View File

@@ -0,0 +1,124 @@
import { RouterBroker } from '@api/abstract/abstract.router';
import { IgnoreJidDto } from '@api/dto/chatbot.dto';
import { InstanceDto } from '@api/dto/instance.dto';
import { HttpStatus } from '@api/routes/index.router';
import { flowiseController } from '@api/server.module';
import { instanceSchema } from '@validate/instance.schema';
import { RequestHandler, Router } from 'express';
import { FlowiseDto, FlowiseSettingDto } from '../dto/flowise.dto';
import {
flowiseIgnoreJidSchema,
flowiseSchema,
flowiseSettingSchema,
flowiseStatusSchema,
} from '../validate/flowise.schema';
export class FlowiseRouter extends RouterBroker {
constructor(...guards: RequestHandler[]) {
super();
this.router
.post(this.routerPath('create'), ...guards, async (req, res) => {
const response = await this.dataValidate<FlowiseDto>({
request: req,
schema: flowiseSchema,
ClassRef: FlowiseDto,
execute: (instance, data) => flowiseController.createBot(instance, data),
});
res.status(HttpStatus.CREATED).json(response);
})
.get(this.routerPath('find'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => flowiseController.findBot(instance),
});
res.status(HttpStatus.OK).json(response);
})
.get(this.routerPath('fetch/:flowiseId'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => flowiseController.fetchBot(instance, req.params.flowiseId),
});
res.status(HttpStatus.OK).json(response);
})
.put(this.routerPath('update/:flowiseId'), ...guards, async (req, res) => {
const response = await this.dataValidate<FlowiseDto>({
request: req,
schema: flowiseSchema,
ClassRef: FlowiseDto,
execute: (instance, data) => flowiseController.updateBot(instance, req.params.flowiseId, data),
});
res.status(HttpStatus.OK).json(response);
})
.delete(this.routerPath('delete/:flowiseId'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => flowiseController.deleteBot(instance, req.params.flowiseId),
});
res.status(HttpStatus.OK).json(response);
})
.post(this.routerPath('settings'), ...guards, async (req, res) => {
const response = await this.dataValidate<FlowiseSettingDto>({
request: req,
schema: flowiseSettingSchema,
ClassRef: FlowiseSettingDto,
execute: (instance, data) => flowiseController.settings(instance, data),
});
res.status(HttpStatus.OK).json(response);
})
.get(this.routerPath('fetchSettings'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => flowiseController.fetchSettings(instance),
});
res.status(HttpStatus.OK).json(response);
})
.post(this.routerPath('changeStatus'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: flowiseStatusSchema,
ClassRef: InstanceDto,
execute: (instance, data) => flowiseController.changeStatus(instance, data),
});
res.status(HttpStatus.OK).json(response);
})
.get(this.routerPath('fetchSessions/:flowiseId'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => flowiseController.fetchSessions(instance, req.params.flowiseId),
});
res.status(HttpStatus.OK).json(response);
})
.post(this.routerPath('ignoreJid'), ...guards, async (req, res) => {
const response = await this.dataValidate<IgnoreJidDto>({
request: req,
schema: flowiseIgnoreJidSchema,
ClassRef: IgnoreJidDto,
execute: (instance, data) => flowiseController.ignoreJid(instance, data),
});
res.status(HttpStatus.OK).json(response);
});
}
public readonly router: Router = Router();
}

View File

@@ -0,0 +1,309 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Integration } from '@api/types/wa.types';
import { Auth, ConfigService, HttpServer } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { Flowise, FlowiseSetting, IntegrationSession } from '@prisma/client';
import { sendTelemetry } from '@utils/sendTelemetry';
import axios from 'axios';
export class FlowiseService {
constructor(
private readonly waMonitor: WAMonitoringService,
private readonly configService: ConfigService,
private readonly prismaRepository: PrismaRepository,
) {}
private readonly logger = new Logger('FlowiseService');
public async createNewSession(instance: InstanceDto, data: any) {
try {
const session = await this.prismaRepository.integrationSession.create({
data: {
remoteJid: data.remoteJid,
sessionId: data.remoteJid,
status: 'opened',
awaitUser: false,
botId: data.botId,
instanceId: instance.instanceId,
},
});
return { session };
} catch (error) {
this.logger.error(error);
return;
}
}
private isImageMessage(content: string) {
return content.includes('imageMessage');
}
private async sendMessageToBot(
instance: any,
session: IntegrationSession,
bot: Flowise,
remoteJid: string,
pushName: string,
content: string,
) {
const payload: any = {
question: content,
overrideConfig: {
sessionId: remoteJid,
vars: {
remoteJid: remoteJid,
pushName: pushName,
instanceName: instance.instanceName,
serverUrl: this.configService.get<HttpServer>('SERVER').URL,
apiKey: this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY,
},
},
};
if (this.isImageMessage(content)) {
const contentSplit = content.split('|');
payload.uploads = [
{
data: contentSplit[1].split('?')[0],
type: 'url',
name: 'Flowise.png',
mime: 'image/png',
},
];
payload.question = contentSplit[2] || content;
}
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
let headers: any = {
'Content-Type': 'application/json',
};
if (bot.apiKey) {
headers = {
...headers,
Authorization: `Bearer ${bot.apiKey}`,
};
}
const endpoint = bot.apiUrl;
if (!endpoint) return null;
const response = await axios.post(endpoint, payload, {
headers,
});
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.text;
return message;
}
private async sendMessageWhatsApp(
instance: any,
remoteJid: string,
session: IntegrationSession,
settings: FlowiseSetting,
message: string,
) {
const regex = /!?\[(.*?)\]\((.*?)\)/g;
const result = [];
let lastIndex = 0;
let match;
while ((match = regex.exec(message)) !== null) {
if (match.index > lastIndex) {
result.push({ text: message.slice(lastIndex, match.index).trim() });
}
result.push({ caption: match[1], url: match[2] });
lastIndex = regex.lastIndex;
}
if (lastIndex < message.length) {
result.push({ text: message.slice(lastIndex).trim() });
}
for (const item of result) {
if (item.text) {
await instance.textMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
text: item.text,
},
false,
);
}
if (item.url) {
await instance.mediaMessage(
{
number: remoteJid.split('@')[0],
delay: settings?.delayMessage || 1000,
mediatype: 'image',
media: item.url,
caption: item.caption,
},
false,
);
}
}
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: true,
},
});
sendTelemetry('/message/sendText');
return;
}
private async initNewSession(
instance: any,
remoteJid: string,
bot: Flowise,
settings: FlowiseSetting,
session: IntegrationSession,
content: string,
pushName?: string,
) {
const data = await this.createNewSession(instance, {
remoteJid,
botId: bot.id,
});
if (data.session) {
session = data.session;
}
const message = await this.sendMessageToBot(instance, session, bot, remoteJid, pushName, content);
await this.sendMessageWhatsApp(instance, remoteJid, session, settings, message);
return;
}
public async processBot(
instance: any,
remoteJid: string,
bot: Flowise,
session: IntegrationSession,
settings: FlowiseSetting,
content: string,
pushName?: string,
) {
if (session && session.status !== 'opened') {
return;
}
if (session && settings.expire && settings.expire > 0) {
const now = Date.now();
const sessionUpdatedAt = new Date(session.updatedAt).getTime();
const diff = now - sessionUpdatedAt;
const diffInMinutes = Math.floor(diff / 1000 / 60);
if (diffInMinutes > settings.expire) {
if (settings.keepOpen) {
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'closed',
},
});
} else {
await this.prismaRepository.integrationSession.deleteMany({
where: {
botId: bot.id,
remoteJid: remoteJid,
},
});
}
await this.initNewSession(instance, remoteJid, bot, settings, session, content, pushName);
return;
}
}
if (!session) {
await this.initNewSession(instance, remoteJid, bot, settings, session, content, pushName);
return;
}
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'opened',
awaitUser: false,
},
});
if (!content) {
if (settings.unknownMessage) {
this.waMonitor.waInstances[instance.instanceName].textMessage(
{
number: remoteJid.split('@')[0],
delay: settings.delayMessage || 1000,
text: settings.unknownMessage,
},
false,
);
sendTelemetry('/message/sendText');
}
return;
}
if (settings.keywordFinish && content.toLowerCase() === settings.keywordFinish.toLowerCase()) {
if (settings.keepOpen) {
await this.prismaRepository.integrationSession.update({
where: {
id: session.id,
},
data: {
status: 'closed',
},
});
} else {
await this.prismaRepository.integrationSession.deleteMany({
where: {
botId: bot.id,
remoteJid: remoteJid,
},
});
}
return;
}
const message = await this.sendMessageToBot(instance, session, bot, remoteJid, pushName, content);
await this.sendMessageWhatsApp(instance, remoteJid, session, settings, message);
return;
}
}

View File

@@ -0,0 +1,107 @@
import { JSONSchema7 } from 'json-schema';
import { v4 } from 'uuid';
const isNotEmpty = (...propertyNames: string[]): JSONSchema7 => {
const properties = {};
propertyNames.forEach(
(property) =>
(properties[property] = {
minLength: 1,
description: `The "${property}" cannot be empty`,
}),
);
return {
if: {
propertyNames: {
enum: [...propertyNames],
},
},
then: { properties },
};
};
export const flowiseSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
enabled: { type: 'boolean' },
description: { type: 'string' },
apiUrl: { type: 'string' },
apiKey: { type: 'string' },
triggerType: { type: 'string', enum: ['all', 'keyword', 'none', 'advanced'] },
triggerOperator: { type: 'string', enum: ['equals', 'contains', 'startsWith', 'endsWith', 'regex'] },
triggerValue: { type: 'string' },
expire: { type: 'integer' },
keywordFinish: { type: 'string' },
delayMessage: { type: 'integer' },
unknownMessage: { type: 'string' },
listeningFromMe: { type: 'boolean' },
stopBotFromMe: { type: 'boolean' },
keepOpen: { type: 'boolean' },
debounceTime: { type: 'integer' },
ignoreJids: { type: 'array', items: { type: 'string' } },
},
required: ['enabled', 'apiUrl', 'triggerType'],
...isNotEmpty('enabled', 'apiUrl', 'triggerType'),
};
export const flowiseStatusSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
remoteJid: { type: 'string' },
status: { type: 'string', enum: ['opened', 'closed', 'paused', 'delete'] },
},
required: ['remoteJid', 'status'],
...isNotEmpty('remoteJid', 'status'),
};
export const flowiseSettingSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
expire: { type: 'integer' },
keywordFinish: { type: 'string' },
delayMessage: { type: 'integer' },
unknownMessage: { type: 'string' },
listeningFromMe: { type: 'boolean' },
stopBotFromMe: { type: 'boolean' },
keepOpen: { type: 'boolean' },
debounceTime: { type: 'integer' },
ignoreJids: { type: 'array', items: { type: 'string' } },
botIdFallback: { type: 'string' },
},
required: [
'expire',
'keywordFinish',
'delayMessage',
'unknownMessage',
'listeningFromMe',
'stopBotFromMe',
'keepOpen',
'debounceTime',
'ignoreJids',
],
...isNotEmpty(
'expire',
'keywordFinish',
'delayMessage',
'unknownMessage',
'listeningFromMe',
'stopBotFromMe',
'keepOpen',
'debounceTime',
'ignoreJids',
),
};
export const flowiseIgnoreJidSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
remoteJid: { type: 'string' },
action: { type: 'string', enum: ['add', 'remove'] },
},
required: ['remoteJid', 'action'],
...isNotEmpty('remoteJid', 'action'),
};

View File

@@ -3,7 +3,6 @@ import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Logger } from '@config/logger.config';
import { BadRequestException } from '@exceptions';
import { getConversationMessage } from '@utils/getConversationMessage';
import { ChatbotController, ChatbotControllerInterface, EmitData } from '../../chatbot.controller';
@@ -524,8 +523,6 @@ export class GenericController extends ChatbotController implements ChatbotContr
// Sessions
public async changeStatus(instance: InstanceDto, data: any) {
if (!this.integrationEnabled) throw new BadRequestException('Dify is disabled');
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
@@ -603,8 +600,6 @@ export class GenericController extends ChatbotController implements ChatbotContr
}
public async fetchSessions(instance: InstanceDto, botId: string, remoteJid?: string) {
if (!this.integrationEnabled) throw new BadRequestException('Dify is disabled');
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
@@ -638,8 +633,6 @@ export class GenericController extends ChatbotController implements ChatbotContr
}
public async ignoreJid(instance: InstanceDto, data: IgnoreJidDto) {
if (!this.integrationEnabled) throw new BadRequestException('Dify is disabled');
try {
const instanceId = await this.prismaRepository.instance
.findFirst({
@@ -689,8 +682,6 @@ export class GenericController extends ChatbotController implements ChatbotContr
// Emit
public async emit({ instance, remoteJid, msg }: EmitData) {
if (!this.integrationEnabled) return;
try {
const settings = await this.settingsRepository.findFirst({
where: {

View File

@@ -1,6 +1,7 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Integration } from '@api/types/wa.types';
import { Auth, ConfigService, HttpServer } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { GenericBot, GenericSetting, IntegrationSession } from '@prisma/client';
@@ -73,9 +74,10 @@ export class GenericService {
payload.query = contentSplit[2] || content;
}
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
let headers: any = {
'Content-Type': 'application/json',
@@ -92,7 +94,8 @@ export class GenericService {
headers,
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data?.answer;

View File

@@ -1,6 +1,7 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { Integration } from '@api/types/wa.types';
import { ConfigService, Language } from '@config/env.config';
import { Logger } from '@config/logger.config';
import { IntegrationSession, OpenaiBot, OpenaiCreds, OpenaiSetting } from '@prisma/client';
@@ -73,9 +74,10 @@ export class OpenaiService {
const messages: any[] = [...messagesSystem, ...messagesAssistant, ...messagesUser, messageData];
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
const completions = await this.client.chat.completions.create({
model: openaiBot.model,
@@ -83,7 +85,8 @@ export class OpenaiService {
max_tokens: openaiBot.maxTokens,
});
await instance.client.sendPresenceUpdate('paused', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = completions.choices[0].message.content;
@@ -131,13 +134,15 @@ export class OpenaiService {
assistant_id: openaiBot.assistantId,
});
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS) {
await instance.client.presenceSubscribe(remoteJid);
await instance.client.sendPresenceUpdate('composing', remoteJid);
}
const response = await this.getAIResponse(threadId, runAssistant.id, openaiBot.functionUrl, remoteJid, pushName);
await instance.client.sendPresenceUpdate('paused', remoteJid);
if (instance.integration === Integration.WHATSAPP_BAILEYS)
await instance.client.sendPresenceUpdate('paused', remoteJid);
const message = response?.data[0].content[0].text.value;

View File

@@ -166,51 +166,4 @@ export class WebhookController extends EventController implements EventControlle
}
}
}
public async receiveWebhook(data: any) {
if (data.object === 'whatsapp_business_account') {
if (data.entry[0]?.changes[0]?.field === 'message_template_status_update') {
const template = await this.prisma.template.findFirst({
where: { templateId: `${data.entry[0].changes[0].value.message_template_id}` },
});
if (!template) {
console.log('template not found');
return;
}
const { webhookUrl } = template;
await axios.post(webhookUrl, data.entry[0].changes[0].value, {
headers: {
'Content-Type': 'application/json',
},
});
return;
}
for (const entry of data.entry) {
const numberId = entry.changes[0].value.metadata.phone_number_id;
if (!numberId) {
this.logger.error('WebhookService -> receiveWebhook -> numberId not found');
continue;
}
const instance = await this.prisma.instance.findFirst({
where: { number: numberId },
});
if (!instance) {
this.logger.error('WebhookService -> receiveWebhook -> instance not found');
continue;
}
await this.monitor.waInstances[instance.name].connectToWhatsapp(data);
}
}
}
}

View File

@@ -30,17 +30,6 @@ export class WebhookRouter extends RouterBroker {
});
res.status(HttpStatus.OK).json(response);
})
.get('meta', async (req, res) => {
if (req.query['hub.verify_token'] === configService.get<WaBusiness>('WA_BUSINESS').TOKEN_WEBHOOK)
res.send(req.query['hub.challenge']);
else res.send('Error, wrong validation token');
})
.post('meta', async (req, res) => {
const { body } = req;
const response = await eventManager.webhook.receiveWebhook(body);
return res.status(200).json(response);
});
}

View File

@@ -1,6 +1,7 @@
import { authGuard } from '@api/guards/auth.guard';
import { instanceExistsGuard, instanceLoggedGuard } from '@api/guards/instance.guard';
import Telemetry from '@api/guards/telemetry.guard';
import { ChannelRouter } from '@api/integrations/channel/channel.router';
import { ChatbotRouter } from '@api/integrations/chatbot/chatbot.router';
import { EventRouter } from '@api/integrations/event/event.router';
import { StorageRouter } from '@api/integrations/storage/storage.router';
@@ -84,6 +85,7 @@ router
.use('/settings', new SettingsRouter(...guards).router)
.use('/proxy', new ProxyRouter(...guards).router)
.use('/label', new LabelRouter(...guards).router)
.use('', new ChannelRouter(configService).router)
.use('', new EventRouter(configService, ...guards).router)
.use('', new ChatbotRouter(...guards).router)
.use('', new StorageRouter(...guards).router);

View File

@@ -12,11 +12,15 @@ import { SendMessageController } from './controllers/sendMessage.controller';
import { SettingsController } from './controllers/settings.controller';
import { TemplateController } from './controllers/template.controller';
import { ChannelController } from './integrations/channel/channel.controller';
import { EvolutionController } from './integrations/channel/evolution/evolution.controller';
import { MetaController } from './integrations/channel/meta/meta.controller';
import { ChatbotController } from './integrations/chatbot/chatbot.controller';
import { ChatwootController } from './integrations/chatbot/chatwoot/controllers/chatwoot.controller';
import { ChatwootService } from './integrations/chatbot/chatwoot/services/chatwoot.service';
import { DifyController } from './integrations/chatbot/dify/controllers/dify.controller';
import { DifyService } from './integrations/chatbot/dify/services/dify.service';
import { FlowiseController } from './integrations/chatbot/flowise/controllers/flowise.controller';
import { FlowiseService } from './integrations/chatbot/flowise/services/flowise.service';
import { GenericController } from './integrations/chatbot/generic/controllers/generic.controller';
import { GenericService } from './integrations/chatbot/generic/services/generic.service';
import { OpenaiController } from './integrations/chatbot/openai/controllers/openai.controller';
@@ -96,7 +100,11 @@ export const labelController = new LabelController(waMonitor);
export const eventManager = new EventManager(prismaRepository, waMonitor);
export const chatbotController = new ChatbotController(prismaRepository, waMonitor);
export const channelController = new ChannelController();
export const channelController = new ChannelController(prismaRepository, waMonitor);
// channels
export const evolutionController = new EvolutionController(prismaRepository, waMonitor);
export const metaController = new MetaController(prismaRepository, waMonitor);
// chatbots
const typebotService = new TypebotService(waMonitor, configService, prismaRepository);
@@ -111,4 +119,7 @@ export const difyController = new DifyController(difyService, prismaRepository,
const genericService = new GenericService(waMonitor, configService, prismaRepository);
export const genericController = new GenericController(genericService, prismaRepository, waMonitor);
const flowiseService = new FlowiseService(waMonitor, configService, prismaRepository);
export const flowiseController = new FlowiseController(flowiseService, prismaRepository, waMonitor);
logger.info('Module - ON');

View File

@@ -1,6 +1,4 @@
import { InstanceDto } from '@api/dto/instance.dto';
import { BaileysStartupService } from '@api/integrations/channel/whatsapp/baileys/whatsapp.baileys.service';
import { BusinessStartupService } from '@api/integrations/channel/whatsapp/business/whatsapp.business.service';
import { ProviderFiles } from '@api/provider/sessions';
import { PrismaRepository } from '@api/repository/repository.service';
import { channelController } from '@api/server.module';
@@ -37,7 +35,7 @@ export class WAMonitoringService {
private readonly redis: Partial<CacheConf> = {};
private readonly logger = new Logger('WAMonitoringService');
public readonly waInstances: Record<string, BaileysStartupService | BusinessStartupService> = {};
public readonly waInstances: Record<string, any> = {};
private readonly providerSession = Object.freeze(this.configService.get<ProviderSession>('PROVIDER'));
@@ -196,7 +194,7 @@ export class WAMonitoringService {
data: {
id: data.instanceId,
name: data.instanceName,
connectionStatus: data.integration && data.integration === Integration.WHATSAPP_BUSINESS ? 'open' : 'close',
connectionStatus: data.integration && data.integration === Integration.WHATSAPP_BAILEYS ? 'close' : 'open',
number: data.number,
integration: data.integration || Integration.WHATSAPP_BAILEYS,
token: data.hash,
@@ -210,7 +208,7 @@ export class WAMonitoringService {
}
private async setInstance(instanceData: InstanceDto) {
const instance = channelController.init(instanceData.integration, {
const instance = channelController.init(instanceData, {
configService: this.configService,
eventEmitter: this.eventEmitter,
prismaRepository: this.prismaRepository,

View File

@@ -134,4 +134,5 @@ export const MessageSubtype = [
export const Integration = {
WHATSAPP_BUSINESS: 'WHATSAPP-BUSINESS',
WHATSAPP_BAILEYS: 'WHATSAPP-BAILEYS',
EVOLUTION: 'EVOLUTION',
};