mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2025-12-19 20:02:20 -06:00
feat: prisma and remove mongodb
This commit is contained in:
@@ -1,8 +1,6 @@
|
||||
import { execSync } from 'child_process';
|
||||
import EventEmitter2 from 'eventemitter2';
|
||||
import { existsSync, mkdirSync, opendirSync, readdirSync, rmSync, writeFileSync } from 'fs';
|
||||
import { Db } from 'mongodb';
|
||||
import { Collection } from 'mongoose';
|
||||
import { join } from 'path';
|
||||
|
||||
import {
|
||||
@@ -17,21 +15,8 @@ import {
|
||||
import { Logger } from '../../config/logger.config';
|
||||
import { INSTANCE_DIR, STORE_DIR } from '../../config/path.config';
|
||||
import { NotFoundException } from '../../exceptions';
|
||||
import {
|
||||
AuthModel,
|
||||
ChatwootModel,
|
||||
ContactModel,
|
||||
LabelModel,
|
||||
ProxyModel,
|
||||
RabbitmqModel,
|
||||
SettingsModel,
|
||||
TypebotModel,
|
||||
WebhookModel,
|
||||
WebsocketModel,
|
||||
} from '../models';
|
||||
import { ProviderFiles } from '../provider/sessions';
|
||||
import { MongodbRepository } from '../repository/mongodb/repository.manager';
|
||||
import { PrismaRepository } from '../repository/prisma/repository.service';
|
||||
import { PrismaRepository } from '../repository/repository.service';
|
||||
import { Integration } from '../types/wa.types';
|
||||
import { CacheService } from './cache.service';
|
||||
import { BaileysStartupService } from './channels/whatsapp.baileys.service';
|
||||
@@ -41,12 +26,11 @@ export class WAMonitoringService {
|
||||
constructor(
|
||||
private readonly eventEmitter: EventEmitter2,
|
||||
private readonly configService: ConfigService,
|
||||
private readonly monogodbRepository: MongodbRepository,
|
||||
private readonly primaRepository: PrismaRepository,
|
||||
private readonly prismaRepository: PrismaRepository,
|
||||
private readonly providerFiles: ProviderFiles,
|
||||
private readonly cache: CacheService,
|
||||
private readonly chatwootCache: CacheService,
|
||||
private readonly baileysCache: CacheService,
|
||||
private readonly providerFiles: ProviderFiles,
|
||||
) {
|
||||
this.logger.verbose('instance created');
|
||||
|
||||
@@ -55,17 +39,11 @@ export class WAMonitoringService {
|
||||
|
||||
Object.assign(this.db, configService.get<Database>('DATABASE'));
|
||||
Object.assign(this.redis, configService.get<CacheConf>('CACHE'));
|
||||
|
||||
this.dbInstance = this.db.ENABLED
|
||||
? this.monogodbRepository.mongodbServer?.db(this.db.CONNECTION.DB_PREFIX_NAME + '-instances')
|
||||
: undefined;
|
||||
}
|
||||
|
||||
private readonly db: Partial<Database> = {};
|
||||
private readonly redis: Partial<CacheConf> = {};
|
||||
|
||||
private dbInstance: Db;
|
||||
|
||||
private readonly logger = new Logger(WAMonitoringService.name);
|
||||
public readonly waInstances: Record<string, BaileysStartupService | BusinessStartupService> = {};
|
||||
|
||||
@@ -126,7 +104,7 @@ export class WAMonitoringService {
|
||||
if (findIntegration) {
|
||||
integration = {
|
||||
...findIntegration,
|
||||
webhook_wa_business: `${urlServer}/webhook/whatsapp/${encodeURIComponent(key)}`,
|
||||
webhookWaBusiness: `${urlServer}/webhook/whatsapp/${encodeURIComponent(key)}`,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -136,7 +114,7 @@ export class WAMonitoringService {
|
||||
const instanceData = {
|
||||
instance: {
|
||||
instanceName: key,
|
||||
instanceId: (await this.monogodbRepository.auth.find(key))?.instanceId,
|
||||
instanceId: this.waInstances[key].instanceId,
|
||||
owner: value.wuid,
|
||||
profileName: (await value.getProfileName()) || 'not loaded',
|
||||
profilePictureUrl: value.profilePictureUrl,
|
||||
@@ -148,7 +126,11 @@ export class WAMonitoringService {
|
||||
if (this.configService.get<Auth>('AUTHENTICATION').EXPOSE_IN_FETCH_INSTANCES) {
|
||||
instanceData.instance['serverUrl'] = this.configService.get<HttpServer>('SERVER').URL;
|
||||
|
||||
instanceData.instance['apikey'] = (await this.monogodbRepository.auth.find(key))?.apikey;
|
||||
instanceData.instance['apikey'] = (
|
||||
await this.prismaRepository.auth.findFirst({
|
||||
where: { instanceId: this.waInstances[key].instanceId },
|
||||
})
|
||||
)?.apikey;
|
||||
|
||||
instanceData.instance['chatwoot'] = chatwoot;
|
||||
|
||||
@@ -162,7 +144,7 @@ export class WAMonitoringService {
|
||||
const instanceData = {
|
||||
instance: {
|
||||
instanceName: key,
|
||||
instanceId: (await this.monogodbRepository.auth.find(key))?.instanceId,
|
||||
instanceId: this.waInstances[key].instanceId,
|
||||
status: value.connectionStatus.state,
|
||||
},
|
||||
};
|
||||
@@ -170,7 +152,11 @@ export class WAMonitoringService {
|
||||
if (this.configService.get<Auth>('AUTHENTICATION').EXPOSE_IN_FETCH_INSTANCES) {
|
||||
instanceData.instance['serverUrl'] = this.configService.get<HttpServer>('SERVER').URL;
|
||||
|
||||
instanceData.instance['apikey'] = (await this.monogodbRepository.auth.find(key))?.apikey;
|
||||
instanceData.instance['apikey'] = (
|
||||
await this.prismaRepository.auth.findFirst({
|
||||
where: { instanceId: this.waInstances[key].instanceId },
|
||||
})
|
||||
)?.apikey;
|
||||
|
||||
instanceData.instance['chatwoot'] = chatwoot;
|
||||
|
||||
@@ -194,12 +180,14 @@ export class WAMonitoringService {
|
||||
this.logger.verbose('get instance info');
|
||||
let instanceName: string;
|
||||
if (instanceId) {
|
||||
instanceName = await this.monogodbRepository.auth.findInstanceNameById(instanceId);
|
||||
instanceName = await this.prismaRepository.instance.findFirst({ where: { id: instanceId } }).then((r) => r?.name);
|
||||
if (!instanceName) {
|
||||
throw new NotFoundException(`Instance "${instanceId}" not found`);
|
||||
}
|
||||
} else if (number) {
|
||||
instanceName = await this.monogodbRepository.auth.findInstanceNameByNumber(number);
|
||||
const id = await this.prismaRepository.integration.findFirst({ where: { number } }).then((r) => r?.instanceId);
|
||||
|
||||
instanceName = await this.prismaRepository.instance.findFirst({ where: { id } }).then((r) => r?.name);
|
||||
if (!instanceName) {
|
||||
throw new NotFoundException(`Instance "${number}" not found`);
|
||||
}
|
||||
@@ -216,50 +204,12 @@ export class WAMonitoringService {
|
||||
return this.instanceInfo(instanceName);
|
||||
}
|
||||
|
||||
private delInstanceFiles() {
|
||||
this.logger.verbose('cron to delete instance files started');
|
||||
setInterval(async () => {
|
||||
if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) {
|
||||
const collections = await this.dbInstance.collections();
|
||||
collections.forEach(async (collection) => {
|
||||
const name = collection.namespace.replace(/^[\w-]+./, '');
|
||||
await this.dbInstance.collection(name).deleteMany({
|
||||
$or: [{ _id: { $regex: /^app.state.*/ } }, { _id: { $regex: /^session-.*/ } }],
|
||||
});
|
||||
this.logger.verbose('instance files deleted: ' + name);
|
||||
});
|
||||
} else if (!this.redis.REDIS.ENABLED && !this.redis.REDIS.SAVE_INSTANCES) {
|
||||
const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' });
|
||||
for await (const dirent of dir) {
|
||||
if (dirent.isDirectory()) {
|
||||
const files = readdirSync(join(INSTANCE_DIR, dirent.name), {
|
||||
encoding: 'utf-8',
|
||||
});
|
||||
files.forEach(async (file) => {
|
||||
if (file.match(/^app.state.*/) || file.match(/^session-.*/)) {
|
||||
rmSync(join(INSTANCE_DIR, dirent.name, file), {
|
||||
recursive: true,
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
});
|
||||
this.logger.verbose('instance files deleted: ' + dirent.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 3600 * 1000 * 2);
|
||||
}
|
||||
|
||||
public async cleaningUp(instanceName: string) {
|
||||
this.logger.verbose('cleaning up instance: ' + instanceName);
|
||||
|
||||
if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) {
|
||||
this.logger.verbose('cleaning up instance in database: ' + instanceName);
|
||||
await this.monogodbRepository.mongodbServer.connect();
|
||||
const collections: any[] = await this.dbInstance.collections();
|
||||
if (collections.length > 0) {
|
||||
await this.dbInstance.dropCollection(instanceName);
|
||||
}
|
||||
// TODO: deleta instancia
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -304,20 +254,7 @@ export class WAMonitoringService {
|
||||
|
||||
this.logger.verbose('cleaning store database instance: ' + instanceName);
|
||||
|
||||
if (this.db.PROVIDER === 'mongodb') {
|
||||
await AuthModel.deleteMany({ _id: instanceName });
|
||||
await WebhookModel.deleteMany({ _id: instanceName });
|
||||
await ChatwootModel.deleteMany({ _id: instanceName });
|
||||
await ProxyModel.deleteMany({ _id: instanceName });
|
||||
await RabbitmqModel.deleteMany({ _id: instanceName });
|
||||
await TypebotModel.deleteMany({ _id: instanceName });
|
||||
await WebsocketModel.deleteMany({ _id: instanceName });
|
||||
await SettingsModel.deleteMany({ _id: instanceName });
|
||||
await LabelModel.deleteMany({ owner: instanceName });
|
||||
await ContactModel.deleteMany({ owner: instanceName });
|
||||
|
||||
return;
|
||||
}
|
||||
// TODO: deleta dados da instancia
|
||||
}
|
||||
|
||||
public async loadInstance() {
|
||||
@@ -329,8 +266,7 @@ export class WAMonitoringService {
|
||||
} else if (this.redis.REDIS.ENABLED && this.redis.REDIS.SAVE_INSTANCES) {
|
||||
await this.loadInstancesFromRedis();
|
||||
} else if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) {
|
||||
if (this.db.PROVIDER === 'mongodb') await this.loadInstancesFromDatabaseMongoDB();
|
||||
else if (this.db.PROVIDER === 'postgresql') await this.loadInstancesFromDatabasePostgres();
|
||||
await this.loadInstancesFromDatabasePostgres();
|
||||
} else {
|
||||
await this.loadInstancesFromFiles();
|
||||
}
|
||||
@@ -345,9 +281,22 @@ export class WAMonitoringService {
|
||||
try {
|
||||
const msgParsed = JSON.parse(JSON.stringify(data));
|
||||
if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) {
|
||||
await this.monogodbRepository.mongodbServer.connect();
|
||||
await this.dbInstance.collection(data.instanceName).replaceOne({ _id: 'integration' }, msgParsed, {
|
||||
upsert: true,
|
||||
await this.prismaRepository.instance.create({
|
||||
data: {
|
||||
id: data.instanceId,
|
||||
name: data.instanceName,
|
||||
connectionStatus: 'close',
|
||||
},
|
||||
});
|
||||
|
||||
console.log('saveInstance');
|
||||
await this.prismaRepository.integration.create({
|
||||
data: {
|
||||
instanceId: data.instanceId,
|
||||
integration: data.integration,
|
||||
number: data.number,
|
||||
token: data.token,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
const path = join(INSTANCE_DIR, data.instanceName);
|
||||
@@ -359,16 +308,18 @@ export class WAMonitoringService {
|
||||
}
|
||||
}
|
||||
|
||||
private async setInstance(name: string) {
|
||||
const integration = await this.monogodbRepository.integration.find(name);
|
||||
private async setInstance(id: string, name: string) {
|
||||
console.log('setInstance', name);
|
||||
const integration = await this.prismaRepository.integration.findUnique({
|
||||
where: { instanceId: id },
|
||||
});
|
||||
|
||||
let instance: BaileysStartupService | BusinessStartupService;
|
||||
if (integration && integration.integration === Integration.WHATSAPP_BUSINESS) {
|
||||
instance = new BusinessStartupService(
|
||||
this.configService,
|
||||
this.eventEmitter,
|
||||
this.monogodbRepository,
|
||||
this.primaRepository,
|
||||
this.prismaRepository,
|
||||
this.cache,
|
||||
this.chatwootCache,
|
||||
this.baileysCache,
|
||||
@@ -376,12 +327,12 @@ export class WAMonitoringService {
|
||||
);
|
||||
|
||||
instance.instanceName = name;
|
||||
instance.instanceId = id;
|
||||
} else {
|
||||
instance = new BaileysStartupService(
|
||||
this.configService,
|
||||
this.eventEmitter,
|
||||
this.monogodbRepository,
|
||||
this.primaRepository,
|
||||
this.prismaRepository,
|
||||
this.cache,
|
||||
this.chatwootCache,
|
||||
this.baileysCache,
|
||||
@@ -389,9 +340,10 @@ export class WAMonitoringService {
|
||||
);
|
||||
|
||||
instance.instanceName = name;
|
||||
instance.instanceId = id;
|
||||
|
||||
if (!integration) {
|
||||
await instance.setIntegration({ integration: Integration.WHATSAPP_BAILEYS });
|
||||
await instance.setIntegration({ integration: Integration.WHATSAPP_BAILEYS, number: '', token: '' });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -403,45 +355,34 @@ export class WAMonitoringService {
|
||||
}
|
||||
|
||||
private async loadInstancesFromRedis() {
|
||||
console.log('loadInstancesFromRedis');
|
||||
this.logger.verbose('Redis enabled');
|
||||
const keys = await this.cache.keys();
|
||||
|
||||
if (keys?.length > 0) {
|
||||
this.logger.verbose('Reading instance keys and setting instances');
|
||||
await Promise.all(keys.map((k) => this.setInstance(k.split(':')[2])));
|
||||
await Promise.all(keys.map((k) => this.setInstance(k.split(':')[1], k.split(':')[2])));
|
||||
} else {
|
||||
this.logger.verbose('No instance keys found');
|
||||
}
|
||||
}
|
||||
|
||||
private async loadInstancesFromDatabaseMongoDB() {
|
||||
this.logger.verbose('Database enabled');
|
||||
await this.monogodbRepository.mongodbServer.connect();
|
||||
const collections: any[] = await this.dbInstance.collections();
|
||||
await this.deleteTempInstances(collections);
|
||||
if (collections.length > 0) {
|
||||
this.logger.verbose('Reading collections and setting instances');
|
||||
await Promise.all(collections.map((coll) => this.setInstance(coll.namespace.replace(/^[\w-]+\./, ''))));
|
||||
} else {
|
||||
this.logger.verbose('No collections found');
|
||||
}
|
||||
}
|
||||
|
||||
private async loadInstancesFromDatabasePostgres() {
|
||||
console.log('loadInstancesFromDatabasePostgres');
|
||||
this.logger.verbose('Database enabled');
|
||||
await this.primaRepository.onModuleInit();
|
||||
|
||||
const instances = await this.primaRepository.instance.findMany();
|
||||
const instances = await this.prismaRepository.instance.findMany();
|
||||
|
||||
console.log('instances', instances);
|
||||
if (instances.length === 0) {
|
||||
this.logger.verbose('No instances found');
|
||||
return;
|
||||
}
|
||||
|
||||
await Promise.all(instances.map(async (instance) => this.setInstance(instance.name)));
|
||||
await Promise.all(instances.map(async (instance) => this.setInstance(instance.id, instance.name)));
|
||||
}
|
||||
|
||||
private async loadInstancesFromProvider() {
|
||||
console.log('loadInstancesFromProvider');
|
||||
this.logger.verbose('Provider in files enabled');
|
||||
const [instances] = await this.providerFiles.allInstances();
|
||||
|
||||
@@ -450,10 +391,11 @@ export class WAMonitoringService {
|
||||
return;
|
||||
}
|
||||
|
||||
await Promise.all(instances?.data?.map(async (instanceName: string) => this.setInstance(instanceName)));
|
||||
await Promise.all(instances?.data?.map(async (instanceName: string) => this.setInstance('', instanceName)));
|
||||
}
|
||||
|
||||
private async loadInstancesFromFiles() {
|
||||
console.log('loadInstancesFromFiles');
|
||||
this.logger.verbose('Store in files enabled');
|
||||
const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' });
|
||||
const instanceDirs = [];
|
||||
@@ -474,7 +416,7 @@ export class WAMonitoringService {
|
||||
if (files.length === 0) {
|
||||
rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true });
|
||||
} else {
|
||||
await this.setInstance(instanceName);
|
||||
await this.setInstance('', instanceName);
|
||||
}
|
||||
}),
|
||||
);
|
||||
@@ -534,25 +476,42 @@ export class WAMonitoringService {
|
||||
});
|
||||
}
|
||||
|
||||
private async deleteTempInstances(collections: Collection<Document>[]) {
|
||||
private delInstanceFiles() {
|
||||
this.logger.verbose('cron to delete instance files started');
|
||||
setInterval(async () => {
|
||||
const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' });
|
||||
for await (const dirent of dir) {
|
||||
if (dirent.isDirectory()) {
|
||||
const files = readdirSync(join(INSTANCE_DIR, dirent.name), {
|
||||
encoding: 'utf-8',
|
||||
});
|
||||
files.forEach(async (file) => {
|
||||
if (file.match(/^app.state.*/) || file.match(/^session-.*/)) {
|
||||
rmSync(join(INSTANCE_DIR, dirent.name, file), {
|
||||
recursive: true,
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
});
|
||||
this.logger.verbose('instance files deleted: ' + dirent.name);
|
||||
}
|
||||
}
|
||||
}, 3600 * 1000 * 2);
|
||||
}
|
||||
|
||||
private async deleteTempInstances() {
|
||||
const shouldDelete = this.configService.get<boolean>('DEL_TEMP_INSTANCES');
|
||||
if (!shouldDelete) {
|
||||
this.logger.verbose('Temp instances deletion is disabled');
|
||||
return;
|
||||
}
|
||||
this.logger.verbose('Cleaning up temp instances');
|
||||
const auths = await this.monogodbRepository.auth.list();
|
||||
if (auths.length === 0) {
|
||||
this.logger.verbose('No temp instances found');
|
||||
return;
|
||||
}
|
||||
const instancesClosed = await this.prismaRepository.instance.findMany({ where: { connectionStatus: 'close' } });
|
||||
|
||||
let tempInstances = 0;
|
||||
auths.forEach((auth) => {
|
||||
if (collections.find((coll) => coll.namespace.replace(/^[\w-]+\./, '') === auth._id)) {
|
||||
return;
|
||||
}
|
||||
instancesClosed.forEach((instance) => {
|
||||
tempInstances++;
|
||||
this.eventEmitter.emit('remove.instance', auth._id, 'inner');
|
||||
this.eventEmitter.emit('remove.instance', instance.id, 'inner');
|
||||
});
|
||||
this.logger.verbose('Temp instances removed: ' + tempInstances);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user