feat: pusher event integration

This commit is contained in:
Davidson Gomes
2024-10-11 07:01:38 -03:00
parent f38f3e5ba5
commit 29e2cfaf96
19 changed files with 540 additions and 29 deletions

View File

@@ -7,7 +7,7 @@ export type EmitData = {
instanceName: string;
origin: string;
event: string;
data: Object;
data: any;
serverUrl: string;
dateTime: string;
sender: string;
@@ -22,7 +22,7 @@ export interface EventControllerInterface {
}
export class EventController {
private prismaRepository: PrismaRepository;
public prismaRepository: PrismaRepository;
private waMonitor: WAMonitoringService;
private integrationStatus: boolean;
private integrationName: string;

View File

@@ -25,6 +25,16 @@ export class EventDto {
enabled?: boolean;
events?: string[];
};
pusher?: {
enabled?: boolean;
appId?: string;
key?: string;
secret?: string;
cluster?: string;
useTLS?: boolean;
events?: string[];
};
}
export function EventInstanceMixin<TBase extends Constructor>(Base: TBase) {
@@ -52,5 +62,15 @@ export function EventInstanceMixin<TBase extends Constructor>(Base: TBase) {
enabled?: boolean;
events?: string[];
};
pusher?: {
enabled?: boolean;
appId?: string;
key?: string;
secret?: string;
cluster?: string;
useTLS?: boolean;
events?: string[];
};
};
}

View File

@@ -1,3 +1,4 @@
import { PusherController } from '@api/integrations/event/pusher/pusher.controller';
import { RabbitmqController } from '@api/integrations/event/rabbitmq/rabbitmq.controller';
import { SqsController } from '@api/integrations/event/sqs/sqs.controller';
import { WebhookController } from '@api/integrations/event/webhook/webhook.controller';
@@ -13,6 +14,7 @@ export class EventManager {
private webhookController: WebhookController;
private rabbitmqController: RabbitmqController;
private sqsController: SqsController;
private pusherController: PusherController;
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
this.prisma = prismaRepository;
@@ -22,6 +24,7 @@ export class EventManager {
this.webhook = new WebhookController(prismaRepository, waMonitor);
this.rabbitmq = new RabbitmqController(prismaRepository, waMonitor);
this.sqs = new SqsController(prismaRepository, waMonitor);
this.pusher = new PusherController(prismaRepository, waMonitor);
}
public set prisma(prisma: PrismaRepository) {
@@ -72,10 +75,18 @@ export class EventManager {
return this.sqsController;
}
public set pusher(pusher: PusherController) {
this.pusherController = pusher;
}
public get pusher() {
return this.pusherController;
}
public init(httpServer: Server): void {
this.websocket.init(httpServer);
this.rabbitmq.init();
this.sqs.init();
this.pusher.init();
}
public async emit(eventData: {
@@ -93,6 +104,7 @@ export class EventManager {
await this.rabbitmq.emit(eventData);
await this.sqs.emit(eventData);
await this.webhook.emit(eventData);
await this.pusher.emit(eventData);
}
public async setInstance(instanceName: string, data: any): Promise<any> {
@@ -131,5 +143,18 @@ export class EventManager {
byEvents: data.webhook?.byEvents,
},
});
if (data.pusher)
await this.pusher.set(instanceName, {
pusher: {
enabled: true,
events: data.pusher?.events,
appId: data.pusher?.appId,
key: data.pusher?.key,
secret: data.pusher?.secret,
cluster: data.pusher?.cluster,
useTLS: data.pusher?.useTLS,
},
});
}
}

View File

@@ -1,3 +1,4 @@
import { PusherRouter } from '@api/integrations/event/pusher/pusher.router';
import { RabbitmqRouter } from '@api/integrations/event/rabbitmq/rabbitmq.router';
import { SqsRouter } from '@api/integrations/event/sqs/sqs.router';
import { WebhookRouter } from '@api/integrations/event/webhook/webhook.router';
@@ -13,6 +14,7 @@ export class EventRouter {
this.router.use('/webhook', new WebhookRouter(configService, ...guards).router);
this.router.use('/websocket', new WebsocketRouter(...guards).router);
this.router.use('/rabbitmq', new RabbitmqRouter(...guards).router);
this.router.use('/pusher', new PusherRouter(...guards).router);
this.router.use('/sqs', new SqsRouter(...guards).router);
}
}

View File

@@ -3,6 +3,7 @@ import { v4 } from 'uuid';
import { EventController } from './event.controller';
export * from '@api/integrations/event/pusher/pusher.schema';
export * from '@api/integrations/event/webhook/webhook.schema';
export const eventSchema: JSONSchema7 = {

View File

@@ -0,0 +1,209 @@
import { EventDto } from '@api/integrations/event/event.dto';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { wa } from '@api/types/wa.types';
import { configService, Log, Pusher as ConfigPusher } from '@config/env.config';
import { Logger } from '@config/logger.config';
import Pusher from 'pusher';
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
export class PusherController extends EventController implements EventControllerInterface {
private readonly logger = new Logger('PusherController');
private pusherClients: { [instanceName: string]: Pusher } = {};
private globalPusherClient: Pusher | null = null;
private pusherConfig: ConfigPusher = configService.get<ConfigPusher>('PUSHER');
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
super(prismaRepository, waMonitor, configService.get<ConfigPusher>('PUSHER')?.ENABLED, 'pusher');
this.init();
}
public async init(): Promise<void> {
if (!this.status) {
return;
}
if (this.pusherConfig.GLOBAL?.ENABLED) {
const { APP_ID, KEY, SECRET, CLUSTER, USE_TLS } = this.pusherConfig.GLOBAL;
if (APP_ID && KEY && SECRET && CLUSTER) {
this.globalPusherClient = new Pusher({
appId: APP_ID,
key: KEY,
secret: SECRET,
cluster: CLUSTER,
useTLS: USE_TLS,
});
this.logger.info('Pusher global client initialized');
}
}
const instances = await this.prismaRepository.instance.findMany({
where: {
Pusher: {
isNot: null,
},
},
include: {
Pusher: true,
},
});
instances.forEach((instance) => {
if (
instance.Pusher.enabled &&
instance.Pusher.appId &&
instance.Pusher.key &&
instance.Pusher.secret &&
instance.Pusher.cluster
) {
this.pusherClients[instance.name] = new Pusher({
appId: instance.Pusher.appId,
key: instance.Pusher.key,
secret: instance.Pusher.secret,
cluster: instance.Pusher.cluster,
useTLS: instance.Pusher.useTLS,
});
this.logger.info(`Pusher client initialized for instance ${instance.name}`);
} else {
delete this.pusherClients[instance.name];
this.logger.warn(`Pusher client disabled or misconfigured for instance ${instance.name}`);
}
});
}
override async set(instanceName: string, data: EventDto): Promise<wa.LocalPusher> {
if (!data.pusher?.enabled) {
data.pusher.events = [];
} else if (data.pusher.events.length === 0) {
data.pusher.events = EventController.events;
}
const instance = await this.prisma.pusher.upsert({
where: {
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
update: {
enabled: data.pusher.enabled,
events: data.pusher.events,
appId: data.pusher.appId,
key: data.pusher.key,
secret: data.pusher.secret,
cluster: data.pusher.cluster,
useTLS: data.pusher.useTLS,
},
create: {
enabled: data.pusher.enabled,
events: data.pusher.events,
instanceId: this.monitor.waInstances[instanceName].instanceId,
appId: data.pusher.appId,
key: data.pusher.key,
secret: data.pusher.secret,
cluster: data.pusher.cluster,
useTLS: data.pusher.useTLS,
},
});
if (instance.enabled && instance.appId && instance.key && instance.secret && instance.cluster) {
this.pusherClients[instanceName] = new Pusher({
appId: instance.appId,
key: instance.key,
secret: instance.secret,
cluster: instance.cluster,
useTLS: instance.useTLS,
});
this.logger.info(`Pusher client initialized for instance ${instanceName}`);
} else {
delete this.pusherClients[instanceName];
this.logger.warn(`Pusher client disabled or misconfigured for instance ${instanceName}`);
}
return instance;
}
public async emit({
instanceName,
origin,
event,
data,
serverUrl,
dateTime,
sender,
apiKey,
local,
}: EmitData): Promise<void> {
if (!this.status) {
return;
}
const instance = (await this.get(instanceName)) as wa.LocalPusher;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const enabledLog = configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS');
const eventName = event.replace(/_/g, '.').toLowerCase();
const pusherData = {
event,
instance: instanceName,
data,
destination: instance?.appId || this.pusherConfig.GLOBAL?.APP_ID,
date_time: dateTime,
sender,
server_url: serverUrl,
apikey: apiKey,
};
if (event == 'qrcode.updated') {
delete pusherData.data.qrcode.base64;
}
const payload = JSON.stringify(pusherData);
const payloadSize = Buffer.byteLength(payload, 'utf8');
const MAX_SIZE = 10240;
if (payloadSize > MAX_SIZE) {
this.logger.error({
local: `${origin}.sendData-Pusher`,
message: 'Payload size exceeds Pusher limit',
event,
instanceName,
payloadSize,
});
return;
}
if (local && instance && instance.enabled) {
const pusherLocalEvents = instance.events;
if (Array.isArray(pusherLocalEvents) && pusherLocalEvents.includes(we)) {
if (enabledLog) {
this.logger.log({
local: `${origin}.sendData-Pusher`,
appId: instance.appId,
...pusherData,
});
}
try {
const pusher = this.pusherClients[instanceName];
if (pusher) {
pusher.trigger(instanceName, eventName, pusherData);
} else {
this.logger.error(`Pusher client not found for instance ${instanceName}`);
}
} catch (error) {
this.logger.error({
local: `${origin}.sendData-Pusher`,
message: error?.message,
error,
});
}
}
}
if (this.pusherConfig.GLOBAL?.ENABLED) {
const globalEvents = this.pusherConfig.EVENTS;
if (globalEvents[we]) {
if (enabledLog) {
this.logger.log({
local: `${origin}.sendData-Pusher-Global`,
appId: this.pusherConfig.GLOBAL?.APP_ID,
...pusherData,
});
}
try {
if (this.globalPusherClient) {
this.globalPusherClient.trigger(instanceName, eventName, pusherData);
} else {
this.logger.error('Global Pusher client not initialized');
}
} catch (error) {
this.logger.error({
local: `${origin}.sendData-Pusher-Global`,
message: error?.message,
error,
});
}
}
}
}
}

View File

@@ -0,0 +1,32 @@
import { RouterBroker } from '@api/abstract/abstract.router';
import { InstanceDto } from '@api/dto/instance.dto';
import { EventDto } from '@api/integrations/event/event.dto';
import { HttpStatus } from '@api/routes/index.router';
import { eventManager } from '@api/server.module';
import { instanceSchema, pusherSchema } from '@validate/validate.schema';
import { RequestHandler, Router } from 'express';
export class PusherRouter extends RouterBroker {
constructor(...guards: RequestHandler[]) {
super();
this.router
.post(this.routerPath('set'), ...guards, async (req, res) => {
const response = await this.dataValidate<EventDto>({
request: req,
schema: pusherSchema,
ClassRef: EventDto,
execute: (instance, data) => eventManager.pusher.set(instance.instanceName, data),
});
res.status(HttpStatus.CREATED).json(response);
})
.get(this.routerPath('find'), ...guards, async (req, res) => {
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceSchema,
ClassRef: InstanceDto,
execute: (instance) => eventManager.pusher.get(instance.instanceName),
});
res.status(HttpStatus.OK).json(response);
});
}
public readonly router: Router = Router();
}

View File

@@ -0,0 +1,50 @@
import { JSONSchema7 } from 'json-schema';
import { v4 } from 'uuid';
import { EventController } from '../event.controller';
const isNotEmpty = (...propertyNames: string[]): JSONSchema7 => {
const properties = {};
propertyNames.forEach(
(property) =>
(properties[property] = {
minLength: 1,
description: `The "${property}" cannot be empty`,
}),
);
return {
if: {
propertyNames: {
enum: [...propertyNames],
},
},
then: { properties },
};
};
export const pusherSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
pusher: {
type: 'object',
properties: {
enabled: { type: 'boolean' },
appId: { type: 'string' },
key: { type: 'string' },
secret: { type: 'string' },
cluster: { type: 'string' },
useTLS: { type: 'boolean' },
events: {
type: 'array',
minItems: 0,
items: {
type: 'string',
enum: EventController.events,
},
},
},
required: ['enabled', 'appId', 'key', 'secret', 'cluster', 'useTLS'],
...isNotEmpty('enabled', 'appId', 'key', 'secret', 'cluster', 'useTLS'),
},
},
required: ['pusher'],
};

View File

@@ -99,6 +99,14 @@ export declare namespace wa {
webhookBase64?: boolean;
};
export type LocalPusher = LocalEvent & {
appId?: string;
key?: string;
secret?: string;
cluster?: string;
useTLS?: boolean;
};
type Session = {
remoteJid?: string;
sessionId?: string;

View File

@@ -20,8 +20,6 @@ export class CacheEngine {
logger.verbose(`LocalCache initialized for ${module}`);
this.engine = new LocalCache(configService, module);
}
}
public getEngine() {

View File

@@ -1,8 +1,8 @@
import { ICache } from '@api/abstract/abstract.cache';
import { CacheConf, CacheConfLocal, ConfigService } from '@config/env.config';
import NodeCache from 'node-cache';
import { BufferJSON } from 'baileys';
import { Logger } from '@config/logger.config';
import { BufferJSON } from 'baileys';
import NodeCache from 'node-cache';
export class LocalCache implements ICache {
private readonly logger = new Logger('LocalCache');
@@ -74,7 +74,6 @@ export class LocalCache implements ICache {
hash[field] = json;
LocalCache.localCache.set(this.buildKey(key), hash);
} catch (error) {
this.logger.error(error);
}

View File

@@ -147,6 +147,36 @@ export type EventsWebhook = {
ERRORS_WEBHOOK: string;
};
export type EventsPusher = {
APPLICATION_STARTUP: boolean;
INSTANCE_CREATE: boolean;
INSTANCE_DELETE: boolean;
QRCODE_UPDATED: boolean;
MESSAGES_SET: boolean;
MESSAGES_UPSERT: boolean;
MESSAGES_EDITED: boolean;
MESSAGES_UPDATE: boolean;
MESSAGES_DELETE: boolean;
SEND_MESSAGE: boolean;
CONTACTS_SET: boolean;
CONTACTS_UPDATE: boolean;
CONTACTS_UPSERT: boolean;
PRESENCE_UPDATE: boolean;
CHATS_SET: boolean;
CHATS_UPDATE: boolean;
CHATS_DELETE: boolean;
CHATS_UPSERT: boolean;
CONNECTION_UPDATE: boolean;
LABELS_EDIT: boolean;
LABELS_ASSOCIATION: boolean;
GROUPS_UPSERT: boolean;
GROUP_UPDATE: boolean;
GROUP_PARTICIPANTS_UPDATE: boolean;
CALL: boolean;
TYPEBOT_START: boolean;
TYPEBOT_CHANGE_STATUS: boolean;
};
export type ApiKey = { KEY: string };
export type Auth = {
@@ -163,6 +193,16 @@ export type GlobalWebhook = {
ENABLED: boolean;
WEBHOOK_BY_EVENTS: boolean;
};
export type GlobalPusher = {
ENABLED: boolean;
APP_ID: string;
KEY: string;
SECRET: string;
CLUSTER: string;
USE_TLS: boolean;
};
export type CacheConfRedis = {
ENABLED: boolean;
URI: string;
@@ -176,6 +216,7 @@ export type CacheConfLocal = {
};
export type SslConf = { PRIVKEY: string; FULLCHAIN: string };
export type Webhook = { GLOBAL?: GlobalWebhook; EVENTS: EventsWebhook };
export type Pusher = { ENABLED: boolean; GLOBAL?: GlobalPusher; EVENTS: EventsPusher };
export type ConfigSessionPhone = { CLIENT: string; NAME: string; VERSION: string };
export type QrCode = { LIMIT: number; COLOR: string };
export type Typebot = { ENABLED: boolean; API_VERSION: string; SEND_MEDIA_BASE64: boolean };
@@ -225,6 +266,7 @@ export interface Env {
DEL_TEMP_INSTANCES: boolean;
LANGUAGE: Language;
WEBHOOK: Webhook;
PUSHER: Pusher;
CONFIG_SESSION_PHONE: ConfigSessionPhone;
QRCODE: QrCode;
TYPEBOT: Typebot;
@@ -270,7 +312,9 @@ export class ConfigService {
},
CORS: {
ORIGIN: process.env.CORS_ORIGIN?.split(',') || ['*'],
METHODS: (process.env.CORS_METHODS?.split(',') as HttpMethods[]) || ['POST', 'GET', 'PUT', 'DELETE'] as HttpMethods[],
METHODS:
(process.env.CORS_METHODS?.split(',') as HttpMethods[]) ||
(['POST', 'GET', 'PUT', 'DELETE'] as HttpMethods[]),
CREDENTIALS: process.env?.CORS_CREDENTIALS === 'true',
},
SSL_CONF: {
@@ -347,6 +391,46 @@ export class ConfigService {
ENABLED: process.env?.WEBSOCKET_ENABLED === 'true',
GLOBAL_EVENTS: process.env?.WEBSOCKET_GLOBAL_EVENTS === 'true',
},
PUSHER: {
ENABLED: process.env?.PUSHER_ENABLED === 'true',
GLOBAL: {
ENABLED: process.env?.PUSHER_GLOBAL_ENABLED === 'true',
APP_ID: process.env?.PUSHER_GLOBAL_APP_ID || '',
KEY: process.env?.PUSHER_GLOBAL_KEY || '',
SECRET: process.env?.PUSHER_GLOBAL_SECRET || '',
CLUSTER: process.env?.PUSHER_GLOBAL_CLUSTER || '',
USE_TLS: process.env?.PUSHER_GLOBAL_USE_TLS === 'true',
},
EVENTS: {
APPLICATION_STARTUP: process.env?.PUSHER_EVENTS_APPLICATION_STARTUP === 'true',
INSTANCE_CREATE: process.env?.PUSHER_EVENTS_INSTANCE_CREATE === 'true',
INSTANCE_DELETE: process.env?.PUSHER_EVENTS_INSTANCE_DELETE === 'true',
QRCODE_UPDATED: process.env?.PUSHER_EVENTS_QRCODE_UPDATED === 'true',
MESSAGES_SET: process.env?.PUSHER_EVENTS_MESSAGES_SET === 'true',
MESSAGES_UPSERT: process.env?.PUSHER_EVENTS_MESSAGES_UPSERT === 'true',
MESSAGES_EDITED: process.env?.PUSHER_EVENTS_MESSAGES_EDITED === 'true',
MESSAGES_UPDATE: process.env?.PUSHER_EVENTS_MESSAGES_UPDATE === 'true',
MESSAGES_DELETE: process.env?.PUSHER_EVENTS_MESSAGES_DELETE === 'true',
SEND_MESSAGE: process.env?.PUSHER_EVENTS_SEND_MESSAGE === 'true',
CONTACTS_SET: process.env?.PUSHER_EVENTS_CONTACTS_SET === 'true',
CONTACTS_UPDATE: process.env?.PUSHER_EVENTS_CONTACTS_UPDATE === 'true',
CONTACTS_UPSERT: process.env?.PUSHER_EVENTS_CONTACTS_UPSERT === 'true',
PRESENCE_UPDATE: process.env?.PUSHER_EVENTS_PRESENCE_UPDATE === 'true',
CHATS_SET: process.env?.PUSHER_EVENTS_CHATS_SET === 'true',
CHATS_UPDATE: process.env?.PUSHER_EVENTS_CHATS_UPDATE === 'true',
CHATS_UPSERT: process.env?.PUSHER_EVENTS_CHATS_UPSERT === 'true',
CHATS_DELETE: process.env?.PUSHER_EVENTS_CHATS_DELETE === 'true',
CONNECTION_UPDATE: process.env?.PUSHER_EVENTS_CONNECTION_UPDATE === 'true',
LABELS_EDIT: process.env?.PUSHER_EVENTS_LABELS_EDIT === 'true',
LABELS_ASSOCIATION: process.env?.PUSHER_EVENTS_LABELS_ASSOCIATION === 'true',
GROUPS_UPSERT: process.env?.PUSHER_EVENTS_GROUPS_UPSERT === 'true',
GROUP_UPDATE: process.env?.PUSHER_EVENTS_GROUPS_UPDATE === 'true',
GROUP_PARTICIPANTS_UPDATE: process.env?.PUSHER_EVENTS_GROUP_PARTICIPANTS_UPDATE === 'true',
CALL: process.env?.PUSHER_EVENTS_CALL === 'true',
TYPEBOT_START: process.env?.PUSHER_EVENTS_TYPEBOT_START === 'true',
TYPEBOT_CHANGE_STATUS: process.env?.PUSHER_EVENTS_TYPEBOT_CHANGE_STATUS === 'true',
},
},
WA_BUSINESS: {
TOKEN_WEBHOOK: process.env.WA_BUSINESS_TOKEN_WEBHOOK || 'evolution',
URL: process.env.WA_BUSINESS_URL || 'https://graph.facebook.com',
@@ -354,17 +438,9 @@ export class ConfigService {
LANGUAGE: process.env.WA_BUSINESS_LANGUAGE || 'en',
},
LOG: {
LEVEL: (process.env?.LOG_LEVEL?.split(',') as LogLevel[]) || [
'ERROR',
'WARN',
'DEBUG',
'INFO',
'LOG',
'VERBOSE',
'DARK',
'WEBHOOKS',
'WEBSOCKET',
] as LogLevel[],
LEVEL:
(process.env?.LOG_LEVEL?.split(',') as LogLevel[]) ||
(['ERROR', 'WARN', 'DEBUG', 'INFO', 'LOG', 'VERBOSE', 'DARK', 'WEBHOOKS', 'WEBSOCKET'] as LogLevel[]),
COLOR: process.env?.LOG_COLOR === 'true',
BAILEYS: (process.env?.LOG_BAILEYS as LogBaileys) || 'error',
},

View File

@@ -57,7 +57,7 @@ const getMessageContent = (types: any) => {
let result = typeKey ? types[typeKey] : undefined;
if (types.externalAdReplyBody) {
result = result ? `${result}\n${types.externalAdReplyBody}` : types.externalAdReplyBody;
result = result ? `${result}\n${types.externalAdReplyBody}` : types.externalAdReplyBody;
}
return result;