feat: Added websocket with lib socket.io

This commit is contained in:
Davidson Gomes 2023-08-02 16:11:19 -03:00
parent b3b4ee7a28
commit 24c880343b
16 changed files with 416 additions and 25 deletions

View File

@ -824,13 +824,11 @@ export const updateGroupDescriptionSchema: JSONSchema7 = {
...isNotEmpty('groupJid', 'description'),
};
// Webhook Schema
export const webhookSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
url: { type: 'string' },
enabled: { type: 'boolean', enum: [true, false] },
events: {
type: 'array',
minItems: 0,
@ -862,7 +860,7 @@ export const webhookSchema: JSONSchema7 = {
},
},
},
required: ['url', 'enabled'],
required: ['url'],
...isNotEmpty('url'),
};
@ -896,3 +894,43 @@ export const settingsSchema: JSONSchema7 = {
required: ['reject_call', 'groups_ignore', 'always_online', 'read_messages', 'read_status'],
...isNotEmpty('reject_call', 'groups_ignore', 'always_online', 'read_messages', 'read_status'),
};
export const websocketSchema: JSONSchema7 = {
$id: v4(),
type: 'object',
properties: {
enabled: { type: 'boolean', enum: [true, false] },
events: {
type: 'array',
minItems: 0,
items: {
type: 'string',
enum: [
'APPLICATION_STARTUP',
'QRCODE_UPDATED',
'MESSAGES_SET',
'MESSAGES_UPSERT',
'MESSAGES_UPDATE',
'MESSAGES_DELETE',
'SEND_MESSAGE',
'CONTACTS_SET',
'CONTACTS_UPSERT',
'CONTACTS_UPDATE',
'PRESENCE_UPDATE',
'CHATS_SET',
'CHATS_UPSERT',
'CHATS_UPDATE',
'CHATS_DELETE',
'GROUPS_UPSERT',
'GROUP_UPDATE',
'GROUP_PARTICIPANTS_UPDATE',
'CONNECTION_UPDATE',
'CALL',
'NEW_JWT_TOKEN',
],
},
},
},
required: ['enabled'],
...isNotEmpty('enabled'),
};

View File

@ -13,6 +13,7 @@ import { ChatwootService } from '../services/chatwoot.service';
import { WAMonitoringService } from '../services/monitor.service';
import { SettingsService } from '../services/settings.service';
import { WebhookService } from '../services/webhook.service';
import { WebsocketService } from '../services/websocket.service';
import { WAStartupService } from '../services/whatsapp.service';
import { wa } from '../types/wa.types';
@ -26,6 +27,7 @@ export class InstanceController {
private readonly webhookService: WebhookService,
private readonly chatwootService: ChatwootService,
private readonly settingsService: SettingsService,
private readonly websocketService: WebsocketService,
private readonly cache: RedisCache,
) {}
@ -51,6 +53,8 @@ export class InstanceController {
always_online,
read_messages,
read_status,
websocket_enabled,
websocket_events,
}: InstanceDto) {
try {
this.logger.verbose('requested createInstance from ' + instanceName + ' instance');
@ -77,7 +81,7 @@ export class InstanceController {
this.logger.verbose('hash: ' + hash + ' generated');
let getEvents: string[];
let webhookEvents: string[];
if (webhook) {
if (!isURL(webhook, { require_tld: false })) {
@ -121,7 +125,51 @@ export class InstanceController {
webhook_by_events,
});
getEvents = (await this.webhookService.find(instance)).events;
webhookEvents = (await this.webhookService.find(instance)).events;
} catch (error) {
this.logger.log(error);
}
}
let websocketEvents: string[];
if (websocket_enabled) {
this.logger.verbose('creating websocket');
try {
let newEvents: string[] = [];
if (websocket_events.length === 0) {
newEvents = [
'APPLICATION_STARTUP',
'QRCODE_UPDATED',
'MESSAGES_SET',
'MESSAGES_UPSERT',
'MESSAGES_UPDATE',
'MESSAGES_DELETE',
'SEND_MESSAGE',
'CONTACTS_SET',
'CONTACTS_UPSERT',
'CONTACTS_UPDATE',
'PRESENCE_UPDATE',
'CHATS_SET',
'CHATS_UPSERT',
'CHATS_UPDATE',
'CHATS_DELETE',
'GROUPS_UPSERT',
'GROUP_UPDATE',
'GROUP_PARTICIPANTS_UPDATE',
'CONNECTION_UPDATE',
'CALL',
'NEW_JWT_TOKEN',
];
} else {
newEvents = events;
}
this.websocketService.create(instance, {
enabled: true,
events: newEvents,
});
websocketEvents = (await this.websocketService.find(instance)).events;
} catch (error) {
this.logger.log(error);
}
@ -157,9 +205,15 @@ export class InstanceController {
status: 'created',
},
hash,
webhook,
webhook_by_events,
events: getEvents,
webhook: {
webhook,
webhook_by_events,
events: webhookEvents,
},
websocker: {
enabled: websocket_enabled,
events: websocketEvents,
},
settings,
qrcode: getQrcode,
};
@ -230,9 +284,15 @@ export class InstanceController {
status: 'created',
},
hash,
webhook,
webhook_by_events,
events: getEvents,
webhook: {
webhook,
webhook_by_events,
events: webhookEvents,
},
websocker: {
enabled: websocket_enabled,
events: websocketEvents,
},
settings,
chatwoot: {
enabled: true,

View File

@ -14,17 +14,17 @@ export class WebhookController {
public async createWebhook(instance: InstanceDto, data: WebhookDto) {
logger.verbose('requested createWebhook from ' + instance.instanceName + ' instance');
if (data.enabled && !isURL(data.url, { require_tld: false })) {
if (!isURL(data.url, { require_tld: false })) {
throw new BadRequestException('Invalid "url" property');
}
data.enabled = data.enabled ?? true;
if (!data.enabled) {
logger.verbose('webhook disabled');
data.url = '';
data.events = [];
}
if (data.events.length === 0) {
} else if (data.events.length === 0) {
logger.verbose('webhook events empty');
data.events = [
'APPLICATION_STARTUP',

View File

@ -0,0 +1,53 @@
import { Logger } from '../../config/logger.config';
import { InstanceDto } from '../dto/instance.dto';
import { WebsocketDto } from '../dto/websocket.dto';
import { WebsocketService } from '../services/websocket.service';
const logger = new Logger('WebsocketController');
export class WebsocketController {
constructor(private readonly websocketService: WebsocketService) {}
public async createWebsocket(instance: InstanceDto, data: WebsocketDto) {
logger.verbose('requested createWebsocket from ' + instance.instanceName + ' instance');
if (!data.enabled) {
logger.verbose('websocket disabled');
data.events = [];
}
if (data.events.length === 0) {
logger.verbose('websocket events empty');
data.events = [
'APPLICATION_STARTUP',
'QRCODE_UPDATED',
'MESSAGES_SET',
'MESSAGES_UPSERT',
'MESSAGES_UPDATE',
'MESSAGES_DELETE',
'SEND_MESSAGE',
'CONTACTS_SET',
'CONTACTS_UPSERT',
'CONTACTS_UPDATE',
'PRESENCE_UPDATE',
'CHATS_SET',
'CHATS_UPSERT',
'CHATS_UPDATE',
'CHATS_DELETE',
'GROUPS_UPSERT',
'GROUP_UPDATE',
'GROUP_PARTICIPANTS_UPDATE',
'CONNECTION_UPDATE',
'CALL',
'NEW_JWT_TOKEN',
];
}
return this.websocketService.create(instance, data);
}
public async findWebsocket(instance: InstanceDto) {
logger.verbose('requested findWebsocket from ' + instance.instanceName + ' instance');
return this.websocketService.find(instance);
}
}

View File

@ -18,4 +18,6 @@ export class InstanceDto {
chatwoot_sign_msg?: boolean;
chatwoot_reopen_conversation?: boolean;
chatwoot_conversation_pending?: boolean;
websocket_enabled?: boolean;
websocket_events?: string[];
}

View File

@ -0,0 +1,4 @@
export class WebsocketDto {
enabled: boolean;
events?: string[];
}

View File

@ -5,3 +5,4 @@ export * from './contact.model';
export * from './message.model';
export * from './settings.model';
export * from './webhook.model';
export * from './websocket.model';

View File

@ -0,0 +1,18 @@
import { Schema } from 'mongoose';
import { dbserver } from '../../libs/db.connect';
export class WebsocketRaw {
_id?: string;
enabled?: boolean;
events?: string[];
}
const websocketSchema = new Schema<WebsocketRaw>({
_id: { type: String, _id: true },
enabled: { type: Boolean, required: true },
events: { type: [String], required: true },
});
export const WebsocketModel = dbserver?.model(WebsocketRaw.name, websocketSchema, 'websocket');
export type IWebsocketModel = typeof WebsocketModel;

View File

@ -12,6 +12,7 @@ import { MessageRepository } from './message.repository';
import { MessageUpRepository } from './messageUp.repository';
import { SettingsRepository } from './settings.repository';
import { WebhookRepository } from './webhook.repository';
import { WebsocketRepository } from './websocket.repository';
export class RepositoryBroker {
constructor(
public readonly message: MessageRepository,
@ -21,6 +22,7 @@ export class RepositoryBroker {
public readonly webhook: WebhookRepository,
public readonly chatwoot: ChatwootRepository,
public readonly settings: SettingsRepository,
public readonly websocket: WebsocketRepository,
public readonly auth: AuthRepository,
private configService: ConfigService,
dbServer?: MongoClient,
@ -51,6 +53,7 @@ export class RepositoryBroker {
const webhookDir = join(storePath, 'webhook');
const chatwootDir = join(storePath, 'chatwoot');
const settingsDir = join(storePath, 'settings');
const websocketDir = join(storePath, 'websocket');
const tempDir = join(storePath, 'temp');
if (!fs.existsSync(authDir)) {
@ -85,6 +88,10 @@ export class RepositoryBroker {
this.logger.verbose('creating settings dir: ' + settingsDir);
fs.mkdirSync(settingsDir, { recursive: true });
}
if (!fs.existsSync(websocketDir)) {
this.logger.verbose('creating websocket dir: ' + websocketDir);
fs.mkdirSync(websocketDir, { recursive: true });
}
if (!fs.existsSync(tempDir)) {
this.logger.verbose('creating temp dir: ' + tempDir);
fs.mkdirSync(tempDir, { recursive: true });

View File

@ -0,0 +1,62 @@
import { readFileSync } from 'fs';
import { join } from 'path';
import { ConfigService } from '../../config/env.config';
import { Logger } from '../../config/logger.config';
import { IInsert, Repository } from '../abstract/abstract.repository';
import { IWebsocketModel, WebsocketRaw } from '../models';
export class WebsocketRepository extends Repository {
constructor(private readonly websocketModel: IWebsocketModel, private readonly configService: ConfigService) {
super(configService);
}
private readonly logger = new Logger('WebsocketRepository');
public async create(data: WebsocketRaw, instance: string): Promise<IInsert> {
try {
this.logger.verbose('creating websocket');
if (this.dbSettings.ENABLED) {
this.logger.verbose('saving websocket to db');
const insert = await this.websocketModel.replaceOne({ _id: instance }, { ...data }, { upsert: true });
this.logger.verbose('websocket saved to db: ' + insert.modifiedCount + ' websocket');
return { insertCount: insert.modifiedCount };
}
this.logger.verbose('saving websocket to store');
this.writeStore<WebsocketRaw>({
path: join(this.storePath, 'websocket'),
fileName: instance,
data,
});
this.logger.verbose('websocket saved to store in path: ' + join(this.storePath, 'websocket') + '/' + instance);
this.logger.verbose('websocket created');
return { insertCount: 1 };
} catch (error) {
return error;
}
}
public async find(instance: string): Promise<WebsocketRaw> {
try {
this.logger.verbose('finding websocket');
if (this.dbSettings.ENABLED) {
this.logger.verbose('finding websocket in db');
return await this.websocketModel.findOne({ _id: instance });
}
this.logger.verbose('finding websocket in store');
return JSON.parse(
readFileSync(join(this.storePath, 'websocket', instance + '.json'), {
encoding: 'utf-8',
}),
) as WebsocketRaw;
} catch (error) {
return {};
}
}
}

View File

@ -12,6 +12,7 @@ import { MessageRouter } from './sendMessage.router';
import { SettingsRouter } from './settings.router';
import { ViewsRouter } from './view.router';
import { WebhookRouter } from './webhook.router';
import { WebsocketRouter } from './websocket.router';
enum HttpStatus {
OK = 200,
@ -44,6 +45,7 @@ router
.use('/group', new GroupRouter(...guards).router)
.use('/webhook', new WebhookRouter(...guards).router)
.use('/chatwoot', new ChatwootRouter(...guards).router)
.use('/settings', new SettingsRouter(...guards).router);
.use('/settings', new SettingsRouter(...guards).router)
.use('/websocket', new WebsocketRouter(...guards).router);
export { HttpStatus, router };

View File

@ -0,0 +1,52 @@
import { RequestHandler, Router } from 'express';
import { Logger } from '../../config/logger.config';
import { instanceNameSchema, websocketSchema } from '../../validate/validate.schema';
import { RouterBroker } from '../abstract/abstract.router';
import { InstanceDto } from '../dto/instance.dto';
import { WebsocketDto } from '../dto/websocket.dto';
import { websocketController } from '../whatsapp.module';
import { HttpStatus } from './index.router';
const logger = new Logger('WebsocketRouter');
export class WebsocketRouter extends RouterBroker {
constructor(...guards: RequestHandler[]) {
super();
this.router
.post(this.routerPath('set'), ...guards, async (req, res) => {
logger.verbose('request received in setWebsocket');
logger.verbose('request body: ');
logger.verbose(req.body);
logger.verbose('request query: ');
logger.verbose(req.query);
const response = await this.dataValidate<WebsocketDto>({
request: req,
schema: websocketSchema,
ClassRef: WebsocketDto,
execute: (instance, data) => websocketController.createWebsocket(instance, data),
});
res.status(HttpStatus.CREATED).json(response);
})
.get(this.routerPath('find'), ...guards, async (req, res) => {
logger.verbose('request received in findWebsocket');
logger.verbose('request body: ');
logger.verbose(req.body);
logger.verbose('request query: ');
logger.verbose(req.query);
const response = await this.dataValidate<InstanceDto>({
request: req,
schema: instanceNameSchema,
ClassRef: InstanceDto,
execute: (instance) => websocketController.findWebsocket(instance),
});
res.status(HttpStatus.OK).json(response);
});
}
public readonly router = Router();
}

View File

@ -0,0 +1,33 @@
import { Logger } from '../../config/logger.config';
import { InstanceDto } from '../dto/instance.dto';
import { WebsocketDto } from '../dto/websocket.dto';
import { WebsocketRaw } from '../models';
import { WAMonitoringService } from './monitor.service';
export class WebsocketService {
constructor(private readonly waMonitor: WAMonitoringService) {}
private readonly logger = new Logger(WebsocketService.name);
public create(instance: InstanceDto, data: WebsocketDto) {
this.logger.verbose('create websocket: ' + instance.instanceName);
this.waMonitor.waInstances[instance.instanceName].setWebsocket(data);
return { websocket: { ...instance, websocket: data } };
}
public async find(instance: InstanceDto): Promise<WebsocketRaw> {
try {
this.logger.verbose('find websocket: ' + instance.instanceName);
const result = await this.waMonitor.waInstances[instance.instanceName].findWebsocket();
if (Object.keys(result).length === 0) {
throw new Error('Websocket not found');
}
return result;
} catch (error) {
return { enabled: false, events: [] };
}
}
}

View File

@ -116,6 +116,7 @@ import { ChatwootRaw } from '../models/chatwoot.model';
import { ContactRaw } from '../models/contact.model';
import { MessageRaw, MessageUpdateRaw } from '../models/message.model';
import { WebhookRaw } from '../models/webhook.model';
import { WebsocketRaw } from '../models/websocket.model';
import { ContactQuery } from '../repository/contact.repository';
import { MessageQuery } from '../repository/message.repository';
import { MessageUpQuery } from '../repository/messageUp.repository';
@ -142,6 +143,7 @@ export class WAStartupService {
private readonly localWebhook: wa.LocalWebHook = {};
private readonly localChatwoot: wa.LocalChatwoot = {};
private readonly localSettings: wa.LocalSettings = {};
private readonly localWebsocket: wa.LocalWebsocket = {};
public stateConnection: wa.StateConnection = { state: 'close' };
public readonly storePath = join(ROOT_DIR, 'store');
private readonly msgRetryCounterCache: CacheStore = new NodeCache();
@ -411,9 +413,44 @@ export class WAStartupService {
return data;
}
private async loadWebsocket() {
this.logger.verbose('Loading websocket');
const data = await this.repository.websocket.find(this.instanceName);
this.localWebsocket.enabled = data?.enabled;
this.logger.verbose(`Websocket enabled: ${this.localWebsocket.enabled}`);
this.localWebsocket.events = data?.events;
this.logger.verbose(`Websocket events: ${this.localWebsocket.events}`);
this.logger.verbose('Websocket loaded');
}
public async setWebsocket(data: WebsocketRaw) {
this.logger.verbose('Setting websocket');
await this.repository.websocket.create(data, this.instanceName);
this.logger.verbose(`Websocket events: ${data.events}`);
Object.assign(this.localWebsocket, data);
this.logger.verbose('Websocket set');
}
public async findWebsocket() {
this.logger.verbose('Finding websocket');
const data = await this.repository.websocket.find(this.instanceName);
if (!data) {
this.logger.verbose('Websocket not found');
throw new NotFoundException('Websocket not found');
}
this.logger.verbose(`Websocket events: ${data.events}`);
return data;
}
public async sendDataWebhook<T = any>(event: Events, data: T, local = true) {
const webhookGlobal = this.configService.get<Webhook>('WEBHOOK');
const webhookLocal = this.localWebhook.events;
const websocketLocal = this.localWebsocket.events;
const serverUrl = this.configService.get<HttpServer>('SERVER').URL;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const transformedWe = we.replace(/_/gm, '-').toLowerCase();
@ -421,14 +458,21 @@ export class WAStartupService {
const expose = this.configService.get<Auth>('AUTHENTICATION').EXPOSE_IN_FETCH_INSTANCES;
const tokenStore = await this.repository.auth.find(this.instanceName);
const instanceApikey = tokenStore?.apikey || 'Apikey not found';
const io = getIO();
this.logger.verbose('Sending data to socket.io in channel: ' + this.instance.name);
io.of(`/${this.instance.name}`).emit(event, {
event,
instance: this.instance.name,
data,
});
if (this.localWebsocket.enabled) {
this.logger.verbose('Sending data to websocket on channel: ' + this.instance.name);
if (Array.isArray(websocketLocal) && websocketLocal.includes(we)) {
this.logger.verbose('Sending data to websocket on event: ' + event);
const io = getIO();
this.logger.verbose('Sending data to socket.io in channel: ' + this.instance.name);
io.of(`/${this.instance.name}`).emit(event, {
event,
instance: this.instance.name,
data,
});
}
}
const globalApiKey = this.configService.get<Auth>('AUTHENTICATION').API_KEY.KEY;
@ -568,7 +612,6 @@ export class WAStartupService {
this.logger.verbose('Connection update');
if (qr) {
this.logger.verbose('QR code found');
console.log('this.instance.qrcode', this.instance.qrcode);
if (this.instance.qrcode.count === this.configService.get<QrCode>('QRCODE').LIMIT) {
this.logger.verbose('QR code limit reached');
@ -823,6 +866,7 @@ export class WAStartupService {
this.loadWebhook();
this.loadChatwoot();
this.loadSettings();
this.loadWebsocket();
this.instance.authState = await this.defineAuthState();
@ -2380,7 +2424,6 @@ export class WAStartupService {
if (!last_message || Object.keys(last_message).length === 0) {
throw new NotFoundException('Last message not found');
}
console.log(last_message);
await this.client.chatModify(
{

View File

@ -70,6 +70,11 @@ export declare namespace wa {
read_status?: boolean;
};
export type LocalWebsocket = {
enabled?: boolean;
events?: string[];
};
export type StateConnection = {
instance?: string;
state?: WAConnectionState | 'refused';

View File

@ -11,6 +11,7 @@ import { SendMessageController } from './controllers/sendMessage.controller';
import { SettingsController } from './controllers/settings.controller';
import { ViewsController } from './controllers/views.controller';
import { WebhookController } from './controllers/webhook.controller';
import { WebsocketController } from './controllers/websocket.controller';
import {
AuthModel,
ChatModel,
@ -20,6 +21,7 @@ import {
MessageUpModel,
SettingsModel,
WebhookModel,
WebsocketModel,
} from './models';
import { AuthRepository } from './repository/auth.repository';
import { ChatRepository } from './repository/chat.repository';
@ -30,11 +32,13 @@ import { MessageUpRepository } from './repository/messageUp.repository';
import { RepositoryBroker } from './repository/repository.manager';
import { SettingsRepository } from './repository/settings.repository';
import { WebhookRepository } from './repository/webhook.repository';
import { WebsocketRepository } from './repository/websocket.repository';
import { AuthService } from './services/auth.service';
import { ChatwootService } from './services/chatwoot.service';
import { WAMonitoringService } from './services/monitor.service';
import { SettingsService } from './services/settings.service';
import { WebhookService } from './services/webhook.service';
import { WebsocketService } from './services/websocket.service';
const logger = new Logger('WA MODULE');
@ -43,6 +47,7 @@ const chatRepository = new ChatRepository(ChatModel, configService);
const contactRepository = new ContactRepository(ContactModel, configService);
const messageUpdateRepository = new MessageUpRepository(MessageUpModel, configService);
const webhookRepository = new WebhookRepository(WebhookModel, configService);
const websocketRepository = new WebsocketRepository(WebsocketModel, configService);
const chatwootRepository = new ChatwootRepository(ChatwootModel, configService);
const settingsRepository = new SettingsRepository(SettingsModel, configService);
const authRepository = new AuthRepository(AuthModel, configService);
@ -55,6 +60,7 @@ export const repository = new RepositoryBroker(
webhookRepository,
chatwootRepository,
settingsRepository,
websocketRepository,
authRepository,
configService,
dbserver?.getClient(),
@ -70,6 +76,10 @@ const webhookService = new WebhookService(waMonitor);
export const webhookController = new WebhookController(webhookService);
const websocketService = new WebsocketService(waMonitor);
export const websocketController = new WebsocketController(websocketService);
const chatwootService = new ChatwootService(waMonitor, configService);
export const chatwootController = new ChatwootController(chatwootService, configService);
@ -87,6 +97,7 @@ export const instanceController = new InstanceController(
webhookService,
chatwootService,
settingsService,
websocketService,
cache,
);
export const viewsController = new ViewsController(waMonitor, configService);