diff --git a/package.json b/package.json index 6f976f43..3c26f4de 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,7 @@ "node-mime-types": "^1.1.0", "node-windows": "^1.0.0-beta.8", "parse-bmfont-xml": "^1.1.4", + "pg": "^8.11.3", "pino": "^8.11.0", "qrcode": "^1.5.1", "qrcode-terminal": "^0.12.0", @@ -112,4 +113,4 @@ "ts-node-dev": "^2.0.0", "typescript": "^4.9.5" } -} +} \ No newline at end of file diff --git a/src/config/env.config.ts b/src/config/env.config.ts index fd6187ab..22ea9b21 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -149,7 +149,18 @@ export type Webhook = { GLOBAL?: GlobalWebhook; EVENTS: EventsWebhook }; export type ConfigSessionPhone = { CLIENT: string; NAME: string }; export type QrCode = { LIMIT: number; COLOR: string }; export type Typebot = { API_VERSION: string; KEEP_OPEN: boolean }; -export type ChatWoot = { MESSAGE_DELETE: boolean }; +export type Chatwoot = { + MESSAGE_DELETE: boolean; + IMPORT: { + DATABASE: { + CONNECTION: { + URI: string; + }; + }; + PLACEHOLDER_MEDIA_MESSAGE: boolean; + }; +}; + export type CacheConf = { REDIS: CacheConfRedis; LOCAL: CacheConfLocal }; export type Production = boolean; @@ -171,7 +182,7 @@ export interface Env { CONFIG_SESSION_PHONE: ConfigSessionPhone; QRCODE: QrCode; TYPEBOT: Typebot; - CHATWOOT: ChatWoot; + CHATWOOT: Chatwoot; CACHE: CacheConf; AUTHENTICATION: Auth; PRODUCTION?: Production; @@ -338,6 +349,14 @@ export class ConfigService { }, CHATWOOT: { MESSAGE_DELETE: process.env.CHATWOOT_MESSAGE_DELETE === 'false', + IMPORT: { + DATABASE: { + CONNECTION: { + URI: process.env.CHATWOOT_DATABASE_CONNECTION_URI || '', + }, + }, + PLACEHOLDER_MEDIA_MESSAGE: process.env?.CHATWOOT_IMPORT_PLACEHOLDER_MEDIA_MESSAGE === 'true', + }, }, CACHE: { REDIS: { diff --git a/src/dev-env.yml b/src/dev-env.yml index 6f09f6ca..07e71453 100644 --- a/src/dev-env.yml +++ b/src/dev-env.yml @@ -153,10 +153,16 @@ TYPEBOT: API_VERSION: 'old' # old | latest KEEP_OPEN: false -# If you leave this option as false, when deleting the message for everyone on WhatsApp, it will not be deleted on Chatwoot. CHATWOOT: + # If you leave this option as false, when deleting the message for everyone on WhatsApp, it will not be deleted on Chatwoot. MESSAGE_DELETE: true # false | true - + IMPORT: + # This db connection is used to import messages from whatsapp to chatwoot database + DATABASE: + CONNECTION: + URI: "postgres://user:password@hostname:port/dbname" + PLACEHOLDER_MEDIA_MESSAGE: true + # Cache to optimize application performance CACHE: REDIS: diff --git a/src/docs/swagger.yaml b/src/docs/swagger.yaml index d3301cae..b5b27c38 100644 --- a/src/docs/swagger.yaml +++ b/src/docs/swagger.yaml @@ -2076,6 +2076,9 @@ paths: read_status: type: boolean description: "Indicates whether to mark status messages as read." + sync_full_history: + type: boolean + description: "Indicates whether to request a full history messages sync on connect." parameters: - name: instanceName in: path @@ -2141,6 +2144,15 @@ paths: conversation_pending: type: boolean description: "Indicates whether to mark conversations as pending." + import_contacts: + type: boolean + description: "Indicates whether to import contacts from phone to Chatwoot when connecting." + import_messages: + type: boolean + description: "Indicates whether to import messages from phone to Chatwoot when connecting." + days_limit_import_messages: + type: number + description: "Indicates number of days to limit messages imported to Chatwoot." parameters: - name: instanceName in: path diff --git a/src/libs/postgres.client.ts b/src/libs/postgres.client.ts new file mode 100644 index 00000000..d1a68cdf --- /dev/null +++ b/src/libs/postgres.client.ts @@ -0,0 +1,49 @@ +import postgresql from 'pg'; + +import { Chatwoot, configService } from '../config/env.config'; +import { Logger } from '../config/logger.config'; + +const { Pool } = postgresql; + +class Postgres { + private logger = new Logger(Postgres.name); + private pool; + private connected = false; + + getConnection(connectionString: string) { + if (this.connected) { + return this.pool; + } else { + this.pool = new Pool({ + connectionString, + ssl: { + rejectUnauthorized: false, + }, + }); + + this.pool.on('error', () => { + this.logger.error('postgres disconnected'); + this.connected = false; + }); + + try { + this.logger.verbose('connecting new postgres'); + this.connected = true; + } catch (e) { + this.connected = false; + this.logger.error('postgres connect exception caught: ' + e); + return null; + } + + return this.pool; + } + } + + getChatwootConnection() { + const uri = configService.get('CHATWOOT').IMPORT.DATABASE.CONNECTION.URI; + + return this.getConnection(uri); + } +} + +export const postgresClient = new Postgres(); diff --git a/src/utils/chatwoot-import-helper.ts b/src/utils/chatwoot-import-helper.ts new file mode 100644 index 00000000..3283683f --- /dev/null +++ b/src/utils/chatwoot-import-helper.ts @@ -0,0 +1,472 @@ +import { inbox } from '@figuro/chatwoot-sdk'; +import { proto } from '@whiskeysockets/baileys'; + +import { Chatwoot, configService } from '../config/env.config'; +import { Logger } from '../config/logger.config'; +import { postgresClient } from '../libs/postgres.client'; +import { InstanceDto } from '../whatsapp/dto/instance.dto'; +import { ChatwootRaw, ContactRaw, MessageRaw } from '../whatsapp/models'; +import { ChatwootService } from '../whatsapp/services/chatwoot.service'; + +type ChatwootUser = { + user_type: string; + user_id: number; +}; + +type FksChatwoot = { + phone_number: string; + contact_id: string; + conversation_id: string; +}; + +type firstLastTimestamp = { + first: number; + last: number; +}; + +type IWebMessageInfo = Omit & Partial>; + +class ChatwootImport { + private logger = new Logger(ChatwootImport.name); + private repositoryMessagesCache = new Map>(); + private historyMessages = new Map(); + private historyContacts = new Map(); + + public getRepositoryMessagesCache(instance: InstanceDto) { + return this.repositoryMessagesCache.has(instance.instanceName) + ? this.repositoryMessagesCache.get(instance.instanceName) + : null; + } + + public setRepositoryMessagesCache(instance: InstanceDto, repositoryMessagesCache: Set) { + this.repositoryMessagesCache.set(instance.instanceName, repositoryMessagesCache); + } + + public deleteRepositoryMessagesCache(instance: InstanceDto) { + this.repositoryMessagesCache.delete(instance.instanceName); + } + + public addHistoryMessages(instance: InstanceDto, messagesRaw: MessageRaw[]) { + const actualValue = this.historyMessages.has(instance.instanceName) + ? this.historyMessages.get(instance.instanceName) + : []; + this.historyMessages.set(instance.instanceName, actualValue.concat(messagesRaw)); + } + + public addHistoryContacts(instance: InstanceDto, contactsRaw: ContactRaw[]) { + const actualValue = this.historyContacts.has(instance.instanceName) + ? this.historyContacts.get(instance.instanceName) + : []; + this.historyContacts.set(instance.instanceName, actualValue.concat(contactsRaw)); + } + + public deleteHistoryMessages(instance: InstanceDto) { + this.historyMessages.delete(instance.instanceName); + } + + public deleteHistoryContacts(instance: InstanceDto) { + this.historyContacts.delete(instance.instanceName); + } + + public clearAll(instance: InstanceDto) { + this.deleteRepositoryMessagesCache(instance); + this.deleteHistoryMessages(instance); + this.deleteHistoryContacts(instance); + } + + public getHistoryMessagesLenght(instance: InstanceDto) { + return this.historyMessages.get(instance.instanceName)?.length ?? 0; + } + + public async importHistoryContacts(instance: InstanceDto, provider: ChatwootRaw) { + try { + if (this.getHistoryMessagesLenght(instance) > 0) { + return; + } + + const pgClient = postgresClient.getChatwootConnection(); + + let totalContactsImported = 0; + + const contacts = this.historyContacts.get(instance.instanceName) || []; + if (contacts.length === 0) { + return 0; + } + + let contactsChunk: ContactRaw[] = this.sliceIntoChunks(contacts, 3000); + while (contactsChunk.length > 0) { + // inserting contacts in chatwoot db + let sqlInsert = `INSERT INTO contacts + (name, phone_number, account_id, identifier, created_at, updated_at) VALUES `; + const bindInsert = [provider.account_id]; + + for (const contact of contactsChunk) { + bindInsert.push(contact.pushName); + const bindName = `$${bindInsert.length}`; + + bindInsert.push(`+${contact.id.split('@')[0]}`); + const bindPhoneNumber = `$${bindInsert.length}`; + + bindInsert.push(contact.id); + const bindIdentifier = `$${bindInsert.length}`; + + sqlInsert += `(${bindName}, ${bindPhoneNumber}, $1, ${bindIdentifier}, NOW(), NOW()),`; + } + if (sqlInsert.slice(-1) === ',') { + sqlInsert = sqlInsert.slice(0, -1); + } + sqlInsert += ` ON CONFLICT (identifier, account_id) + DO UPDATE SET + name = EXCLUDED.name, + phone_number = EXCLUDED.phone_number, + identifier = EXCLUDED.identifier`; + + totalContactsImported += (await pgClient.query(sqlInsert, bindInsert))?.rowCount ?? 0; + contactsChunk = this.sliceIntoChunks(contacts, 3000); + } + + this.deleteHistoryContacts(instance); + + return totalContactsImported; + } catch (error) { + this.logger.error(`Error on import history contacts: ${error.toString()}`); + } + } + + public async importHistoryMessages( + instance: InstanceDto, + chatwootService: ChatwootService, + inbox: inbox, + provider: ChatwootRaw, + ) { + try { + const pgClient = postgresClient.getChatwootConnection(); + + const chatwootUser = await this.getChatwootUser(provider); + if (!chatwootUser) { + throw new Error('User not found to import messages.'); + } + + let totalMessagesImported = 0; + + const messagesOrdered = this.historyMessages.get(instance.instanceName) || []; + if (messagesOrdered.length === 0) { + return 0; + } + + // ordering messages by number and timestamp asc + messagesOrdered.sort((a, b) => { + return ( + parseInt(a.key.remoteJid) - parseInt(b.key.remoteJid) || + (a.messageTimestamp as number) - (b.messageTimestamp as number) + ); + }); + + const allMessagesMappedByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesOrdered); + // Map structure: +552199999999 => { first message timestamp from number, last message timestamp from number} + const phoneNumbersWithTimestamp = new Map(); + allMessagesMappedByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => { + phoneNumbersWithTimestamp.set(phoneNumber, { + first: messages[0]?.messageTimestamp as number, + last: messages[messages.length - 1]?.messageTimestamp as number, + }); + }); + + // processing messages in batch + const batchSize = 4000; + let messagesChunk: MessageRaw[] = this.sliceIntoChunks(messagesOrdered, batchSize); + while (messagesChunk.length > 0) { + // Map structure: +552199999999 => MessageRaw[] + const messagesByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesChunk); + + if (messagesByPhoneNumber.size > 0) { + const fksByNumber = await this.selectOrCreateFksFromChatwoot( + provider, + inbox, + phoneNumbersWithTimestamp, + messagesByPhoneNumber, + ); + + // inserting messages in chatwoot db + let sqlInsertMsg = `INSERT INTO messages + (content, account_id, inbox_id, conversation_id, message_type, private, content_type, + sender_type, sender_id, created_at, updated_at) VALUES `; + const bindInsertMsg = [provider.account_id, inbox.id]; + + messagesByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => { + const fksChatwoot = fksByNumber.get(phoneNumber); + + messages.forEach((message) => { + if (!message.message) { + return; + } + + if (!fksChatwoot?.conversation_id || !fksChatwoot?.contact_id) { + return; + } + + const contentMessage = this.getContentMessage(chatwootService, message); + if (!contentMessage) { + return; + } + + bindInsertMsg.push(contentMessage); + const bindContent = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(fksChatwoot.conversation_id); + const bindConversationId = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.key.fromMe ? '1' : '0'); + const bindMessageType = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_type : 'Contact'); + const bindSenderType = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id); + const bindSenderId = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.messageTimestamp as number); + const bindmessageTimestamp = `$${bindInsertMsg.length}`; + + sqlInsertMsg += `(${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0, + ${bindSenderType},${bindSenderId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`; + }); + }); + if (bindInsertMsg.length > 2) { + if (sqlInsertMsg.slice(-1) === ',') { + sqlInsertMsg = sqlInsertMsg.slice(0, -1); + } + totalMessagesImported += (await pgClient.query(sqlInsertMsg, bindInsertMsg))?.rowCount ?? 0; + } + } + messagesChunk = this.sliceIntoChunks(messagesOrdered, batchSize); + } + + this.deleteHistoryMessages(instance); + this.deleteRepositoryMessagesCache(instance); + + this.importHistoryContacts(instance, provider); + + return totalMessagesImported; + } catch (error) { + this.logger.error(`Error on import history messages: ${error.toString()}`); + + this.deleteHistoryMessages(instance); + this.deleteRepositoryMessagesCache(instance); + } + } + + public async selectOrCreateFksFromChatwoot( + provider: ChatwootRaw, + inbox: inbox, + phoneNumbersWithTimestamp: Map, + messagesByPhoneNumber: Map, + ): Promise> { + const pgClient = postgresClient.getChatwootConnection(); + + const bindValues = [provider.account_id, inbox.id]; + const phoneNumberBind = Array.from(messagesByPhoneNumber.keys()) + .map((phoneNumber) => { + const phoneNumberTimestamp = phoneNumbersWithTimestamp.get(phoneNumber); + + if (phoneNumberTimestamp) { + bindValues.push(phoneNumber); + let bindStr = `($${bindValues.length},`; + + bindValues.push(phoneNumberTimestamp.first); + bindStr += `$${bindValues.length},`; + + bindValues.push(phoneNumberTimestamp.last); + return `${bindStr}$${bindValues.length})`; + } + }) + .join(','); + + // select (or insert when necessary) data from tables contacts, contact_inboxes, conversations from chatwoot db + const sqlFromChatwoot = `WITH + phone_number AS ( + SELECT phone_number, created_at::INTEGER, last_activity_at::INTEGER FROM ( + VALUES + ${phoneNumberBind} + ) as t (phone_number, created_at, last_activity_at) + ), + + only_new_phone_number AS ( + SELECT * FROM phone_number + WHERE phone_number NOT IN ( + SELECT phone_number + FROM contacts + JOIN contact_inboxes ci ON ci.contact_id = contacts.id AND ci.inbox_id = $2 + JOIN conversations con ON con.contact_inbox_id = ci.id + AND con.account_id = $1 + AND con.inbox_id = $2 + AND con.contact_id = contacts.id + WHERE contacts.account_id = $1 + ) + ), + + new_contact AS ( + INSERT INTO contacts (name, phone_number, account_id, identifier, created_at, updated_at) + SELECT REPLACE(p.phone_number, '+', ''), p.phone_number, $1, CONCAT(REPLACE(p.phone_number, '+', ''), + '@s.whatsapp.net'), to_timestamp(p.created_at), to_timestamp(p.last_activity_at) + FROM only_new_phone_number AS p + ON CONFLICT(identifier, account_id) DO UPDATE SET updated_at = EXCLUDED.updated_at + RETURNING id, phone_number, created_at, updated_at + ), + + new_contact_inbox AS ( + INSERT INTO contact_inboxes (contact_id, inbox_id, source_id, created_at, updated_at) + SELECT new_contact.id, $2, gen_random_uuid(), new_contact.created_at, new_contact.updated_at + FROM new_contact + RETURNING id, contact_id, created_at, updated_at + ), + + new_conversation AS ( + INSERT INTO conversations (account_id, inbox_id, status, contact_id, + contact_inbox_id, uuid, last_activity_at, created_at, updated_at) + SELECT $1, $2, 0, new_contact_inbox.contact_id, new_contact_inbox.id, gen_random_uuid(), + new_contact_inbox.updated_at, new_contact_inbox.created_at, new_contact_inbox.updated_at + FROM new_contact_inbox + RETURNING id, contact_id + ) + + SELECT new_contact.phone_number, new_conversation.contact_id, new_conversation.id AS conversation_id + FROM new_conversation + JOIN new_contact ON new_conversation.contact_id = new_contact.id + + UNION + + SELECT p.phone_number, c.id contact_id, con.id conversation_id + FROM phone_number p + JOIN contacts c ON c.phone_number = p.phone_number + JOIN contact_inboxes ci ON ci.contact_id = c.id AND ci.inbox_id = $2 + JOIN conversations con ON con.contact_inbox_id = ci.id AND con.account_id = $1 + AND con.inbox_id = $2 AND con.contact_id = c.id`; + + const fksFromChatwoot = await pgClient.query(sqlFromChatwoot, bindValues); + + return new Map(fksFromChatwoot.rows.map((item: FksChatwoot) => [item.phone_number, item])); + } + + public async getChatwootUser(provider: ChatwootRaw): Promise { + try { + const pgClient = postgresClient.getChatwootConnection(); + + const sqlUser = `SELECT owner_type AS user_type, owner_id AS user_id + FROM access_tokens + WHERE token = $1`; + + return (await pgClient.query(sqlUser, [provider.token]))?.rows[0] || false; + } catch (error) { + this.logger.error(`Error on getChatwootUser: ${error.toString()}`); + } + } + + public createMessagesMapByPhoneNumber(messages: MessageRaw[]): Map { + return messages.reduce((acc: Map, message: MessageRaw) => { + if (!this.isIgnorePhoneNumber(message?.key?.remoteJid)) { + const phoneNumber = message?.key?.remoteJid?.split('@')[0]; + if (phoneNumber) { + const phoneNumberPlus = `+${phoneNumber}`; + const messages = acc.has(phoneNumberPlus) ? acc.get(phoneNumberPlus) : []; + messages.push(message); + acc.set(phoneNumberPlus, messages); + } + } + + return acc; + }, new Map()); + } + + public async getContactsOrderByRecentConversations( + inbox: inbox, + provider: ChatwootRaw, + limit = 50, + ): Promise<{ id: number; phone_number: string; identifier: string }[]> { + try { + const pgClient = postgresClient.getChatwootConnection(); + + const sql = `SELECT contacts.id, contacts.identifier, contacts.phone_number + FROM conversations + JOIN contacts ON contacts.id = conversations.contact_id + WHERE conversations.account_id = $1 + AND inbox_id = $2 + ORDER BY conversations.last_activity_at DESC + LIMIT $3`; + + return (await pgClient.query(sql, [provider.account_id, inbox.id, limit]))?.rows; + } catch (error) { + this.logger.error(`Error on get recent conversations: ${error.toString()}`); + } + } + + public getContentMessage(chatwootService: ChatwootService, msg: IWebMessageInfo) { + const contentMessage = chatwootService.getConversationMessage(msg.message); + if (contentMessage) { + return contentMessage; + } + + if (!configService.get('CHATWOOT').IMPORT.PLACEHOLDER_MEDIA_MESSAGE) { + return ''; + } + + const types = { + documentMessage: msg.message.documentMessage, + documentWithCaptionMessage: msg.message.documentWithCaptionMessage?.message?.documentMessage, + imageMessage: msg.message.imageMessage, + videoMessage: msg.message.videoMessage, + audioMessage: msg.message.audioMessage, + stickerMessage: msg.message.stickerMessage, + templateMessage: msg.message.templateMessage?.hydratedTemplate?.hydratedContentText, + }; + const typeKey = Object.keys(types).find((key) => types[key] !== undefined); + + switch (typeKey) { + case 'documentMessage': + return `__`; + + case 'documentWithCaptionMessage': + return `__`; + + case 'templateMessage': + return msg.message.templateMessage.hydratedTemplate.hydratedTitleText + ? `*${msg.message.templateMessage.hydratedTemplate.hydratedTitleText}*\\n` + : '' + msg.message.templateMessage.hydratedTemplate.hydratedContentText; + + case 'imageMessage': + return '__'; + + case 'videoMessage': + return '_