diff --git a/src/api/controllers/event.controller.ts b/src/api/controllers/event.controller.ts index 45c389c5..d1281fda 100644 --- a/src/api/controllers/event.controller.ts +++ b/src/api/controllers/event.controller.ts @@ -1,5 +1,5 @@ import { PrismaRepository } from '@api/repository/repository.service'; -import { websocketController } from '@api/server.module'; +import { rabbitmqController, sqsController, webhookController, websocketController } from '@api/server.module'; import { WAMonitoringService } from '@api/services/monitor.service'; import { Server } from 'http'; @@ -61,6 +61,12 @@ export class EventController { public init(httpServer: Server): void { // websocket websocketController.init(httpServer); + + // rabbitmq + rabbitmqController.init(); + + // sqs + sqsController.init(); } public async emit({ @@ -68,28 +74,76 @@ export class EventController { origin, event, data, + serverUrl, + dateTime, + sender, + apiKey, + local, }: { instanceName: string; origin: string; event: string; data: Object; + serverUrl: string; + dateTime: string; + sender: string; + apiKey?: string; + local?: boolean; }): Promise { - // websocket - await websocketController.emit({ + const emitData = { instanceName, origin, event, data, - }); + serverUrl, + dateTime, + sender, + apiKey, + local, + }; + // websocket + await websocketController.emit(emitData); + + // rabbitmq + await rabbitmqController.emit(emitData); + + // sqs + await sqsController.emit(emitData); + + // webhook + await webhookController.emit(emitData); } - public async set(instanceName: string, data: any): Promise { + public async setInstance(instanceName: string, data: any): Promise { // websocket - await websocketController.set(instanceName, data); - } + if (data.websocketEnabled) + await websocketController.set(instanceName, { + enabled: true, + events: data.websocketEvents, + }); - public async get(instanceName: string): Promise { - // websocket - await websocketController.get(instanceName); + // rabbitmq + if (data.rabbitmqEnabled) + await rabbitmqController.set(instanceName, { + enabled: true, + events: data.rabbitmqEvents, + }); + + // sqs + if (data.sqsEnabled) + await sqsController.set(instanceName, { + enabled: true, + events: data.sqsEvents, + }); + + // webhook + if (data.webhookEnabled) + await webhookController.set(instanceName, { + enabled: true, + events: data.webhookEvents, + url: data.webhookUrl, + webhookBase64: data.webhookBase64, + webhookByEvents: data.webhookByEvents, + }); } } diff --git a/src/api/controllers/instance.controller.ts b/src/api/controllers/instance.controller.ts index fe468d6d..ed22a3ea 100644 --- a/src/api/controllers/instance.controller.ts +++ b/src/api/controllers/instance.controller.ts @@ -1,22 +1,17 @@ import { InstanceDto, SetPresenceDto } from '@api/dto/instance.dto'; import { ChatwootService } from '@api/integrations/chatbot/chatwoot/services/chatwoot.service'; -import { RabbitmqService } from '@api/integrations/event/rabbitmq/services/rabbitmq.service'; -import { SqsService } from '@api/integrations/event/sqs/services/sqs.service'; -import { WebsocketController } from '@api/integrations/event/websocket/controllers/websocket.controller'; import { ProviderFiles } from '@api/provider/sessions'; import { PrismaRepository } from '@api/repository/repository.service'; -import { AuthService } from '@api/services/auth.service'; +import { eventController } from '@api/server.module'; import { CacheService } from '@api/services/cache.service'; import { BaileysStartupService } from '@api/services/channels/whatsapp.baileys.service'; import { BusinessStartupService } from '@api/services/channels/whatsapp.business.service'; import { WAMonitoringService } from '@api/services/monitor.service'; import { SettingsService } from '@api/services/settings.service'; -import { WebhookService } from '@api/services/webhook.service'; import { Events, Integration, wa } from '@api/types/wa.types'; import { Auth, Chatwoot, ConfigService, HttpServer, WaBusiness } from '@config/env.config'; import { Logger } from '@config/logger.config'; import { BadRequestException, InternalServerErrorException, UnauthorizedException } from '@exceptions'; -import { JsonValue } from '@prisma/client/runtime/library'; import { delay } from 'baileys'; import { isArray, isURL } from 'class-validator'; import EventEmitter2 from 'eventemitter2'; @@ -30,13 +25,8 @@ export class InstanceController { private readonly configService: ConfigService, private readonly prismaRepository: PrismaRepository, private readonly eventEmitter: EventEmitter2, - private readonly authService: AuthService, - private readonly webhookService: WebhookService, private readonly chatwootService: ChatwootService, private readonly settingsService: SettingsService, - private readonly websocketController: WebsocketController, - private readonly rabbitmqService: RabbitmqService, - private readonly sqsService: SqsService, private readonly proxyService: ProxyController, private readonly cache: CacheService, private readonly chatwootCache: CacheService, @@ -46,58 +36,14 @@ export class InstanceController { private readonly logger = new Logger('InstanceController'); - public async createInstance({ - instanceName, - qrcode, - number, - integration, - token, - rejectCall, - msgCall, - groupsIgnore, - alwaysOnline, - readMessages, - readStatus, - syncFullHistory, - proxyHost, - proxyPort, - proxyProtocol, - proxyUsername, - proxyPassword, - webhookUrl, - webhookByEvents, - webhookBase64, - webhookEvents, - websocketEnabled, - websocketEvents, - rabbitmqEnabled, - rabbitmqEvents, - sqsEnabled, - sqsEvents, - chatwootAccountId, - chatwootToken, - chatwootUrl, - chatwootSignMsg, - chatwootReopenConversation, - chatwootConversationPending, - chatwootImportContacts, - chatwootNameInbox, - chatwootMergeBrazilContacts, - chatwootImportMessages, - chatwootDaysLimitImportMessages, - chatwootOrganization, - chatwootLogo, - businessId, - }: InstanceDto) { + public async createInstance(instanceData: InstanceDto) { try { - // if (token) await this.authService.checkDuplicateToken(token); - - if (!token && integration === Integration.WHATSAPP_BUSINESS) { + if (!instanceData.token && instanceData.integration === Integration.WHATSAPP_BUSINESS) { throw new BadRequestException('token is required'); } let instance: BaileysStartupService | BusinessStartupService; - if (integration === Integration.WHATSAPP_BUSINESS) { + if (instanceData.integration === Integration.WHATSAPP_BUSINESS) { instance = new BusinessStartupService( this.configService, this.eventEmitter, @@ -123,210 +69,45 @@ export class InstanceController { let hash: string; - if (!token) hash = v4().toUpperCase(); - else hash = token; + if (!instanceData.token) hash = v4().toUpperCase(); + else hash = instanceData.token; - await this.waMonitor.saveInstance({ instanceId, integration, instanceName, hash, number, businessId }); + await this.waMonitor.saveInstance({ + instanceId, + integration: instanceData.integration, + instanceName: instanceData.instanceName, + hash, + number: instanceData.number, + businessId: instanceData.businessId, + }); instance.setInstance({ - instanceName, + instanceName: instanceData.instanceName, instanceId, - integration, + integration: instanceData.integration, token: hash, - number, - businessId, + number: instanceData.number, + businessId: instanceData.businessId, }); instance.sendDataWebhook(Events.INSTANCE_CREATE, { - instanceName, + instanceName: instanceData.instanceName, instanceId: instanceId, }); this.waMonitor.waInstances[instance.instanceName] = instance; this.waMonitor.delInstanceTime(instance.instanceName); - let getWebhookEvents: string[]; + // set events + eventController.setInstance(instance.instanceName, instanceData); - if (webhookUrl) { - if (!isURL(webhookUrl, { require_tld: false })) { - throw new BadRequestException('Invalid "url" property in webhook'); - } - - try { - let newEvents: string[] = []; - if (webhookEvents.length === 0) { - newEvents = [ - 'APPLICATION_STARTUP', - 'QRCODE_UPDATED', - 'MESSAGES_SET', - 'MESSAGES_UPSERT', - 'MESSAGES_EDITED', - 'MESSAGES_UPDATE', - 'MESSAGES_DELETE', - 'SEND_MESSAGE', - 'CONTACTS_SET', - 'CONTACTS_UPSERT', - 'CONTACTS_UPDATE', - 'PRESENCE_UPDATE', - 'CHATS_SET', - 'CHATS_UPSERT', - 'CHATS_UPDATE', - 'CHATS_DELETE', - 'GROUPS_UPSERT', - 'GROUP_UPDATE', - 'GROUP_PARTICIPANTS_UPDATE', - 'CONNECTION_UPDATE', - 'LABELS_EDIT', - 'LABELS_ASSOCIATION', - 'CALL', - 'TYPEBOT_START', - 'TYPEBOT_CHANGE_STATUS', - ]; - } else { - newEvents = webhookEvents; - } - this.webhookService.create(instance, { - enabled: true, - url: webhookUrl, - events: newEvents, - webhookByEvents, - webhookBase64, - }); - - const webhookEventsJson: JsonValue = (await this.webhookService.find(instance)).events; - - getWebhookEvents = Array.isArray(webhookEventsJson) ? webhookEventsJson.map((event) => String(event)) : []; - } catch (error) { - this.logger.log(error); - } - } - - let getWebsocketEvents: string[]; - - if (websocketEnabled) { - try { - await this.websocketController.set(instance.instanceName, { - enabled: true, - events: websocketEvents, - }); - - const websocketEventsJson: JsonValue = (await this.websocketController.get(instance.instanceName)).events; - - getWebsocketEvents = Array.isArray(websocketEventsJson) - ? websocketEventsJson.map((event) => String(event)) - : []; - } catch (error) { - this.logger.log(error); - } - } - - let getRabbitmqEvents: string[]; - - if (rabbitmqEnabled) { - try { - let newEvents: string[] = []; - if (rabbitmqEvents.length === 0) { - newEvents = [ - 'APPLICATION_STARTUP', - 'QRCODE_UPDATED', - 'MESSAGES_SET', - 'MESSAGES_UPSERT', - 'MESSAGES_EDITED', - 'MESSAGES_UPDATE', - 'MESSAGES_DELETE', - 'SEND_MESSAGE', - 'CONTACTS_SET', - 'CONTACTS_UPSERT', - 'CONTACTS_UPDATE', - 'PRESENCE_UPDATE', - 'CHATS_SET', - 'CHATS_UPSERT', - 'CHATS_UPDATE', - 'CHATS_DELETE', - 'GROUPS_UPSERT', - 'GROUP_UPDATE', - 'GROUP_PARTICIPANTS_UPDATE', - 'CONNECTION_UPDATE', - 'LABELS_EDIT', - 'LABELS_ASSOCIATION', - 'CALL', - 'TYPEBOT_START', - 'TYPEBOT_CHANGE_STATUS', - ]; - } else { - newEvents = rabbitmqEvents; - } - this.rabbitmqService.create(instance, { - enabled: true, - events: newEvents, - }); - - const rabbitmqEventsJson: JsonValue = (await this.rabbitmqService.find(instance)).events; - - getRabbitmqEvents = Array.isArray(rabbitmqEventsJson) ? rabbitmqEventsJson.map((event) => String(event)) : []; - } catch (error) { - this.logger.log(error); - } - } - - let getSqsEvents: string[]; - - if (sqsEnabled) { - try { - let newEvents: string[] = []; - if (sqsEvents.length === 0) { - newEvents = [ - 'APPLICATION_STARTUP', - 'QRCODE_UPDATED', - 'MESSAGES_SET', - 'MESSAGES_UPSERT', - 'MESSAGES_EDITED', - 'MESSAGES_UPDATE', - 'MESSAGES_DELETE', - 'SEND_MESSAGE', - 'CONTACTS_SET', - 'CONTACTS_UPSERT', - 'CONTACTS_UPDATE', - 'PRESENCE_UPDATE', - 'CHATS_SET', - 'CHATS_UPSERT', - 'CHATS_UPDATE', - 'CHATS_DELETE', - 'GROUPS_UPSERT', - 'GROUP_UPDATE', - 'GROUP_PARTICIPANTS_UPDATE', - 'CONNECTION_UPDATE', - 'LABELS_EDIT', - 'LABELS_ASSOCIATION', - 'CALL', - 'TYPEBOT_START', - 'TYPEBOT_CHANGE_STATUS', - ]; - } else { - newEvents = sqsEvents; - } - this.sqsService.create(instance, { - enabled: true, - events: newEvents, - }); - - const sqsEventsJson: JsonValue = (await this.sqsService.find(instance)).events; - - getSqsEvents = Array.isArray(sqsEventsJson) ? sqsEventsJson.map((event) => String(event)) : []; - - // sqsEvents = (await this.sqsService.find(instance)).events; - } catch (error) { - this.logger.log(error); - } - } - - if (proxyHost && proxyPort && proxyProtocol) { + if (instanceData.proxyHost && instanceData.proxyPort && instanceData.proxyProtocol) { const testProxy = await this.proxyService.testProxy({ - host: proxyHost, - port: proxyPort, - protocol: proxyProtocol, - username: proxyUsername, - password: proxyPassword, + host: instanceData.proxyHost, + port: instanceData.proxyPort, + protocol: instanceData.proxyProtocol, + username: instanceData.proxyUsername, + password: instanceData.proxyPassword, }); if (!testProxy) { throw new BadRequestException('Invalid proxy'); @@ -334,22 +115,22 @@ export class InstanceController { await this.proxyService.createProxy(instance, { enabled: true, - host: proxyHost, - port: proxyPort, - protocol: proxyProtocol, - username: proxyUsername, - password: proxyPassword, + host: instanceData.proxyHost, + port: instanceData.proxyPort, + protocol: instanceData.proxyProtocol, + username: instanceData.proxyUsername, + password: instanceData.proxyPassword, }); } const settings: wa.LocalSettings = { - rejectCall: rejectCall === true, - msgCall: msgCall || '', - groupsIgnore: groupsIgnore === true, - alwaysOnline: alwaysOnline === true, - readMessages: readMessages === true, - readStatus: readStatus === true, - syncFullHistory: syncFullHistory === true, + rejectCall: instanceData.rejectCall === true, + msgCall: instanceData.msgCall || '', + groupsIgnore: instanceData.groupsIgnore === true, + alwaysOnline: instanceData.alwaysOnline === true, + readMessages: instanceData.readMessages === true, + readStatus: instanceData.readStatus === true, + syncFullHistory: instanceData.syncFullHistory === true, }; await this.settingsService.create(instance, settings); @@ -357,8 +138,8 @@ export class InstanceController { let webhookWaBusiness = null, accessTokenWaBusiness = ''; - if (integration === Integration.WHATSAPP_BUSINESS) { - if (!number) { + if (instanceData.integration === Integration.WHATSAPP_BUSINESS) { + if (!instanceData.number) { throw new BadRequestException('number is required'); } const urlServer = this.configService.get('SERVER').URL; @@ -366,11 +147,11 @@ export class InstanceController { accessTokenWaBusiness = this.configService.get('WA_BUSINESS').TOKEN_WEBHOOK; } - if (!chatwootAccountId || !chatwootToken || !chatwootUrl) { + if (!instanceData.chatwootAccountId || !instanceData.chatwootToken || !instanceData.chatwootUrl) { let getQrcode: wa.QrCode; - if (qrcode && integration === Integration.WHATSAPP_BAILEYS) { - await instance.connectToWhatsapp(number); + if (instanceData.qrcode && instanceData.integration === Integration.WHATSAPP_BAILEYS) { + await instance.connectToWhatsapp(instanceData.number); await delay(5000); getQrcode = instance.qrCode; } @@ -379,29 +160,29 @@ export class InstanceController { instance: { instanceName: instance.instanceName, instanceId: instanceId, - integration: integration, + integration: instanceData.integration, webhookWaBusiness, accessTokenWaBusiness, status: 'created', }, hash, webhook: { - webhookUrl, - webhookByEvents, - webhookBase64, - events: getWebhookEvents, + webhookUrl: instanceData.webhookUrl, + webhookByEvents: instanceData.webhookByEvents, + webhookBase64: instanceData.webhookBase64, + // events: getWebhookEvents, }, websocket: { - enabled: websocketEnabled, - events: getWebsocketEvents, + enabled: instanceData.websocketEnabled, + // events: getWebsocketEvents, }, rabbitmq: { - enabled: rabbitmqEnabled, - events: getRabbitmqEvents, + enabled: instanceData.rabbitmqEnabled, + // events: getRabbitmqEvents, }, sqs: { - enabled: sqsEnabled, - events: getSqsEvents, + enabled: instanceData.sqsEnabled, + // events: getSqsEvents, }, settings, qrcode: getQrcode, @@ -413,31 +194,31 @@ export class InstanceController { if (!this.configService.get('CHATWOOT').ENABLED) throw new BadRequestException('Chatwoot is not enabled'); - if (!chatwootAccountId) { + if (!instanceData.chatwootAccountId) { throw new BadRequestException('accountId is required'); } - if (!chatwootToken) { + if (!instanceData.chatwootToken) { throw new BadRequestException('token is required'); } - if (!chatwootUrl) { + if (!instanceData.chatwootUrl) { throw new BadRequestException('url is required'); } - if (!isURL(chatwootUrl, { require_tld: false })) { + if (!isURL(instanceData.chatwootUrl, { require_tld: false })) { throw new BadRequestException('Invalid "url" property in chatwoot'); } - if (chatwootSignMsg !== true && chatwootSignMsg !== false) { + if (instanceData.chatwootSignMsg !== true && instanceData.chatwootSignMsg !== false) { throw new BadRequestException('signMsg is required'); } - if (chatwootReopenConversation !== true && chatwootReopenConversation !== false) { + if (instanceData.chatwootReopenConversation !== true && instanceData.chatwootReopenConversation !== false) { throw new BadRequestException('reopenConversation is required'); } - if (chatwootConversationPending !== true && chatwootConversationPending !== false) { + if (instanceData.chatwootConversationPending !== true && instanceData.chatwootConversationPending !== false) { throw new BadRequestException('conversationPending is required'); } @@ -446,20 +227,20 @@ export class InstanceController { try { this.chatwootService.create(instance, { enabled: true, - accountId: chatwootAccountId, - token: chatwootToken, - url: chatwootUrl, - signMsg: chatwootSignMsg || false, - nameInbox: chatwootNameInbox ?? instance.instanceName.split('-cwId-')[0], - number, - reopenConversation: chatwootReopenConversation || false, - conversationPending: chatwootConversationPending || false, - importContacts: chatwootImportContacts ?? true, - mergeBrazilContacts: chatwootMergeBrazilContacts ?? false, - importMessages: chatwootImportMessages ?? true, - daysLimitImportMessages: chatwootDaysLimitImportMessages ?? 60, - organization: chatwootOrganization, - logo: chatwootLogo, + accountId: instanceData.chatwootAccountId, + token: instanceData.chatwootToken, + url: instanceData.chatwootUrl, + signMsg: instanceData.chatwootSignMsg || false, + nameInbox: instanceData.chatwootNameInbox ?? instance.instanceName.split('-cwId-')[0], + number: instanceData.number, + reopenConversation: instanceData.chatwootReopenConversation || false, + conversationPending: instanceData.chatwootConversationPending || false, + importContacts: instanceData.chatwootImportContacts ?? true, + mergeBrazilContacts: instanceData.chatwootMergeBrazilContacts ?? false, + importMessages: instanceData.chatwootImportMessages ?? true, + daysLimitImportMessages: instanceData.chatwootDaysLimitImportMessages ?? 60, + organization: instanceData.chatwootOrganization, + logo: instanceData.chatwootLogo, autoCreate: true, }); } catch (error) { @@ -470,45 +251,45 @@ export class InstanceController { instance: { instanceName: instance.instanceName, instanceId: instanceId, - integration: integration, + integration: instanceData.integration, webhookWaBusiness, accessTokenWaBusiness, status: 'created', }, hash, webhook: { - webhookUrl, - webhookByEvents, - webhookBase64, - events: getWebhookEvents, + webhookUrl: instanceData.webhookUrl, + webhookByEvents: instanceData.webhookByEvents, + webhookBase64: instanceData.webhookBase64, + // events: getWebhookEvents, }, websocket: { - enabled: websocketEnabled, - events: getWebsocketEvents, + enabled: instanceData.websocketEnabled, + // events: getWebsocketEvents, }, rabbitmq: { - enabled: rabbitmqEnabled, - events: getRabbitmqEvents, + enabled: instanceData.rabbitmqEnabled, + // events: getRabbitmqEvents, }, sqs: { - enabled: sqsEnabled, - events: getSqsEvents, + enabled: instanceData.sqsEnabled, + // events: getSqsEvents, }, settings, chatwoot: { enabled: true, - accountId: chatwootAccountId, - token: chatwootToken, - url: chatwootUrl, - signMsg: chatwootSignMsg || false, - reopenConversation: chatwootReopenConversation || false, - conversationPending: chatwootConversationPending || false, - mergeBrazilContacts: chatwootMergeBrazilContacts ?? false, - importContacts: chatwootImportContacts ?? true, - importMessages: chatwootImportMessages ?? true, - daysLimitImportMessages: chatwootDaysLimitImportMessages || 60, - number, - nameInbox: chatwootNameInbox ?? instance.instanceName, + accountId: instanceData.chatwootAccountId, + token: instanceData.chatwootToken, + url: instanceData.chatwootUrl, + signMsg: instanceData.chatwootSignMsg || false, + reopenConversation: instanceData.chatwootReopenConversation || false, + conversationPending: instanceData.chatwootConversationPending || false, + mergeBrazilContacts: instanceData.chatwootMergeBrazilContacts ?? false, + importContacts: instanceData.chatwootImportContacts ?? true, + importMessages: instanceData.chatwootImportMessages ?? true, + daysLimitImportMessages: instanceData.chatwootDaysLimitImportMessages || 60, + number: instanceData.number, + nameInbox: instanceData.chatwootNameInbox ?? instance.instanceName, webhookUrl: `${urlServer}/chatwoot/webhook/${encodeURIComponent(instance.instanceName)}`, }, }; @@ -650,7 +431,6 @@ export class InstanceController { } try { const waInstances = this.waMonitor.waInstances[instanceName]; - waInstances?.removeRabbitmqQueues(); if (this.configService.get('CHATWOOT').ENABLED) waInstances?.clearCacheChatwoot(); if (instance.state === 'connecting') { diff --git a/src/api/controllers/webhook.controller.ts b/src/api/controllers/webhook.controller.ts deleted file mode 100644 index 036c70dc..00000000 --- a/src/api/controllers/webhook.controller.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { InstanceDto } from '@api/dto/instance.dto'; -import { WebhookDto } from '@api/dto/webhook.dto'; -import { WAMonitoringService } from '@api/services/monitor.service'; -import { WebhookService } from '@api/services/webhook.service'; -import { BadRequestException } from '@exceptions'; -import { isURL } from 'class-validator'; - -export class WebhookController { - constructor(private readonly webhookService: WebhookService, private readonly waMonitor: WAMonitoringService) {} - - public async createWebhook(instance: InstanceDto, data: WebhookDto) { - if (!isURL(data.url, { require_tld: false })) { - throw new BadRequestException('Invalid "url" property'); - } - - data.enabled = data.enabled ?? true; - - if (!data.enabled) { - data.url = ''; - data.events = []; - } else if (data.events.length === 0) { - data.events = [ - 'APPLICATION_STARTUP', - 'QRCODE_UPDATED', - 'MESSAGES_SET', - 'MESSAGES_UPSERT', - 'MESSAGES_EDITED', - 'MESSAGES_UPDATE', - 'MESSAGES_DELETE', - 'SEND_MESSAGE', - 'CONTACTS_SET', - 'CONTACTS_UPSERT', - 'CONTACTS_UPDATE', - 'PRESENCE_UPDATE', - 'CHATS_SET', - 'CHATS_UPSERT', - 'CHATS_UPDATE', - 'CHATS_DELETE', - 'GROUPS_UPSERT', - 'GROUP_UPDATE', - 'GROUP_PARTICIPANTS_UPDATE', - 'CONNECTION_UPDATE', - 'LABELS_EDIT', - 'LABELS_ASSOCIATION', - 'CALL', - 'TYPEBOT_START', - 'TYPEBOT_CHANGE_STATUS', - ]; - } - - return this.webhookService.create(instance, data); - } - - public async findWebhook(instance: InstanceDto) { - return this.webhookService.find(instance); - } - - public async receiveWebhook(data: any) { - this.webhookService.receiveWebhook(data); - - return { - message: 'Webhook received', - }; - } -} diff --git a/src/api/dto/instance.dto.ts b/src/api/dto/instance.dto.ts index 86a3f67e..cb31f4c5 100644 --- a/src/api/dto/instance.dto.ts +++ b/src/api/dto/instance.dto.ts @@ -1,16 +1,16 @@ import { WAPresence } from 'baileys'; -export class InstanceDto { +import { IntegrationDto } from './integration.dto'; + +export class InstanceDto extends IntegrationDto { instanceName: string; instanceId?: string; qrcode?: boolean; + businessId?: string; number?: string; integration?: string; token?: string; - webhookUrl?: string; - webhookByEvents?: boolean; - webhookBase64?: boolean; - webhookEvents?: string[]; + // settings rejectCall?: boolean; msgCall?: string; groupsIgnore?: boolean; @@ -18,31 +18,12 @@ export class InstanceDto { readMessages?: boolean; readStatus?: boolean; syncFullHistory?: boolean; - chatwootAccountId?: string; - chatwootToken?: string; - chatwootUrl?: string; - chatwootSignMsg?: boolean; - chatwootReopenConversation?: boolean; - chatwootConversationPending?: boolean; - chatwootMergeBrazilContacts?: boolean; - chatwootImportContacts?: boolean; - chatwootImportMessages?: boolean; - chatwootDaysLimitImportMessages?: number; - chatwootNameInbox?: string; - chatwootOrganization?: string; - chatwootLogo?: string; - websocketEnabled?: boolean; - websocketEvents?: string[]; - rabbitmqEnabled?: boolean; - rabbitmqEvents?: string[]; - sqsEnabled?: boolean; - sqsEvents?: string[]; + // proxy proxyHost?: string; proxyPort?: string; proxyProtocol?: string; proxyUsername?: string; proxyPassword?: string; - businessId?: string; } export class SetPresenceDto { diff --git a/src/api/dto/integration.dto.ts b/src/api/dto/integration.dto.ts new file mode 100644 index 00000000..4df9a2c0 --- /dev/null +++ b/src/api/dto/integration.dto.ts @@ -0,0 +1,28 @@ +import { RabbitMQInstanceMixin } from '@api/integrations/event/rabbitmq/dto/rabbitmq.dto'; +import { SQSInstanceMixin } from '@api/integrations/event/sqs/dto/sqs.dto'; +import { WebhookInstanceMixin } from '@api/integrations/event/webhook/dto/webhook.dto'; +import { WebsocketInstanceMixin } from '@api/integrations/event/websocket/dto/websocket.dto'; + +export type Constructor = new (...args: any[]) => T; + +function ChatwootInstanceMixin(Base: TBase) { + return class extends Base { + chatwootAccountId?: string; + chatwootToken?: string; + chatwootUrl?: string; + chatwootSignMsg?: boolean; + chatwootReopenConversation?: boolean; + chatwootConversationPending?: boolean; + chatwootMergeBrazilContacts?: boolean; + chatwootImportContacts?: boolean; + chatwootImportMessages?: boolean; + chatwootDaysLimitImportMessages?: number; + chatwootNameInbox?: string; + chatwootOrganization?: string; + chatwootLogo?: string; + }; +} + +export class IntegrationDto extends WebhookInstanceMixin( + WebsocketInstanceMixin(RabbitMQInstanceMixin(SQSInstanceMixin(ChatwootInstanceMixin(class {})))), +) {} diff --git a/src/api/dto/webhook.dto.ts b/src/api/dto/webhook.dto.ts deleted file mode 100644 index e0e6b722..00000000 --- a/src/api/dto/webhook.dto.ts +++ /dev/null @@ -1,7 +0,0 @@ -export class WebhookDto { - enabled?: boolean; - url?: string; - events?: string[]; - webhookByEvents?: boolean; - webhookBase64?: boolean; -} diff --git a/src/api/guards/instance.guard.ts b/src/api/guards/instance.guard.ts index 29c320ec..874fa07f 100644 --- a/src/api/guards/instance.guard.ts +++ b/src/api/guards/instance.guard.ts @@ -50,8 +50,6 @@ export async function instanceLoggedGuard(req: Request, _: Response, next: NextF } if (waMonitor.waInstances[instance.instanceName]) { - waMonitor.waInstances[instance.instanceName]?.removeRabbitmqQueues(); - waMonitor.waInstances[instance.instanceName]?.removeSqsQueues(); delete waMonitor.waInstances[instance.instanceName]; } } diff --git a/src/api/integrations/event/rabbitmq/controllers/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/controllers/rabbitmq.controller.ts index 582e1ddc..8e2f799b 100644 --- a/src/api/integrations/event/rabbitmq/controllers/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/controllers/rabbitmq.controller.ts @@ -1,53 +1,271 @@ -import { InstanceDto } from '@api/dto/instance.dto'; +import { EventController } from '@api/controllers/event.controller'; import { RabbitmqDto } from '@api/integrations/event/rabbitmq/dto/rabbitmq.dto'; -import { RabbitmqService } from '@api/integrations/event/rabbitmq/services/rabbitmq.service'; -import { configService, Rabbitmq } from '@config/env.config'; -import { BadRequestException } from '@exceptions'; +import { PrismaRepository } from '@api/repository/repository.service'; +import { WAMonitoringService } from '@api/services/monitor.service'; +import { wa } from '@api/types/wa.types'; +import { configService, Log, Rabbitmq } from '@config/env.config'; +import { Logger } from '@config/logger.config'; +import { NotFoundException } from '@exceptions'; +import * as amqp from 'amqplib/callback_api'; -export class RabbitmqController { - constructor(private readonly rabbitmqService: RabbitmqService) {} +export class RabbitmqController extends EventController { + public amqpChannel: amqp.Channel | null = null; + private readonly logger = new Logger(RabbitmqController.name); + constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { + super(prismaRepository, waMonitor); + } - public async createRabbitmq(instance: InstanceDto, data: RabbitmqDto) { - if (!configService.get('RABBITMQ').ENABLED) throw new BadRequestException('Rabbitmq is disabled'); + public init(): void { + if (!configService.get('RABBITMQ')?.ENABLED) { + return; + } + new Promise((resolve, reject) => { + const uri = configService.get('RABBITMQ').URI; + amqp.connect(uri, (error, connection) => { + if (error) { + reject(error); + return; + } + + connection.createChannel((channelError, channel) => { + if (channelError) { + reject(channelError); + return; + } + + const exchangeName = 'evolution_exchange'; + + channel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + this.amqpChannel = channel; + + this.logger.info('AMQP initialized'); + resolve(); + }); + }); + }).then(() => { + if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) this.initGlobalQueues(); + }); + } + + private set channel(channel: amqp.Channel) { + this.amqpChannel = channel; + } + + public get channel(): amqp.Channel { + return this.amqpChannel; + } + + public async set(instanceName: string, data: RabbitmqDto): Promise { if (!data.enabled) { data.events = []; + } else { + if (0 === data.events.length) { + data.events = this.events; + } } - if (data.events.length === 0) { - data.events = [ - 'APPLICATION_STARTUP', - 'QRCODE_UPDATED', - 'MESSAGES_SET', - 'MESSAGES_UPSERT', - 'MESSAGES_EDITED', - 'MESSAGES_UPDATE', - 'MESSAGES_DELETE', - 'SEND_MESSAGE', - 'CONTACTS_SET', - 'CONTACTS_UPSERT', - 'CONTACTS_UPDATE', - 'PRESENCE_UPDATE', - 'CHATS_SET', - 'CHATS_UPSERT', - 'CHATS_UPDATE', - 'CHATS_DELETE', - 'GROUPS_UPSERT', - 'GROUP_UPDATE', - 'GROUP_PARTICIPANTS_UPDATE', - 'CONNECTION_UPDATE', - 'LABELS_EDIT', - 'LABELS_ASSOCIATION', - 'CALL', - 'TYPEBOT_START', - 'TYPEBOT_CHANGE_STATUS', - ]; - } + try { + await this.get(instanceName); - return this.rabbitmqService.create(instance, data); + return this.prisma.rabbitmq.update({ + where: { + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + data, + }); + } catch (err) { + return this.prisma.rabbitmq.create({ + data: { + enabled: data.enabled, + events: data.events, + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + }); + } } - public async findRabbitmq(instance: InstanceDto) { - return this.rabbitmqService.find(instance); + public async get(instanceName: string): Promise { + if (undefined === this.monitor.waInstances[instanceName]) { + throw new NotFoundException('Instance not found'); + } + + const data = await this.prisma.rabbitmq.findUnique({ + where: { + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + }); + + if (!data) { + throw new NotFoundException('Rabbitmq not found'); + } + + return data; + } + + public async emit({ + instanceName, + origin, + event, + data, + serverUrl, + dateTime, + sender, + apiKey, + }: { + instanceName: string; + origin: string; + event: string; + data: Object; + serverUrl: string; + dateTime: string; + sender: string; + apiKey?: string; + }): Promise { + if (!configService.get('RABBITMQ')?.ENABLED) { + return; + } + + const instanceRabbitmq = await this.get(instanceName); + const rabbitmqLocal = instanceRabbitmq.events; + const rabbitmqGlobal = configService.get('RABBITMQ').GLOBAL_ENABLED; + const rabbitmqEvents = configService.get('RABBITMQ').EVENTS; + const we = event.replace(/[.-]/gm, '_').toUpperCase(); + const logEnabled = configService.get('LOG').LEVEL.includes('WEBHOOKS'); + + const message = { + event, + instance: instanceName, + data, + server_url: serverUrl, + date_time: dateTime, + sender, + apikey: apiKey, + }; + + if (instanceRabbitmq.enabled && this.amqpChannel) { + if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { + const exchangeName = instanceName ?? 'evolution_exchange'; + + let retry = 0; + + while (retry < 3) { + try { + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + const eventName = event.replace(/_/g, '.').toLowerCase(); + + const queueName = `${instanceName}.${eventName}`; + + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); + + await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + + if (logEnabled) { + const logData = { + local: `${origin}.sendData-RabbitMQ`, + ...message, + }; + + this.logger.log(logData); + } + break; + } catch (error) { + retry++; + } + } + } + } + + if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) { + const exchangeName = 'evolution_exchange'; + + let retry = 0; + + while (retry < 3) { + try { + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + const queueName = event; + + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + await this.amqpChannel.bindQueue(queueName, exchangeName, event); + + await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + + if (logEnabled) { + const logData = { + local: `${origin}.sendData-RabbitMQ-Global`, + ...message, + }; + + this.logger.log(logData); + } + + break; + } catch (error) { + retry++; + } + } + } + } + + private async initGlobalQueues(): Promise { + this.logger.info('Initializing global queues'); + const events = configService.get('RABBITMQ').EVENTS; + + if (!events) { + this.logger.warn('No events to initialize on AMQP'); + return; + } + + const eventKeys = Object.keys(events); + + eventKeys.forEach((event) => { + if (events[event] === false) return; + + const queueName = `${event.replace(/_/g, '.').toLowerCase()}`; + const exchangeName = 'evolution_exchange'; + + this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + this.amqpChannel.bindQueue(queueName, exchangeName, event); + }); } } diff --git a/src/api/integrations/event/rabbitmq/dto/rabbitmq.dto.ts b/src/api/integrations/event/rabbitmq/dto/rabbitmq.dto.ts index 9bfd5b42..3766f287 100644 --- a/src/api/integrations/event/rabbitmq/dto/rabbitmq.dto.ts +++ b/src/api/integrations/event/rabbitmq/dto/rabbitmq.dto.ts @@ -1,4 +1,13 @@ +import { Constructor } from '@api/dto/integration.dto'; + export class RabbitmqDto { enabled: boolean; events?: string[]; } + +export function RabbitMQInstanceMixin(Base: TBase) { + return class extends Base { + rabbitmqEnabled?: boolean; + rabbitmqEvents?: string[]; + }; +} diff --git a/src/api/integrations/event/rabbitmq/libs/amqp.server.ts b/src/api/integrations/event/rabbitmq/libs/amqp.server.ts deleted file mode 100644 index 583d4715..00000000 --- a/src/api/integrations/event/rabbitmq/libs/amqp.server.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { configService, Rabbitmq } from '@config/env.config'; -import { Logger } from '@config/logger.config'; -import { JsonValue } from '@prisma/client/runtime/library'; -import * as amqp from 'amqplib/callback_api'; - -const logger = new Logger('AMQP'); - -let amqpChannel: amqp.Channel | null = null; - -export const initAMQP = () => { - return new Promise((resolve, reject) => { - const uri = configService.get('RABBITMQ').URI; - amqp.connect(uri, (error, connection) => { - if (error) { - reject(error); - return; - } - - connection.createChannel((channelError, channel) => { - if (channelError) { - reject(channelError); - return; - } - - const exchangeName = configService.get('RABBITMQ').EXCHANGE_NAME || 'evolution_exchange'; - - channel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - amqpChannel = channel; - - logger.info('AMQP initialized'); - resolve(); - }); - }); - }); -}; - -export const getAMQP = (): amqp.Channel | null => { - return amqpChannel; -}; - -export const initGlobalQueues = () => { - logger.info('Initializing global queues'); - const events = configService.get('RABBITMQ').EVENTS; - - if (!events) { - logger.warn('No events to initialize on AMQP'); - return; - } - - const eventKeys = Object.keys(events); - - eventKeys.forEach((event) => { - if (events[event] === false) return; - - const queueName = `${event.replace(/_/g, '.').toLowerCase()}`; - const amqp = getAMQP(); - const exchangeName = configService.get('RABBITMQ').EXCHANGE_NAME || 'evolution_exchange'; - - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - amqp.bindQueue(queueName, exchangeName, event); - }); -}; - -export const initQueues = (instanceName: string, events: string[]) => { - if (!events || !events.length) return; - - const queues = events.map((event) => { - return `${event.replace(/_/g, '.').toLowerCase()}`; - }); - - queues.forEach((event) => { - const amqp = getAMQP(); - const exchangeName = instanceName ?? 'evolution_exchange'; - - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const queueName = `${instanceName}.${event}`; - - amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - amqp.bindQueue(queueName, exchangeName, event); - }); -}; - -export const removeQueues = (instanceName: string, events: JsonValue) => { - const eventsArray = Array.isArray(events) ? events.map((event) => String(event)) : []; - - if (!events || !eventsArray.length) return; - - const channel = getAMQP(); - - const queues = eventsArray.map((event) => { - return `${event.replace(/_/g, '.').toLowerCase()}`; - }); - - const exchangeName = instanceName ?? 'evolution_exchange'; - - queues.forEach((event) => { - const amqp = getAMQP(); - - amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const queueName = `${instanceName}.${event}`; - - amqp.deleteQueue(queueName); - }); - - channel.deleteExchange(exchangeName); -}; diff --git a/src/api/integrations/event/rabbitmq/routes/rabbitmq.router.ts b/src/api/integrations/event/rabbitmq/routes/rabbitmq.router.ts index d3590c1d..8a11975e 100644 --- a/src/api/integrations/event/rabbitmq/routes/rabbitmq.router.ts +++ b/src/api/integrations/event/rabbitmq/routes/rabbitmq.router.ts @@ -15,7 +15,7 @@ export class RabbitmqRouter extends RouterBroker { request: req, schema: rabbitmqSchema, ClassRef: RabbitmqDto, - execute: (instance, data) => rabbitmqController.createRabbitmq(instance, data), + execute: (instance, data) => rabbitmqController.set(instance.instanceName, data), }); res.status(HttpStatus.CREATED).json(response); @@ -25,7 +25,7 @@ export class RabbitmqRouter extends RouterBroker { request: req, schema: instanceSchema, ClassRef: InstanceDto, - execute: (instance) => rabbitmqController.findRabbitmq(instance), + execute: (instance) => rabbitmqController.get(instance.instanceName), }); res.status(HttpStatus.OK).json(response); diff --git a/src/api/integrations/event/rabbitmq/services/rabbitmq.service.ts b/src/api/integrations/event/rabbitmq/services/rabbitmq.service.ts deleted file mode 100644 index 371aaafb..00000000 --- a/src/api/integrations/event/rabbitmq/services/rabbitmq.service.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { InstanceDto } from '@api/dto/instance.dto'; -import { RabbitmqDto } from '@api/integrations/event/rabbitmq/dto/rabbitmq.dto'; -import { initQueues } from '@api/integrations/event/rabbitmq/libs/amqp.server'; -import { WAMonitoringService } from '@api/services/monitor.service'; -import { Logger } from '@config/logger.config'; -import { Rabbitmq } from '@prisma/client'; - -export class RabbitmqService { - constructor(private readonly waMonitor: WAMonitoringService) {} - - private readonly logger = new Logger('RabbitmqService'); - - public create(instance: InstanceDto, data: RabbitmqDto) { - this.waMonitor.waInstances[instance.instanceName].setRabbitmq(data); - - initQueues(instance.instanceName, data.events); - return { rabbitmq: { ...instance, rabbitmq: data } }; - } - - public async find(instance: InstanceDto): Promise { - try { - const result = await this.waMonitor.waInstances[instance.instanceName].findRabbitmq(); - - if (Object.keys(result).length === 0) { - throw new Error('Rabbitmq not found'); - } - - return result; - } catch (error) { - return null; - } - } -} diff --git a/src/api/integrations/event/sqs/controllers/sqs.controller.ts b/src/api/integrations/event/sqs/controllers/sqs.controller.ts index e31c436b..eb7a8d40 100644 --- a/src/api/integrations/event/sqs/controllers/sqs.controller.ts +++ b/src/api/integrations/event/sqs/controllers/sqs.controller.ts @@ -1,53 +1,245 @@ -import { InstanceDto } from '@api/dto/instance.dto'; +import { EventController } from '@api/controllers/event.controller'; import { SqsDto } from '@api/integrations/event/sqs/dto/sqs.dto'; -import { SqsService } from '@api/integrations/event/sqs/services/sqs.service'; -import { configService, Sqs } from '@config/env.config'; -import { BadRequestException } from '@exceptions'; +import { PrismaRepository } from '@api/repository/repository.service'; +import { WAMonitoringService } from '@api/services/monitor.service'; +import { wa } from '@api/types/wa.types'; +import { SQS } from '@aws-sdk/client-sqs'; +import { configService, Log, Sqs } from '@config/env.config'; +import { Logger } from '@config/logger.config'; +import { NotFoundException } from '@exceptions'; -export class SqsController { - constructor(private readonly sqsService: SqsService) {} +export class SqsController extends EventController { + private sqs: SQS; + private readonly logger = new Logger(SqsController.name); - public async createSqs(instance: InstanceDto, data: SqsDto) { - if (!configService.get('SQS').ENABLED) throw new BadRequestException('Sqs is disabled'); + constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { + super(prismaRepository, waMonitor); + } + public init(): void { + if (!configService.get('SQS')?.ENABLED) { + return; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + new Promise((resolve, reject) => { + const awsConfig = configService.get('SQS'); + this.sqs = new SQS({ + credentials: { + accessKeyId: awsConfig.ACCESS_KEY_ID, + secretAccessKey: awsConfig.SECRET_ACCESS_KEY, + }, + + region: awsConfig.REGION, + }); + + this.logger.info('SQS initialized'); + resolve(); + }); + } + + private set channel(sqs: SQS) { + this.sqs = sqs; + } + + public get channel(): SQS { + return this.sqs; + } + + public async set(instanceName: string, data: SqsDto): Promise { if (!data.enabled) { data.events = []; + } else { + if (0 === data.events.length) { + data.events = this.events; + } } - if (data.events.length === 0) { - data.events = [ - 'APPLICATION_STARTUP', - 'QRCODE_UPDATED', - 'MESSAGES_SET', - 'MESSAGES_UPSERT', - 'MESSAGES_EDITED', - 'MESSAGES_UPDATE', - 'MESSAGES_DELETE', - 'SEND_MESSAGE', - 'CONTACTS_SET', - 'CONTACTS_UPSERT', - 'CONTACTS_UPDATE', - 'PRESENCE_UPDATE', - 'CHATS_SET', - 'CHATS_UPSERT', - 'CHATS_UPDATE', - 'CHATS_DELETE', - 'GROUPS_UPSERT', - 'GROUP_UPDATE', - 'GROUP_PARTICIPANTS_UPDATE', - 'CONNECTION_UPDATE', - 'LABELS_EDIT', - 'LABELS_ASSOCIATION', - 'CALL', - 'TYPEBOT_START', - 'TYPEBOT_CHANGE_STATUS', - ]; - } + try { + await this.get(instanceName); - return this.sqsService.create(instance, data); + return this.prisma.sqs.update({ + where: { + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + data, + }); + } catch (err) { + return this.prisma.sqs.create({ + data: { + enabled: data.enabled, + events: data.events, + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + }); + } } - public async findSqs(instance: InstanceDto) { - return this.sqsService.find(instance); + public async get(instanceName: string): Promise { + if (undefined === this.monitor.waInstances[instanceName]) { + throw new NotFoundException('Instance not found'); + } + + const data = await this.prisma.sqs.findUnique({ + where: { + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + }); + + if (!data) { + throw new NotFoundException('Sqs not found'); + } + + return data; + } + + public async emit({ + instanceName, + origin, + event, + data, + serverUrl, + dateTime, + sender, + apiKey, + }: { + instanceName: string; + origin: string; + event: string; + data: Object; + serverUrl: string; + dateTime: string; + sender: string; + apiKey?: string; + }): Promise { + if (!configService.get('SQS')?.ENABLED) { + return; + } + + const instanceSqs = await this.get(instanceName); + const sqsLocal = instanceSqs.events; + const we = event.replace(/[.-]/gm, '_').toUpperCase(); + + if (instanceSqs.enabled) { + if (this.sqs) { + if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) { + const eventFormatted = `${event.replace('.', '_').toLowerCase()}`; + + const queueName = `${instanceName}_${eventFormatted}.fifo`; + + const sqsConfig = configService.get('SQS'); + + const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`; + + const message = { + event, + instance: instanceName, + data, + server_url: serverUrl, + date_time: dateTime, + sender, + apikey: apiKey, + }; + + const params = { + MessageBody: JSON.stringify(message), + MessageGroupId: 'evolution', + MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`, + QueueUrl: sqsUrl, + }; + + this.sqs.sendMessage(params, (err) => { + if (err) { + this.logger.error({ + local: `${origin}.sendData-SQS`, + message: err?.message, + hostName: err?.hostname, + code: err?.code, + stack: err?.stack, + name: err?.name, + url: queueName, + server_url: serverUrl, + }); + } else { + if (configService.get('LOG').LEVEL.includes('WEBHOOKS')) { + const logData = { + local: `${origin}.sendData-SQS`, + ...message, + }; + + this.logger.log(logData); + } + } + }); + } + } + } + } + + public async initQueues(instanceName: string, events: string[]) { + if (!events || !events.length) return; + + const queues = events.map((event) => { + return `${event.replace(/_/g, '_').toLowerCase()}`; + }); + + queues.forEach((event) => { + const queueName = `${instanceName}_${event}.fifo`; + + this.sqs.createQueue( + { + QueueName: queueName, + Attributes: { + FifoQueue: 'true', + }, + }, + (err, data) => { + if (err) { + this.logger.error(`Error creating queue ${queueName}: ${err.message}`); + } else { + this.logger.info(`Queue ${queueName} created: ${data.QueueUrl}`); + } + }, + ); + }); + } + + public async removeQueues(instanceName: string, events: any) { + const eventsArray = Array.isArray(events) ? events.map((event) => String(event)) : []; + if (!events || !eventsArray.length) return; + + const queues = eventsArray.map((event) => { + return `${event.replace(/_/g, '_').toLowerCase()}`; + }); + + queues.forEach((event) => { + const queueName = `${instanceName}_${event}.fifo`; + + this.sqs.getQueueUrl( + { + QueueName: queueName, + }, + (err, data) => { + if (err) { + this.logger.error(`Error getting queue URL for ${queueName}: ${err.message}`); + } else { + const queueUrl = data.QueueUrl; + + this.sqs.deleteQueue( + { + QueueUrl: queueUrl, + }, + (deleteErr) => { + if (deleteErr) { + this.logger.error(`Error deleting queue ${queueName}: ${deleteErr.message}`); + } else { + this.logger.info(`Queue ${queueName} deleted`); + } + }, + ); + } + }, + ); + }); } } diff --git a/src/api/integrations/event/sqs/dto/sqs.dto.ts b/src/api/integrations/event/sqs/dto/sqs.dto.ts index 9b8aeedd..daca23fd 100644 --- a/src/api/integrations/event/sqs/dto/sqs.dto.ts +++ b/src/api/integrations/event/sqs/dto/sqs.dto.ts @@ -1,4 +1,13 @@ +import { Constructor } from '@api/dto/integration.dto'; + export class SqsDto { enabled: boolean; events?: string[]; } + +export function SQSInstanceMixin(Base: TBase) { + return class extends Base { + sqsEnabled?: boolean; + sqsEvents?: string[]; + }; +} diff --git a/src/api/integrations/event/sqs/libs/sqs.server.ts b/src/api/integrations/event/sqs/libs/sqs.server.ts deleted file mode 100644 index 5ac34786..00000000 --- a/src/api/integrations/event/sqs/libs/sqs.server.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { SQS } from '@aws-sdk/client-sqs'; -import { configService, Sqs } from '@config/env.config'; -import { Logger } from '@config/logger.config'; -import { JsonValue } from '@prisma/client/runtime/library'; - -const logger = new Logger('SQS'); - -let sqs: SQS; - -export const initSQS = () => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - return new Promise((resolve, reject) => { - const awsConfig = configService.get('SQS'); - sqs = new SQS({ - credentials: { - accessKeyId: awsConfig.ACCESS_KEY_ID, - secretAccessKey: awsConfig.SECRET_ACCESS_KEY, - }, - - region: awsConfig.REGION, - }); - - logger.info('SQS initialized'); - resolve(); - }); -}; - -export const getSQS = (): SQS => { - return sqs; -}; - -export const initQueues = (instanceName: string, events: string[]) => { - if (!events || !events.length) return; - - const queues = events.map((event) => { - return `${event.replace(/_/g, '_').toLowerCase()}`; - }); - - const sqs = getSQS(); - - queues.forEach((event) => { - const queueName = `${instanceName}_${event}.fifo`; - - sqs.createQueue( - { - QueueName: queueName, - Attributes: { - FifoQueue: 'true', - }, - }, - (err, data) => { - if (err) { - logger.error(`Error creating queue ${queueName}: ${err.message}`); - } else { - logger.info(`Queue ${queueName} created: ${data.QueueUrl}`); - } - }, - ); - }); -}; - -export const removeQueues = (instanceName: string, events: JsonValue) => { - const eventsArray = Array.isArray(events) ? events.map((event) => String(event)) : []; - if (!events || !eventsArray.length) return; - - const sqs = getSQS(); - - const queues = eventsArray.map((event) => { - return `${event.replace(/_/g, '_').toLowerCase()}`; - }); - - queues.forEach((event) => { - const queueName = `${instanceName}_${event}.fifo`; - - sqs.getQueueUrl( - { - QueueName: queueName, - }, - (err, data) => { - if (err) { - logger.error(`Error getting queue URL for ${queueName}: ${err.message}`); - } else { - const queueUrl = data.QueueUrl; - - sqs.deleteQueue( - { - QueueUrl: queueUrl, - }, - (deleteErr) => { - if (deleteErr) { - logger.error(`Error deleting queue ${queueName}: ${deleteErr.message}`); - } else { - logger.info(`Queue ${queueName} deleted`); - } - }, - ); - } - }, - ); - }); -}; diff --git a/src/api/integrations/event/sqs/routes/sqs.router.ts b/src/api/integrations/event/sqs/routes/sqs.router.ts index d73aa138..9a811c66 100644 --- a/src/api/integrations/event/sqs/routes/sqs.router.ts +++ b/src/api/integrations/event/sqs/routes/sqs.router.ts @@ -15,7 +15,7 @@ export class SqsRouter extends RouterBroker { request: req, schema: sqsSchema, ClassRef: SqsDto, - execute: (instance, data) => sqsController.createSqs(instance, data), + execute: (instance, data) => sqsController.set(instance.instanceName, data), }); res.status(HttpStatus.CREATED).json(response); @@ -25,7 +25,7 @@ export class SqsRouter extends RouterBroker { request: req, schema: instanceSchema, ClassRef: InstanceDto, - execute: (instance) => sqsController.findSqs(instance), + execute: (instance) => sqsController.get(instance.instanceName), }); res.status(HttpStatus.OK).json(response); diff --git a/src/api/integrations/event/sqs/services/sqs.service.ts b/src/api/integrations/event/sqs/services/sqs.service.ts deleted file mode 100644 index 7327df6c..00000000 --- a/src/api/integrations/event/sqs/services/sqs.service.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { InstanceDto } from '@api/dto/instance.dto'; -import { SqsDto } from '@api/integrations/event/sqs/dto/sqs.dto'; -import { initQueues } from '@api/integrations/event/sqs/libs/sqs.server'; -import { WAMonitoringService } from '@api/services/monitor.service'; -import { Logger } from '@config/logger.config'; -import { Sqs } from '@prisma/client'; - -export class SqsService { - constructor(private readonly waMonitor: WAMonitoringService) {} - - private readonly logger = new Logger('SqsService'); - - public create(instance: InstanceDto, data: SqsDto) { - this.waMonitor.waInstances[instance.instanceName].setSqs(data); - - initQueues(instance.instanceName, data.events); - return { sqs: { ...instance, sqs: data } }; - } - - public async find(instance: InstanceDto): Promise { - try { - const result = await this.waMonitor.waInstances[instance.instanceName].findSqs(); - - if (Object.keys(result).length === 0) { - throw new Error('Sqs not found'); - } - - return result; - } catch (error) { - return null; - } - } -} diff --git a/src/api/integrations/event/webhook/controllers/webhook.controller.ts b/src/api/integrations/event/webhook/controllers/webhook.controller.ts new file mode 100644 index 00000000..be3d3664 --- /dev/null +++ b/src/api/integrations/event/webhook/controllers/webhook.controller.ts @@ -0,0 +1,252 @@ +import { EventController } from '@api/controllers/event.controller'; +import { PrismaRepository } from '@api/repository/repository.service'; +import { WAMonitoringService } from '@api/services/monitor.service'; +import { wa } from '@api/types/wa.types'; +import { configService, Log, Webhook, Websocket } from '@config/env.config'; +import { Logger } from '@config/logger.config'; +import { BadRequestException, NotFoundException } from '@exceptions'; +import axios from 'axios'; +import { isURL } from 'class-validator'; + +import { WebhookDto } from '../dto/webhook.dto'; + +export class WebhookController extends EventController { + private readonly logger = new Logger(WebhookController.name); + + constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { + super(prismaRepository, waMonitor); + } + + public async set(instanceName: string, data: WebhookDto): Promise { + if (!isURL(data.url, { require_tld: false })) { + throw new BadRequestException('Invalid "url" property'); + } + + if (!data.enabled) { + data.events = []; + } else { + if (0 === data.events.length) { + data.events = this.events; + } + } + + try { + await this.get(instanceName); + + return this.prisma.webhook.update({ + where: { + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + data, + }); + } catch (err) { + return this.prisma.webhook.create({ + data: { + enabled: data.enabled, + events: data.events, + instanceId: this.monitor.waInstances[instanceName].instanceId, + url: data.url, + webhookBase64: data.webhookBase64, + webhookByEvents: data.webhookByEvents, + }, + }); + } + } + + public async get(instanceName: string): Promise { + if (undefined === this.monitor.waInstances[instanceName]) { + throw new NotFoundException('Instance not found'); + } + + const data = await this.prisma.webhook.findUnique({ + where: { + instanceId: this.monitor.waInstances[instanceName].instanceId, + }, + }); + + if (!data) { + throw new NotFoundException('Webhook not found'); + } + + return data; + } + + public async emit({ + instanceName, + origin, + event, + data, + serverUrl, + dateTime, + sender, + apiKey, + local, + }: { + instanceName: string; + origin: string; + event: string; + data: Object; + serverUrl: string; + dateTime: string; + sender: string; + apiKey?: string; + local?: boolean; + }): Promise { + if (!configService.get('WEBSOCKET')?.ENABLED) { + return; + } + + const instanceWebhook = await this.get(instanceName); + const webhookGlobal = configService.get('WEBHOOK'); + const webhookLocal = instanceWebhook.events; + const we = event.replace(/[.-]/gm, '_').toUpperCase(); + const transformedWe = we.replace(/_/gm, '-').toLowerCase(); + const enabledLog = configService.get('LOG').LEVEL.includes('WEBHOOKS'); + + const webhookData = { + event, + instance: instanceName, + data, + destination: instanceWebhook.url, + date_time: dateTime, + sender, + server_url: serverUrl, + apikey: apiKey, + }; + if (local) { + if (Array.isArray(webhookLocal) && webhookLocal.includes(we)) { + let baseURL: string; + + if (instanceWebhook.webhookByEvents) { + baseURL = `${instanceWebhook.url}/${transformedWe}`; + } else { + baseURL = instanceWebhook.url; + } + + if (enabledLog) { + const logData = { + local: `${origin}.sendData-Webhook`, + url: baseURL, + ...webhookData, + }; + + this.logger.log(logData); + } + + try { + if (instanceWebhook.enabled && isURL(instanceWebhook.url, { require_tld: false })) { + const httpService = axios.create({ baseURL }); + + await httpService.post('', webhookData); + } + } catch (error) { + this.logger.error({ + local: `${origin}.sendData-Webhook`, + message: error?.message, + hostName: error?.hostname, + syscall: error?.syscall, + code: error?.code, + error: error?.errno, + stack: error?.stack, + name: error?.name, + url: baseURL, + server_url: serverUrl, + }); + } + } + } + + if (webhookGlobal.GLOBAL?.ENABLED) { + if (webhookGlobal.EVENTS[we]) { + const globalWebhook = configService.get('WEBHOOK').GLOBAL; + + let globalURL; + + if (webhookGlobal.GLOBAL.WEBHOOK_BY_EVENTS) { + globalURL = `${globalWebhook.URL}/${transformedWe}`; + } else { + globalURL = globalWebhook.URL; + } + + if (enabledLog) { + const logData = { + local: `${origin}.sendData-Webhook-Global`, + url: globalURL, + ...webhookData, + }; + + this.logger.log(logData); + } + + try { + if (globalWebhook && globalWebhook?.ENABLED && isURL(globalURL)) { + const httpService = axios.create({ baseURL: globalURL }); + + await httpService.post('', webhookData); + } + } catch (error) { + this.logger.error({ + local: `${origin}.sendData-Webhook-Global`, + message: error?.message, + hostName: error?.hostname, + syscall: error?.syscall, + code: error?.code, + error: error?.errno, + stack: error?.stack, + name: error?.name, + url: globalURL, + server_url: serverUrl, + }); + } + } + } + } + + public async receiveWebhook(data: any) { + if (data.object === 'whatsapp_business_account') { + if (data.entry[0]?.changes[0]?.field === 'message_template_status_update') { + const template = await this.prismaRepository.template.findFirst({ + where: { templateId: `${data.entry[0].changes[0].value.message_template_id}` }, + }); + + if (!template) { + console.log('template not found'); + return; + } + + const { webhookUrl } = template; + + await axios.post(webhookUrl, data.entry[0].changes[0].value, { + headers: { + 'Content-Type': 'application/json', + }, + }); + return; + } + + data.entry?.forEach(async (entry: any) => { + const numberId = entry.changes[0].value.metadata.phone_number_id; + + if (!numberId) { + this.logger.error('WebhookService -> receiveWebhook -> numberId not found'); + return; + } + + const instance = await this.prismaRepository.instance.findFirst({ + where: { number: numberId }, + }); + + if (!instance) { + this.logger.error('WebhookService -> receiveWebhook -> instance not found'); + return; + } + + await this.waMonitor.waInstances[instance.name].connectToWhatsapp(data); + + return; + }); + } + + return; + } +} diff --git a/src/api/integrations/event/webhook/dto/webhook.dto.ts b/src/api/integrations/event/webhook/dto/webhook.dto.ts new file mode 100644 index 00000000..6d1d7eea --- /dev/null +++ b/src/api/integrations/event/webhook/dto/webhook.dto.ts @@ -0,0 +1,18 @@ +import { Constructor } from '@api/dto/integration.dto'; + +export class WebhookDto { + enabled?: boolean; + url?: string; + events?: string[]; + webhookByEvents?: boolean; + webhookBase64?: boolean; +} + +export function WebhookInstanceMixin(Base: TBase) { + return class extends Base { + webhookUrl?: string; + webhookByEvents?: boolean; + webhookBase64?: boolean; + webhookEvents?: string[]; + }; +} diff --git a/src/api/routes/webhook.router.ts b/src/api/integrations/event/webhook/routes/webhook.router.ts similarity index 80% rename from src/api/routes/webhook.router.ts rename to src/api/integrations/event/webhook/routes/webhook.router.ts index c17befa6..ca9a90bb 100644 --- a/src/api/routes/webhook.router.ts +++ b/src/api/integrations/event/webhook/routes/webhook.router.ts @@ -1,12 +1,12 @@ import { RouterBroker } from '@api/abstract/abstract.router'; import { InstanceDto } from '@api/dto/instance.dto'; -import { WebhookDto } from '@api/dto/webhook.dto'; +import { HttpStatus } from '@api/routes/index.router'; import { webhookController } from '@api/server.module'; import { ConfigService } from '@config/env.config'; import { instanceSchema, webhookSchema } from '@validate/validate.schema'; import { RequestHandler, Router } from 'express'; -import { HttpStatus } from './index.router'; +import { WebhookDto } from '../dto/webhook.dto'; export class WebhookRouter extends RouterBroker { constructor(readonly configService: ConfigService, ...guards: RequestHandler[]) { @@ -17,7 +17,7 @@ export class WebhookRouter extends RouterBroker { request: req, schema: webhookSchema, ClassRef: WebhookDto, - execute: (instance, data) => webhookController.createWebhook(instance, data), + execute: (instance, data) => webhookController.set(instance.instanceName, data), }); res.status(HttpStatus.CREATED).json(response); @@ -27,7 +27,7 @@ export class WebhookRouter extends RouterBroker { request: req, schema: instanceSchema, ClassRef: InstanceDto, - execute: (instance) => webhookController.findWebhook(instance), + execute: (instance) => webhookController.get(instance.instanceName), }); res.status(HttpStatus.OK).json(response); diff --git a/src/api/integrations/event/webhook/validate/websocket.schema.ts b/src/api/integrations/event/webhook/validate/websocket.schema.ts new file mode 100644 index 00000000..8a7678c1 --- /dev/null +++ b/src/api/integrations/event/webhook/validate/websocket.schema.ts @@ -0,0 +1,65 @@ +import { JSONSchema7 } from 'json-schema'; +import { v4 } from 'uuid'; + +const isNotEmpty = (...propertyNames: string[]): JSONSchema7 => { + const properties = {}; + propertyNames.forEach( + (property) => + (properties[property] = { + minLength: 1, + description: `The "${property}" cannot be empty`, + }), + ); + return { + if: { + propertyNames: { + enum: [...propertyNames], + }, + }, + then: { properties }, + }; +}; + +export const websocketSchema: JSONSchema7 = { + $id: v4(), + type: 'object', + properties: { + enabled: { type: 'boolean', enum: [true, false] }, + events: { + type: 'array', + minItems: 0, + items: { + type: 'string', + enum: [ + 'APPLICATION_STARTUP', + 'QRCODE_UPDATED', + 'MESSAGES_SET', + 'MESSAGES_UPSERT', + 'MESSAGES_EDITED', + 'MESSAGES_UPDATE', + 'MESSAGES_DELETE', + 'SEND_MESSAGE', + 'CONTACTS_SET', + 'CONTACTS_UPSERT', + 'CONTACTS_UPDATE', + 'PRESENCE_UPDATE', + 'CHATS_SET', + 'CHATS_UPSERT', + 'CHATS_UPDATE', + 'CHATS_DELETE', + 'GROUPS_UPSERT', + 'GROUP_UPDATE', + 'GROUP_PARTICIPANTS_UPDATE', + 'CONNECTION_UPDATE', + 'LABELS_EDIT', + 'LABELS_ASSOCIATION', + 'CALL', + 'TYPEBOT_START', + 'TYPEBOT_CHANGE_STATUS', + ], + }, + }, + }, + required: ['enabled'], + ...isNotEmpty('enabled'), +}; diff --git a/src/api/integrations/event/websocket/controllers/websocket.controller.ts b/src/api/integrations/event/websocket/controllers/websocket.controller.ts index 563a7302..0b05519c 100644 --- a/src/api/integrations/event/websocket/controllers/websocket.controller.ts +++ b/src/api/integrations/event/websocket/controllers/websocket.controller.ts @@ -3,7 +3,7 @@ import { WebsocketDto } from '@api/integrations/event/websocket/dto/websocket.dt import { PrismaRepository } from '@api/repository/repository.service'; import { WAMonitoringService } from '@api/services/monitor.service'; import { wa } from '@api/types/wa.types'; -import { configService, Cors, HttpServer, Log, Websocket } from '@config/env.config'; +import { configService, Cors, Log, Websocket } from '@config/env.config'; import { Logger } from '@config/logger.config'; import { NotFoundException } from '@exceptions'; import { Server } from 'http'; @@ -109,11 +109,19 @@ export class WebsocketController extends EventController { origin, event, data, + serverUrl, + dateTime, + sender, + apiKey, }: { instanceName: string; origin: string; event: string; data: Object; + serverUrl: string; + dateTime: string; + sender: string; + apiKey?: string; }): Promise { if (!configService.get('WEBSOCKET')?.ENABLED) { return; @@ -121,14 +129,14 @@ export class WebsocketController extends EventController { const configEv = event.replace(/[.-]/gm, '_').toUpperCase(); const logEnabled = configService.get('LOG').LEVEL.includes('WEBSOCKET'); - const serverUrl = configService.get('SERVER').URL; - const date = new Date(Date.now() - new Date().getTimezoneOffset() * 60000).toISOString(); const message = { event, - instanceName, + instance: instanceName, data, - serverUrl, - date, + server_url: serverUrl, + date_time: dateTime, + sender, + apikey: apiKey, }; if (configService.get('WEBSOCKET')?.GLOBAL_EVENTS) { diff --git a/src/api/integrations/event/websocket/dto/websocket.dto.ts b/src/api/integrations/event/websocket/dto/websocket.dto.ts index 27f6d785..cead7b84 100644 --- a/src/api/integrations/event/websocket/dto/websocket.dto.ts +++ b/src/api/integrations/event/websocket/dto/websocket.dto.ts @@ -1,4 +1,13 @@ +import { Constructor } from '@api/dto/integration.dto'; + export class WebsocketDto { enabled: boolean; events?: string[]; } + +export function WebsocketInstanceMixin(Base: TBase) { + return class extends Base { + websocketEnabled?: boolean; + websocketEvents?: string[]; + }; +} diff --git a/src/api/routes/event.router.ts b/src/api/routes/event.router.ts index 80da82db..88ce6306 100644 --- a/src/api/routes/event.router.ts +++ b/src/api/routes/event.router.ts @@ -1,10 +1,9 @@ import { RabbitmqRouter } from '@api/integrations/event/rabbitmq/routes/rabbitmq.router'; import { SqsRouter } from '@api/integrations/event/sqs/routes/sqs.router'; +import { WebhookRouter } from '@api/integrations/event/webhook/routes/webhook.router'; import { WebsocketRouter } from '@api/integrations/event/websocket/routes/websocket.router'; import { Router } from 'express'; -import { WebhookRouter } from './webhook.router'; - export class EventRouter { public readonly router: Router; diff --git a/src/api/server.module.ts b/src/api/server.module.ts index 3e5fff38..8e375b7b 100644 --- a/src/api/server.module.ts +++ b/src/api/server.module.ts @@ -12,7 +12,6 @@ import { ProxyController } from './controllers/proxy.controller'; import { SendMessageController } from './controllers/sendMessage.controller'; import { SettingsController } from './controllers/settings.controller'; import { TemplateController } from './controllers/template.controller'; -import { WebhookController } from './controllers/webhook.controller'; import { ChatwootController } from './integrations/chatbot/chatwoot/controllers/chatwoot.controller'; import { ChatwootService } from './integrations/chatbot/chatwoot/services/chatwoot.service'; import { DifyController } from './integrations/chatbot/dify/controllers/dify.controller'; @@ -22,21 +21,18 @@ import { OpenaiService } from './integrations/chatbot/openai/services/openai.ser import { TypebotController } from './integrations/chatbot/typebot/controllers/typebot.controller'; import { TypebotService } from './integrations/chatbot/typebot/services/typebot.service'; import { RabbitmqController } from './integrations/event/rabbitmq/controllers/rabbitmq.controller'; -import { RabbitmqService } from './integrations/event/rabbitmq/services/rabbitmq.service'; import { SqsController } from './integrations/event/sqs/controllers/sqs.controller'; -import { SqsService } from './integrations/event/sqs/services/sqs.service'; +import { WebhookController } from './integrations/event/webhook/controllers/webhook.controller'; import { WebsocketController } from './integrations/event/websocket/controllers/websocket.controller'; import { S3Controller } from './integrations/storage/s3/controllers/s3.controller'; import { S3Service } from './integrations/storage/s3/services/s3.service'; import { ProviderFiles } from './provider/sessions'; import { PrismaRepository } from './repository/repository.service'; -import { AuthService } from './services/auth.service'; import { CacheService } from './services/cache.service'; import { WAMonitoringService } from './services/monitor.service'; import { ProxyService } from './services/proxy.service'; import { SettingsService } from './services/settings.service'; import { TemplateService } from './services/template.service'; -import { WebhookService } from './services/webhook.service'; const logger = new Logger('WA MODULE'); @@ -65,8 +61,6 @@ export const waMonitor = new WAMonitoringService( baileysCache, ); -const authService = new AuthService(prismaRepository); - const typebotService = new TypebotService(waMonitor, configService, prismaRepository); export const typebotController = new TypebotController(typebotService); @@ -79,25 +73,19 @@ export const difyController = new DifyController(difyService); const s3Service = new S3Service(prismaRepository); export const s3Controller = new S3Controller(s3Service); -const webhookService = new WebhookService(waMonitor, prismaRepository); -export const webhookController = new WebhookController(webhookService, waMonitor); - const templateService = new TemplateService(waMonitor, prismaRepository, configService); export const templateController = new TemplateController(templateService); export const eventController = new EventController(prismaRepository, waMonitor); export const websocketController = new WebsocketController(prismaRepository, waMonitor); +export const rabbitmqController = new RabbitmqController(prismaRepository, waMonitor); +export const sqsController = new SqsController(prismaRepository, waMonitor); +export const webhookController = new WebhookController(prismaRepository, waMonitor); const proxyService = new ProxyService(waMonitor); export const proxyController = new ProxyController(proxyService, waMonitor); -const rabbitmqService = new RabbitmqService(waMonitor); -export const rabbitmqController = new RabbitmqController(rabbitmqService); - -const sqsService = new SqsService(waMonitor); -export const sqsController = new SqsController(sqsService); - const chatwootService = new ChatwootService(waMonitor, configService, prismaRepository, chatwootCache); export const chatwootController = new ChatwootController(chatwootService, configService, prismaRepository); @@ -109,13 +97,8 @@ export const instanceController = new InstanceController( configService, prismaRepository, eventEmitter, - authService, - webhookService, chatwootService, settingsService, - websocketController, - rabbitmqService, - sqsService, proxyController, cache, chatwootCache, diff --git a/src/api/services/channel.service.ts b/src/api/services/channel.service.ts index b3291b92..7acc7438 100644 --- a/src/api/services/channel.service.ts +++ b/src/api/services/channel.service.ts @@ -1,27 +1,20 @@ import { InstanceDto } from '@api/dto/instance.dto'; import { ProxyDto } from '@api/dto/proxy.dto'; import { SettingsDto } from '@api/dto/settings.dto'; -import { WebhookDto } from '@api/dto/webhook.dto'; import { ChatwootDto } from '@api/integrations/chatbot/chatwoot/dto/chatwoot.dto'; import { ChatwootService } from '@api/integrations/chatbot/chatwoot/services/chatwoot.service'; import { DifyService } from '@api/integrations/chatbot/dify/services/dify.service'; import { OpenaiService } from '@api/integrations/chatbot/openai/services/openai.service'; import { TypebotService } from '@api/integrations/chatbot/typebot/services/typebot.service'; -import { RabbitmqDto } from '@api/integrations/event/rabbitmq/dto/rabbitmq.dto'; -import { getAMQP, removeQueues } from '@api/integrations/event/rabbitmq/libs/amqp.server'; -import { SqsDto } from '@api/integrations/event/sqs/dto/sqs.dto'; -import { getSQS, removeQueues as removeQueuesSQS } from '@api/integrations/event/sqs/libs/sqs.server'; import { PrismaRepository, Query } from '@api/repository/repository.service'; import { eventController, waMonitor } from '@api/server.module'; import { Events, wa } from '@api/types/wa.types'; -import { Auth, Chatwoot, ConfigService, HttpServer, Log, Rabbitmq, Sqs, Webhook } from '@config/env.config'; +import { Auth, Chatwoot, ConfigService, HttpServer } from '@config/env.config'; import { Logger } from '@config/logger.config'; import { ROOT_DIR } from '@config/path.config'; import { NotFoundException } from '@exceptions'; import { Contact, Message } from '@prisma/client'; -import axios from 'axios'; import { WASocket } from 'baileys'; -import { isURL } from 'class-validator'; import EventEmitter2 from 'eventemitter2'; import { join } from 'path'; import { v4 } from 'uuid'; @@ -40,10 +33,7 @@ export class ChannelStartupService { public client: WASocket; public readonly instance: wa.Instance = {}; - public readonly localWebhook: wa.LocalWebHook = {}; public readonly localChatwoot: wa.LocalChatwoot = {}; - public readonly localRabbitmq: wa.LocalRabbitmq = {}; - public readonly localSqs: wa.LocalSqs = {}; public readonly localProxy: wa.LocalProxy = {}; public readonly localSettings: wa.LocalSettings = {}; public readonly storePath = join(ROOT_DIR, 'store'); @@ -215,73 +205,6 @@ export class ChannelStartupService { }; } - public async loadWebhook() { - const data = await this.prismaRepository.webhook.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - this.localWebhook.url = data?.url; - this.localWebhook.enabled = data?.enabled; - this.localWebhook.events = data?.events; - this.localWebhook.webhookByEvents = data?.webhookByEvents; - this.localWebhook.webhookBase64 = data?.webhookBase64; - } - - public async setWebhook(data: WebhookDto) { - const findWebhook = await this.prismaRepository.webhook.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - if (findWebhook) { - await this.prismaRepository.webhook.update({ - where: { - instanceId: this.instanceId, - }, - data: { - url: data.url, - enabled: data.enabled, - events: data.events, - webhookByEvents: data.webhookByEvents, - webhookBase64: data.webhookBase64, - }, - }); - - Object.assign(this.localWebhook, data); - return; - } - await this.prismaRepository.webhook.create({ - data: { - url: data.url, - enabled: data.enabled, - events: data.events, - webhookByEvents: data.webhookByEvents, - webhookBase64: data.webhookBase64, - instanceId: this.instanceId, - }, - }); - - Object.assign(this.localWebhook, data); - return; - } - - public async findWebhook() { - const data = await this.prismaRepository.webhook.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - if (!data) { - throw new NotFoundException('Webhook not found'); - } - - return data; - } - public async loadChatwoot() { if (!this.configService.get('CHATWOOT').ENABLED) { return; @@ -422,136 +345,6 @@ export class ChannelStartupService { } } - public async loadRabbitmq() { - const data = await this.prismaRepository.rabbitmq.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - this.localRabbitmq.enabled = data?.enabled; - this.localRabbitmq.events = data?.events; - } - - public async setRabbitmq(data: RabbitmqDto) { - const findRabbitmq = await this.prismaRepository.rabbitmq.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - if (findRabbitmq) { - await this.prismaRepository.rabbitmq.update({ - where: { - instanceId: this.instanceId, - }, - data: { - enabled: data.enabled, - events: data.events, - }, - }); - - Object.assign(this.localRabbitmq, data); - return; - } - - await this.prismaRepository.rabbitmq.create({ - data: { - enabled: data.enabled, - events: data.events, - instanceId: this.instanceId, - }, - }); - - Object.assign(this.localRabbitmq, data); - return; - } - - public async findRabbitmq() { - const data = await this.prismaRepository.rabbitmq.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - if (!data) { - throw new NotFoundException('Rabbitmq not found'); - } - - return data; - } - - public async removeRabbitmqQueues() { - if (this.localRabbitmq.enabled) { - removeQueues(this.instanceName, this.localRabbitmq.events); - } - } - - public async loadSqs() { - const data = await this.prismaRepository.sqs.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - this.localSqs.enabled = data?.enabled; - this.localSqs.events = data?.events; - } - - public async setSqs(data: SqsDto) { - const findSqs = await this.prismaRepository.sqs.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - if (findSqs) { - await this.prismaRepository.sqs.update({ - where: { - instanceId: this.instanceId, - }, - data: { - enabled: data.enabled, - events: data.events, - }, - }); - - Object.assign(this.localSqs, data); - return; - } - - await this.prismaRepository.sqs.create({ - data: { - enabled: data.enabled, - events: data.events, - instanceId: this.instanceId, - }, - }); - - Object.assign(this.localSqs, data); - return; - } - - public async findSqs() { - const data = await this.prismaRepository.sqs.findUnique({ - where: { - instanceId: this.instanceId, - }, - }); - - if (!data) { - throw new NotFoundException('Sqs not found'); - } - - return data; - } - - public async removeSqsQueues() { - if (this.localSqs.enabled) { - removeQueuesSQS(this.instanceName, this.localSqs.events); - } - } - public async loadProxy() { const data = await this.prismaRepository.proxy.findUnique({ where: { @@ -598,16 +391,7 @@ export class ChannelStartupService { } public async sendDataWebhook(event: Events, data: T, local = true) { - const webhookGlobal = this.configService.get('WEBHOOK'); - const webhookLocal = this.localWebhook.events; - const rabbitmqLocal = this.localRabbitmq.events; - const sqsLocal = this.localSqs.events; const serverUrl = this.configService.get('SERVER').URL; - const rabbitmqEnabled = this.configService.get('RABBITMQ').ENABLED; - const rabbitmqGlobal = this.configService.get('RABBITMQ').GLOBAL_ENABLED; - const rabbitmqEvents = this.configService.get('RABBITMQ').EVENTS; - const we = event.replace(/[.-]/gm, '_').toUpperCase(); - const transformedWe = we.replace(/_/gm, '-').toLowerCase(); const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds const localISOTime = new Date(Date.now() - tzoffset).toISOString(); const now = localISOTime; @@ -616,360 +400,17 @@ export class ChannelStartupService { const instanceApikey = this.token || 'Apikey not found'; - if (rabbitmqEnabled) { - const amqp = getAMQP(); - if (this.localRabbitmq.enabled && amqp) { - if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { - const exchangeName = this.instanceName ?? 'evolution_exchange'; - - let retry = 0; - - while (retry < 3) { - try { - await amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const eventName = event.replace(/_/g, '.').toLowerCase(); - - const queueName = `${this.instanceName}.${eventName}`; - - await amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - await amqp.bindQueue(queueName, exchangeName, eventName); - - const message = { - event, - instance: this.instance.name, - data, - server_url: serverUrl, - date_time: now, - sender: this.wuid, - }; - - if (expose && instanceApikey) { - message['apikey'] = instanceApikey; - } - - await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); - - if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { - const logData = { - local: ChannelStartupService.name + '.sendData-RabbitMQ', - event, - instance: this.instance.name, - data, - server_url: serverUrl, - apikey: (expose && instanceApikey) || null, - date_time: now, - sender: this.wuid, - }; - - if (expose && instanceApikey) { - logData['apikey'] = instanceApikey; - } - - this.logger.log(logData); - } - break; - } catch (error) { - retry++; - } - } - } - } - - if (rabbitmqGlobal && rabbitmqEvents[we] && amqp) { - const exchangeName = 'evolution_exchange'; - - let retry = 0; - - while (retry < 3) { - try { - await amqp.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const queueName = event; - - await amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - await amqp.bindQueue(queueName, exchangeName, event); - - const message = { - event, - instance: this.instance.name, - data, - server_url: serverUrl, - date_time: now, - sender: this.wuid, - }; - - if (expose && instanceApikey) { - message['apikey'] = instanceApikey; - } - await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); - - if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { - const logData = { - local: ChannelStartupService.name + '.sendData-RabbitMQ-Global', - event, - instance: this.instance.name, - data, - server_url: serverUrl, - apikey: (expose && instanceApikey) || null, - date_time: now, - sender: this.wuid, - }; - - if (expose && instanceApikey) { - logData['apikey'] = instanceApikey; - } - - this.logger.log(logData); - } - - break; - } catch (error) { - retry++; - } - } - } - } - - if (this.localSqs.enabled) { - const sqs = getSQS(); - - if (sqs) { - if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) { - const eventFormatted = `${event.replace('.', '_').toLowerCase()}`; - - const queueName = `${this.instanceName}_${eventFormatted}.fifo`; - - const sqsConfig = this.configService.get('SQS'); - - const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`; - - const message = { - event, - instance: this.instance.name, - data, - server_url: serverUrl, - date_time: now, - sender: this.wuid, - }; - - if (expose && instanceApikey) { - message['apikey'] = instanceApikey; - } - - const params = { - MessageBody: JSON.stringify(message), - MessageGroupId: 'evolution', - MessageDeduplicationId: `${this.instanceName}_${eventFormatted}_${Date.now()}`, - QueueUrl: sqsUrl, - }; - - sqs.sendMessage(params, (err, data) => { - if (err) { - this.logger.error({ - local: ChannelStartupService.name + '.sendData-SQS', - message: err?.message, - hostName: err?.hostname, - code: err?.code, - stack: err?.stack, - name: err?.name, - url: queueName, - server_url: serverUrl, - }); - } else { - if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { - const logData = { - local: ChannelStartupService.name + '.sendData-SQS', - event, - instance: this.instance.name, - data, - server_url: serverUrl, - apikey: (expose && instanceApikey) || null, - date_time: now, - sender: this.wuid, - }; - - if (expose && instanceApikey) { - logData['apikey'] = instanceApikey; - } - - this.logger.log(logData); - } - } - }); - } - } - } - await eventController.emit({ instanceName: this.instance.name, origin: ChannelStartupService.name, event, - data: { - ...data, - sender: this.wuid, - apikey: (expose && instanceApikey) || null, - }, + data, + serverUrl, + dateTime: now, + sender: this.wuid, + apiKey: expose && instanceApikey ? instanceApikey : null, + local, }); - - const globalApiKey = this.configService.get('AUTHENTICATION').API_KEY.KEY; - - if (local) { - if (Array.isArray(webhookLocal) && webhookLocal.includes(we)) { - let baseURL: string; - - if (this.localWebhook.webhookByEvents) { - baseURL = `${this.localWebhook.url}/${transformedWe}`; - } else { - baseURL = this.localWebhook.url; - } - - if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { - const logData = { - local: ChannelStartupService.name + '.sendDataWebhook-local', - url: baseURL, - event, - instance: this.instance.name, - data, - destination: this.localWebhook.url, - date_time: now, - sender: this.wuid, - server_url: serverUrl, - apikey: (expose && instanceApikey) || null, - }; - - if (expose && instanceApikey) { - logData['apikey'] = instanceApikey; - } - - this.logger.log(logData); - } - - try { - if (this.localWebhook.enabled && isURL(this.localWebhook.url, { require_tld: false })) { - const httpService = axios.create({ baseURL }); - const postData = { - event, - instance: this.instance.name, - data, - destination: this.localWebhook.url, - date_time: now, - sender: this.wuid, - server_url: serverUrl, - }; - - if (expose && instanceApikey) { - postData['apikey'] = instanceApikey; - } - - await httpService.post('', postData); - } - } catch (error) { - this.logger.error({ - local: ChannelStartupService.name + '.sendDataWebhook-local', - message: error?.message, - hostName: error?.hostname, - syscall: error?.syscall, - code: error?.code, - error: error?.errno, - stack: error?.stack, - name: error?.name, - url: baseURL, - server_url: serverUrl, - }); - } - } - } - - if (webhookGlobal.GLOBAL?.ENABLED) { - if (webhookGlobal.EVENTS[we]) { - const globalWebhook = this.configService.get('WEBHOOK').GLOBAL; - - let globalURL; - - if (webhookGlobal.GLOBAL.WEBHOOK_BY_EVENTS) { - globalURL = `${globalWebhook.URL}/${transformedWe}`; - } else { - globalURL = globalWebhook.URL; - } - - const localUrl = this.localWebhook.url; - - if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { - const logData = { - local: ChannelStartupService.name + '.sendDataWebhook-global', - url: globalURL, - event, - instance: this.instance.name, - data, - destination: localUrl, - date_time: now, - sender: this.wuid, - server_url: serverUrl, - }; - - if (expose && globalApiKey) { - logData['apikey'] = globalApiKey; - } - - this.logger.log(logData); - } - - try { - if (globalWebhook && globalWebhook?.ENABLED && isURL(globalURL)) { - const httpService = axios.create({ baseURL: globalURL }); - const postData = { - event, - instance: this.instance.name, - data, - destination: localUrl, - date_time: now, - sender: this.wuid, - server_url: serverUrl, - }; - - if (expose && globalApiKey) { - postData['apikey'] = globalApiKey; - } - - await httpService.post('', postData); - } - } catch (error) { - this.logger.error({ - local: ChannelStartupService.name + '.sendDataWebhook-global', - message: error?.message, - hostName: error?.hostname, - syscall: error?.syscall, - code: error?.code, - error: error?.errno, - stack: error?.stack, - name: error?.name, - url: globalURL, - server_url: serverUrl, - }); - } - } - } } // Check if the number is MX or AR diff --git a/src/api/services/channels/whatsapp.baileys.service.ts b/src/api/services/channels/whatsapp.baileys.service.ts index 735c2314..7be72ae8 100644 --- a/src/api/services/channels/whatsapp.baileys.service.ts +++ b/src/api/services/channels/whatsapp.baileys.service.ts @@ -654,11 +654,8 @@ export class BaileysStartupService extends ChannelStartupService { public async connectToWhatsapp(number?: string): Promise { try { - this.loadWebhook(); this.loadChatwoot(); this.loadSettings(); - this.loadRabbitmq(); - this.loadSqs(); this.loadProxy(); return await this.createClient(number); @@ -1184,7 +1181,7 @@ export class BaileysStartupService extends ChannelStartupService { } } - if (isMedia && !this.configService.get('S3').ENABLE && this.localWebhook.webhookBase64 === true) { + if (isMedia && !this.configService.get('S3').ENABLE) { const buffer = await downloadMediaMessage( { key: received.key, message: received?.message }, 'buffer', diff --git a/src/api/services/channels/whatsapp.business.service.ts b/src/api/services/channels/whatsapp.business.service.ts index 5a739586..885708d9 100644 --- a/src/api/services/channels/whatsapp.business.service.ts +++ b/src/api/services/channels/whatsapp.business.service.ts @@ -129,10 +129,7 @@ export class BusinessStartupService extends ChannelStartupService { const content = data.entry[0].changes[0].value; try { - this.loadWebhook(); this.loadChatwoot(); - this.loadRabbitmq(); - this.loadSqs(); this.eventHandler(content); diff --git a/src/api/services/monitor.service.ts b/src/api/services/monitor.service.ts index 493b70a7..be3f821d 100644 --- a/src/api/services/monitor.service.ts +++ b/src/api/services/monitor.service.ts @@ -1,8 +1,7 @@ import { InstanceDto } from '@api/dto/instance.dto'; import { ProviderFiles } from '@api/provider/sessions'; import { PrismaRepository } from '@api/repository/repository.service'; -import { websocketController } from '@api/server.module'; -import { Integration } from '@api/types/wa.types'; +import { Events, Integration } from '@api/types/wa.types'; import { CacheConf, Chatwoot, ConfigService, Database, DelInstance, ProviderSession } from '@config/env.config'; import { Logger } from '@config/logger.config'; import { INSTANCE_DIR, STORE_DIR } from '@config/path.config'; @@ -52,10 +51,8 @@ export class WAMonitoringService { this.waInstances[instance]?.client?.ws?.close(); this.waInstances[instance]?.client?.end(undefined); } - this.waInstances[instance]?.removeRabbitmqQueues(); this.eventEmitter.emit('remove.instance', instance, 'inner'); } else { - this.waInstances[instance]?.removeRabbitmqQueues(); this.eventEmitter.emit('remove.instance', instance, 'inner'); } } @@ -340,12 +337,7 @@ export class WAMonitoringService { private removeInstance() { this.eventEmitter.on('remove.instance', async (instanceName: string) => { try { - await websocketController.emit({ - instanceName, - origin: WAMonitoringService.name, - event: 'remove.instance', - data: null, - }); + await this.waInstances[instanceName]?.sendDataWebhook(Events.REMOVE_INSTANCE, null); this.cleaningUp(instanceName); this.cleaningStoreData(instanceName); @@ -361,12 +353,7 @@ export class WAMonitoringService { }); this.eventEmitter.on('logout.instance', async (instanceName: string) => { try { - await websocketController.emit({ - instanceName, - origin: WAMonitoringService.name, - event: 'logout.instance', - data: null, - }); + await this.waInstances[instanceName]?.sendDataWebhook(Events.LOGOUT_INSTANCE, null); if (this.configService.get('CHATWOOT').ENABLED) { this.waInstances[instanceName]?.clearCacheChatwoot(); diff --git a/src/api/services/webhook.service.ts b/src/api/services/webhook.service.ts deleted file mode 100644 index d8f10932..00000000 --- a/src/api/services/webhook.service.ts +++ /dev/null @@ -1,82 +0,0 @@ -import { InstanceDto } from '@api/dto/instance.dto'; -import { WebhookDto } from '@api/dto/webhook.dto'; -import { PrismaRepository } from '@api/repository/repository.service'; -import { Logger } from '@config/logger.config'; -import { Webhook } from '@prisma/client'; -import axios from 'axios'; - -import { WAMonitoringService } from './monitor.service'; - -export class WebhookService { - constructor(private readonly waMonitor: WAMonitoringService, public readonly prismaRepository: PrismaRepository) {} - - private readonly logger = new Logger('WebhookService'); - - public create(instance: InstanceDto, data: WebhookDto) { - this.waMonitor.waInstances[instance.instanceName].setWebhook(data); - - return { webhook: { ...instance, webhook: data } }; - } - - public async find(instance: InstanceDto): Promise { - try { - const result = await this.waMonitor.waInstances[instance.instanceName].findWebhook(); - - if (Object.keys(result).length === 0) { - throw new Error('Webhook not found'); - } - - return result; - } catch (error) { - return null; - } - } - - public async receiveWebhook(data: any) { - if (data.object === 'whatsapp_business_account') { - if (data.entry[0]?.changes[0]?.field === 'message_template_status_update') { - const template = await this.prismaRepository.template.findFirst({ - where: { templateId: `${data.entry[0].changes[0].value.message_template_id}` }, - }); - - if (!template) { - console.log('template not found'); - return; - } - - const { webhookUrl } = template; - - await axios.post(webhookUrl, data.entry[0].changes[0].value, { - headers: { - 'Content-Type': 'application/json', - }, - }); - return; - } - - data.entry?.forEach(async (entry: any) => { - const numberId = entry.changes[0].value.metadata.phone_number_id; - - if (!numberId) { - this.logger.error('WebhookService -> receiveWebhook -> numberId not found'); - return; - } - - const instance = await this.prismaRepository.instance.findFirst({ - where: { number: numberId }, - }); - - if (!instance) { - this.logger.error('WebhookService -> receiveWebhook -> instance not found'); - return; - } - - await this.waMonitor.waInstances[instance.name].connectToWhatsapp(data); - - return; - }); - } - - return; - } -} diff --git a/src/api/types/wa.types.ts b/src/api/types/wa.types.ts index 93187bb6..7004592f 100644 --- a/src/api/types/wa.types.ts +++ b/src/api/types/wa.types.ts @@ -33,6 +33,8 @@ export enum Events { LABELS_ASSOCIATION = 'labels.association', CREDS_UPDATE = 'creds.update', MESSAGING_HISTORY_SET = 'messaging-history.set', + REMOVE_INSTANCE = 'remove.instance', + LOGOUT_INSTANCE = 'logout.instance', } export declare namespace wa { diff --git a/src/main.ts b/src/main.ts index 27da0bbe..62346983 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,10 +1,8 @@ -import { initAMQP, initGlobalQueues } from '@api/integrations/event/rabbitmq/libs/amqp.server'; -import { initSQS } from '@api/integrations/event/sqs/libs/sqs.server'; import { ProviderFiles } from '@api/provider/sessions'; import { PrismaRepository } from '@api/repository/repository.service'; import { HttpStatus, router } from '@api/routes/index.router'; import { eventController, waMonitor } from '@api/server.module'; -import { Auth, configService, Cors, HttpServer, ProviderSession, Rabbitmq, Sqs, Webhook } from '@config/env.config'; +import { Auth, configService, Cors, HttpServer, ProviderSession, Webhook } from '@config/env.config'; import { onUnexpectedError } from '@config/error.config'; import { Logger } from '@config/logger.config'; import { ROOT_DIR } from '@config/path.config'; @@ -147,14 +145,6 @@ async function bootstrap() { initWA(); - if (configService.get('RABBITMQ')?.ENABLED) { - initAMQP().then(() => { - if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) initGlobalQueues(); - }); - } - - if (configService.get('SQS')?.ENABLED) initSQS(); - onUnexpectedError(); } diff --git a/src/validate/chatbot.schema.ts b/src/validate/chatbot.schema.ts new file mode 100644 index 00000000..d2b5b7fe --- /dev/null +++ b/src/validate/chatbot.schema.ts @@ -0,0 +1,4 @@ +export * from '@api/integrations/chatbot/chatwoot/validate/chatwoot.schema'; +export * from '@api/integrations/chatbot/dify/validate/dify.schema'; +export * from '@api/integrations/chatbot/openai/validate/openai.schema'; +export * from '@api/integrations/chatbot/typebot/validate/typebot.schema'; diff --git a/src/validate/event.schema.ts b/src/validate/event.schema.ts new file mode 100644 index 00000000..d4f691ce --- /dev/null +++ b/src/validate/event.schema.ts @@ -0,0 +1,4 @@ +export * from './webhook.schema'; +export * from './websocket.schema'; +export * from '@api/integrations/event/rabbitmq/validate/rabbitmq.schema'; +export * from '@api/integrations/event/sqs/validate/sqs.schema'; diff --git a/src/validate/validate.schema.ts b/src/validate/validate.schema.ts index 64fae31c..fb17e3d5 100644 --- a/src/validate/validate.schema.ts +++ b/src/validate/validate.schema.ts @@ -1,5 +1,7 @@ // Integrations Schema export * from './chat.schema'; +export * from './chatbot.schema'; +export * from './event.schema'; export * from './group.schema'; export * from './instance.schema'; export * from './label.schema'; @@ -7,11 +9,3 @@ export * from './message.schema'; export * from './proxy.schema'; export * from './settings.schema'; export * from './template.schema'; -export * from './webhook.schema'; -export * from './websocket.schema'; -export * from '@api/integrations/chatbot/chatwoot/validate/chatwoot.schema'; -export * from '@api/integrations/chatbot/dify/validate/dify.schema'; -export * from '@api/integrations/chatbot/openai/validate/openai.schema'; -export * from '@api/integrations/event/rabbitmq/validate/rabbitmq.schema'; -export * from '@api/integrations/event/sqs/validate/sqs.schema'; -export * from '@api/integrations/chatbot/typebot/validate/typebot.schema';