import { InstanceDto } from '@api/dto/instance.dto'; import { ChatwootDto } from '@api/integrations/chatbot/chatwoot/dto/chatwoot.dto'; import { postgresClient } from '@api/integrations/chatbot/chatwoot/libs/postgres.client'; import { ChatwootService } from '@api/integrations/chatbot/chatwoot/services/chatwoot.service'; import { Chatwoot, configService } from '@config/env.config'; import { Logger } from '@config/logger.config'; import { inbox } from '@figuro/chatwoot-sdk'; import { Chatwoot as ChatwootModel, Contact, Message } from '@prisma/client'; import { proto } from 'baileys'; type ChatwootUser = { user_type: string; user_id: number; }; type FksChatwoot = { phone_number: string; contact_id: string; conversation_id: string; }; type firstLastTimestamp = { first: number; last: number; }; type IWebMessageInfo = Omit & Partial>; class ChatwootImport { private logger = new Logger('ChatwootImport'); private repositoryMessagesCache = new Map>(); private historyMessages = new Map(); private historyContacts = new Map(); public getRepositoryMessagesCache(instance: InstanceDto) { return this.repositoryMessagesCache.has(instance.instanceName) ? this.repositoryMessagesCache.get(instance.instanceName) : null; } public setRepositoryMessagesCache(instance: InstanceDto, repositoryMessagesCache: Set) { this.repositoryMessagesCache.set(instance.instanceName, repositoryMessagesCache); } public deleteRepositoryMessagesCache(instance: InstanceDto) { this.repositoryMessagesCache.delete(instance.instanceName); } public addHistoryMessages(instance: InstanceDto, messagesRaw: Message[]) { const actualValue = this.historyMessages.has(instance.instanceName) ? this.historyMessages.get(instance.instanceName) : []; this.historyMessages.set(instance.instanceName, [...actualValue, ...messagesRaw]); } public addHistoryContacts(instance: InstanceDto, contactsRaw: Contact[]) { const actualValue = this.historyContacts.has(instance.instanceName) ? this.historyContacts.get(instance.instanceName) : []; this.historyContacts.set(instance.instanceName, actualValue.concat(contactsRaw)); } public deleteHistoryMessages(instance: InstanceDto) { this.historyMessages.delete(instance.instanceName); } public deleteHistoryContacts(instance: InstanceDto) { this.historyContacts.delete(instance.instanceName); } public clearAll(instance: InstanceDto) { this.deleteRepositoryMessagesCache(instance); this.deleteHistoryMessages(instance); this.deleteHistoryContacts(instance); } public getHistoryMessagesLenght(instance: InstanceDto) { return this.historyMessages.get(instance.instanceName)?.length ?? 0; } public async importHistoryContacts(instance: InstanceDto, provider: ChatwootDto) { try { if (this.getHistoryMessagesLenght(instance) > 0) { return; } const pgClient = postgresClient.getChatwootConnection(); let totalContactsImported = 0; const contacts = this.historyContacts.get(instance.instanceName) || []; if (contacts.length === 0) { return 0; } let contactsChunk: Contact[] = this.sliceIntoChunks(contacts, 3000); while (contactsChunk.length > 0) { const labelSql = `SELECT id FROM labels WHERE title = '${provider.nameInbox}' AND account_id = ${provider.accountId} LIMIT 1`; let labelId = (await pgClient.query(labelSql))?.rows[0]?.id; if (!labelId) { // creating label in chatwoot db and getting the id const sqlLabel = `INSERT INTO labels (title, color, show_on_sidebar, account_id, created_at, updated_at) VALUES ('${provider.nameInbox}', '#34039B', true, ${provider.accountId}, NOW(), NOW()) RETURNING id`; labelId = (await pgClient.query(sqlLabel))?.rows[0]?.id; } // inserting contacts in chatwoot db let sqlInsert = `INSERT INTO contacts (name, phone_number, account_id, identifier, created_at, updated_at) VALUES `; const bindInsert = [provider.accountId]; for (const contact of contactsChunk) { bindInsert.push(contact.pushName); const bindName = `$${bindInsert.length}`; bindInsert.push(`+${contact.remoteJid.split('@')[0]}`); const bindPhoneNumber = `$${bindInsert.length}`; bindInsert.push(contact.remoteJid); const bindIdentifier = `$${bindInsert.length}`; sqlInsert += `(${bindName}, ${bindPhoneNumber}, $1, ${bindIdentifier}, NOW(), NOW()),`; } if (sqlInsert.slice(-1) === ',') { sqlInsert = sqlInsert.slice(0, -1); } sqlInsert += ` ON CONFLICT (identifier, account_id) DO UPDATE SET name = EXCLUDED.name, phone_number = EXCLUDED.phone_number, identifier = EXCLUDED.identifier`; totalContactsImported += (await pgClient.query(sqlInsert, bindInsert))?.rowCount ?? 0; const sqlTags = `SELECT id FROM tags WHERE name = '${provider.nameInbox}' LIMIT 1`; const tagData = (await pgClient.query(sqlTags))?.rows[0]; let tagId = tagData?.id; const sqlTag = `INSERT INTO tags (name, taggings_count) VALUES ('${provider.nameInbox}', ${totalContactsImported}) ON CONFLICT (name) DO UPDATE SET taggings_count = tags.taggings_count + ${totalContactsImported} RETURNING id`; tagId = (await pgClient.query(sqlTag))?.rows[0]?.id; await pgClient.query(sqlTag); let sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at) VALUES `; contactsChunk.forEach((contact) => { const bindTaggableId = `(SELECT id FROM contacts WHERE identifier = '${contact.remoteJid}' AND account_id = ${provider.accountId})`; sqlInsertLabel += `($1, $2, ${bindTaggableId}, $3, NOW()),`; }); if (sqlInsertLabel.slice(-1) === ',') { sqlInsertLabel = sqlInsertLabel.slice(0, -1); } await pgClient.query(sqlInsertLabel, [tagId, 'Contact', 'labels']); contactsChunk = this.sliceIntoChunks(contacts, 3000); } this.deleteHistoryContacts(instance); return totalContactsImported; } catch (error) { this.logger.error(`Error on import history contacts: ${error.toString()}`); } } public async getExistingSourceIds(sourceIds: string[], conversationId?: number): Promise> { try { const existingSourceIdsSet = new Set(); if (sourceIds.length === 0) { return existingSourceIdsSet; } const formattedSourceIds = sourceIds.map((sourceId) => `WAID:${sourceId.replace('WAID:', '')}`); const pgClient = postgresClient.getChatwootConnection(); const params = conversationId ? [formattedSourceIds, conversationId] : [formattedSourceIds]; const query = conversationId ? 'SELECT source_id FROM messages WHERE source_id = ANY($1) AND conversation_id = $2' : 'SELECT source_id FROM messages WHERE source_id = ANY($1)'; const result = await pgClient.query(query, params); for (const row of result.rows) { existingSourceIdsSet.add(row.source_id); } return existingSourceIdsSet; } catch (error) { this.logger.error(`Error on getExistingSourceIds: ${error.toString()}`); return new Set(); } } public async importHistoryMessages( instance: InstanceDto, chatwootService: ChatwootService, inbox: inbox, provider: ChatwootModel, ) { try { const pgClient = postgresClient.getChatwootConnection(); const chatwootUser = await this.getChatwootUser(provider); if (!chatwootUser) { throw new Error('User not found to import messages.'); } let totalMessagesImported = 0; let messagesOrdered = this.historyMessages.get(instance.instanceName) || []; if (messagesOrdered.length === 0) { return 0; } // ordering messages by number and timestamp asc messagesOrdered.sort((a, b) => { const aKey = a.key as { remoteJid: string; }; const bKey = b.key as { remoteJid: string; }; const aMessageTimestamp = a.messageTimestamp as any as number; const bMessageTimestamp = b.messageTimestamp as any as number; return parseInt(aKey.remoteJid) - parseInt(bKey.remoteJid) || aMessageTimestamp - bMessageTimestamp; }); const allMessagesMappedByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesOrdered); // Map structure: +552199999999 => { first message timestamp from number, last message timestamp from number} const phoneNumbersWithTimestamp = new Map(); allMessagesMappedByPhoneNumber.forEach((messages: Message[], phoneNumber: string) => { phoneNumbersWithTimestamp.set(phoneNumber, { first: messages[0]?.messageTimestamp as any as number, last: messages[messages.length - 1]?.messageTimestamp as any as number, }); }); const existingSourceIds = await this.getExistingSourceIds(messagesOrdered.map((message: any) => message.key.id)); messagesOrdered = messagesOrdered.filter((message: any) => !existingSourceIds.has(message.key.id)); // processing messages in batch const batchSize = 4000; let messagesChunk: Message[] = this.sliceIntoChunks(messagesOrdered, batchSize); while (messagesChunk.length > 0) { // Map structure: +552199999999 => Message[] const messagesByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesChunk); if (messagesByPhoneNumber.size > 0) { const fksByNumber = await this.selectOrCreateFksFromChatwoot( provider, inbox, phoneNumbersWithTimestamp, messagesByPhoneNumber, ); // inserting messages in chatwoot db let sqlInsertMsg = `INSERT INTO messages (content, processed_message_content, account_id, inbox_id, conversation_id, message_type, private, content_type, sender_type, sender_id, source_id, created_at, updated_at) VALUES `; const bindInsertMsg = [provider.accountId, inbox.id]; messagesByPhoneNumber.forEach((messages: any[], phoneNumber: string) => { const fksChatwoot = fksByNumber.get(phoneNumber); messages.forEach((message) => { if (!message.message) { return; } if (!fksChatwoot?.conversation_id || !fksChatwoot?.contact_id) { return; } const contentMessage = this.getContentMessage(chatwootService, message); if (!contentMessage) { return; } bindInsertMsg.push(contentMessage); const bindContent = `$${bindInsertMsg.length}`; bindInsertMsg.push(fksChatwoot.conversation_id); const bindConversationId = `$${bindInsertMsg.length}`; bindInsertMsg.push(message.key.fromMe ? '1' : '0'); const bindMessageType = `$${bindInsertMsg.length}`; bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_type : 'Contact'); const bindSenderType = `$${bindInsertMsg.length}`; bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id); const bindSenderId = `$${bindInsertMsg.length}`; bindInsertMsg.push('WAID:' + message.key.id); const bindSourceId = `$${bindInsertMsg.length}`; bindInsertMsg.push(message.messageTimestamp as number); const bindmessageTimestamp = `$${bindInsertMsg.length}`; sqlInsertMsg += `(${bindContent}, ${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0, ${bindSenderType},${bindSenderId},${bindSourceId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`; }); }); if (bindInsertMsg.length > 2) { if (sqlInsertMsg.slice(-1) === ',') { sqlInsertMsg = sqlInsertMsg.slice(0, -1); } totalMessagesImported += (await pgClient.query(sqlInsertMsg, bindInsertMsg))?.rowCount ?? 0; } } messagesChunk = this.sliceIntoChunks(messagesOrdered, batchSize); } this.deleteHistoryMessages(instance); this.deleteRepositoryMessagesCache(instance); const providerData: ChatwootDto = { ...provider, ignoreJids: Array.isArray(provider.ignoreJids) ? provider.ignoreJids.map((event) => String(event)) : [], }; this.importHistoryContacts(instance, providerData); return totalMessagesImported; } catch (error) { this.logger.error(`Error on import history messages: ${error.toString()}`); this.deleteHistoryMessages(instance); this.deleteRepositoryMessagesCache(instance); } } public async selectOrCreateFksFromChatwoot( provider: ChatwootModel, inbox: inbox, phoneNumbersWithTimestamp: Map, messagesByPhoneNumber: Map, ): Promise> { const pgClient = postgresClient.getChatwootConnection(); const bindValues = [provider.accountId, inbox.id]; const phoneNumberBind = Array.from(messagesByPhoneNumber.keys()) .map((phoneNumber) => { const phoneNumberTimestamp = phoneNumbersWithTimestamp.get(phoneNumber); if (phoneNumberTimestamp) { bindValues.push(phoneNumber); let bindStr = `($${bindValues.length},`; bindValues.push(phoneNumberTimestamp.first); bindStr += `$${bindValues.length},`; bindValues.push(phoneNumberTimestamp.last); return `${bindStr}$${bindValues.length})`; } }) .join(','); // select (or insert when necessary) data from tables contacts, contact_inboxes, conversations from chatwoot db const sqlFromChatwoot = `WITH phone_number AS ( SELECT phone_number, created_at::INTEGER, last_activity_at::INTEGER FROM ( VALUES ${phoneNumberBind} ) as t (phone_number, created_at, last_activity_at) ), only_new_phone_number AS ( SELECT * FROM phone_number WHERE phone_number NOT IN ( SELECT phone_number FROM contacts JOIN contact_inboxes ci ON ci.contact_id = contacts.id AND ci.inbox_id = $2 JOIN conversations con ON con.contact_inbox_id = ci.id AND con.account_id = $1 AND con.inbox_id = $2 AND con.contact_id = contacts.id WHERE contacts.account_id = $1 ) ), new_contact AS ( INSERT INTO contacts (name, phone_number, account_id, identifier, created_at, updated_at) SELECT REPLACE(p.phone_number, '+', ''), p.phone_number, $1, CONCAT(REPLACE(p.phone_number, '+', ''), '@s.whatsapp.net'), to_timestamp(p.created_at), to_timestamp(p.last_activity_at) FROM only_new_phone_number AS p ON CONFLICT(identifier, account_id) DO UPDATE SET updated_at = EXCLUDED.updated_at RETURNING id, phone_number, created_at, updated_at ), new_contact_inbox AS ( INSERT INTO contact_inboxes (contact_id, inbox_id, source_id, created_at, updated_at) SELECT new_contact.id, $2, gen_random_uuid(), new_contact.created_at, new_contact.updated_at FROM new_contact RETURNING id, contact_id, created_at, updated_at ), new_conversation AS ( INSERT INTO conversations (account_id, inbox_id, status, contact_id, contact_inbox_id, uuid, last_activity_at, created_at, updated_at) SELECT $1, $2, 0, new_contact_inbox.contact_id, new_contact_inbox.id, gen_random_uuid(), new_contact_inbox.updated_at, new_contact_inbox.created_at, new_contact_inbox.updated_at FROM new_contact_inbox RETURNING id, contact_id ) SELECT new_contact.phone_number, new_conversation.contact_id, new_conversation.id AS conversation_id FROM new_conversation JOIN new_contact ON new_conversation.contact_id = new_contact.id UNION SELECT p.phone_number, c.id contact_id, con.id conversation_id FROM phone_number p JOIN contacts c ON c.phone_number = p.phone_number JOIN contact_inboxes ci ON ci.contact_id = c.id AND ci.inbox_id = $2 JOIN conversations con ON con.contact_inbox_id = ci.id AND con.account_id = $1 AND con.inbox_id = $2 AND con.contact_id = c.id`; const fksFromChatwoot = await pgClient.query(sqlFromChatwoot, bindValues); return new Map(fksFromChatwoot.rows.map((item: FksChatwoot) => [item.phone_number, item])); } public async getChatwootUser(provider: ChatwootModel): Promise { try { const pgClient = postgresClient.getChatwootConnection(); const sqlUser = `SELECT owner_type AS user_type, owner_id AS user_id FROM access_tokens WHERE token = $1`; return (await pgClient.query(sqlUser, [provider.token]))?.rows[0] || false; } catch (error) { this.logger.error(`Error on getChatwootUser: ${error.toString()}`); } } public createMessagesMapByPhoneNumber(messages: Message[]): Map { return messages.reduce((acc: Map, message: Message) => { const key = message?.key as { remoteJid: string; }; if (!this.isIgnorePhoneNumber(key?.remoteJid)) { const phoneNumber = key?.remoteJid?.split('@')[0]; if (phoneNumber) { const phoneNumberPlus = `+${phoneNumber}`; const messages = acc.has(phoneNumberPlus) ? acc.get(phoneNumberPlus) : []; messages.push(message); acc.set(phoneNumberPlus, messages); } } return acc; }, new Map()); } public async getContactsOrderByRecentConversations( inbox: inbox, provider: ChatwootModel, limit = 50, ): Promise<{ id: number; phone_number: string; identifier: string }[]> { try { const pgClient = postgresClient.getChatwootConnection(); const sql = `SELECT contacts.id, contacts.identifier, contacts.phone_number FROM conversations JOIN contacts ON contacts.id = conversations.contact_id WHERE conversations.account_id = $1 AND inbox_id = $2 ORDER BY conversations.last_activity_at DESC LIMIT $3`; return (await pgClient.query(sql, [provider.accountId, inbox.id, limit]))?.rows; } catch (error) { this.logger.error(`Error on get recent conversations: ${error.toString()}`); } } public getContentMessage(chatwootService: ChatwootService, msg: IWebMessageInfo) { const contentMessage = chatwootService.getConversationMessage(msg.message); if (contentMessage) { return contentMessage; } if (!configService.get('CHATWOOT').IMPORT.PLACEHOLDER_MEDIA_MESSAGE) { return ''; } const types = { documentMessage: msg.message.documentMessage, documentWithCaptionMessage: msg.message.documentWithCaptionMessage?.message?.documentMessage, imageMessage: msg.message.imageMessage, videoMessage: msg.message.videoMessage, audioMessage: msg.message.audioMessage, stickerMessage: msg.message.stickerMessage, templateMessage: msg.message.templateMessage?.hydratedTemplate?.hydratedContentText, }; const typeKey = Object.keys(types).find((key) => types[key] !== undefined && types[key] !== null); switch (typeKey) { case 'documentMessage': { const doc = msg.message.documentMessage; const fileName = doc?.fileName || 'document'; const caption = doc?.caption ? ` ${doc.caption}` : ''; return `__`; } case 'documentWithCaptionMessage': { const doc = msg.message.documentWithCaptionMessage?.message?.documentMessage; const fileName = doc?.fileName || 'document'; const caption = doc?.caption ? ` ${doc.caption}` : ''; return `__`; } case 'templateMessage': { const template = msg.message.templateMessage?.hydratedTemplate; return ( (template?.hydratedTitleText ? `*${template.hydratedTitleText}*\n` : '') + (template?.hydratedContentText || '') ); } case 'imageMessage': return '__'; case 'videoMessage': return '_