diff --git a/.env.example b/.env.example index 52644bea..73a3b40d 100644 --- a/.env.example +++ b/.env.example @@ -190,6 +190,60 @@ PUSHER_EVENTS_CALL=true PUSHER_EVENTS_TYPEBOT_START=false PUSHER_EVENTS_TYPEBOT_CHANGE_STATUS=false +# Kafka - Environment variables +KAFKA_ENABLED=false +KAFKA_CLIENT_ID=evolution-api +KAFKA_BROKERS=localhost:9092 +KAFKA_CONNECTION_TIMEOUT=3000 +KAFKA_REQUEST_TIMEOUT=30000 +# Global events - By enabling this variable, events from all instances are sent to global Kafka topics. +KAFKA_GLOBAL_ENABLED=false +KAFKA_CONSUMER_GROUP_ID=evolution-api-consumers +KAFKA_TOPIC_PREFIX=evolution +KAFKA_NUM_PARTITIONS=1 +KAFKA_REPLICATION_FACTOR=1 +KAFKA_AUTO_CREATE_TOPICS=false +# Choose the events you want to send to Kafka +KAFKA_EVENTS_APPLICATION_STARTUP=false +KAFKA_EVENTS_INSTANCE_CREATE=false +KAFKA_EVENTS_INSTANCE_DELETE=false +KAFKA_EVENTS_QRCODE_UPDATED=false +KAFKA_EVENTS_MESSAGES_SET=false +KAFKA_EVENTS_MESSAGES_UPSERT=false +KAFKA_EVENTS_MESSAGES_EDITED=false +KAFKA_EVENTS_MESSAGES_UPDATE=false +KAFKA_EVENTS_MESSAGES_DELETE=false +KAFKA_EVENTS_SEND_MESSAGE=false +KAFKA_EVENTS_SEND_MESSAGE_UPDATE=false +KAFKA_EVENTS_CONTACTS_SET=false +KAFKA_EVENTS_CONTACTS_UPSERT=false +KAFKA_EVENTS_CONTACTS_UPDATE=false +KAFKA_EVENTS_PRESENCE_UPDATE=false +KAFKA_EVENTS_CHATS_SET=false +KAFKA_EVENTS_CHATS_UPSERT=false +KAFKA_EVENTS_CHATS_UPDATE=false +KAFKA_EVENTS_CHATS_DELETE=false +KAFKA_EVENTS_GROUPS_UPSERT=false +KAFKA_EVENTS_GROUPS_UPDATE=false +KAFKA_EVENTS_GROUP_PARTICIPANTS_UPDATE=false +KAFKA_EVENTS_CONNECTION_UPDATE=false +KAFKA_EVENTS_LABELS_EDIT=false +KAFKA_EVENTS_LABELS_ASSOCIATION=false +KAFKA_EVENTS_CALL=false +KAFKA_EVENTS_TYPEBOT_START=false +KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS=false +# SASL Authentication (optional) +KAFKA_SASL_ENABLED=false +KAFKA_SASL_MECHANISM=plain +KAFKA_SASL_USERNAME= +KAFKA_SASL_PASSWORD= +# SSL Configuration (optional) +KAFKA_SSL_ENABLED=false +KAFKA_SSL_REJECT_UNAUTHORIZED=true +KAFKA_SSL_CA= +KAFKA_SSL_KEY= +KAFKA_SSL_CERT= + # WhatsApp Business API - Environment variables # Token used to validate the webhook on the Facebook APP WA_BUSINESS_TOKEN_WEBHOOK=evolution diff --git a/README.md b/README.md index 5f486a15..eb7e638c 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,9 @@ Evolution API supports various integrations to enhance its functionality. Below - [RabbitMQ](https://www.rabbitmq.com/): - Receive events from the Evolution API via RabbitMQ. +- [Apache Kafka](https://kafka.apache.org/): + - Receive events from the Evolution API via Apache Kafka for real-time event streaming and processing. + - [Amazon SQS](https://aws.amazon.com/pt/sqs/): - Receive events from the Evolution API via Amazon SQS. diff --git a/package-lock.json b/package-lock.json index d898321d..f1f483e3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -39,6 +39,7 @@ "json-schema": "^0.4.0", "jsonschema": "^1.4.1", "jsonwebtoken": "^9.0.2", + "kafkajs": "^2.2.4", "link-preview-js": "^3.0.13", "long": "^5.2.3", "mediainfo.js": "^0.3.4", @@ -10718,6 +10719,15 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "license": "MIT", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "5.5.1", "resolved": "https://registry.npmjs.org/keyv/-/keyv-5.5.1.tgz", diff --git a/package.json b/package.json index b3771546..6168e11c 100644 --- a/package.json +++ b/package.json @@ -95,6 +95,7 @@ "json-schema": "^0.4.0", "jsonschema": "^1.4.1", "jsonwebtoken": "^9.0.2", + "kafkajs": "^2.2.4", "link-preview-js": "^3.0.13", "long": "^5.2.3", "mediainfo.js": "^0.3.4", diff --git a/prisma/mysql-migrations/20250918183910_add_kafka_integration/migration.sql b/prisma/mysql-migrations/20250918183910_add_kafka_integration/migration.sql new file mode 100644 index 00000000..c3a089bd --- /dev/null +++ b/prisma/mysql-migrations/20250918183910_add_kafka_integration/migration.sql @@ -0,0 +1,231 @@ +/* + Warnings: + + - You are about to alter the column `createdAt` on the `Chat` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Chat` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Chatwoot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Chatwoot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Contact` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Contact` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Dify` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Dify` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `DifySetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `DifySetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Evoai` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Evoai` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `EvoaiSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `EvoaiSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `EvolutionBot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `EvolutionBot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `EvolutionBotSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `EvolutionBotSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Flowise` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Flowise` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `FlowiseSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `FlowiseSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `disconnectionAt` on the `Instance` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Instance` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Instance` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `IntegrationSession` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `IntegrationSession` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to drop the column `lid` on the `IsOnWhatsapp` table. All the data in the column will be lost. + - You are about to alter the column `createdAt` on the `IsOnWhatsapp` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `IsOnWhatsapp` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Label` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Label` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Media` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `N8n` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `N8n` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `N8nSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `N8nSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Nats` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Nats` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `OpenaiBot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `OpenaiBot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `OpenaiCreds` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `OpenaiCreds` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `OpenaiSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `OpenaiSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Proxy` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Proxy` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Pusher` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Pusher` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Rabbitmq` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Rabbitmq` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Session` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Setting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Setting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Sqs` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Sqs` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Template` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Template` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to drop the column `splitMessages` on the `Typebot` table. All the data in the column will be lost. + - You are about to drop the column `timePerChar` on the `Typebot` table. All the data in the column will be lost. + - You are about to alter the column `createdAt` on the `Typebot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Typebot` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to drop the column `splitMessages` on the `TypebotSetting` table. All the data in the column will be lost. + - You are about to drop the column `timePerChar` on the `TypebotSetting` table. All the data in the column will be lost. + - You are about to alter the column `createdAt` on the `TypebotSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `TypebotSetting` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Webhook` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Webhook` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `createdAt` on the `Websocket` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + - You are about to alter the column `updatedAt` on the `Websocket` table. The data in that column could be lost. The data in that column will be cast from `Timestamp(0)` to `Timestamp`. + +*/ +-- DropIndex +DROP INDEX `unique_remote_instance` ON `Chat`; + +-- AlterTable +ALTER TABLE `Chat` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NULL; + +-- AlterTable +ALTER TABLE `Chatwoot` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Contact` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NULL; + +-- AlterTable +ALTER TABLE `Dify` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `DifySetting` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Evoai` MODIFY `triggerType` ENUM('all', 'keyword', 'none', 'advanced') NULL, + MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `EvoaiSetting` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `EvolutionBot` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `EvolutionBotSetting` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Flowise` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `FlowiseSetting` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Instance` MODIFY `disconnectionAt` TIMESTAMP NULL, + MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NULL; + +-- AlterTable +ALTER TABLE `IntegrationSession` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `IsOnWhatsapp` DROP COLUMN `lid`, + MODIFY `createdAt` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Label` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Media` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP; + +-- AlterTable +ALTER TABLE `N8n` MODIFY `triggerType` ENUM('all', 'keyword', 'none', 'advanced') NULL, + MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `N8nSetting` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Nats` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `OpenaiBot` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `OpenaiCreds` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `OpenaiSetting` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Proxy` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Pusher` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Rabbitmq` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Session` MODIFY `createdAt` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- AlterTable +ALTER TABLE `Setting` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Sqs` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Template` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Typebot` DROP COLUMN `splitMessages`, + DROP COLUMN `timePerChar`, + MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NULL; + +-- AlterTable +ALTER TABLE `TypebotSetting` DROP COLUMN `splitMessages`, + DROP COLUMN `timePerChar`, + MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Webhook` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- AlterTable +ALTER TABLE `Websocket` MODIFY `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + MODIFY `updatedAt` TIMESTAMP NOT NULL; + +-- CreateTable +CREATE TABLE `Kafka` ( + `id` VARCHAR(191) NOT NULL, + `enabled` BOOLEAN NOT NULL DEFAULT false, + `events` JSON NOT NULL, + `createdAt` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, + `updatedAt` TIMESTAMP NOT NULL, + `instanceId` VARCHAR(191) NOT NULL, + + UNIQUE INDEX `Kafka_instanceId_key`(`instanceId`), + PRIMARY KEY (`id`) +) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +-- AddForeignKey +ALTER TABLE `Kafka` ADD CONSTRAINT `Kafka_instanceId_fkey` FOREIGN KEY (`instanceId`) REFERENCES `Instance`(`id`) ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/mysql-schema.prisma b/prisma/mysql-schema.prisma index 70efea47..71b5a743 100644 --- a/prisma/mysql-schema.prisma +++ b/prisma/mysql-schema.prisma @@ -88,6 +88,7 @@ model Instance { Rabbitmq Rabbitmq? Nats Nats? Sqs Sqs? + Kafka Kafka? Websocket Websocket? Typebot Typebot[] Session Session? @@ -105,8 +106,11 @@ model Instance { EvolutionBotSetting EvolutionBotSetting? Flowise Flowise[] FlowiseSetting FlowiseSetting? - Pusher Pusher? N8n N8n[] + N8nSetting N8nSetting? + Evoai Evoai[] + EvoaiSetting EvoaiSetting? + Pusher Pusher? } model Session { @@ -309,6 +313,16 @@ model Sqs { instanceId String @unique } +model Kafka { + id String @id @default(cuid()) + enabled Boolean @default(false) + events Json @db.Json + createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp + updatedAt DateTime @updatedAt @db.Timestamp + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String @unique +} + model Websocket { id String @id @default(cuid()) enabled Boolean @default(false) @@ -647,7 +661,7 @@ model IsOnWhatsapp { model N8n { id String @id @default(cuid()) - enabled Boolean @default(true) @db.TinyInt(1) + enabled Boolean @default(true) @db.TinyInt() description String? @db.VarChar(255) webhookUrl String? @db.VarChar(255) basicAuthUser String? @db.VarChar(255) @@ -666,7 +680,7 @@ model N8n { triggerType TriggerType? triggerOperator TriggerOperator? triggerValue String? - createdAt DateTime? @default(now()) @db.Timestamp + createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) instanceId String @@ -686,7 +700,7 @@ model N8nSetting { ignoreJids Json? splitMessages Boolean? @default(false) timePerChar Int? @default(50) @db.Int - createdAt DateTime? @default(now()) @db.Timestamp + createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp Fallback N8n? @relation(fields: [n8nIdFallback], references: [id]) n8nIdFallback String? @db.VarChar(100) @@ -696,7 +710,7 @@ model N8nSetting { model Evoai { id String @id @default(cuid()) - enabled Boolean @default(true) @db.TinyInt(1) + enabled Boolean @default(true) @db.TinyInt() description String? @db.VarChar(255) agentUrl String? @db.VarChar(255) apiKey String? @db.VarChar(255) @@ -714,7 +728,7 @@ model Evoai { triggerType TriggerType? triggerOperator TriggerOperator? triggerValue String? - createdAt DateTime? @default(now()) @db.Timestamp + createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) instanceId String @@ -734,7 +748,7 @@ model EvoaiSetting { ignoreJids Json? splitMessages Boolean? @default(false) timePerChar Int? @default(50) @db.Int - createdAt DateTime? @default(now()) @db.Timestamp + createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp Fallback Evoai? @relation(fields: [evoaiIdFallback], references: [id]) evoaiIdFallback String? @db.VarChar(100) diff --git a/prisma/postgresql-migrations/20250918182355_add_kafka_integration/migration.sql b/prisma/postgresql-migrations/20250918182355_add_kafka_integration/migration.sql new file mode 100644 index 00000000..16ef2d7c --- /dev/null +++ b/prisma/postgresql-migrations/20250918182355_add_kafka_integration/migration.sql @@ -0,0 +1,17 @@ +-- CreateTable +CREATE TABLE "public"."Kafka" ( + "id" TEXT NOT NULL, + "enabled" BOOLEAN NOT NULL DEFAULT false, + "events" JSONB NOT NULL, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP NOT NULL, + "instanceId" TEXT NOT NULL, + + CONSTRAINT "Kafka_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "Kafka_instanceId_key" ON "public"."Kafka"("instanceId"); + +-- AddForeignKey +ALTER TABLE "public"."Kafka" ADD CONSTRAINT "Kafka_instanceId_fkey" FOREIGN KEY ("instanceId") REFERENCES "public"."Instance"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/postgresql-schema.prisma b/prisma/postgresql-schema.prisma index 221d69d9..e14f8d49 100644 --- a/prisma/postgresql-schema.prisma +++ b/prisma/postgresql-schema.prisma @@ -88,6 +88,7 @@ model Instance { Rabbitmq Rabbitmq? Nats Nats? Sqs Sqs? + Kafka Kafka? Websocket Websocket? Typebot Typebot[] Session Session? @@ -312,6 +313,16 @@ model Sqs { instanceId String @unique } +model Kafka { + id String @id @default(cuid()) + enabled Boolean @default(false) @db.Boolean + events Json @db.JsonB + createdAt DateTime? @default(now()) @db.Timestamp + updatedAt DateTime @updatedAt @db.Timestamp + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String @unique +} + model Websocket { id String @id @default(cuid()) enabled Boolean @default(false) @db.Boolean diff --git a/prisma/psql_bouncer-schema.prisma b/prisma/psql_bouncer-schema.prisma index 4754f165..a3f4dbe9 100644 --- a/prisma/psql_bouncer-schema.prisma +++ b/prisma/psql_bouncer-schema.prisma @@ -89,6 +89,7 @@ model Instance { Rabbitmq Rabbitmq? Nats Nats? Sqs Sqs? + Kafka Kafka? Websocket Websocket? Typebot Typebot[] Session Session? @@ -313,6 +314,16 @@ model Sqs { instanceId String @unique } +model Kafka { + id String @id @default(cuid()) + enabled Boolean @default(false) @db.Boolean + events Json @db.JsonB + createdAt DateTime? @default(now()) @db.Timestamp + updatedAt DateTime @updatedAt @db.Timestamp + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String @unique +} + model Websocket { id String @id @default(cuid()) enabled Boolean @default(false) @db.Boolean diff --git a/src/api/integrations/event/event.dto.ts b/src/api/integrations/event/event.dto.ts index 84426764..b5292993 100644 --- a/src/api/integrations/event/event.dto.ts +++ b/src/api/integrations/event/event.dto.ts @@ -40,6 +40,11 @@ export class EventDto { useTLS?: boolean; events?: string[]; }; + + kafka?: { + enabled?: boolean; + events?: string[]; + }; } export function EventInstanceMixin(Base: TBase) { @@ -82,5 +87,10 @@ export function EventInstanceMixin(Base: TBase) { useTLS?: boolean; events?: string[]; }; + + kafka?: { + enabled?: boolean; + events?: string[]; + }; }; } diff --git a/src/api/integrations/event/event.manager.ts b/src/api/integrations/event/event.manager.ts index fe3256c9..90547932 100644 --- a/src/api/integrations/event/event.manager.ts +++ b/src/api/integrations/event/event.manager.ts @@ -1,3 +1,4 @@ +import { KafkaController } from '@api/integrations/event/kafka/kafka.controller'; import { NatsController } from '@api/integrations/event/nats/nats.controller'; import { PusherController } from '@api/integrations/event/pusher/pusher.controller'; import { RabbitmqController } from '@api/integrations/event/rabbitmq/rabbitmq.controller'; @@ -17,6 +18,7 @@ export class EventManager { private natsController: NatsController; private sqsController: SqsController; private pusherController: PusherController; + private kafkaController: KafkaController; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { this.prisma = prismaRepository; @@ -28,6 +30,7 @@ export class EventManager { this.nats = new NatsController(prismaRepository, waMonitor); this.sqs = new SqsController(prismaRepository, waMonitor); this.pusher = new PusherController(prismaRepository, waMonitor); + this.kafka = new KafkaController(prismaRepository, waMonitor); } public set prisma(prisma: PrismaRepository) { @@ -93,12 +96,20 @@ export class EventManager { return this.pusherController; } + public set kafka(kafka: KafkaController) { + this.kafkaController = kafka; + } + public get kafka() { + return this.kafkaController; + } + public init(httpServer: Server): void { this.websocket.init(httpServer); this.rabbitmq.init(); this.nats.init(); this.sqs.init(); this.pusher.init(); + this.kafka.init(); } public async emit(eventData: { @@ -119,42 +130,47 @@ export class EventManager { await this.sqs.emit(eventData); await this.webhook.emit(eventData); await this.pusher.emit(eventData); + await this.kafka.emit(eventData); } public async setInstance(instanceName: string, data: any): Promise { - if (data.websocket) + if (data.websocket) { await this.websocket.set(instanceName, { websocket: { enabled: true, events: data.websocket?.events, }, }); + } - if (data.rabbitmq) + if (data.rabbitmq) { await this.rabbitmq.set(instanceName, { rabbitmq: { enabled: true, events: data.rabbitmq?.events, }, }); + } - if (data.nats) + if (data.nats) { await this.nats.set(instanceName, { nats: { enabled: true, events: data.nats?.events, }, }); + } - if (data.sqs) + if (data.sqs) { await this.sqs.set(instanceName, { sqs: { enabled: true, events: data.sqs?.events, }, }); + } - if (data.webhook) + if (data.webhook) { await this.webhook.set(instanceName, { webhook: { enabled: true, @@ -165,8 +181,9 @@ export class EventManager { byEvents: data.webhook?.byEvents, }, }); + } - if (data.pusher) + if (data.pusher) { await this.pusher.set(instanceName, { pusher: { enabled: true, @@ -178,5 +195,15 @@ export class EventManager { useTLS: data.pusher?.useTLS, }, }); + } + + if (data.kafka) { + await this.kafka.set(instanceName, { + kafka: { + enabled: true, + events: data.kafka?.events, + }, + }); + } } } diff --git a/src/api/integrations/event/event.router.ts b/src/api/integrations/event/event.router.ts index 49a6ec60..f80907bc 100644 --- a/src/api/integrations/event/event.router.ts +++ b/src/api/integrations/event/event.router.ts @@ -1,3 +1,4 @@ +import { KafkaRouter } from '@api/integrations/event/kafka/kafka.router'; import { NatsRouter } from '@api/integrations/event/nats/nats.router'; import { PusherRouter } from '@api/integrations/event/pusher/pusher.router'; import { RabbitmqRouter } from '@api/integrations/event/rabbitmq/rabbitmq.router'; @@ -18,5 +19,6 @@ export class EventRouter { this.router.use('/nats', new NatsRouter(...guards).router); this.router.use('/pusher', new PusherRouter(...guards).router); this.router.use('/sqs', new SqsRouter(...guards).router); + this.router.use('/kafka', new KafkaRouter(...guards).router); } } diff --git a/src/api/integrations/event/event.schema.ts b/src/api/integrations/event/event.schema.ts index 464ee02b..812bbd53 100644 --- a/src/api/integrations/event/event.schema.ts +++ b/src/api/integrations/event/event.schema.ts @@ -22,6 +22,9 @@ export const eventSchema: JSONSchema7 = { sqs: { $ref: '#/$defs/event', }, + kafka: { + $ref: '#/$defs/event', + }, }, $defs: { event: { diff --git a/src/api/integrations/event/kafka/kafka.controller.ts b/src/api/integrations/event/kafka/kafka.controller.ts new file mode 100644 index 00000000..416bcb13 --- /dev/null +++ b/src/api/integrations/event/kafka/kafka.controller.ts @@ -0,0 +1,414 @@ +import { PrismaRepository } from '@api/repository/repository.service'; +import { WAMonitoringService } from '@api/services/monitor.service'; +import { configService, Kafka, Log } from '@config/env.config'; +import { Logger } from '@config/logger.config'; +import { Consumer, ConsumerConfig, Kafka as KafkaJS, KafkaConfig, Producer, ProducerConfig } from 'kafkajs'; + +import { EmitData, EventController, EventControllerInterface } from '../event.controller'; + +export class KafkaController extends EventController implements EventControllerInterface { + private kafkaClient: KafkaJS | null = null; + private producer: Producer | null = null; + private consumer: Consumer | null = null; + private readonly logger = new Logger('KafkaController'); + private reconnectAttempts = 0; + private maxReconnectAttempts = 10; + private reconnectDelay = 5000; // 5 seconds + private isReconnecting = false; + + constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { + super(prismaRepository, waMonitor, configService.get('KAFKA')?.ENABLED, 'kafka'); + } + + public async init(): Promise { + if (!this.status) { + return; + } + + await this.connect(); + } + + private async connect(): Promise { + try { + const kafkaConfig = configService.get('KAFKA'); + + const clientConfig: KafkaConfig = { + clientId: kafkaConfig.CLIENT_ID || 'evolution-api', + brokers: kafkaConfig.BROKERS || ['localhost:9092'], + connectionTimeout: kafkaConfig.CONNECTION_TIMEOUT || 3000, + requestTimeout: kafkaConfig.REQUEST_TIMEOUT || 30000, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }; + + // Add SASL authentication if configured + if (kafkaConfig.SASL?.ENABLED) { + clientConfig.sasl = { + mechanism: (kafkaConfig.SASL.MECHANISM as any) || 'plain', + username: kafkaConfig.SASL.USERNAME, + password: kafkaConfig.SASL.PASSWORD, + }; + } + + // Add SSL configuration if enabled + if (kafkaConfig.SSL?.ENABLED) { + clientConfig.ssl = { + rejectUnauthorized: kafkaConfig.SSL.REJECT_UNAUTHORIZED !== false, + ca: kafkaConfig.SSL.CA ? [kafkaConfig.SSL.CA] : undefined, + key: kafkaConfig.SSL.KEY, + cert: kafkaConfig.SSL.CERT, + }; + } + + this.kafkaClient = new KafkaJS(clientConfig); + + // Initialize producer + const producerConfig: ProducerConfig = { + maxInFlightRequests: 1, + idempotent: true, + transactionTimeout: 30000, + }; + + this.producer = this.kafkaClient.producer(producerConfig); + await this.producer.connect(); + + // Initialize consumer for global events if enabled + if (kafkaConfig.GLOBAL_ENABLED) { + await this.initGlobalConsumer(); + } + + this.reconnectAttempts = 0; + this.isReconnecting = false; + + this.logger.info('Kafka initialized successfully'); + + // Create topics if they don't exist + if (kafkaConfig.AUTO_CREATE_TOPICS) { + await this.createTopics(); + } + } catch (error) { + this.logger.error({ + local: 'KafkaController.connect', + message: 'Failed to connect to Kafka', + error: error.message || error, + }); + this.scheduleReconnect(); + throw error; + } + } + + private async initGlobalConsumer(): Promise { + try { + const kafkaConfig = configService.get('KAFKA'); + + const consumerConfig: ConsumerConfig = { + groupId: kafkaConfig.CONSUMER_GROUP_ID || 'evolution-api-consumers', + sessionTimeout: 30000, + heartbeatInterval: 3000, + }; + + this.consumer = this.kafkaClient.consumer(consumerConfig); + await this.consumer.connect(); + + // Subscribe to global topics + const events = kafkaConfig.EVENTS; + if (events) { + const eventKeys = Object.keys(events).filter((event) => events[event]); + + for (const event of eventKeys) { + const topicName = this.getTopicName(event, true); + await this.consumer.subscribe({ topic: topicName }); + } + + // Start consuming messages + await this.consumer.run({ + eachMessage: async ({ topic, message }) => { + try { + const data = JSON.parse(message.value?.toString() || '{}'); + this.logger.debug(`Received message from topic ${topic}: ${JSON.stringify(data)}`); + + // Process the message here if needed + // This is where you can add custom message processing logic + } catch (error) { + this.logger.error(`Error processing message from topic ${topic}: ${error}`); + } + }, + }); + + this.logger.info('Global Kafka consumer initialized'); + } + } catch (error) { + this.logger.error(`Failed to initialize global Kafka consumer: ${error}`); + } + } + + private async createTopics(): Promise { + try { + const kafkaConfig = configService.get('KAFKA'); + const admin = this.kafkaClient.admin(); + await admin.connect(); + + const topics = []; + + // Create global topics if enabled + if (kafkaConfig.GLOBAL_ENABLED && kafkaConfig.EVENTS) { + const eventKeys = Object.keys(kafkaConfig.EVENTS).filter((event) => kafkaConfig.EVENTS[event]); + + for (const event of eventKeys) { + const topicName = this.getTopicName(event, true); + topics.push({ + topic: topicName, + numPartitions: kafkaConfig.NUM_PARTITIONS || 1, + replicationFactor: kafkaConfig.REPLICATION_FACTOR || 1, + }); + } + } + + if (topics.length > 0) { + await admin.createTopics({ + topics, + waitForLeaders: true, + }); + + this.logger.info(`Created ${topics.length} Kafka topics`); + } + + await admin.disconnect(); + } catch (error) { + this.logger.error(`Failed to create Kafka topics: ${error}`); + } + } + + private getTopicName(event: string, isGlobal: boolean = false, instanceName?: string): string { + const kafkaConfig = configService.get('KAFKA'); + const prefix = kafkaConfig.TOPIC_PREFIX || 'evolution'; + + if (isGlobal) { + return `${prefix}.global.${event.toLowerCase().replace(/_/g, '.')}`; + } else { + return `${prefix}.${instanceName}.${event.toLowerCase().replace(/_/g, '.')}`; + } + } + + private handleConnectionLoss(): void { + if (this.isReconnecting) { + return; + } + + this.cleanup(); + this.scheduleReconnect(); + } + + private scheduleReconnect(): void { + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + this.logger.error( + `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, + ); + return; + } + + if (this.isReconnecting) { + return; + } + + this.isReconnecting = true; + this.reconnectAttempts++; + + const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); + + this.logger.info( + `Scheduling Kafka reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, + ); + + setTimeout(async () => { + try { + this.logger.info( + `Attempting to reconnect to Kafka (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, + ); + await this.connect(); + this.logger.info('Successfully reconnected to Kafka'); + } catch (error) { + this.logger.error({ + local: 'KafkaController.scheduleReconnect', + message: `Reconnection attempt ${this.reconnectAttempts} failed`, + error: error.message || error, + }); + this.isReconnecting = false; + this.scheduleReconnect(); + } + }, delay); + } + + private async ensureConnection(): Promise { + if (!this.producer) { + this.logger.warn('Kafka producer is not available, attempting to reconnect...'); + if (!this.isReconnecting) { + this.scheduleReconnect(); + } + return false; + } + return true; + } + + public async emit({ + instanceName, + origin, + event, + data, + serverUrl, + dateTime, + sender, + apiKey, + integration, + }: EmitData): Promise { + if (integration && !integration.includes('kafka')) { + return; + } + + if (!this.status) { + return; + } + + if (!(await this.ensureConnection())) { + this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No Kafka connection`); + return; + } + + const instanceKafka = await this.get(instanceName); + const kafkaLocal = instanceKafka?.events; + const kafkaGlobal = configService.get('KAFKA').GLOBAL_ENABLED; + const kafkaEvents = configService.get('KAFKA').EVENTS; + const we = event.replace(/[.-]/gm, '_').toUpperCase(); + const logEnabled = configService.get('LOG').LEVEL.includes('WEBHOOKS'); + + const message = { + event, + instance: instanceName, + data, + server_url: serverUrl, + date_time: dateTime, + sender, + apikey: apiKey, + timestamp: Date.now(), + }; + + const messageValue = JSON.stringify(message); + + // Instance-specific events + if (instanceKafka?.enabled && this.producer && Array.isArray(kafkaLocal) && kafkaLocal.includes(we)) { + const topicName = this.getTopicName(event, false, instanceName); + + let retry = 0; + while (retry < 3) { + try { + await this.producer.send({ + topic: topicName, + messages: [ + { + key: instanceName, + value: messageValue, + headers: { + event, + instance: instanceName, + origin, + timestamp: dateTime, + }, + }, + ], + }); + + if (logEnabled) { + const logData = { + local: `${origin}.sendData-Kafka`, + ...message, + }; + this.logger.log(logData); + } + + break; + } catch (error) { + this.logger.error({ + local: 'KafkaController.emit', + message: `Error publishing local Kafka message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); + retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } + } + } + } + + // Global events + if (kafkaGlobal && kafkaEvents[we] && this.producer) { + const topicName = this.getTopicName(event, true); + + let retry = 0; + while (retry < 3) { + try { + await this.producer.send({ + topic: topicName, + messages: [ + { + key: `${instanceName}-${event}`, + value: messageValue, + headers: { + event, + instance: instanceName, + origin, + timestamp: dateTime, + }, + }, + ], + }); + + if (logEnabled) { + const logData = { + local: `${origin}.sendData-Kafka-Global`, + ...message, + }; + this.logger.log(logData); + } + + break; + } catch (error) { + this.logger.error({ + local: 'KafkaController.emit', + message: `Error publishing global Kafka message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); + retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } + } + } + } + } + + public async cleanup(): Promise { + try { + if (this.consumer) { + await this.consumer.disconnect(); + this.consumer = null; + } + if (this.producer) { + await this.producer.disconnect(); + this.producer = null; + } + this.kafkaClient = null; + } catch (error) { + this.logger.warn({ + local: 'KafkaController.cleanup', + message: 'Error during cleanup', + error: error.message || error, + }); + this.producer = null; + this.consumer = null; + this.kafkaClient = null; + } + } +} diff --git a/src/api/integrations/event/kafka/kafka.router.ts b/src/api/integrations/event/kafka/kafka.router.ts new file mode 100644 index 00000000..eb846508 --- /dev/null +++ b/src/api/integrations/event/kafka/kafka.router.ts @@ -0,0 +1,36 @@ +import { RouterBroker } from '@api/abstract/abstract.router'; +import { InstanceDto } from '@api/dto/instance.dto'; +import { EventDto } from '@api/integrations/event/event.dto'; +import { HttpStatus } from '@api/routes/index.router'; +import { eventManager } from '@api/server.module'; +import { eventSchema, instanceSchema } from '@validate/validate.schema'; +import { RequestHandler, Router } from 'express'; + +export class KafkaRouter extends RouterBroker { + constructor(...guards: RequestHandler[]) { + super(); + this.router + .post(this.routerPath('set'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: eventSchema, + ClassRef: EventDto, + execute: (instance, data) => eventManager.kafka.set(instance.instanceName, data), + }); + + res.status(HttpStatus.CREATED).json(response); + }) + .get(this.routerPath('find'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: instanceSchema, + ClassRef: InstanceDto, + execute: (instance) => eventManager.kafka.get(instance.instanceName), + }); + + res.status(HttpStatus.OK).json(response); + }); + } + + public readonly router: Router = Router(); +} diff --git a/src/api/integrations/event/kafka/kafka.schema.ts b/src/api/integrations/event/kafka/kafka.schema.ts new file mode 100644 index 00000000..3decb27e --- /dev/null +++ b/src/api/integrations/event/kafka/kafka.schema.ts @@ -0,0 +1,21 @@ +import { JSONSchema7 } from 'json-schema'; +import { v4 } from 'uuid'; + +import { EventController } from '../event.controller'; + +export const kafkaSchema: JSONSchema7 = { + $id: v4(), + type: 'object', + properties: { + enabled: { type: 'boolean', enum: [true, false] }, + events: { + type: 'array', + minItems: 0, + items: { + type: 'string', + enum: EventController.events, + }, + }, + }, + required: ['enabled'], +}; diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 66e0b000..7c4e382e 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -153,6 +153,34 @@ export type Sqs = { }; }; +export type Kafka = { + ENABLED: boolean; + CLIENT_ID: string; + BROKERS: string[]; + CONNECTION_TIMEOUT: number; + REQUEST_TIMEOUT: number; + GLOBAL_ENABLED: boolean; + CONSUMER_GROUP_ID: string; + TOPIC_PREFIX: string; + NUM_PARTITIONS: number; + REPLICATION_FACTOR: number; + AUTO_CREATE_TOPICS: boolean; + EVENTS: EventsRabbitmq; + SASL?: { + ENABLED: boolean; + MECHANISM: string; + USERNAME: string; + PASSWORD: string; + }; + SSL?: { + ENABLED: boolean; + REJECT_UNAUTHORIZED: boolean; + CA?: string; + KEY?: string; + CERT?: string; + }; +}; + export type Websocket = { ENABLED: boolean; GLOBAL_EVENTS: boolean; @@ -372,6 +400,7 @@ export interface Env { RABBITMQ: Rabbitmq; NATS: Nats; SQS: Sqs; + KAFKA: Kafka; WEBSOCKET: Websocket; WA_BUSINESS: WaBusiness; LOG: Log; @@ -587,6 +616,68 @@ export class ConfigService { TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true', }, }, + KAFKA: { + ENABLED: process.env?.KAFKA_ENABLED === 'true', + CLIENT_ID: process.env?.KAFKA_CLIENT_ID || 'evolution-api', + BROKERS: process.env?.KAFKA_BROKERS?.split(',') || ['localhost:9092'], + CONNECTION_TIMEOUT: Number.parseInt(process.env?.KAFKA_CONNECTION_TIMEOUT || '3000'), + REQUEST_TIMEOUT: Number.parseInt(process.env?.KAFKA_REQUEST_TIMEOUT || '30000'), + GLOBAL_ENABLED: process.env?.KAFKA_GLOBAL_ENABLED === 'true', + CONSUMER_GROUP_ID: process.env?.KAFKA_CONSUMER_GROUP_ID || 'evolution-api-consumers', + TOPIC_PREFIX: process.env?.KAFKA_TOPIC_PREFIX || 'evolution', + NUM_PARTITIONS: Number.parseInt(process.env?.KAFKA_NUM_PARTITIONS || '1'), + REPLICATION_FACTOR: Number.parseInt(process.env?.KAFKA_REPLICATION_FACTOR || '1'), + AUTO_CREATE_TOPICS: process.env?.KAFKA_AUTO_CREATE_TOPICS === 'true', + EVENTS: { + APPLICATION_STARTUP: process.env?.KAFKA_EVENTS_APPLICATION_STARTUP === 'true', + INSTANCE_CREATE: process.env?.KAFKA_EVENTS_INSTANCE_CREATE === 'true', + INSTANCE_DELETE: process.env?.KAFKA_EVENTS_INSTANCE_DELETE === 'true', + QRCODE_UPDATED: process.env?.KAFKA_EVENTS_QRCODE_UPDATED === 'true', + MESSAGES_SET: process.env?.KAFKA_EVENTS_MESSAGES_SET === 'true', + MESSAGES_UPSERT: process.env?.KAFKA_EVENTS_MESSAGES_UPSERT === 'true', + MESSAGES_EDITED: process.env?.KAFKA_EVENTS_MESSAGES_EDITED === 'true', + MESSAGES_UPDATE: process.env?.KAFKA_EVENTS_MESSAGES_UPDATE === 'true', + MESSAGES_DELETE: process.env?.KAFKA_EVENTS_MESSAGES_DELETE === 'true', + SEND_MESSAGE: process.env?.KAFKA_EVENTS_SEND_MESSAGE === 'true', + SEND_MESSAGE_UPDATE: process.env?.KAFKA_EVENTS_SEND_MESSAGE_UPDATE === 'true', + CONTACTS_SET: process.env?.KAFKA_EVENTS_CONTACTS_SET === 'true', + CONTACTS_UPSERT: process.env?.KAFKA_EVENTS_CONTACTS_UPSERT === 'true', + CONTACTS_UPDATE: process.env?.KAFKA_EVENTS_CONTACTS_UPDATE === 'true', + PRESENCE_UPDATE: process.env?.KAFKA_EVENTS_PRESENCE_UPDATE === 'true', + CHATS_SET: process.env?.KAFKA_EVENTS_CHATS_SET === 'true', + CHATS_UPSERT: process.env?.KAFKA_EVENTS_CHATS_UPSERT === 'true', + CHATS_UPDATE: process.env?.KAFKA_EVENTS_CHATS_UPDATE === 'true', + CHATS_DELETE: process.env?.KAFKA_EVENTS_CHATS_DELETE === 'true', + CONNECTION_UPDATE: process.env?.KAFKA_EVENTS_CONNECTION_UPDATE === 'true', + LABELS_EDIT: process.env?.KAFKA_EVENTS_LABELS_EDIT === 'true', + LABELS_ASSOCIATION: process.env?.KAFKA_EVENTS_LABELS_ASSOCIATION === 'true', + GROUPS_UPSERT: process.env?.KAFKA_EVENTS_GROUPS_UPSERT === 'true', + GROUP_UPDATE: process.env?.KAFKA_EVENTS_GROUPS_UPDATE === 'true', + GROUP_PARTICIPANTS_UPDATE: process.env?.KAFKA_EVENTS_GROUP_PARTICIPANTS_UPDATE === 'true', + CALL: process.env?.KAFKA_EVENTS_CALL === 'true', + TYPEBOT_START: process.env?.KAFKA_EVENTS_TYPEBOT_START === 'true', + TYPEBOT_CHANGE_STATUS: process.env?.KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + }, + SASL: + process.env?.KAFKA_SASL_ENABLED === 'true' + ? { + ENABLED: true, + MECHANISM: process.env?.KAFKA_SASL_MECHANISM || 'plain', + USERNAME: process.env?.KAFKA_SASL_USERNAME || '', + PASSWORD: process.env?.KAFKA_SASL_PASSWORD || '', + } + : undefined, + SSL: + process.env?.KAFKA_SSL_ENABLED === 'true' + ? { + ENABLED: true, + REJECT_UNAUTHORIZED: process.env?.KAFKA_SSL_REJECT_UNAUTHORIZED !== 'false', + CA: process.env?.KAFKA_SSL_CA, + KEY: process.env?.KAFKA_SSL_KEY, + CERT: process.env?.KAFKA_SSL_CERT, + } + : undefined, + }, WEBSOCKET: { ENABLED: process.env?.WEBSOCKET_ENABLED === 'true', GLOBAL_EVENTS: process.env?.WEBSOCKET_GLOBAL_EVENTS === 'true',