--- description: Event integration patterns for Evolution API globs: - "src/api/integrations/event/**/*.ts" alwaysApply: false --- # Evolution API Event Integration Rules ## Event Manager Pattern ### Event Manager Structure ```typescript import { PrismaRepository } from '@api/repository/repository.service'; import { ConfigService } from '@config/env.config'; import { Logger } from '@config/logger.config'; import { Server } from 'http'; export class EventManager { private prismaRepository: PrismaRepository; private configService: ConfigService; private logger = new Logger('EventManager'); // Event integrations private webhook: WebhookController; private websocket: WebsocketController; private rabbitmq: RabbitmqController; private nats: NatsController; private sqs: SqsController; private pusher: PusherController; constructor( prismaRepository: PrismaRepository, configService: ConfigService, server?: Server, ) { this.prismaRepository = prismaRepository; this.configService = configService; // Initialize event controllers this.webhook = new WebhookController(prismaRepository, configService); this.websocket = new WebsocketController(prismaRepository, configService, server); this.rabbitmq = new RabbitmqController(prismaRepository, configService); this.nats = new NatsController(prismaRepository, configService); this.sqs = new SqsController(prismaRepository, configService); this.pusher = new PusherController(prismaRepository, configService); } public async emit(eventData: { instanceName: string; origin: string; event: string; data: Object; serverUrl: string; dateTime: string; sender: string; apiKey?: string; local?: boolean; integration?: string[]; }): Promise { this.logger.log(`Emitting event ${eventData.event} for instance ${eventData.instanceName}`); // Emit to all configured integrations await Promise.allSettled([ this.webhook.emit(eventData), this.websocket.emit(eventData), this.rabbitmq.emit(eventData), this.nats.emit(eventData), this.sqs.emit(eventData), this.pusher.emit(eventData), ]); } public async setInstance(instanceName: string, data: any): Promise { const promises = []; if (data.websocket) { promises.push( this.websocket.set(instanceName, { websocket: { enabled: true, events: data.websocket?.events, }, }) ); } if (data.rabbitmq) { promises.push( this.rabbitmq.set(instanceName, { rabbitmq: { enabled: true, events: data.rabbitmq?.events, }, }) ); } if (data.webhook) { promises.push( this.webhook.set(instanceName, { webhook: { enabled: true, events: data.webhook?.events, url: data.webhook?.url, headers: data.webhook?.headers, base64: data.webhook?.base64, byEvents: data.webhook?.byEvents, }, }) ); } // Set other integrations... await Promise.allSettled(promises); } } ``` ## Base Event Controller Pattern ### Abstract Event Controller ```typescript import { PrismaRepository } from '@api/repository/repository.service'; import { ConfigService } from '@config/env.config'; import { Logger } from '@config/logger.config'; export type EmitData = { instanceName: string; origin: string; event: string; data: Object; serverUrl: string; dateTime: string; sender: string; apiKey?: string; local?: boolean; integration?: string[]; }; export interface EventControllerInterface { integrationEnabled: boolean; emit(data: EmitData): Promise; set(instanceName: string, data: any): Promise; } export abstract class EventController implements EventControllerInterface { protected readonly logger: Logger; protected readonly prismaRepository: PrismaRepository; protected readonly configService: ConfigService; public integrationEnabled: boolean = false; // Available events for all integrations public static readonly events = [ 'APPLICATION_STARTUP', 'INSTANCE_CREATE', 'INSTANCE_DELETE', 'QRCODE_UPDATED', 'CONNECTION_UPDATE', 'STATUS_INSTANCE', 'MESSAGES_SET', 'MESSAGES_UPSERT', 'MESSAGES_EDITED', 'MESSAGES_UPDATE', 'MESSAGES_DELETE', 'SEND_MESSAGE', 'CONTACTS_SET', 'CONTACTS_UPSERT', 'CONTACTS_UPDATE', 'PRESENCE_UPDATE', 'CHATS_SET', 'CHATS_UPDATE', 'CHATS_UPSERT', 'CHATS_DELETE', 'GROUPS_UPSERT', 'GROUPS_UPDATE', 'GROUP_PARTICIPANTS_UPDATE', 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', 'LABELS_EDIT', 'LABELS_ASSOCIATION', 'CREDS_UPDATE', 'MESSAGING_HISTORY_SET', 'REMOVE_INSTANCE', 'LOGOUT_INSTANCE', ]; constructor( prismaRepository: PrismaRepository, configService: ConfigService, loggerName: string, ) { this.prismaRepository = prismaRepository; this.configService = configService; this.logger = new Logger(loggerName); } // Abstract methods to be implemented by specific integrations public abstract emit(data: EmitData): Promise; public abstract set(instanceName: string, data: any): Promise; // Helper method to check if event should be processed protected shouldProcessEvent(eventName: string, configuredEvents?: string[]): boolean { if (!configuredEvents || configuredEvents.length === 0) { return true; // Process all events if none specified } return configuredEvents.includes(eventName); } // Helper method to get instance configuration protected async getInstanceConfig(instanceName: string): Promise { try { const instance = await this.prismaRepository.instance.findUnique({ where: { name: instanceName }, }); return instance; } catch (error) { this.logger.error(`Failed to get instance config for ${instanceName}:`, error); return null; } } } ``` ## Webhook Integration Pattern ### Webhook Controller Implementation ```typescript export class WebhookController extends EventController { constructor( prismaRepository: PrismaRepository, configService: ConfigService, ) { super(prismaRepository, configService, 'WebhookController'); } public async emit(data: EmitData): Promise { try { const instance = await this.getInstanceConfig(data.instanceName); if (!instance?.webhook?.enabled) { return; } const webhookConfig = instance.webhook; if (!this.shouldProcessEvent(data.event, webhookConfig.events)) { return; } const payload = { event: data.event, instance: data.instanceName, data: data.data, timestamp: data.dateTime, sender: data.sender, server: { version: process.env.npm_package_version, url: data.serverUrl, }, }; // Encode data as base64 if configured if (webhookConfig.base64) { payload.data = Buffer.from(JSON.stringify(payload.data)).toString('base64'); } const headers = { 'Content-Type': 'application/json', 'User-Agent': 'Evolution-API-Webhook', ...webhookConfig.headers, }; if (webhookConfig.byEvents) { // Send to event-specific endpoint const eventUrl = `${webhookConfig.url}/${data.event.toLowerCase()}`; await this.sendWebhook(eventUrl, payload, headers); } else { // Send to main webhook URL await this.sendWebhook(webhookConfig.url, payload, headers); } this.logger.log(`Webhook sent for event ${data.event} to instance ${data.instanceName}`); } catch (error) { this.logger.error(`Webhook emission failed for ${data.instanceName}:`, error); } } public async set(instanceName: string, data: any): Promise { try { const webhookData = data.webhook; await this.prismaRepository.instance.update({ where: { name: instanceName }, data: { webhook: webhookData, }, }); this.logger.log(`Webhook configuration set for instance ${instanceName}`); return { webhook: webhookData }; } catch (error) { this.logger.error(`Failed to set webhook config for ${instanceName}:`, error); throw error; } } private async sendWebhook(url: string, payload: any, headers: any): Promise { try { const response = await axios.post(url, payload, { headers, timeout: 30000, maxRedirects: 3, }); if (response.status >= 200 && response.status < 300) { this.logger.log(`Webhook delivered successfully to ${url}`); } else { this.logger.warn(`Webhook returned status ${response.status} for ${url}`); } } catch (error) { this.logger.error(`Webhook delivery failed to ${url}:`, error.message); // Implement retry logic here if needed if (error.response?.status >= 500) { // Server error - might be worth retrying this.logger.log(`Server error detected, webhook might be retried later`); } } } } ``` ## WebSocket Integration Pattern ### WebSocket Controller Implementation ```typescript import { Server as SocketIOServer } from 'socket.io'; import { Server } from 'http'; export class WebsocketController extends EventController { private io: SocketIOServer; constructor( prismaRepository: PrismaRepository, configService: ConfigService, server?: Server, ) { super(prismaRepository, configService, 'WebsocketController'); if (server) { this.io = new SocketIOServer(server, { cors: { origin: "*", methods: ["GET", "POST"], }, }); this.setupSocketHandlers(); } } private setupSocketHandlers(): void { this.io.on('connection', (socket) => { this.logger.log(`WebSocket client connected: ${socket.id}`); socket.on('join-instance', (instanceName: string) => { socket.join(`instance:${instanceName}`); this.logger.log(`Client ${socket.id} joined instance ${instanceName}`); }); socket.on('leave-instance', (instanceName: string) => { socket.leave(`instance:${instanceName}`); this.logger.log(`Client ${socket.id} left instance ${instanceName}`); }); socket.on('disconnect', () => { this.logger.log(`WebSocket client disconnected: ${socket.id}`); }); }); } public async emit(data: EmitData): Promise { if (!this.io) { return; } try { const instance = await this.getInstanceConfig(data.instanceName); if (!instance?.websocket?.enabled) { return; } const websocketConfig = instance.websocket; if (!this.shouldProcessEvent(data.event, websocketConfig.events)) { return; } const payload = { event: data.event, instance: data.instanceName, data: data.data, timestamp: data.dateTime, sender: data.sender, }; // Emit to specific instance room this.io.to(`instance:${data.instanceName}`).emit('evolution-event', payload); // Also emit to global room for monitoring this.io.emit('global-event', payload); this.logger.log(`WebSocket event ${data.event} emitted for instance ${data.instanceName}`); } catch (error) { this.logger.error(`WebSocket emission failed for ${data.instanceName}:`, error); } } public async set(instanceName: string, data: any): Promise { try { const websocketData = data.websocket; await this.prismaRepository.instance.update({ where: { name: instanceName }, data: { websocket: websocketData, }, }); this.logger.log(`WebSocket configuration set for instance ${instanceName}`); return { websocket: websocketData }; } catch (error) { this.logger.error(`Failed to set WebSocket config for ${instanceName}:`, error); throw error; } } } ``` ## Queue Integration Patterns ### RabbitMQ Controller Implementation ```typescript import amqp from 'amqplib'; export class RabbitmqController extends EventController { private connection: amqp.Connection | null = null; private channel: amqp.Channel | null = null; constructor( prismaRepository: PrismaRepository, configService: ConfigService, ) { super(prismaRepository, configService, 'RabbitmqController'); this.initializeConnection(); } private async initializeConnection(): Promise { try { const rabbitmqConfig = this.configService.get('RABBITMQ'); if (!rabbitmqConfig?.ENABLED) { return; } this.connection = await amqp.connect(rabbitmqConfig.URI); this.channel = await this.connection.createChannel(); // Declare exchange for Evolution API events await this.channel.assertExchange('evolution-events', 'topic', { durable: true }); this.logger.log('RabbitMQ connection established'); } catch (error) { this.logger.error('Failed to initialize RabbitMQ connection:', error); } } public async emit(data: EmitData): Promise { if (!this.channel) { return; } try { const instance = await this.getInstanceConfig(data.instanceName); if (!instance?.rabbitmq?.enabled) { return; } const rabbitmqConfig = instance.rabbitmq; if (!this.shouldProcessEvent(data.event, rabbitmqConfig.events)) { return; } const payload = { event: data.event, instance: data.instanceName, data: data.data, timestamp: data.dateTime, sender: data.sender, }; const routingKey = `evolution.${data.instanceName}.${data.event.toLowerCase()}`; await this.channel.publish( 'evolution-events', routingKey, Buffer.from(JSON.stringify(payload)), { persistent: true, timestamp: Date.now(), messageId: `${data.instanceName}-${Date.now()}`, } ); this.logger.log(`RabbitMQ message published for event ${data.event} to instance ${data.instanceName}`); } catch (error) { this.logger.error(`RabbitMQ emission failed for ${data.instanceName}:`, error); } } public async set(instanceName: string, data: any): Promise { try { const rabbitmqData = data.rabbitmq; await this.prismaRepository.instance.update({ where: { name: instanceName }, data: { rabbitmq: rabbitmqData, }, }); this.logger.log(`RabbitMQ configuration set for instance ${instanceName}`); return { rabbitmq: rabbitmqData }; } catch (error) { this.logger.error(`Failed to set RabbitMQ config for ${instanceName}:`, error); throw error; } } } ``` ### SQS Controller Implementation ```typescript import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs'; export class SqsController extends EventController { private sqsClient: SQSClient | null = null; constructor( prismaRepository: PrismaRepository, configService: ConfigService, ) { super(prismaRepository, configService, 'SqsController'); this.initializeSQSClient(); } private initializeSQSClient(): void { try { const sqsConfig = this.configService.get('SQS'); if (!sqsConfig?.ENABLED) { return; } this.sqsClient = new SQSClient({ region: sqsConfig.REGION, credentials: { accessKeyId: sqsConfig.ACCESS_KEY_ID, secretAccessKey: sqsConfig.SECRET_ACCESS_KEY, }, }); this.logger.log('SQS client initialized'); } catch (error) { this.logger.error('Failed to initialize SQS client:', error); } } public async emit(data: EmitData): Promise { if (!this.sqsClient) { return; } try { const instance = await this.getInstanceConfig(data.instanceName); if (!instance?.sqs?.enabled) { return; } const sqsConfig = instance.sqs; if (!this.shouldProcessEvent(data.event, sqsConfig.events)) { return; } const payload = { event: data.event, instance: data.instanceName, data: data.data, timestamp: data.dateTime, sender: data.sender, }; const command = new SendMessageCommand({ QueueUrl: sqsConfig.queueUrl, MessageBody: JSON.stringify(payload), MessageAttributes: { event: { DataType: 'String', StringValue: data.event, }, instance: { DataType: 'String', StringValue: data.instanceName, }, }, MessageGroupId: data.instanceName, // For FIFO queues MessageDeduplicationId: `${data.instanceName}-${Date.now()}`, // For FIFO queues }); await this.sqsClient.send(command); this.logger.log(`SQS message sent for event ${data.event} to instance ${data.instanceName}`); } catch (error) { this.logger.error(`SQS emission failed for ${data.instanceName}:`, error); } } public async set(instanceName: string, data: any): Promise { try { const sqsData = data.sqs; await this.prismaRepository.instance.update({ where: { name: instanceName }, data: { sqs: sqsData, }, }); this.logger.log(`SQS configuration set for instance ${instanceName}`); return { sqs: sqsData }; } catch (error) { this.logger.error(`Failed to set SQS config for ${instanceName}:`, error); throw error; } } } ``` ## Event DTO Pattern ### Event Configuration DTO ```typescript import { JsonValue } from '@prisma/client/runtime/library'; export class EventDto { webhook?: { enabled?: boolean; events?: string[]; url?: string; headers?: JsonValue; byEvents?: boolean; base64?: boolean; }; websocket?: { enabled?: boolean; events?: string[]; }; sqs?: { enabled?: boolean; events?: string[]; queueUrl?: string; }; rabbitmq?: { enabled?: boolean; events?: string[]; exchange?: string; }; nats?: { enabled?: boolean; events?: string[]; subject?: string; }; pusher?: { enabled?: boolean; appId?: string; key?: string; secret?: string; cluster?: string; useTLS?: boolean; events?: string[]; }; } ``` ## Event Router Pattern ### Event Router Structure ```typescript import { NatsRouter } from '@api/integrations/event/nats/nats.router'; import { PusherRouter } from '@api/integrations/event/pusher/pusher.router'; import { RabbitmqRouter } from '@api/integrations/event/rabbitmq/rabbitmq.router'; import { SqsRouter } from '@api/integrations/event/sqs/sqs.router'; import { WebhookRouter } from '@api/integrations/event/webhook/webhook.router'; import { WebsocketRouter } from '@api/integrations/event/websocket/websocket.router'; import { Router } from 'express'; export class EventRouter { public readonly router: Router; constructor(configService: any, ...guards: any[]) { this.router = Router(); this.router.use('/webhook', new WebhookRouter(configService, ...guards).router); this.router.use('/websocket', new WebsocketRouter(...guards).router); this.router.use('/rabbitmq', new RabbitmqRouter(...guards).router); this.router.use('/nats', new NatsRouter(...guards).router); this.router.use('/pusher', new PusherRouter(...guards).router); this.router.use('/sqs', new SqsRouter(...guards).router); } } ``` ## Event Validation Schema ### Event Configuration Validation ```typescript import Joi from 'joi'; import { EventController } from '@api/integrations/event/event.controller'; const eventListSchema = Joi.array().items( Joi.string().valid(...EventController.events) ).optional(); export const webhookSchema = Joi.object({ enabled: Joi.boolean().required(), url: Joi.string().when('enabled', { is: true, then: Joi.required().uri({ scheme: ['http', 'https'] }), otherwise: Joi.optional(), }), events: eventListSchema, headers: Joi.object().pattern(Joi.string(), Joi.string()).optional(), byEvents: Joi.boolean().optional().default(false), base64: Joi.boolean().optional().default(false), }).required(); export const websocketSchema = Joi.object({ enabled: Joi.boolean().required(), events: eventListSchema, }).required(); export const rabbitmqSchema = Joi.object({ enabled: Joi.boolean().required(), events: eventListSchema, exchange: Joi.string().optional().default('evolution-events'), }).required(); export const sqsSchema = Joi.object({ enabled: Joi.boolean().required(), events: eventListSchema, queueUrl: Joi.string().when('enabled', { is: true, then: Joi.required().uri(), otherwise: Joi.optional(), }), }).required(); export const eventSchema = Joi.object({ webhook: webhookSchema.optional(), websocket: websocketSchema.optional(), rabbitmq: rabbitmqSchema.optional(), sqs: sqsSchema.optional(), nats: Joi.object({ enabled: Joi.boolean().required(), events: eventListSchema, subject: Joi.string().optional().default('evolution.events'), }).optional(), pusher: Joi.object({ enabled: Joi.boolean().required(), appId: Joi.string().when('enabled', { is: true, then: Joi.required() }), key: Joi.string().when('enabled', { is: true, then: Joi.required() }), secret: Joi.string().when('enabled', { is: true, then: Joi.required() }), cluster: Joi.string().when('enabled', { is: true, then: Joi.required() }), useTLS: Joi.boolean().optional().default(true), events: eventListSchema, }).optional(), }).min(1).required(); ``` ## Event Testing Pattern ### Event Controller Testing ```typescript describe('WebhookController', () => { let controller: WebhookController; let prismaRepository: jest.Mocked; let configService: jest.Mocked; beforeEach(() => { controller = new WebhookController(prismaRepository, configService); }); describe('emit', () => { it('should send webhook when enabled', async () => { const mockInstance = { webhook: { enabled: true, url: 'https://example.com/webhook', events: ['MESSAGES_UPSERT'], }, }; prismaRepository.instance.findUnique.mockResolvedValue(mockInstance); jest.spyOn(axios, 'post').mockResolvedValue({ status: 200 }); const eventData = { instanceName: 'test-instance', event: 'MESSAGES_UPSERT', data: { message: 'test' }, origin: 'test', serverUrl: 'http://localhost', dateTime: new Date().toISOString(), sender: 'test', }; await controller.emit(eventData); expect(axios.post).toHaveBeenCalledWith( 'https://example.com/webhook', expect.objectContaining({ event: 'MESSAGES_UPSERT', instance: 'test-instance', }), expect.objectContaining({ headers: expect.objectContaining({ 'Content-Type': 'application/json', }), }) ); }); }); }); ```