diff --git a/package.json b/package.json index 4d2c17cd..c6684e08 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,8 @@ "start:prod": "bash start.sh", "dev:server": "clear && tsnd --files --transpile-only --respawn --ignore-watch node_modules ./src/main.ts", "test": "clear && tsnd --files --transpile-only --respawn --ignore-watch node_modules ./test/all.test.ts", - "lint": "eslint --fix --ext .ts src" + "lint": "eslint --fix --ext .ts src", + "create:indexes": "ts-node ./src/createIndexes.ts" }, "repository": { "type": "git", diff --git a/src/api/controllers/instance.controller.ts b/src/api/controllers/instance.controller.ts index 2415fd83..5cbd4ece 100644 --- a/src/api/controllers/instance.controller.ts +++ b/src/api/controllers/instance.controller.ts @@ -402,6 +402,7 @@ export class InstanceController { read_status: read_status || false, sync_full_history: sync_full_history ?? false, ignore_list: [], + initial_connection: null, }; this.logger.verbose('settings: ' + JSON.stringify(settings)); diff --git a/src/api/dto/settings.dto.ts b/src/api/dto/settings.dto.ts index f74c625d..145a61b4 100644 --- a/src/api/dto/settings.dto.ts +++ b/src/api/dto/settings.dto.ts @@ -7,4 +7,5 @@ export class SettingsDto { read_status?: boolean; sync_full_history?: boolean; ignore_list?: string[]; + initial_connection?: number; } diff --git a/src/api/integrations/kwik/controllers/kwik.controller.ts b/src/api/integrations/kwik/controllers/kwik.controller.ts index d9ed07d9..833332c4 100644 --- a/src/api/integrations/kwik/controllers/kwik.controller.ts +++ b/src/api/integrations/kwik/controllers/kwik.controller.ts @@ -5,11 +5,17 @@ import { Logger } from '../../../../config/logger.config'; import { dbserver } from '../../../../libs/db.connect'; import { InstanceDto } from '../../../dto/instance.dto'; import { WAMonitoringService } from '../../../services/monitor.service'; +import { SettingsService } from '../../../services/settings.service'; const logger = new Logger('KwikController'); +type SearchObject = { + text_search: string; + where: string[]; +}; + export class KwikController { - constructor(private readonly waMonitor: WAMonitoringService) {} + constructor(private readonly waMonitor: WAMonitoringService, private readonly settingsService: SettingsService) {} private isTextMessage(messageType: any) { return [ @@ -20,19 +26,76 @@ export class KwikController { 'messageContextInfo', ].includes(messageType); } + + private async findOffsetByUUID(query, sortOrder, docUUID, batchSize = 1000) { + const db = configService.get('DATABASE'); + const connection = dbserver.getClient().db(db.CONNECTION.DB_PREFIX_NAME + '-whatsapp-api'); + const collection = connection.collection('messages'); + + let offset = 0; + let found = false; + + while (!found) { + // Fetch a batch of documents sorted as per the query + const batch = await collection.find(query).sort(sortOrder).skip(offset).limit(batchSize).toArray(); + const index = batch.findIndex((doc) => doc.key.id === docUUID); + + if (index !== -1) { + // If the document is found in the batch, calculate its offset + found = true; + offset += index; + } else if (batch.length < batchSize) { + // If the batch is smaller than batchSize, we have exhausted the collection + throw new Error(`Document with UUID ${docUUID} not found in the collection.`); + } else { + // Otherwise, move the offset forward by the batch size and continue searching + offset += batchSize; + } + } + + return offset; + } + + private firstMultipleBefore(X, Y) { + return Math.floor(Y / X) * X; + } + + public async messageOffset( + { instanceName }: InstanceDto, + messageTimestamp: number, + remoteJid: string, + sort: any, + limit: number, + docUUID: string, + ) { + const query = { + 'key.remoteJid': remoteJid, + messageTimestamp: { $gte: messageTimestamp }, + owner: instanceName, + }; + const offset = await this.findOffsetByUUID(query, sort, docUUID); + const multiple = this.firstMultipleBefore(limit, offset); + return multiple; + } + public async fetchChats( { instanceName }: InstanceDto, limit: number, skip: number, sort: any, messageTimestamp: number, + remoteJid?: string, ) { const db = configService.get('DATABASE'); const connection = dbserver.getClient().db(db.CONNECTION.DB_PREFIX_NAME + '-whatsapp-api'); const messages = connection.collection('messages'); + let match: { owner: string; 'key.remoteJid'?: string } = { owner: instanceName }; + if (remoteJid) { + match = { ...match, 'key.remoteJid': remoteJid }; + } const pipeline: Document[] = [ { $sort: { 'key.remoteJid': -1, messageTimestamp: -1 } }, - { $match: { owner: instanceName } }, + { $match: match }, { $group: { _id: '$key.remoteJid', @@ -184,4 +247,87 @@ export class KwikController { }; } } + public async cleanChats(instance: InstanceDto) { + const db = configService.get('DATABASE'); + const connection = dbserver.getClient().db(db.CONNECTION.DB_PREFIX_NAME + '-whatsapp-api'); + const settings = this.settingsService.find(instance); + const initialConnection = (await settings).initial_connection; + if (initialConnection) { + connection + .collection('messages') + .deleteMany({ owner: instance.instanceName, messageTimestamp: { $lt: initialConnection } }); + } + + return { status: 'ok' }; + } + + public async textSearch({ instanceName }: InstanceDto, query: SearchObject) { + logger.verbose('request received in textSearch'); + logger.verbose(instanceName); + logger.verbose(query); + + const db = configService.get('DATABASE'); + const connection = dbserver.getClient().db(db.CONNECTION.DB_PREFIX_NAME + '-whatsapp-api'); + const messages = await connection + .collection('messages') + .find({ + owner: { $in: query.where }, + $text: { $search: query.text_search }, + }) + .limit(100) + .toArray(); + + const data = []; + + const uniqueContacts = Array.from( + new Set(messages.filter((m) => !m.key.remoteJid.includes('@g.us')).map((m) => `${m.owner}#${m.key.remoteJid}`)), + ); + const contacts_promises = uniqueContacts.map((m) => { + return connection.collection('contacts').findOne({ owner: m.split('#')[0], id: m.split('#')[1] }); + }); + const uniqueGroups = Array.from( + new Set(messages.filter((m) => m.key.remoteJid.includes('@g.us')).map((m) => `${m.owner}#${m.key.remoteJid}`)), + ); + + const groups_promises = uniqueGroups.map(async (g) => { + const instanceName = g.split('#')[0]; + const groupJid = g.split('#')[1]; + const group = await this.waMonitor.waInstances[instanceName].findGroup({ groupJid }, 'inner'); + + return group ? { ...group, instanceName } : null; + }); + + const [...contacts_solved] = await Promise.all([...contacts_promises]); + const [...groups_solved] = await Promise.all([...groups_promises]); + + const contacts = Object.fromEntries(contacts_solved.filter((c) => c != null).map((c) => [`${c.owner}#${c.id}`, c])); + const groups = Object.fromEntries( + groups_solved.filter((g) => g !== null).map((g) => [`${g.instanceName}#${g.id}`, g]), + ); + + for (let i = 0; i < messages.length; i++) { + const message = messages[i]; + const info = message.key.remoteJid.split('@'); + let type; + let tinfo; + if (info[1] == 'g.us') { + tinfo = groups[`${message.owner}#${message.key.remoteJid}`]; + + type = 'GROUP'; + } else { + tinfo = contacts[`${message.owner}#${message.key.remoteJid}`]; + type = 'CONTACT'; + } + data.push({ + message: message, + + owner: message.owner, + conversation: `${message.owner}#${info}`, + type: type, + info: tinfo, + }); + } + + return { data }; + } } diff --git a/src/api/integrations/kwik/routes/kwik.router.ts b/src/api/integrations/kwik/routes/kwik.router.ts index effa2b2e..39e7a604 100644 --- a/src/api/integrations/kwik/routes/kwik.router.ts +++ b/src/api/integrations/kwik/routes/kwik.router.ts @@ -30,6 +30,7 @@ export class KwikRouter extends RouterBroker { Number(req.query.skip), req.query.sort, Number(req.query.messageTimestamp), + req.query.remoteJid ? req.query.remoteJid.toString() : null, ), }); @@ -70,6 +71,68 @@ export class KwikRouter extends RouterBroker { return res.status(HttpStatus.OK).json(response); }); + + this.router.post(this.routerPath('cleanChats'), ...guards, async (req, res) => { + logger.verbose('request received in cleanChats'); + logger.verbose('request received in cleanChats'); + logger.verbose('request body: '); + logger.verbose(req.body); + logger.verbose('request query: '); + logger.verbose(req.query); + + const response = await this.dataValidate({ + request: req, + schema: null, + ClassRef: InstanceDto, + execute: (instance) => kwikController.cleanChats(instance), + }); + + return res.status(HttpStatus.OK).json(response); + }); + + this.router.post(this.routerPath('textSearch'), ...guards, async (req, res) => { + logger.verbose('request received in textSearch'); + logger.verbose('request body: '); + logger.verbose(req.body); + + logger.verbose('request query: '); + logger.verbose(req.query); + + const response = await this.dataValidate({ + request: req, + schema: null, + ClassRef: InstanceDto, + execute: (instance) => kwikController.textSearch(instance, req.body), + }); + + return res.status(HttpStatus.OK).json(response); + }); + + this.router.get(this.routerPath('messageOffset'), ...guards, async (req, res) => { + logger.verbose('request received in messageOffset'); + logger.verbose('request body: '); + logger.verbose(req.body); + + logger.verbose('request query: '); + logger.verbose(req.query); + + const response = await this.dataValidate({ + request: req, + schema: null, + ClassRef: InstanceDto, + execute: (instance) => + kwikController.messageOffset( + instance, + req.body.message_timestamp, + req.body.remote_jid, + req.body.sort, + req.body.limit, + req.body.chat_message_id, + ), + }); + + return res.status(HttpStatus.OK).json(response); + }); } public readonly router = Router(); diff --git a/src/api/models/settings.model.ts b/src/api/models/settings.model.ts index 5e0b7e01..b27b06a8 100644 --- a/src/api/models/settings.model.ts +++ b/src/api/models/settings.model.ts @@ -12,6 +12,7 @@ export class SettingsRaw { read_status?: boolean; sync_full_history?: boolean; ignore_list?: string[]; + initial_connection?: number; } const settingsSchema = new Schema({ @@ -24,6 +25,7 @@ const settingsSchema = new Schema({ read_status: { type: Boolean, required: true }, sync_full_history: { type: Boolean, required: true }, ignore_list: { type: [String], required: false }, + initial_connection: { type: Number, required: false }, }); export const SettingsModel = dbserver?.model(SettingsRaw.name, settingsSchema, 'settings'); diff --git a/src/api/server.module.ts b/src/api/server.module.ts index d6a335f3..2959623a 100644 --- a/src/api/server.module.ts +++ b/src/api/server.module.ts @@ -183,6 +183,6 @@ export const sendMessageController = new SendMessageController(waMonitor); export const chatController = new ChatController(waMonitor); export const groupController = new GroupController(waMonitor); export const labelController = new LabelController(waMonitor); -export const kwikController = new KwikController(waMonitor); +export const kwikController = new KwikController(waMonitor, settingsService); logger.info('Module - ON'); diff --git a/src/api/services/channel.service.ts b/src/api/services/channel.service.ts index f48970e5..896d894f 100644 --- a/src/api/services/channel.service.ts +++ b/src/api/services/channel.service.ts @@ -223,6 +223,7 @@ export class ChannelStartupService { read_status: data.read_status, sync_full_history: data.sync_full_history, ignore_list: data.ignore_list, + initial_connection: data.initial_connection, }; } diff --git a/src/api/services/channels/whatsapp.baileys.service.ts b/src/api/services/channels/whatsapp.baileys.service.ts index d3843d9d..c9d4fefb 100644 --- a/src/api/services/channels/whatsapp.baileys.service.ts +++ b/src/api/services/channels/whatsapp.baileys.service.ts @@ -979,6 +979,13 @@ export class BaileysStartupService extends ChannelStartupService { continue; } + if (settings.initial_connection && (m.messageTimestamp as number) <= settings.initial_connection) { + this.logger.verbose( + `[messaging-history.set] Ignore -> ${m.messageTimestamp} <= ${settings.initial_connection}`, + ); + continue; + } + if (messagesRepository.has(m.key.id)) { continue; } @@ -1098,6 +1105,13 @@ export class BaileysStartupService extends ChannelStartupService { return; } + if (settings?.initial_connection && (received.messageTimestamp as number) <= settings.initial_connection) { + this.logger.verbose( + `[messages.upsert] Ignore -> ${received.messageTimestamp} <= ${settings.initial_connection}`, + ); + continue; + } + if (settings?.ignore_list && settings.ignore_list.includes(received.key.remoteJid)) { this.logger.verbose('contact in ignore list.'); return; diff --git a/src/api/types/wa.types.ts b/src/api/types/wa.types.ts index eb9fe99a..41d5098e 100644 --- a/src/api/types/wa.types.ts +++ b/src/api/types/wa.types.ts @@ -84,6 +84,7 @@ export declare namespace wa { read_status?: boolean; sync_full_history?: boolean; ignore_list?: string[]; + initial_connection?: number; }; export type LocalWebsocket = { diff --git a/src/createIndexes.ts b/src/createIndexes.ts new file mode 100644 index 00000000..4c7aeacd --- /dev/null +++ b/src/createIndexes.ts @@ -0,0 +1,40 @@ +import { configService, Database } from './config/env.config'; +import { dbserver } from './libs/db.connect'; + +(async () => { + const db = configService.get('DATABASE'); + const client = dbserver.getClient(); + const connection = client.db(db.CONNECTION.DB_PREFIX_NAME + '-whatsapp-api'); + const collection = connection.collection('messages'); + + await collection.createIndex({ 'key.remoteJid': -1, messageTimestamp: -1 }); + + collection.createIndex( + { + 'message.templateMessage.hydratedFourRowTemplate.hydratedContentText': 'text', + 'message.templateMessage.hydratedFourRowTemplate.hydratedFooterText': 'text', + 'message.templateMessage.hydratedFourRowTemplate.hydratedTitleText': 'text', + 'message.templateMessage.hydratedTemplate.hydratedContentText': 'text', + 'message.templateMessage.hydratedTemplate.hydratedFooterText': 'text', + 'message.templateMessage.hydratedTemplate.hydratedTitleText': 'text', + 'message.conversation': 'text', + 'message.extendedTextMessage.text': 'text', + 'message.imageMessage.caption': 'text', + 'message.videoMessage.caption': 'text', + 'message.stickerMessage.caption': 'text', + 'message.documentMessage.caption': 'text', + 'message.documentWithCaptionMessage.caption': 'text', + 'message.audioMessage.caption': 'text', + 'message.viewOnceMessage.caption': 'text', + 'message.viewOnceMessageV2.caption': 'text', + }, + { + default_language: 'none', + }, + ); + + process.exit(0); +})().catch((error) => { + console.error('An error occurred:', error); + dbserver.getClient().close(); +}); diff --git a/src/validate/validate.schema.ts b/src/validate/validate.schema.ts index 532ae2f9..9f3715d0 100644 --- a/src/validate/validate.schema.ts +++ b/src/validate/validate.schema.ts @@ -1003,6 +1003,7 @@ export const settingsSchema: JSONSchema7 = { read_status: { type: 'boolean', enum: [true, false] }, sync_full_history: { type: 'boolean', enum: [true, false] }, ignore_list: { type: 'array', items: { type: 'string' } }, + initial_connection: { type: 'integer', minLength: 1 }, }, required: ['reject_call', 'groups_ignore', 'always_online', 'read_messages', 'read_status', 'sync_full_history'], ...isNotEmpty('reject_call', 'groups_ignore', 'always_online', 'read_messages', 'read_status', 'sync_full_history'),