mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-12-20 04:12:23 -06:00
feat(kafka): add Kafka integration for event streaming
- Introduced Kafka support in the Evolution API, allowing for real-time event streaming and processing. - Updated environment configuration to include Kafka-related variables. - Added KafkaController and KafkaRouter for managing Kafka events. - Enhanced event management to support Kafka alongside existing integrations. - Updated database schemas and migrations for Kafka integration in both MySQL and PostgreSQL. - Documented Kafka integration in the README file.
This commit is contained in:
@@ -40,6 +40,11 @@ export class EventDto {
|
||||
useTLS?: boolean;
|
||||
events?: string[];
|
||||
};
|
||||
|
||||
kafka?: {
|
||||
enabled?: boolean;
|
||||
events?: string[];
|
||||
};
|
||||
}
|
||||
|
||||
export function EventInstanceMixin<TBase extends Constructor>(Base: TBase) {
|
||||
@@ -82,5 +87,10 @@ export function EventInstanceMixin<TBase extends Constructor>(Base: TBase) {
|
||||
useTLS?: boolean;
|
||||
events?: string[];
|
||||
};
|
||||
|
||||
kafka?: {
|
||||
enabled?: boolean;
|
||||
events?: string[];
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { KafkaController } from '@api/integrations/event/kafka/kafka.controller';
|
||||
import { NatsController } from '@api/integrations/event/nats/nats.controller';
|
||||
import { PusherController } from '@api/integrations/event/pusher/pusher.controller';
|
||||
import { RabbitmqController } from '@api/integrations/event/rabbitmq/rabbitmq.controller';
|
||||
@@ -17,6 +18,7 @@ export class EventManager {
|
||||
private natsController: NatsController;
|
||||
private sqsController: SqsController;
|
||||
private pusherController: PusherController;
|
||||
private kafkaController: KafkaController;
|
||||
|
||||
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
|
||||
this.prisma = prismaRepository;
|
||||
@@ -28,6 +30,7 @@ export class EventManager {
|
||||
this.nats = new NatsController(prismaRepository, waMonitor);
|
||||
this.sqs = new SqsController(prismaRepository, waMonitor);
|
||||
this.pusher = new PusherController(prismaRepository, waMonitor);
|
||||
this.kafka = new KafkaController(prismaRepository, waMonitor);
|
||||
}
|
||||
|
||||
public set prisma(prisma: PrismaRepository) {
|
||||
@@ -93,12 +96,20 @@ export class EventManager {
|
||||
return this.pusherController;
|
||||
}
|
||||
|
||||
public set kafka(kafka: KafkaController) {
|
||||
this.kafkaController = kafka;
|
||||
}
|
||||
public get kafka() {
|
||||
return this.kafkaController;
|
||||
}
|
||||
|
||||
public init(httpServer: Server): void {
|
||||
this.websocket.init(httpServer);
|
||||
this.rabbitmq.init();
|
||||
this.nats.init();
|
||||
this.sqs.init();
|
||||
this.pusher.init();
|
||||
this.kafka.init();
|
||||
}
|
||||
|
||||
public async emit(eventData: {
|
||||
@@ -119,42 +130,47 @@ export class EventManager {
|
||||
await this.sqs.emit(eventData);
|
||||
await this.webhook.emit(eventData);
|
||||
await this.pusher.emit(eventData);
|
||||
await this.kafka.emit(eventData);
|
||||
}
|
||||
|
||||
public async setInstance(instanceName: string, data: any): Promise<any> {
|
||||
if (data.websocket)
|
||||
if (data.websocket) {
|
||||
await this.websocket.set(instanceName, {
|
||||
websocket: {
|
||||
enabled: true,
|
||||
events: data.websocket?.events,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (data.rabbitmq)
|
||||
if (data.rabbitmq) {
|
||||
await this.rabbitmq.set(instanceName, {
|
||||
rabbitmq: {
|
||||
enabled: true,
|
||||
events: data.rabbitmq?.events,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (data.nats)
|
||||
if (data.nats) {
|
||||
await this.nats.set(instanceName, {
|
||||
nats: {
|
||||
enabled: true,
|
||||
events: data.nats?.events,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (data.sqs)
|
||||
if (data.sqs) {
|
||||
await this.sqs.set(instanceName, {
|
||||
sqs: {
|
||||
enabled: true,
|
||||
events: data.sqs?.events,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (data.webhook)
|
||||
if (data.webhook) {
|
||||
await this.webhook.set(instanceName, {
|
||||
webhook: {
|
||||
enabled: true,
|
||||
@@ -165,8 +181,9 @@ export class EventManager {
|
||||
byEvents: data.webhook?.byEvents,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (data.pusher)
|
||||
if (data.pusher) {
|
||||
await this.pusher.set(instanceName, {
|
||||
pusher: {
|
||||
enabled: true,
|
||||
@@ -178,5 +195,15 @@ export class EventManager {
|
||||
useTLS: data.pusher?.useTLS,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (data.kafka) {
|
||||
await this.kafka.set(instanceName, {
|
||||
kafka: {
|
||||
enabled: true,
|
||||
events: data.kafka?.events,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { KafkaRouter } from '@api/integrations/event/kafka/kafka.router';
|
||||
import { NatsRouter } from '@api/integrations/event/nats/nats.router';
|
||||
import { PusherRouter } from '@api/integrations/event/pusher/pusher.router';
|
||||
import { RabbitmqRouter } from '@api/integrations/event/rabbitmq/rabbitmq.router';
|
||||
@@ -18,5 +19,6 @@ export class EventRouter {
|
||||
this.router.use('/nats', new NatsRouter(...guards).router);
|
||||
this.router.use('/pusher', new PusherRouter(...guards).router);
|
||||
this.router.use('/sqs', new SqsRouter(...guards).router);
|
||||
this.router.use('/kafka', new KafkaRouter(...guards).router);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,9 @@ export const eventSchema: JSONSchema7 = {
|
||||
sqs: {
|
||||
$ref: '#/$defs/event',
|
||||
},
|
||||
kafka: {
|
||||
$ref: '#/$defs/event',
|
||||
},
|
||||
},
|
||||
$defs: {
|
||||
event: {
|
||||
|
||||
414
src/api/integrations/event/kafka/kafka.controller.ts
Normal file
414
src/api/integrations/event/kafka/kafka.controller.ts
Normal file
@@ -0,0 +1,414 @@
|
||||
import { PrismaRepository } from '@api/repository/repository.service';
|
||||
import { WAMonitoringService } from '@api/services/monitor.service';
|
||||
import { configService, Kafka, Log } from '@config/env.config';
|
||||
import { Logger } from '@config/logger.config';
|
||||
import { Consumer, ConsumerConfig, Kafka as KafkaJS, KafkaConfig, Producer, ProducerConfig } from 'kafkajs';
|
||||
|
||||
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
|
||||
|
||||
export class KafkaController extends EventController implements EventControllerInterface {
|
||||
private kafkaClient: KafkaJS | null = null;
|
||||
private producer: Producer | null = null;
|
||||
private consumer: Consumer | null = null;
|
||||
private readonly logger = new Logger('KafkaController');
|
||||
private reconnectAttempts = 0;
|
||||
private maxReconnectAttempts = 10;
|
||||
private reconnectDelay = 5000; // 5 seconds
|
||||
private isReconnecting = false;
|
||||
|
||||
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
|
||||
super(prismaRepository, waMonitor, configService.get<Kafka>('KAFKA')?.ENABLED, 'kafka');
|
||||
}
|
||||
|
||||
public async init(): Promise<void> {
|
||||
if (!this.status) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.connect();
|
||||
}
|
||||
|
||||
private async connect(): Promise<void> {
|
||||
try {
|
||||
const kafkaConfig = configService.get<Kafka>('KAFKA');
|
||||
|
||||
const clientConfig: KafkaConfig = {
|
||||
clientId: kafkaConfig.CLIENT_ID || 'evolution-api',
|
||||
brokers: kafkaConfig.BROKERS || ['localhost:9092'],
|
||||
connectionTimeout: kafkaConfig.CONNECTION_TIMEOUT || 3000,
|
||||
requestTimeout: kafkaConfig.REQUEST_TIMEOUT || 30000,
|
||||
retry: {
|
||||
initialRetryTime: 100,
|
||||
retries: 8,
|
||||
},
|
||||
};
|
||||
|
||||
// Add SASL authentication if configured
|
||||
if (kafkaConfig.SASL?.ENABLED) {
|
||||
clientConfig.sasl = {
|
||||
mechanism: (kafkaConfig.SASL.MECHANISM as any) || 'plain',
|
||||
username: kafkaConfig.SASL.USERNAME,
|
||||
password: kafkaConfig.SASL.PASSWORD,
|
||||
};
|
||||
}
|
||||
|
||||
// Add SSL configuration if enabled
|
||||
if (kafkaConfig.SSL?.ENABLED) {
|
||||
clientConfig.ssl = {
|
||||
rejectUnauthorized: kafkaConfig.SSL.REJECT_UNAUTHORIZED !== false,
|
||||
ca: kafkaConfig.SSL.CA ? [kafkaConfig.SSL.CA] : undefined,
|
||||
key: kafkaConfig.SSL.KEY,
|
||||
cert: kafkaConfig.SSL.CERT,
|
||||
};
|
||||
}
|
||||
|
||||
this.kafkaClient = new KafkaJS(clientConfig);
|
||||
|
||||
// Initialize producer
|
||||
const producerConfig: ProducerConfig = {
|
||||
maxInFlightRequests: 1,
|
||||
idempotent: true,
|
||||
transactionTimeout: 30000,
|
||||
};
|
||||
|
||||
this.producer = this.kafkaClient.producer(producerConfig);
|
||||
await this.producer.connect();
|
||||
|
||||
// Initialize consumer for global events if enabled
|
||||
if (kafkaConfig.GLOBAL_ENABLED) {
|
||||
await this.initGlobalConsumer();
|
||||
}
|
||||
|
||||
this.reconnectAttempts = 0;
|
||||
this.isReconnecting = false;
|
||||
|
||||
this.logger.info('Kafka initialized successfully');
|
||||
|
||||
// Create topics if they don't exist
|
||||
if (kafkaConfig.AUTO_CREATE_TOPICS) {
|
||||
await this.createTopics();
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
local: 'KafkaController.connect',
|
||||
message: 'Failed to connect to Kafka',
|
||||
error: error.message || error,
|
||||
});
|
||||
this.scheduleReconnect();
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private async initGlobalConsumer(): Promise<void> {
|
||||
try {
|
||||
const kafkaConfig = configService.get<Kafka>('KAFKA');
|
||||
|
||||
const consumerConfig: ConsumerConfig = {
|
||||
groupId: kafkaConfig.CONSUMER_GROUP_ID || 'evolution-api-consumers',
|
||||
sessionTimeout: 30000,
|
||||
heartbeatInterval: 3000,
|
||||
};
|
||||
|
||||
this.consumer = this.kafkaClient.consumer(consumerConfig);
|
||||
await this.consumer.connect();
|
||||
|
||||
// Subscribe to global topics
|
||||
const events = kafkaConfig.EVENTS;
|
||||
if (events) {
|
||||
const eventKeys = Object.keys(events).filter((event) => events[event]);
|
||||
|
||||
for (const event of eventKeys) {
|
||||
const topicName = this.getTopicName(event, true);
|
||||
await this.consumer.subscribe({ topic: topicName });
|
||||
}
|
||||
|
||||
// Start consuming messages
|
||||
await this.consumer.run({
|
||||
eachMessage: async ({ topic, message }) => {
|
||||
try {
|
||||
const data = JSON.parse(message.value?.toString() || '{}');
|
||||
this.logger.debug(`Received message from topic ${topic}: ${JSON.stringify(data)}`);
|
||||
|
||||
// Process the message here if needed
|
||||
// This is where you can add custom message processing logic
|
||||
} catch (error) {
|
||||
this.logger.error(`Error processing message from topic ${topic}: ${error}`);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.info('Global Kafka consumer initialized');
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to initialize global Kafka consumer: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async createTopics(): Promise<void> {
|
||||
try {
|
||||
const kafkaConfig = configService.get<Kafka>('KAFKA');
|
||||
const admin = this.kafkaClient.admin();
|
||||
await admin.connect();
|
||||
|
||||
const topics = [];
|
||||
|
||||
// Create global topics if enabled
|
||||
if (kafkaConfig.GLOBAL_ENABLED && kafkaConfig.EVENTS) {
|
||||
const eventKeys = Object.keys(kafkaConfig.EVENTS).filter((event) => kafkaConfig.EVENTS[event]);
|
||||
|
||||
for (const event of eventKeys) {
|
||||
const topicName = this.getTopicName(event, true);
|
||||
topics.push({
|
||||
topic: topicName,
|
||||
numPartitions: kafkaConfig.NUM_PARTITIONS || 1,
|
||||
replicationFactor: kafkaConfig.REPLICATION_FACTOR || 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (topics.length > 0) {
|
||||
await admin.createTopics({
|
||||
topics,
|
||||
waitForLeaders: true,
|
||||
});
|
||||
|
||||
this.logger.info(`Created ${topics.length} Kafka topics`);
|
||||
}
|
||||
|
||||
await admin.disconnect();
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to create Kafka topics: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
private getTopicName(event: string, isGlobal: boolean = false, instanceName?: string): string {
|
||||
const kafkaConfig = configService.get<Kafka>('KAFKA');
|
||||
const prefix = kafkaConfig.TOPIC_PREFIX || 'evolution';
|
||||
|
||||
if (isGlobal) {
|
||||
return `${prefix}.global.${event.toLowerCase().replace(/_/g, '.')}`;
|
||||
} else {
|
||||
return `${prefix}.${instanceName}.${event.toLowerCase().replace(/_/g, '.')}`;
|
||||
}
|
||||
}
|
||||
|
||||
private handleConnectionLoss(): void {
|
||||
if (this.isReconnecting) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.cleanup();
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
|
||||
private scheduleReconnect(): void {
|
||||
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||
this.logger.error(
|
||||
`Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isReconnecting) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isReconnecting = true;
|
||||
this.reconnectAttempts++;
|
||||
|
||||
const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5));
|
||||
|
||||
this.logger.info(
|
||||
`Scheduling Kafka reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
|
||||
);
|
||||
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
this.logger.info(
|
||||
`Attempting to reconnect to Kafka (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
|
||||
);
|
||||
await this.connect();
|
||||
this.logger.info('Successfully reconnected to Kafka');
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
local: 'KafkaController.scheduleReconnect',
|
||||
message: `Reconnection attempt ${this.reconnectAttempts} failed`,
|
||||
error: error.message || error,
|
||||
});
|
||||
this.isReconnecting = false;
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
}, delay);
|
||||
}
|
||||
|
||||
private async ensureConnection(): Promise<boolean> {
|
||||
if (!this.producer) {
|
||||
this.logger.warn('Kafka producer is not available, attempting to reconnect...');
|
||||
if (!this.isReconnecting) {
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public async emit({
|
||||
instanceName,
|
||||
origin,
|
||||
event,
|
||||
data,
|
||||
serverUrl,
|
||||
dateTime,
|
||||
sender,
|
||||
apiKey,
|
||||
integration,
|
||||
}: EmitData): Promise<void> {
|
||||
if (integration && !integration.includes('kafka')) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.status) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(await this.ensureConnection())) {
|
||||
this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No Kafka connection`);
|
||||
return;
|
||||
}
|
||||
|
||||
const instanceKafka = await this.get(instanceName);
|
||||
const kafkaLocal = instanceKafka?.events;
|
||||
const kafkaGlobal = configService.get<Kafka>('KAFKA').GLOBAL_ENABLED;
|
||||
const kafkaEvents = configService.get<Kafka>('KAFKA').EVENTS;
|
||||
const we = event.replace(/[.-]/gm, '_').toUpperCase();
|
||||
const logEnabled = configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS');
|
||||
|
||||
const message = {
|
||||
event,
|
||||
instance: instanceName,
|
||||
data,
|
||||
server_url: serverUrl,
|
||||
date_time: dateTime,
|
||||
sender,
|
||||
apikey: apiKey,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
const messageValue = JSON.stringify(message);
|
||||
|
||||
// Instance-specific events
|
||||
if (instanceKafka?.enabled && this.producer && Array.isArray(kafkaLocal) && kafkaLocal.includes(we)) {
|
||||
const topicName = this.getTopicName(event, false, instanceName);
|
||||
|
||||
let retry = 0;
|
||||
while (retry < 3) {
|
||||
try {
|
||||
await this.producer.send({
|
||||
topic: topicName,
|
||||
messages: [
|
||||
{
|
||||
key: instanceName,
|
||||
value: messageValue,
|
||||
headers: {
|
||||
event,
|
||||
instance: instanceName,
|
||||
origin,
|
||||
timestamp: dateTime,
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
if (logEnabled) {
|
||||
const logData = {
|
||||
local: `${origin}.sendData-Kafka`,
|
||||
...message,
|
||||
};
|
||||
this.logger.log(logData);
|
||||
}
|
||||
|
||||
break;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
local: 'KafkaController.emit',
|
||||
message: `Error publishing local Kafka message (attempt ${retry + 1}/3)`,
|
||||
error: error.message || error,
|
||||
});
|
||||
retry++;
|
||||
if (retry >= 3) {
|
||||
this.handleConnectionLoss();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Global events
|
||||
if (kafkaGlobal && kafkaEvents[we] && this.producer) {
|
||||
const topicName = this.getTopicName(event, true);
|
||||
|
||||
let retry = 0;
|
||||
while (retry < 3) {
|
||||
try {
|
||||
await this.producer.send({
|
||||
topic: topicName,
|
||||
messages: [
|
||||
{
|
||||
key: `${instanceName}-${event}`,
|
||||
value: messageValue,
|
||||
headers: {
|
||||
event,
|
||||
instance: instanceName,
|
||||
origin,
|
||||
timestamp: dateTime,
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
if (logEnabled) {
|
||||
const logData = {
|
||||
local: `${origin}.sendData-Kafka-Global`,
|
||||
...message,
|
||||
};
|
||||
this.logger.log(logData);
|
||||
}
|
||||
|
||||
break;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
local: 'KafkaController.emit',
|
||||
message: `Error publishing global Kafka message (attempt ${retry + 1}/3)`,
|
||||
error: error.message || error,
|
||||
});
|
||||
retry++;
|
||||
if (retry >= 3) {
|
||||
this.handleConnectionLoss();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async cleanup(): Promise<void> {
|
||||
try {
|
||||
if (this.consumer) {
|
||||
await this.consumer.disconnect();
|
||||
this.consumer = null;
|
||||
}
|
||||
if (this.producer) {
|
||||
await this.producer.disconnect();
|
||||
this.producer = null;
|
||||
}
|
||||
this.kafkaClient = null;
|
||||
} catch (error) {
|
||||
this.logger.warn({
|
||||
local: 'KafkaController.cleanup',
|
||||
message: 'Error during cleanup',
|
||||
error: error.message || error,
|
||||
});
|
||||
this.producer = null;
|
||||
this.consumer = null;
|
||||
this.kafkaClient = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
36
src/api/integrations/event/kafka/kafka.router.ts
Normal file
36
src/api/integrations/event/kafka/kafka.router.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
import { RouterBroker } from '@api/abstract/abstract.router';
|
||||
import { InstanceDto } from '@api/dto/instance.dto';
|
||||
import { EventDto } from '@api/integrations/event/event.dto';
|
||||
import { HttpStatus } from '@api/routes/index.router';
|
||||
import { eventManager } from '@api/server.module';
|
||||
import { eventSchema, instanceSchema } from '@validate/validate.schema';
|
||||
import { RequestHandler, Router } from 'express';
|
||||
|
||||
export class KafkaRouter extends RouterBroker {
|
||||
constructor(...guards: RequestHandler[]) {
|
||||
super();
|
||||
this.router
|
||||
.post(this.routerPath('set'), ...guards, async (req, res) => {
|
||||
const response = await this.dataValidate<EventDto>({
|
||||
request: req,
|
||||
schema: eventSchema,
|
||||
ClassRef: EventDto,
|
||||
execute: (instance, data) => eventManager.kafka.set(instance.instanceName, data),
|
||||
});
|
||||
|
||||
res.status(HttpStatus.CREATED).json(response);
|
||||
})
|
||||
.get(this.routerPath('find'), ...guards, async (req, res) => {
|
||||
const response = await this.dataValidate<InstanceDto>({
|
||||
request: req,
|
||||
schema: instanceSchema,
|
||||
ClassRef: InstanceDto,
|
||||
execute: (instance) => eventManager.kafka.get(instance.instanceName),
|
||||
});
|
||||
|
||||
res.status(HttpStatus.OK).json(response);
|
||||
});
|
||||
}
|
||||
|
||||
public readonly router: Router = Router();
|
||||
}
|
||||
21
src/api/integrations/event/kafka/kafka.schema.ts
Normal file
21
src/api/integrations/event/kafka/kafka.schema.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import { JSONSchema7 } from 'json-schema';
|
||||
import { v4 } from 'uuid';
|
||||
|
||||
import { EventController } from '../event.controller';
|
||||
|
||||
export const kafkaSchema: JSONSchema7 = {
|
||||
$id: v4(),
|
||||
type: 'object',
|
||||
properties: {
|
||||
enabled: { type: 'boolean', enum: [true, false] },
|
||||
events: {
|
||||
type: 'array',
|
||||
minItems: 0,
|
||||
items: {
|
||||
type: 'string',
|
||||
enum: EventController.events,
|
||||
},
|
||||
},
|
||||
},
|
||||
required: ['enabled'],
|
||||
};
|
||||
@@ -153,6 +153,34 @@ export type Sqs = {
|
||||
};
|
||||
};
|
||||
|
||||
export type Kafka = {
|
||||
ENABLED: boolean;
|
||||
CLIENT_ID: string;
|
||||
BROKERS: string[];
|
||||
CONNECTION_TIMEOUT: number;
|
||||
REQUEST_TIMEOUT: number;
|
||||
GLOBAL_ENABLED: boolean;
|
||||
CONSUMER_GROUP_ID: string;
|
||||
TOPIC_PREFIX: string;
|
||||
NUM_PARTITIONS: number;
|
||||
REPLICATION_FACTOR: number;
|
||||
AUTO_CREATE_TOPICS: boolean;
|
||||
EVENTS: EventsRabbitmq;
|
||||
SASL?: {
|
||||
ENABLED: boolean;
|
||||
MECHANISM: string;
|
||||
USERNAME: string;
|
||||
PASSWORD: string;
|
||||
};
|
||||
SSL?: {
|
||||
ENABLED: boolean;
|
||||
REJECT_UNAUTHORIZED: boolean;
|
||||
CA?: string;
|
||||
KEY?: string;
|
||||
CERT?: string;
|
||||
};
|
||||
};
|
||||
|
||||
export type Websocket = {
|
||||
ENABLED: boolean;
|
||||
GLOBAL_EVENTS: boolean;
|
||||
@@ -372,6 +400,7 @@ export interface Env {
|
||||
RABBITMQ: Rabbitmq;
|
||||
NATS: Nats;
|
||||
SQS: Sqs;
|
||||
KAFKA: Kafka;
|
||||
WEBSOCKET: Websocket;
|
||||
WA_BUSINESS: WaBusiness;
|
||||
LOG: Log;
|
||||
@@ -587,6 +616,68 @@ export class ConfigService {
|
||||
TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true',
|
||||
},
|
||||
},
|
||||
KAFKA: {
|
||||
ENABLED: process.env?.KAFKA_ENABLED === 'true',
|
||||
CLIENT_ID: process.env?.KAFKA_CLIENT_ID || 'evolution-api',
|
||||
BROKERS: process.env?.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
|
||||
CONNECTION_TIMEOUT: Number.parseInt(process.env?.KAFKA_CONNECTION_TIMEOUT || '3000'),
|
||||
REQUEST_TIMEOUT: Number.parseInt(process.env?.KAFKA_REQUEST_TIMEOUT || '30000'),
|
||||
GLOBAL_ENABLED: process.env?.KAFKA_GLOBAL_ENABLED === 'true',
|
||||
CONSUMER_GROUP_ID: process.env?.KAFKA_CONSUMER_GROUP_ID || 'evolution-api-consumers',
|
||||
TOPIC_PREFIX: process.env?.KAFKA_TOPIC_PREFIX || 'evolution',
|
||||
NUM_PARTITIONS: Number.parseInt(process.env?.KAFKA_NUM_PARTITIONS || '1'),
|
||||
REPLICATION_FACTOR: Number.parseInt(process.env?.KAFKA_REPLICATION_FACTOR || '1'),
|
||||
AUTO_CREATE_TOPICS: process.env?.KAFKA_AUTO_CREATE_TOPICS === 'true',
|
||||
EVENTS: {
|
||||
APPLICATION_STARTUP: process.env?.KAFKA_EVENTS_APPLICATION_STARTUP === 'true',
|
||||
INSTANCE_CREATE: process.env?.KAFKA_EVENTS_INSTANCE_CREATE === 'true',
|
||||
INSTANCE_DELETE: process.env?.KAFKA_EVENTS_INSTANCE_DELETE === 'true',
|
||||
QRCODE_UPDATED: process.env?.KAFKA_EVENTS_QRCODE_UPDATED === 'true',
|
||||
MESSAGES_SET: process.env?.KAFKA_EVENTS_MESSAGES_SET === 'true',
|
||||
MESSAGES_UPSERT: process.env?.KAFKA_EVENTS_MESSAGES_UPSERT === 'true',
|
||||
MESSAGES_EDITED: process.env?.KAFKA_EVENTS_MESSAGES_EDITED === 'true',
|
||||
MESSAGES_UPDATE: process.env?.KAFKA_EVENTS_MESSAGES_UPDATE === 'true',
|
||||
MESSAGES_DELETE: process.env?.KAFKA_EVENTS_MESSAGES_DELETE === 'true',
|
||||
SEND_MESSAGE: process.env?.KAFKA_EVENTS_SEND_MESSAGE === 'true',
|
||||
SEND_MESSAGE_UPDATE: process.env?.KAFKA_EVENTS_SEND_MESSAGE_UPDATE === 'true',
|
||||
CONTACTS_SET: process.env?.KAFKA_EVENTS_CONTACTS_SET === 'true',
|
||||
CONTACTS_UPSERT: process.env?.KAFKA_EVENTS_CONTACTS_UPSERT === 'true',
|
||||
CONTACTS_UPDATE: process.env?.KAFKA_EVENTS_CONTACTS_UPDATE === 'true',
|
||||
PRESENCE_UPDATE: process.env?.KAFKA_EVENTS_PRESENCE_UPDATE === 'true',
|
||||
CHATS_SET: process.env?.KAFKA_EVENTS_CHATS_SET === 'true',
|
||||
CHATS_UPSERT: process.env?.KAFKA_EVENTS_CHATS_UPSERT === 'true',
|
||||
CHATS_UPDATE: process.env?.KAFKA_EVENTS_CHATS_UPDATE === 'true',
|
||||
CHATS_DELETE: process.env?.KAFKA_EVENTS_CHATS_DELETE === 'true',
|
||||
CONNECTION_UPDATE: process.env?.KAFKA_EVENTS_CONNECTION_UPDATE === 'true',
|
||||
LABELS_EDIT: process.env?.KAFKA_EVENTS_LABELS_EDIT === 'true',
|
||||
LABELS_ASSOCIATION: process.env?.KAFKA_EVENTS_LABELS_ASSOCIATION === 'true',
|
||||
GROUPS_UPSERT: process.env?.KAFKA_EVENTS_GROUPS_UPSERT === 'true',
|
||||
GROUP_UPDATE: process.env?.KAFKA_EVENTS_GROUPS_UPDATE === 'true',
|
||||
GROUP_PARTICIPANTS_UPDATE: process.env?.KAFKA_EVENTS_GROUP_PARTICIPANTS_UPDATE === 'true',
|
||||
CALL: process.env?.KAFKA_EVENTS_CALL === 'true',
|
||||
TYPEBOT_START: process.env?.KAFKA_EVENTS_TYPEBOT_START === 'true',
|
||||
TYPEBOT_CHANGE_STATUS: process.env?.KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
|
||||
},
|
||||
SASL:
|
||||
process.env?.KAFKA_SASL_ENABLED === 'true'
|
||||
? {
|
||||
ENABLED: true,
|
||||
MECHANISM: process.env?.KAFKA_SASL_MECHANISM || 'plain',
|
||||
USERNAME: process.env?.KAFKA_SASL_USERNAME || '',
|
||||
PASSWORD: process.env?.KAFKA_SASL_PASSWORD || '',
|
||||
}
|
||||
: undefined,
|
||||
SSL:
|
||||
process.env?.KAFKA_SSL_ENABLED === 'true'
|
||||
? {
|
||||
ENABLED: true,
|
||||
REJECT_UNAUTHORIZED: process.env?.KAFKA_SSL_REJECT_UNAUTHORIZED !== 'false',
|
||||
CA: process.env?.KAFKA_SSL_CA,
|
||||
KEY: process.env?.KAFKA_SSL_KEY,
|
||||
CERT: process.env?.KAFKA_SSL_CERT,
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
WEBSOCKET: {
|
||||
ENABLED: process.env?.WEBSOCKET_ENABLED === 'true',
|
||||
GLOBAL_EVENTS: process.env?.WEBSOCKET_GLOBAL_EVENTS === 'true',
|
||||
|
||||
Reference in New Issue
Block a user