diff --git a/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts b/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts index dd0bb23a..c8d709b7 100644 --- a/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts +++ b/src/api/integrations/chatwoot/utils/chatwoot-import-helper.ts @@ -78,6 +78,82 @@ class ChatwootImport { return this.historyMessages.get(instance.instanceName)?.length ?? 0; } + public async insertLabel(instanceName: string, accountId: number) { + const pgClient = postgresClient.getChatwootConnection(); + const sqlCheckLabel = ` + SELECT 1 FROM labels WHERE title = $1 AND account_id = $2 + `; + const sqlInsertLabel = ` + INSERT INTO labels (title, description, color, show_on_sidebar, account_id, created_at, updated_at) + VALUES ($1, 'fonte origem do contato', '#2BB32F', TRUE, $2, NOW(), NOW()) + RETURNING * + `; + + try { + const checkResult = await pgClient.query(sqlCheckLabel, [instanceName, accountId]); + if (checkResult.rowCount === 0) { + const result = await pgClient.query(sqlInsertLabel, [instanceName, accountId]); + return result.rows[0]; + } else { + this.logger.info(`Label with title ${instanceName} already exists for account_id ${accountId}`); + return null; + } + } catch (error) { + this.logger.error(`Error on insert label: ${error.toString()}`); + } + } + + public async insertTag(instanceName: string, totalContacts: number) { + const pgClient = postgresClient.getChatwootConnection(); + const sqlCheckTag = ` + SELECT id FROM tags WHERE name = $1 + `; + const sqlInsertTag = ` + INSERT INTO tags (name, taggings_count) + VALUES ($1, $2) + RETURNING id + `; + + try { + const checkResult = await pgClient.query(sqlCheckTag, [instanceName]); + if (checkResult.rowCount === 0) { + const result = await pgClient.query(sqlInsertTag, [instanceName, totalContacts]); + return result.rows[0].id; + } else { + this.logger.info(`Tag with name ${instanceName} already exists`); + // Update taggings_count here + const updateSql = ` + UPDATE tags + SET taggings_count = taggings_count + $1 + WHERE name = $2 + `; + await pgClient.query(updateSql, [totalContacts, instanceName]); + return checkResult.rows[0].id; + } + } catch (error) { + this.logger.error(`Error on insert tag: ${error.toString()}`); + } + } + + public async insertTaggings(instanceName: string, tagId: number, contactIds: number[]) { + const pgClient = postgresClient.getChatwootConnection(); + const sqlInsertTaggings = ` + INSERT INTO taggings (tag_id, taggable_type, taggable_id, tagger_type, tagger_id, context, created_at) + VALUES ($1, 'Contact', $2, NULL, NULL, 'labels', NOW()) + `; + + try { + const bindValues = [tagId]; + for (const contactId of contactIds) { + bindValues.push(contactId); + await pgClient.query(sqlInsertTaggings, bindValues); + bindValues.pop(); // Remove the last added contactId + } + } catch (error) { + this.logger.error(`Error on insert taggings: ${error.toString()}`); + } + } + public async importHistoryContacts(instance: InstanceDto, provider: ChatwootRaw) { try { if (this.getHistoryMessagesLenght(instance) > 0) { @@ -94,8 +170,14 @@ class ChatwootImport { } let contactsChunk: ContactRaw[] = this.sliceIntoChunks(contacts, 3000); + // Inserindo o label uma única vez + await this.insertLabel(instance.instanceName, Number(provider.account_id)); + const tagId = await this.insertTag(instance.instanceName, contacts.length); + + const contactIds: number[] = []; + while (contactsChunk.length > 0) { - // inserting contacts in chatwoot db + // Inserindo contatos no banco de dados Chatwoot let sqlInsert = `INSERT INTO contacts (name, phone_number, account_id, identifier, created_at, updated_at) VALUES `; const bindInsert = [provider.account_id]; @@ -112,6 +194,7 @@ class ChatwootImport { sqlInsert += `(${bindName}, ${bindPhoneNumber}, $1, ${bindIdentifier}, NOW(), NOW()),`; } + if (sqlInsert.slice(-1) === ',') { sqlInsert = sqlInsert.slice(0, -1); } @@ -119,12 +202,23 @@ class ChatwootImport { DO UPDATE SET name = EXCLUDED.name, phone_number = EXCLUDED.phone_number, - identifier = EXCLUDED.identifier`; + identifier = EXCLUDED.identifier + RETURNING id`; + + const result = await pgClient.query(sqlInsert, bindInsert); + totalContactsImported += result?.rowCount ?? 0; + + // Coletando os IDs dos contatos inseridos ou atualizados + for (const row of result.rows) { + contactIds.push(row.id); + } - totalContactsImported += (await pgClient.query(sqlInsert, bindInsert))?.rowCount ?? 0; contactsChunk = this.sliceIntoChunks(contacts, 3000); } + // Após inserir todos os contatos, inserir dados na tabela taggings + await this.insertTaggings(instance.instanceName, tagId, contactIds); + this.deleteHistoryContacts(instance); return totalContactsImported; @@ -156,316 +250,95 @@ class ChatwootImport { // ordering messages by number and timestamp asc messagesOrdered.sort((a, b) => { - return ( - parseInt(a.key.remoteJid) - parseInt(b.key.remoteJid) || - (a.messageTimestamp as number) - (b.messageTimestamp as number) - ); + const phoneComparison = a.key.remoteJid.localeCompare(b.key.remoteJid); + return phoneComparison !== 0 ? phoneComparison : a.messageTimestamp - b.messageTimestamp; }); - const allMessagesMappedByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesOrdered); - // Map structure: +552199999999 => { first message timestamp from number, last message timestamp from number} - const phoneNumbersWithTimestamp = new Map(); - allMessagesMappedByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => { - phoneNumbersWithTimestamp.set(phoneNumber, { - first: messages[0]?.messageTimestamp as number, - last: messages[messages.length - 1]?.messageTimestamp as number, - }); - }); + const firstLastMessageTimestampsByNumber: Map = new Map(); - // processing messages in batch - const batchSize = 4000; - let messagesChunk: MessageRaw[] = this.sliceIntoChunks(messagesOrdered, batchSize); - while (messagesChunk.length > 0) { - // Map structure: +552199999999 => MessageRaw[] - const messagesByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesChunk); + for (const message of messagesOrdered) { + const remoteJid = message.key.remoteJid; + const timestamp = message.messageTimestamp; - if (messagesByPhoneNumber.size > 0) { - const fksByNumber = await this.selectOrCreateFksFromChatwoot( - provider, - inbox, - phoneNumbersWithTimestamp, - messagesByPhoneNumber, - ); - - // inserting messages in chatwoot db - let sqlInsertMsg = `INSERT INTO messages - (content, account_id, inbox_id, conversation_id, message_type, private, content_type, - sender_type, sender_id, created_at, updated_at) VALUES `; - const bindInsertMsg = [provider.account_id, inbox.id]; - - messagesByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => { - const fksChatwoot = fksByNumber.get(phoneNumber); - - messages.forEach((message) => { - if (!message.message) { - return; - } - - if (!fksChatwoot?.conversation_id || !fksChatwoot?.contact_id) { - return; - } - - const contentMessage = this.getContentMessage(chatwootService, message); - if (!contentMessage) { - return; - } - - bindInsertMsg.push(contentMessage); - const bindContent = `$${bindInsertMsg.length}`; - - bindInsertMsg.push(fksChatwoot.conversation_id); - const bindConversationId = `$${bindInsertMsg.length}`; - - bindInsertMsg.push(message.key.fromMe ? '1' : '0'); - const bindMessageType = `$${bindInsertMsg.length}`; - - bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_type : 'Contact'); - const bindSenderType = `$${bindInsertMsg.length}`; - - bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id); - const bindSenderId = `$${bindInsertMsg.length}`; - - bindInsertMsg.push(message.messageTimestamp as number); - const bindmessageTimestamp = `$${bindInsertMsg.length}`; - - sqlInsertMsg += `(${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0, - ${bindSenderType},${bindSenderId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`; - }); - }); - if (bindInsertMsg.length > 2) { - if (sqlInsertMsg.slice(-1) === ',') { - sqlInsertMsg = sqlInsertMsg.slice(0, -1); - } - totalMessagesImported += (await pgClient.query(sqlInsertMsg, bindInsertMsg))?.rowCount ?? 0; - } + if (!firstLastMessageTimestampsByNumber.has(remoteJid)) { + firstLastMessageTimestampsByNumber.set(remoteJid, { first: timestamp, last: timestamp }); + } else { + const entry = firstLastMessageTimestampsByNumber.get(remoteJid); + if (timestamp < entry.first) entry.first = timestamp; + if (timestamp > entry.last) entry.last = timestamp; + } + } + + for (const [phoneNumber, timestamps] of firstLastMessageTimestampsByNumber.entries()) { + const contact = await chatwootService.createContact( + phoneNumber, + provider.inbox_id, + instance.instanceName, + ); + + const conversation = await chatwootService.createConversation( + phoneNumber, + provider.inbox_id, + provider.account_id, + ); + + let chunkedMessages = this.sliceIntoChunks( + messagesOrdered.filter(m => m.key.remoteJid === phoneNumber), + 3000, + ); + + while (chunkedMessages.length > 0) { + const chunk = chunkedMessages.shift(); + const sqlInsert = ` + INSERT INTO messages (content, account_id, inbox_id, conversation_id, created_at, updated_at, source_id) + VALUES ${chunk + .map( + _ => + `('${_.message?.conversation || _.message?.extendedTextMessage?.text || ''}', + $1, $2, $3, to_timestamp($4), to_timestamp($5), $6)`, + ) + .join(',')} + ON CONFLICT (source_id) + DO NOTHING + `; + + const bindValues = [ + provider.account_id, + provider.inbox_id, + conversation.id, + ...chunk.flatMap(m => [m.messageTimestamp, m.messageTimestamp, m.key.id]), + ]; + + await pgClient.query(sqlInsert, bindValues); + totalMessagesImported += chunk.length; } - messagesChunk = this.sliceIntoChunks(messagesOrdered, batchSize); } this.deleteHistoryMessages(instance); - this.deleteRepositoryMessagesCache(instance); - - this.importHistoryContacts(instance, provider); return totalMessagesImported; } catch (error) { this.logger.error(`Error on import history messages: ${error.toString()}`); - - this.deleteHistoryMessages(instance); - this.deleteRepositoryMessagesCache(instance); } } - public async selectOrCreateFksFromChatwoot( - provider: ChatwootRaw, - inbox: inbox, - phoneNumbersWithTimestamp: Map, - messagesByPhoneNumber: Map, - ): Promise> { + public sliceIntoChunks(arr: T[], chunkSize: number): T[] { + const res = []; + for (let i = 0; i < arr.length; i += chunkSize) { + const chunk = arr.slice(i, i + chunkSize); + res.push(chunk); + } + return res; + } + + private async getChatwootUser(provider: ChatwootRaw): Promise { const pgClient = postgresClient.getChatwootConnection(); - - const bindValues = [provider.account_id, inbox.id]; - const phoneNumberBind = Array.from(messagesByPhoneNumber.keys()) - .map((phoneNumber) => { - const phoneNumberTimestamp = phoneNumbersWithTimestamp.get(phoneNumber); - - if (phoneNumberTimestamp) { - bindValues.push(phoneNumber); - let bindStr = `($${bindValues.length},`; - - bindValues.push(phoneNumberTimestamp.first); - bindStr += `$${bindValues.length},`; - - bindValues.push(phoneNumberTimestamp.last); - return `${bindStr}$${bindValues.length})`; - } - }) - .join(','); - - // select (or insert when necessary) data from tables contacts, contact_inboxes, conversations from chatwoot db - const sqlFromChatwoot = `WITH - phone_number AS ( - SELECT phone_number, created_at::INTEGER, last_activity_at::INTEGER FROM ( - VALUES - ${phoneNumberBind} - ) as t (phone_number, created_at, last_activity_at) - ), - - only_new_phone_number AS ( - SELECT * FROM phone_number - WHERE phone_number NOT IN ( - SELECT phone_number - FROM contacts - JOIN contact_inboxes ci ON ci.contact_id = contacts.id AND ci.inbox_id = $2 - JOIN conversations con ON con.contact_inbox_id = ci.id - AND con.account_id = $1 - AND con.inbox_id = $2 - AND con.contact_id = contacts.id - WHERE contacts.account_id = $1 - ) - ), - - new_contact AS ( - INSERT INTO contacts (name, phone_number, account_id, identifier, created_at, updated_at) - SELECT REPLACE(p.phone_number, '+', ''), p.phone_number, $1, CONCAT(REPLACE(p.phone_number, '+', ''), - '@s.whatsapp.net'), to_timestamp(p.created_at), to_timestamp(p.last_activity_at) - FROM only_new_phone_number AS p - ON CONFLICT(identifier, account_id) DO UPDATE SET updated_at = EXCLUDED.updated_at - RETURNING id, phone_number, created_at, updated_at - ), - - new_contact_inbox AS ( - INSERT INTO contact_inboxes (contact_id, inbox_id, source_id, created_at, updated_at) - SELECT new_contact.id, $2, gen_random_uuid(), new_contact.created_at, new_contact.updated_at - FROM new_contact - RETURNING id, contact_id, created_at, updated_at - ), - - new_conversation AS ( - INSERT INTO conversations (account_id, inbox_id, status, contact_id, - contact_inbox_id, uuid, last_activity_at, created_at, updated_at) - SELECT $1, $2, 0, new_contact_inbox.contact_id, new_contact_inbox.id, gen_random_uuid(), - new_contact_inbox.updated_at, new_contact_inbox.created_at, new_contact_inbox.updated_at - FROM new_contact_inbox - RETURNING id, contact_id - ) - - SELECT new_contact.phone_number, new_conversation.contact_id, new_conversation.id AS conversation_id - FROM new_conversation - JOIN new_contact ON new_conversation.contact_id = new_contact.id - - UNION - - SELECT p.phone_number, c.id contact_id, con.id conversation_id - FROM phone_number p - JOIN contacts c ON c.phone_number = p.phone_number - JOIN contact_inboxes ci ON ci.contact_id = c.id AND ci.inbox_id = $2 - JOIN conversations con ON con.contact_inbox_id = ci.id AND con.account_id = $1 - AND con.inbox_id = $2 AND con.contact_id = c.id`; - - const fksFromChatwoot = await pgClient.query(sqlFromChatwoot, bindValues); - - return new Map(fksFromChatwoot.rows.map((item: FksChatwoot) => [item.phone_number, item])); - } - - public async getChatwootUser(provider: ChatwootRaw): Promise { - try { - const pgClient = postgresClient.getChatwootConnection(); - - const sqlUser = `SELECT owner_type AS user_type, owner_id AS user_id - FROM access_tokens - WHERE token = $1`; - - return (await pgClient.query(sqlUser, [provider.token]))?.rows[0] || false; - } catch (error) { - this.logger.error(`Error on getChatwootUser: ${error.toString()}`); - } - } - - public createMessagesMapByPhoneNumber(messages: MessageRaw[]): Map { - return messages.reduce((acc: Map, message: MessageRaw) => { - if (!this.isIgnorePhoneNumber(message?.key?.remoteJid)) { - const phoneNumber = message?.key?.remoteJid?.split('@')[0]; - if (phoneNumber) { - const phoneNumberPlus = `+${phoneNumber}`; - const messages = acc.has(phoneNumberPlus) ? acc.get(phoneNumberPlus) : []; - messages.push(message); - acc.set(phoneNumberPlus, messages); - } - } - - return acc; - }, new Map()); - } - - public async getContactsOrderByRecentConversations( - inbox: inbox, - provider: ChatwootRaw, - limit = 50, - ): Promise<{ id: number; phone_number: string; identifier: string }[]> { - try { - const pgClient = postgresClient.getChatwootConnection(); - - const sql = `SELECT contacts.id, contacts.identifier, contacts.phone_number - FROM conversations - JOIN contacts ON contacts.id = conversations.contact_id - WHERE conversations.account_id = $1 - AND inbox_id = $2 - ORDER BY conversations.last_activity_at DESC - LIMIT $3`; - - return (await pgClient.query(sql, [provider.account_id, inbox.id, limit]))?.rows; - } catch (error) { - this.logger.error(`Error on get recent conversations: ${error.toString()}`); - } - } - - public getContentMessage(chatwootService: ChatwootService, msg: IWebMessageInfo) { - const contentMessage = chatwootService.getConversationMessage(msg.message); - if (contentMessage) { - return contentMessage; - } - - if (!configService.get('CHATWOOT').IMPORT.PLACEHOLDER_MEDIA_MESSAGE) { - return ''; - } - - const types = { - documentMessage: msg.message.documentMessage, - documentWithCaptionMessage: msg.message.documentWithCaptionMessage?.message?.documentMessage, - imageMessage: msg.message.imageMessage, - videoMessage: msg.message.videoMessage, - audioMessage: msg.message.audioMessage, - stickerMessage: msg.message.stickerMessage, - templateMessage: msg.message.templateMessage?.hydratedTemplate?.hydratedContentText, - }; - const typeKey = Object.keys(types).find((key) => types[key] !== undefined); - - switch (typeKey) { - case 'documentMessage': - return `__`; - - case 'documentWithCaptionMessage': - return `__`; - - case 'templateMessage': - return msg.message.templateMessage.hydratedTemplate.hydratedTitleText - ? `*${msg.message.templateMessage.hydratedTemplate.hydratedTitleText}*\\n` - : '' + msg.message.templateMessage.hydratedTemplate.hydratedContentText; - - case 'imageMessage': - return '__'; - - case 'videoMessage': - return '_