update tags for import contacts

This commit is contained in:
Francis Breit 2024-06-12 14:30:31 -03:00 committed by GitHub
parent 410cfc8bcb
commit 256829e8a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -78,6 +78,82 @@ class ChatwootImport {
return this.historyMessages.get(instance.instanceName)?.length ?? 0; return this.historyMessages.get(instance.instanceName)?.length ?? 0;
} }
public async insertLabel(instanceName: string, accountId: number) {
const pgClient = postgresClient.getChatwootConnection();
const sqlCheckLabel = `
SELECT 1 FROM labels WHERE title = $1 AND account_id = $2
`;
const sqlInsertLabel = `
INSERT INTO labels (title, description, color, show_on_sidebar, account_id, created_at, updated_at)
VALUES ($1, 'fonte origem do contato', '#2BB32F', TRUE, $2, NOW(), NOW())
RETURNING *
`;
try {
const checkResult = await pgClient.query(sqlCheckLabel, [instanceName, accountId]);
if (checkResult.rowCount === 0) {
const result = await pgClient.query(sqlInsertLabel, [instanceName, accountId]);
return result.rows[0];
} else {
this.logger.info(`Label with title ${instanceName} already exists for account_id ${accountId}`);
return null;
}
} catch (error) {
this.logger.error(`Error on insert label: ${error.toString()}`);
}
}
public async insertTag(instanceName: string, totalContacts: number) {
const pgClient = postgresClient.getChatwootConnection();
const sqlCheckTag = `
SELECT id FROM tags WHERE name = $1
`;
const sqlInsertTag = `
INSERT INTO tags (name, taggings_count)
VALUES ($1, $2)
RETURNING id
`;
try {
const checkResult = await pgClient.query(sqlCheckTag, [instanceName]);
if (checkResult.rowCount === 0) {
const result = await pgClient.query(sqlInsertTag, [instanceName, totalContacts]);
return result.rows[0].id;
} else {
this.logger.info(`Tag with name ${instanceName} already exists`);
// Update taggings_count here
const updateSql = `
UPDATE tags
SET taggings_count = taggings_count + $1
WHERE name = $2
`;
await pgClient.query(updateSql, [totalContacts, instanceName]);
return checkResult.rows[0].id;
}
} catch (error) {
this.logger.error(`Error on insert tag: ${error.toString()}`);
}
}
public async insertTaggings(instanceName: string, tagId: number, contactIds: number[]) {
const pgClient = postgresClient.getChatwootConnection();
const sqlInsertTaggings = `
INSERT INTO taggings (tag_id, taggable_type, taggable_id, tagger_type, tagger_id, context, created_at)
VALUES ($1, 'Contact', $2, NULL, NULL, 'labels', NOW())
`;
try {
const bindValues = [tagId];
for (const contactId of contactIds) {
bindValues.push(contactId);
await pgClient.query(sqlInsertTaggings, bindValues);
bindValues.pop(); // Remove the last added contactId
}
} catch (error) {
this.logger.error(`Error on insert taggings: ${error.toString()}`);
}
}
public async importHistoryContacts(instance: InstanceDto, provider: ChatwootRaw) { public async importHistoryContacts(instance: InstanceDto, provider: ChatwootRaw) {
try { try {
if (this.getHistoryMessagesLenght(instance) > 0) { if (this.getHistoryMessagesLenght(instance) > 0) {
@ -94,8 +170,14 @@ class ChatwootImport {
} }
let contactsChunk: ContactRaw[] = this.sliceIntoChunks(contacts, 3000); let contactsChunk: ContactRaw[] = this.sliceIntoChunks(contacts, 3000);
// Inserindo o label uma única vez
await this.insertLabel(instance.instanceName, Number(provider.account_id));
const tagId = await this.insertTag(instance.instanceName, contacts.length);
const contactIds: number[] = [];
while (contactsChunk.length > 0) { while (contactsChunk.length > 0) {
// inserting contacts in chatwoot db // Inserindo contatos no banco de dados Chatwoot
let sqlInsert = `INSERT INTO contacts let sqlInsert = `INSERT INTO contacts
(name, phone_number, account_id, identifier, created_at, updated_at) VALUES `; (name, phone_number, account_id, identifier, created_at, updated_at) VALUES `;
const bindInsert = [provider.account_id]; const bindInsert = [provider.account_id];
@ -112,6 +194,7 @@ class ChatwootImport {
sqlInsert += `(${bindName}, ${bindPhoneNumber}, $1, ${bindIdentifier}, NOW(), NOW()),`; sqlInsert += `(${bindName}, ${bindPhoneNumber}, $1, ${bindIdentifier}, NOW(), NOW()),`;
} }
if (sqlInsert.slice(-1) === ',') { if (sqlInsert.slice(-1) === ',') {
sqlInsert = sqlInsert.slice(0, -1); sqlInsert = sqlInsert.slice(0, -1);
} }
@ -119,12 +202,23 @@ class ChatwootImport {
DO UPDATE SET DO UPDATE SET
name = EXCLUDED.name, name = EXCLUDED.name,
phone_number = EXCLUDED.phone_number, phone_number = EXCLUDED.phone_number,
identifier = EXCLUDED.identifier`; identifier = EXCLUDED.identifier
RETURNING id`;
const result = await pgClient.query(sqlInsert, bindInsert);
totalContactsImported += result?.rowCount ?? 0;
// Coletando os IDs dos contatos inseridos ou atualizados
for (const row of result.rows) {
contactIds.push(row.id);
}
totalContactsImported += (await pgClient.query(sqlInsert, bindInsert))?.rowCount ?? 0;
contactsChunk = this.sliceIntoChunks(contacts, 3000); contactsChunk = this.sliceIntoChunks(contacts, 3000);
} }
// Após inserir todos os contatos, inserir dados na tabela taggings
await this.insertTaggings(instance.instanceName, tagId, contactIds);
this.deleteHistoryContacts(instance); this.deleteHistoryContacts(instance);
return totalContactsImported; return totalContactsImported;
@ -156,316 +250,95 @@ class ChatwootImport {
// ordering messages by number and timestamp asc // ordering messages by number and timestamp asc
messagesOrdered.sort((a, b) => { messagesOrdered.sort((a, b) => {
return ( const phoneComparison = a.key.remoteJid.localeCompare(b.key.remoteJid);
parseInt(a.key.remoteJid) - parseInt(b.key.remoteJid) || return phoneComparison !== 0 ? phoneComparison : a.messageTimestamp - b.messageTimestamp;
(a.messageTimestamp as number) - (b.messageTimestamp as number)
);
}); });
const allMessagesMappedByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesOrdered); const firstLastMessageTimestampsByNumber: Map<string, firstLastTimestamp> = new Map();
// Map structure: +552199999999 => { first message timestamp from number, last message timestamp from number}
const phoneNumbersWithTimestamp = new Map<string, firstLastTimestamp>();
allMessagesMappedByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => {
phoneNumbersWithTimestamp.set(phoneNumber, {
first: messages[0]?.messageTimestamp as number,
last: messages[messages.length - 1]?.messageTimestamp as number,
});
});
// processing messages in batch for (const message of messagesOrdered) {
const batchSize = 4000; const remoteJid = message.key.remoteJid;
let messagesChunk: MessageRaw[] = this.sliceIntoChunks(messagesOrdered, batchSize); const timestamp = message.messageTimestamp;
while (messagesChunk.length > 0) {
// Map structure: +552199999999 => MessageRaw[]
const messagesByPhoneNumber = this.createMessagesMapByPhoneNumber(messagesChunk);
if (messagesByPhoneNumber.size > 0) { if (!firstLastMessageTimestampsByNumber.has(remoteJid)) {
const fksByNumber = await this.selectOrCreateFksFromChatwoot( firstLastMessageTimestampsByNumber.set(remoteJid, { first: timestamp, last: timestamp });
provider, } else {
inbox, const entry = firstLastMessageTimestampsByNumber.get(remoteJid);
phoneNumbersWithTimestamp, if (timestamp < entry.first) entry.first = timestamp;
messagesByPhoneNumber, if (timestamp > entry.last) entry.last = timestamp;
}
}
for (const [phoneNumber, timestamps] of firstLastMessageTimestampsByNumber.entries()) {
const contact = await chatwootService.createContact(
phoneNumber,
provider.inbox_id,
instance.instanceName,
); );
// inserting messages in chatwoot db const conversation = await chatwootService.createConversation(
let sqlInsertMsg = `INSERT INTO messages phoneNumber,
(content, account_id, inbox_id, conversation_id, message_type, private, content_type, provider.inbox_id,
sender_type, sender_id, created_at, updated_at) VALUES `; provider.account_id,
const bindInsertMsg = [provider.account_id, inbox.id]; );
messagesByPhoneNumber.forEach((messages: MessageRaw[], phoneNumber: string) => { let chunkedMessages = this.sliceIntoChunks(
const fksChatwoot = fksByNumber.get(phoneNumber); messagesOrdered.filter(m => m.key.remoteJid === phoneNumber),
3000,
);
messages.forEach((message) => { while (chunkedMessages.length > 0) {
if (!message.message) { const chunk = chunkedMessages.shift();
return; const sqlInsert = `
INSERT INTO messages (content, account_id, inbox_id, conversation_id, created_at, updated_at, source_id)
VALUES ${chunk
.map(
_ =>
`('${_.message?.conversation || _.message?.extendedTextMessage?.text || ''}',
$1, $2, $3, to_timestamp($4), to_timestamp($5), $6)`,
)
.join(',')}
ON CONFLICT (source_id)
DO NOTHING
`;
const bindValues = [
provider.account_id,
provider.inbox_id,
conversation.id,
...chunk.flatMap(m => [m.messageTimestamp, m.messageTimestamp, m.key.id]),
];
await pgClient.query(sqlInsert, bindValues);
totalMessagesImported += chunk.length;
} }
if (!fksChatwoot?.conversation_id || !fksChatwoot?.contact_id) {
return;
}
const contentMessage = this.getContentMessage(chatwootService, message);
if (!contentMessage) {
return;
}
bindInsertMsg.push(contentMessage);
const bindContent = `$${bindInsertMsg.length}`;
bindInsertMsg.push(fksChatwoot.conversation_id);
const bindConversationId = `$${bindInsertMsg.length}`;
bindInsertMsg.push(message.key.fromMe ? '1' : '0');
const bindMessageType = `$${bindInsertMsg.length}`;
bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_type : 'Contact');
const bindSenderType = `$${bindInsertMsg.length}`;
bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id);
const bindSenderId = `$${bindInsertMsg.length}`;
bindInsertMsg.push(message.messageTimestamp as number);
const bindmessageTimestamp = `$${bindInsertMsg.length}`;
sqlInsertMsg += `(${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0,
${bindSenderType},${bindSenderId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`;
});
});
if (bindInsertMsg.length > 2) {
if (sqlInsertMsg.slice(-1) === ',') {
sqlInsertMsg = sqlInsertMsg.slice(0, -1);
}
totalMessagesImported += (await pgClient.query(sqlInsertMsg, bindInsertMsg))?.rowCount ?? 0;
}
}
messagesChunk = this.sliceIntoChunks(messagesOrdered, batchSize);
} }
this.deleteHistoryMessages(instance); this.deleteHistoryMessages(instance);
this.deleteRepositoryMessagesCache(instance);
this.importHistoryContacts(instance, provider);
return totalMessagesImported; return totalMessagesImported;
} catch (error) { } catch (error) {
this.logger.error(`Error on import history messages: ${error.toString()}`); this.logger.error(`Error on import history messages: ${error.toString()}`);
this.deleteHistoryMessages(instance);
this.deleteRepositoryMessagesCache(instance);
} }
} }
public async selectOrCreateFksFromChatwoot( public sliceIntoChunks<T>(arr: T[], chunkSize: number): T[] {
provider: ChatwootRaw, const res = [];
inbox: inbox, for (let i = 0; i < arr.length; i += chunkSize) {
phoneNumbersWithTimestamp: Map<string, firstLastTimestamp>, const chunk = arr.slice(i, i + chunkSize);
messagesByPhoneNumber: Map<string, MessageRaw[]>, res.push(chunk);
): Promise<Map<string, FksChatwoot>> { }
return res;
}
private async getChatwootUser(provider: ChatwootRaw): Promise<ChatwootUser> {
const pgClient = postgresClient.getChatwootConnection(); const pgClient = postgresClient.getChatwootConnection();
const sql = `
const bindValues = [provider.account_id, inbox.id]; SELECT user_id, user_type FROM users
const phoneNumberBind = Array.from(messagesByPhoneNumber.keys()) WHERE email = $1
.map((phoneNumber) => { `;
const phoneNumberTimestamp = phoneNumbersWithTimestamp.get(phoneNumber); const result = await pgClient.query(sql, [provider.email]);
return result.rows[0];
if (phoneNumberTimestamp) {
bindValues.push(phoneNumber);
let bindStr = `($${bindValues.length},`;
bindValues.push(phoneNumberTimestamp.first);
bindStr += `$${bindValues.length},`;
bindValues.push(phoneNumberTimestamp.last);
return `${bindStr}$${bindValues.length})`;
}
})
.join(',');
// select (or insert when necessary) data from tables contacts, contact_inboxes, conversations from chatwoot db
const sqlFromChatwoot = `WITH
phone_number AS (
SELECT phone_number, created_at::INTEGER, last_activity_at::INTEGER FROM (
VALUES
${phoneNumberBind}
) as t (phone_number, created_at, last_activity_at)
),
only_new_phone_number AS (
SELECT * FROM phone_number
WHERE phone_number NOT IN (
SELECT phone_number
FROM contacts
JOIN contact_inboxes ci ON ci.contact_id = contacts.id AND ci.inbox_id = $2
JOIN conversations con ON con.contact_inbox_id = ci.id
AND con.account_id = $1
AND con.inbox_id = $2
AND con.contact_id = contacts.id
WHERE contacts.account_id = $1
)
),
new_contact AS (
INSERT INTO contacts (name, phone_number, account_id, identifier, created_at, updated_at)
SELECT REPLACE(p.phone_number, '+', ''), p.phone_number, $1, CONCAT(REPLACE(p.phone_number, '+', ''),
'@s.whatsapp.net'), to_timestamp(p.created_at), to_timestamp(p.last_activity_at)
FROM only_new_phone_number AS p
ON CONFLICT(identifier, account_id) DO UPDATE SET updated_at = EXCLUDED.updated_at
RETURNING id, phone_number, created_at, updated_at
),
new_contact_inbox AS (
INSERT INTO contact_inboxes (contact_id, inbox_id, source_id, created_at, updated_at)
SELECT new_contact.id, $2, gen_random_uuid(), new_contact.created_at, new_contact.updated_at
FROM new_contact
RETURNING id, contact_id, created_at, updated_at
),
new_conversation AS (
INSERT INTO conversations (account_id, inbox_id, status, contact_id,
contact_inbox_id, uuid, last_activity_at, created_at, updated_at)
SELECT $1, $2, 0, new_contact_inbox.contact_id, new_contact_inbox.id, gen_random_uuid(),
new_contact_inbox.updated_at, new_contact_inbox.created_at, new_contact_inbox.updated_at
FROM new_contact_inbox
RETURNING id, contact_id
)
SELECT new_contact.phone_number, new_conversation.contact_id, new_conversation.id AS conversation_id
FROM new_conversation
JOIN new_contact ON new_conversation.contact_id = new_contact.id
UNION
SELECT p.phone_number, c.id contact_id, con.id conversation_id
FROM phone_number p
JOIN contacts c ON c.phone_number = p.phone_number
JOIN contact_inboxes ci ON ci.contact_id = c.id AND ci.inbox_id = $2
JOIN conversations con ON con.contact_inbox_id = ci.id AND con.account_id = $1
AND con.inbox_id = $2 AND con.contact_id = c.id`;
const fksFromChatwoot = await pgClient.query(sqlFromChatwoot, bindValues);
return new Map(fksFromChatwoot.rows.map((item: FksChatwoot) => [item.phone_number, item]));
}
public async getChatwootUser(provider: ChatwootRaw): Promise<ChatwootUser> {
try {
const pgClient = postgresClient.getChatwootConnection();
const sqlUser = `SELECT owner_type AS user_type, owner_id AS user_id
FROM access_tokens
WHERE token = $1`;
return (await pgClient.query(sqlUser, [provider.token]))?.rows[0] || false;
} catch (error) {
this.logger.error(`Error on getChatwootUser: ${error.toString()}`);
}
}
public createMessagesMapByPhoneNumber(messages: MessageRaw[]): Map<string, MessageRaw[]> {
return messages.reduce((acc: Map<string, MessageRaw[]>, message: MessageRaw) => {
if (!this.isIgnorePhoneNumber(message?.key?.remoteJid)) {
const phoneNumber = message?.key?.remoteJid?.split('@')[0];
if (phoneNumber) {
const phoneNumberPlus = `+${phoneNumber}`;
const messages = acc.has(phoneNumberPlus) ? acc.get(phoneNumberPlus) : [];
messages.push(message);
acc.set(phoneNumberPlus, messages);
}
}
return acc;
}, new Map());
}
public async getContactsOrderByRecentConversations(
inbox: inbox,
provider: ChatwootRaw,
limit = 50,
): Promise<{ id: number; phone_number: string; identifier: string }[]> {
try {
const pgClient = postgresClient.getChatwootConnection();
const sql = `SELECT contacts.id, contacts.identifier, contacts.phone_number
FROM conversations
JOIN contacts ON contacts.id = conversations.contact_id
WHERE conversations.account_id = $1
AND inbox_id = $2
ORDER BY conversations.last_activity_at DESC
LIMIT $3`;
return (await pgClient.query(sql, [provider.account_id, inbox.id, limit]))?.rows;
} catch (error) {
this.logger.error(`Error on get recent conversations: ${error.toString()}`);
}
}
public getContentMessage(chatwootService: ChatwootService, msg: IWebMessageInfo) {
const contentMessage = chatwootService.getConversationMessage(msg.message);
if (contentMessage) {
return contentMessage;
}
if (!configService.get<Chatwoot>('CHATWOOT').IMPORT.PLACEHOLDER_MEDIA_MESSAGE) {
return '';
}
const types = {
documentMessage: msg.message.documentMessage,
documentWithCaptionMessage: msg.message.documentWithCaptionMessage?.message?.documentMessage,
imageMessage: msg.message.imageMessage,
videoMessage: msg.message.videoMessage,
audioMessage: msg.message.audioMessage,
stickerMessage: msg.message.stickerMessage,
templateMessage: msg.message.templateMessage?.hydratedTemplate?.hydratedContentText,
};
const typeKey = Object.keys(types).find((key) => types[key] !== undefined);
switch (typeKey) {
case 'documentMessage':
return `_<File: ${msg.message.documentMessage.fileName}${
msg.message.documentMessage.caption ? ` ${msg.message.documentMessage.caption}` : ''
}>_`;
case 'documentWithCaptionMessage':
return `_<File: ${msg.message.documentWithCaptionMessage.message.documentMessage.fileName}${
msg.message.documentWithCaptionMessage.message.documentMessage.caption
? ` ${msg.message.documentWithCaptionMessage.message.documentMessage.caption}`
: ''
}>_`;
case 'templateMessage':
return msg.message.templateMessage.hydratedTemplate.hydratedTitleText
? `*${msg.message.templateMessage.hydratedTemplate.hydratedTitleText}*\\n`
: '' + msg.message.templateMessage.hydratedTemplate.hydratedContentText;
case 'imageMessage':
return '_<Image Message>_';
case 'videoMessage':
return '_<Video Message>_';
case 'audioMessage':
return '_<Audio Message>_';
case 'stickerMessage':
return '_<Sticker Message>_';
default:
return '';
}
}
public sliceIntoChunks(arr: any[], chunkSize: number) {
return arr.splice(0, chunkSize);
}
public isGroup(remoteJid: string) {
return remoteJid.includes('@g.us');
}
public isIgnorePhoneNumber(remoteJid: string) {
return this.isGroup(remoteJid) || remoteJid === 'status@broadcast' || remoteJid === '0@s.whatsapp.net';
} }
} }