diff --git a/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts b/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts index c8d709b7..b970a558 100644 --- a/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts +++ b/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts @@ -154,78 +154,81 @@ class ChatwootImport { } } - public async importHistoryContacts(instance: InstanceDto, provider: ChatwootRaw) { + + public async importHistoryContacts(instance: InstanceDto, provider: ChatwootRaw) { try { - if (this.getHistoryMessagesLenght(instance) > 0) { - return; - } - - const pgClient = postgresClient.getChatwootConnection(); - - let totalContactsImported = 0; - - const contacts = this.historyContacts.get(instance.instanceName) || []; - if (contacts.length === 0) { - return 0; - } - - let contactsChunk: ContactRaw[] = this.sliceIntoChunks(contacts, 3000); - // Inserindo o label uma única vez - await this.insertLabel(instance.instanceName, Number(provider.account_id)); - const tagId = await this.insertTag(instance.instanceName, contacts.length); - - const contactIds: number[] = []; - - while (contactsChunk.length > 0) { - // Inserindo contatos no banco de dados Chatwoot - let sqlInsert = `INSERT INTO contacts - (name, phone_number, account_id, identifier, created_at, updated_at) VALUES `; - const bindInsert = [provider.account_id]; - - for (const contact of contactsChunk) { - bindInsert.push(contact.pushName); - const bindName = `$${bindInsert.length}`; - - bindInsert.push(`+${contact.id.split('@')[0]}`); - const bindPhoneNumber = `$${bindInsert.length}`; - - bindInsert.push(contact.id); - const bindIdentifier = `$${bindInsert.length}`; - - sqlInsert += `(${bindName}, ${bindPhoneNumber}, $1, ${bindIdentifier}, NOW(), NOW()),`; + if (this.getHistoryMessagesLenght(instance) > 0) { + return; } - if (sqlInsert.slice(-1) === ',') { - sqlInsert = sqlInsert.slice(0, -1); - } - sqlInsert += ` ON CONFLICT (identifier, account_id) - DO UPDATE SET - name = EXCLUDED.name, - phone_number = EXCLUDED.phone_number, - identifier = EXCLUDED.identifier - RETURNING id`; + const pgClient = postgresClient.getChatwootConnection(); - const result = await pgClient.query(sqlInsert, bindInsert); - totalContactsImported += result?.rowCount ?? 0; + let totalContactsImported = 0; - // Coletando os IDs dos contatos inseridos ou atualizados - for (const row of result.rows) { - contactIds.push(row.id); + const contacts = this.historyContacts.get(instance.instanceName) || []; + if (contacts.length === 0) { + return 0; } - contactsChunk = this.sliceIntoChunks(contacts, 3000); - } + let contactsChunk: ContactRaw[] = this.sliceIntoChunks(contacts, 3000); + // Inserindo o label uma única vez + await this.insertLabel(instance.instanceName, Number(provider.account_id)); + const tagId = await this.insertTag(instance.instanceName, contacts.length); + + const contactIds: number[] = []; - // Após inserir todos os contatos, inserir dados na tabela taggings - await this.insertTaggings(instance.instanceName, tagId, contactIds); + while (contactsChunk.length > 0) { + // Inserindo contatos no banco de dados Chatwoot + let sqlInsert = `INSERT INTO contacts + (name, phone_number, account_id, identifier, created_at, updated_at) VALUES `; + const bindInsert = [provider.account_id]; - this.deleteHistoryContacts(instance); + for (const contact of contactsChunk) { + bindInsert.push(contact.pushName); + const bindName = `$${bindInsert.length}`; - return totalContactsImported; + bindInsert.push(`+${contact.id.split('@')[0]}`); + const bindPhoneNumber = `$${bindInsert.length}`; + + bindInsert.push(contact.id); + const bindIdentifier = `$${bindInsert.length}`; + + sqlInsert += `(${bindName}, ${bindPhoneNumber}, $1, ${bindIdentifier}, NOW(), NOW()),`; + } + + if (sqlInsert.slice(-1) === ',') { + sqlInsert = sqlInsert.slice(0, -1); + } + sqlInsert += ` ON CONFLICT (identifier, account_id) + DO UPDATE SET + name = EXCLUDED.name, + phone_number = EXCLUDED.phone_number, + identifier = EXCLUDED.identifier + RETURNING id`; + + const result = await pgClient.query(sqlInsert, bindInsert); + totalContactsImported += result?.rowCount ?? 0; + + // Coletando os IDs dos contatos inseridos ou atualizados + for (const row of result.rows) { + contactIds.push(row.id); + } + + contactsChunk = this.sliceIntoChunks(contacts, 3000); + } + + // Após inserir todos os contatos, inserir dados na tabela taggings + await this.insertTaggings(instance.instanceName, tagId, contactIds); + + this.deleteHistoryContacts(instance); + + return totalContactsImported; } catch (error) { - this.logger.error(`Error on import history contacts: ${error.toString()}`); + this.logger.error(`Error on import history contacts: ${error.toString()}`); } - } +} + + public async importHistoryMessages( instance: InstanceDto, @@ -250,95 +253,316 @@ class ChatwootImport { // ordering messages by number and timestamp asc messagesOrdered.sort((a, b) => { - const phoneComparison = a.key.remoteJid.localeCompare(b.key.remoteJid); - return phoneComparison !== 0 ? phoneComparison : a.messageTimestamp - b.messageTimestamp; + return ( + parseInt(a.key.remoteJid) - parseInt(b.key.remoteJid) || + (a.messageTimestamp as number) - (b.messageTimestamp as number) + ); }); - const firstLastMessageTimestampsByNumber: Map = new Map(); + const allMessagesMappedByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesOrdered); + // Map structure: +552199999999 => { first message timestamp from number, last message timestamp from number} + const phoneNumbersWithTimestamp = new Map(); + allMessagesMappedByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => { + phoneNumbersWithTimestamp.set(phoneNumber, { + first: messages[0]?.messageTimestamp as number, + last: messages[messages.length - 1]?.messageTimestamp as number, + }); + }); - for (const message of messagesOrdered) { - const remoteJid = message.key.remoteJid; - const timestamp = message.messageTimestamp; + // processing messages in batch + const batchSize = 4000; + let messagesChunk: MessageRaw[] = this.sliceIntoChunks(messagesOrdered, batchSize); + while (messagesChunk.length > 0) { + // Map structure: +552199999999 => MessageRaw[] + const messagesByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesChunk); - if (!firstLastMessageTimestampsByNumber.has(remoteJid)) { - firstLastMessageTimestampsByNumber.set(remoteJid, { first: timestamp, last: timestamp }); - } else { - const entry = firstLastMessageTimestampsByNumber.get(remoteJid); - if (timestamp < entry.first) entry.first = timestamp; - if (timestamp > entry.last) entry.last = timestamp; - } - } - - for (const [phoneNumber, timestamps] of firstLastMessageTimestampsByNumber.entries()) { - const contact = await chatwootService.createContact( - phoneNumber, - provider.inbox_id, - instance.instanceName, - ); - - const conversation = await chatwootService.createConversation( - phoneNumber, - provider.inbox_id, - provider.account_id, - ); - - let chunkedMessages = this.sliceIntoChunks( - messagesOrdered.filter(m => m.key.remoteJid === phoneNumber), - 3000, - ); - - while (chunkedMessages.length > 0) { - const chunk = chunkedMessages.shift(); - const sqlInsert = ` - INSERT INTO messages (content, account_id, inbox_id, conversation_id, created_at, updated_at, source_id) - VALUES ${chunk - .map( - _ => - `('${_.message?.conversation || _.message?.extendedTextMessage?.text || ''}', - $1, $2, $3, to_timestamp($4), to_timestamp($5), $6)`, - ) - .join(',')} - ON CONFLICT (source_id) - DO NOTHING - `; - - const bindValues = [ - provider.account_id, - provider.inbox_id, - conversation.id, - ...chunk.flatMap(m => [m.messageTimestamp, m.messageTimestamp, m.key.id]), - ]; - - await pgClient.query(sqlInsert, bindValues); - totalMessagesImported += chunk.length; + if (messagesByPhoneNumber.size > 0) { + const fksByNumber = await this.selectOrCreateFksFromChatwoot( + provider, + inbox, + phoneNumbersWithTimestamp, + messagesByPhoneNumber, + ); + + // inserting messages in chatwoot db + let sqlInsertMsg = `INSERT INTO messages + (content, account_id, inbox_id, conversation_id, message_type, private, content_type, + sender_type, sender_id, created_at, updated_at) VALUES `; + const bindInsertMsg = [provider.account_id, inbox.id]; + + messagesByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => { + const fksChatwoot = fksByNumber.get(phoneNumber); + + messages.forEach((message) => { + if (!message.message) { + return; + } + + if (!fksChatwoot?.conversation_id || !fksChatwoot?.contact_id) { + return; + } + + const contentMessage = this.getContentMessage(chatwootService, message); + if (!contentMessage) { + return; + } + + bindInsertMsg.push(contentMessage); + const bindContent = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(fksChatwoot.conversation_id); + const bindConversationId = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.key.fromMe ? '1' : '0'); + const bindMessageType = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_type : 'Contact'); + const bindSenderType = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id); + const bindSenderId = `$${bindInsertMsg.length}`; + + bindInsertMsg.push(message.messageTimestamp as number); + const bindmessageTimestamp = `$${bindInsertMsg.length}`; + + sqlInsertMsg += `(${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0, + ${bindSenderType},${bindSenderId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`; + }); + }); + if (bindInsertMsg.length > 2) { + if (sqlInsertMsg.slice(-1) === ',') { + sqlInsertMsg = sqlInsertMsg.slice(0, -1); + } + totalMessagesImported += (await pgClient.query(sqlInsertMsg, bindInsertMsg))?.rowCount ?? 0; + } } + messagesChunk = this.sliceIntoChunks(messagesOrdered, batchSize); } this.deleteHistoryMessages(instance); + this.deleteRepositoryMessagesCache(instance); + + this.importHistoryContacts(instance, provider); return totalMessagesImported; } catch (error) { this.logger.error(`Error on import history messages: ${error.toString()}`); + + this.deleteHistoryMessages(instance); + this.deleteRepositoryMessagesCache(instance); } } - public sliceIntoChunks(arr: T[], chunkSize: number): T[] { - const res = []; - for (let i = 0; i < arr.length; i += chunkSize) { - const chunk = arr.slice(i, i + chunkSize); - res.push(chunk); - } - return res; - } - - private async getChatwootUser(provider: ChatwootRaw): Promise { + public async selectOrCreateFksFromChatwoot( + provider: ChatwootRaw, + inbox: inbox, + phoneNumbersWithTimestamp: Map, + messagesByPhoneNumber: Map, + ): Promise> { const pgClient = postgresClient.getChatwootConnection(); - const sql = ` - SELECT user_id, user_type FROM users - WHERE email = $1 - `; - const result = await pgClient.query(sql, [provider.email]); - return result.rows[0]; + + const bindValues = [provider.account_id, inbox.id]; + const phoneNumberBind = Array.from(messagesByPhoneNumber.keys()) + .map((phoneNumber) => { + const phoneNumberTimestamp = phoneNumbersWithTimestamp.get(phoneNumber); + + if (phoneNumberTimestamp) { + bindValues.push(phoneNumber); + let bindStr = `($${bindValues.length},`; + + bindValues.push(phoneNumberTimestamp.first); + bindStr += `$${bindValues.length},`; + + bindValues.push(phoneNumberTimestamp.last); + return `${bindStr}$${bindValues.length})`; + } + }) + .join(','); + + // select (or insert when necessary) data from tables contacts, contact_inboxes, conversations from chatwoot db + const sqlFromChatwoot = `WITH + phone_number AS ( + SELECT phone_number, created_at::INTEGER, last_activity_at::INTEGER FROM ( + VALUES + ${phoneNumberBind} + ) as t (phone_number, created_at, last_activity_at) + ), + + only_new_phone_number AS ( + SELECT * FROM phone_number + WHERE phone_number NOT IN ( + SELECT phone_number + FROM contacts + JOIN contact_inboxes ci ON ci.contact_id = contacts.id AND ci.inbox_id = $2 + JOIN conversations con ON con.contact_inbox_id = ci.id + AND con.account_id = $1 + AND con.inbox_id = $2 + AND con.contact_id = contacts.id + WHERE contacts.account_id = $1 + ) + ), + + new_contact AS ( + INSERT INTO contacts (name, phone_number, account_id, identifier, created_at, updated_at) + SELECT REPLACE(p.phone_number, '+', ''), p.phone_number, $1, CONCAT(REPLACE(p.phone_number, '+', ''), + '@s.whatsapp.net'), to_timestamp(p.created_at), to_timestamp(p.last_activity_at) + FROM only_new_phone_number AS p + ON CONFLICT(identifier, account_id) DO UPDATE SET updated_at = EXCLUDED.updated_at + RETURNING id, phone_number, created_at, updated_at + ), + + new_contact_inbox AS ( + INSERT INTO contact_inboxes (contact_id, inbox_id, source_id, created_at, updated_at) + SELECT new_contact.id, $2, gen_random_uuid(), new_contact.created_at, new_contact.updated_at + FROM new_contact + RETURNING id, contact_id, created_at, updated_at + ), + + new_conversation AS ( + INSERT INTO conversations (account_id, inbox_id, status, contact_id, + contact_inbox_id, uuid, last_activity_at, created_at, updated_at) + SELECT $1, $2, 0, new_contact_inbox.contact_id, new_contact_inbox.id, gen_random_uuid(), + new_contact_inbox.updated_at, new_contact_inbox.created_at, new_contact_inbox.updated_at + FROM new_contact_inbox + RETURNING id, contact_id + ) + + SELECT new_contact.phone_number, new_conversation.contact_id, new_conversation.id AS conversation_id + FROM new_conversation + JOIN new_contact ON new_conversation.contact_id = new_contact.id + + UNION + + SELECT p.phone_number, c.id contact_id, con.id conversation_id + FROM phone_number p + JOIN contacts c ON c.phone_number = p.phone_number + JOIN contact_inboxes ci ON ci.contact_id = c.id AND ci.inbox_id = $2 + JOIN conversations con ON con.contact_inbox_id = ci.id AND con.account_id = $1 + AND con.inbox_id = $2 AND con.contact_id = c.id`; + + const fksFromChatwoot = await pgClient.query(sqlFromChatwoot, bindValues); + + return new Map(fksFromChatwoot.rows.map((item: FksChatwoot) => [item.phone_number, item])); + } + + public async getChatwootUser(provider: ChatwootRaw): Promise { + try { + const pgClient = postgresClient.getChatwootConnection(); + + const sqlUser = `SELECT owner_type AS user_type, owner_id AS user_id + FROM access_tokens + WHERE token = $1`; + + return (await pgClient.query(sqlUser, [provider.token]))?.rows[0] || false; + } catch (error) { + this.logger.error(`Error on getChatwootUser: ${error.toString()}`); + } + } + + public createMessagesMapByPhoneNumber(messages: MessageRaw[]): Map { + return messages.reduce((acc: Map, message: MessageRaw) => { + if (!this.isIgnorePhoneNumber(message?.key?.remoteJid)) { + const phoneNumber = message?.key?.remoteJid?.split('@')[0]; + if (phoneNumber) { + const phoneNumberPlus = `+${phoneNumber}`; + const messages = acc.has(phoneNumberPlus) ? acc.get(phoneNumberPlus) : []; + messages.push(message); + acc.set(phoneNumberPlus, messages); + } + } + + return acc; + }, new Map()); + } + + public async getContactsOrderByRecentConversations( + inbox: inbox, + provider: ChatwootRaw, + limit = 50, + ): Promise<{ id: number; phone_number: string; identifier: string }[]> { + try { + const pgClient = postgresClient.getChatwootConnection(); + + const sql = `SELECT contacts.id, contacts.identifier, contacts.phone_number + FROM conversations + JOIN contacts ON contacts.id = conversations.contact_id + WHERE conversations.account_id = $1 + AND inbox_id = $2 + ORDER BY conversations.last_activity_at DESC + LIMIT $3`; + + return (await pgClient.query(sql, [provider.account_id, inbox.id, limit]))?.rows; + } catch (error) { + this.logger.error(`Error on get recent conversations: ${error.toString()}`); + } + } + + public getContentMessage(chatwootService: ChatwootService, msg: IWebMessageInfo) { + const contentMessage = chatwootService.getConversationMessage(msg.message); + if (contentMessage) { + return contentMessage; + } + + if (!configService.get('CHATWOOT').IMPORT.PLACEHOLDER_MEDIA_MESSAGE) { + return ''; + } + + const types = { + documentMessage: msg.message.documentMessage, + documentWithCaptionMessage: msg.message.documentWithCaptionMessage?.message?.documentMessage, + imageMessage: msg.message.imageMessage, + videoMessage: msg.message.videoMessage, + audioMessage: msg.message.audioMessage, + stickerMessage: msg.message.stickerMessage, + templateMessage: msg.message.templateMessage?.hydratedTemplate?.hydratedContentText, + }; + const typeKey = Object.keys(types).find((key) => types[key] !== undefined); + + switch (typeKey) { + case 'documentMessage': + return `__`; + + case 'documentWithCaptionMessage': + return `__`; + + case 'templateMessage': + return msg.message.templateMessage.hydratedTemplate.hydratedTitleText + ? `*${msg.message.templateMessage.hydratedTemplate.hydratedTitleText}*\\n` + : '' + msg.message.templateMessage.hydratedTemplate.hydratedContentText; + + case 'imageMessage': + return '__'; + + case 'videoMessage': + return '_