Merge pull request #395 from jaison-x/import-messages-chatwoot

feat(chatwoot): import history messages to chatwoot on whatsapp connection
This commit is contained in:
Davidson Gomes
2024-02-05 15:26:21 -03:00
committed by GitHub
23 changed files with 994 additions and 112 deletions

View File

@@ -73,6 +73,7 @@ import { dbserver } from '../../libs/db.connect';
import { RedisCache } from '../../libs/redis.client';
import { getIO } from '../../libs/socket.server';
import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server';
import { chatwootImport } from '../../utils/chatwoot-import-helper';
import { makeProxyAgent } from '../../utils/makeProxyAgent';
import { useMultiFileAuthStateDb } from '../../utils/use-multi-file-auth-state-db';
import { useMultiFileAuthStateRedisDb } from '../../utils/use-multi-file-auth-state-redis-db';
@@ -103,6 +104,7 @@ import {
GroupUpdateParticipantDto,
GroupUpdateSettingDto,
} from '../dto/group.dto';
import { InstanceDto } from '../dto/instance.dto';
import {
ContactMessage,
MediaMessage,
@@ -355,6 +357,15 @@ export class WAStartupService {
this.localChatwoot.conversation_pending = data?.conversation_pending;
this.logger.verbose(`Chatwoot conversation pending: ${this.localChatwoot.conversation_pending}`);
this.localChatwoot.import_contacts = data?.import_contacts;
this.logger.verbose(`Chatwoot import contacts: ${this.localChatwoot.import_contacts}`);
this.localChatwoot.import_messages = data?.import_messages;
this.logger.verbose(`Chatwoot import messages: ${this.localChatwoot.import_messages}`);
this.localChatwoot.days_limit_import_messages = data?.days_limit_import_messages;
this.logger.verbose(`Chatwoot days limit import messages: ${this.localChatwoot.days_limit_import_messages}`);
this.logger.verbose('Chatwoot loaded');
}
@@ -369,6 +380,9 @@ export class WAStartupService {
this.logger.verbose(`Chatwoot sign delimiter: ${data.sign_delimiter}`);
this.logger.verbose(`Chatwoot reopen conversation: ${data.reopen_conversation}`);
this.logger.verbose(`Chatwoot conversation pending: ${data.conversation_pending}`);
this.logger.verbose(`Chatwoot import contacts: ${data.import_contacts}`);
this.logger.verbose(`Chatwoot import messages: ${data.import_messages}`);
this.logger.verbose(`Chatwoot days limit import messages: ${data.days_limit_import_messages}`);
Object.assign(this.localChatwoot, { ...data, sign_delimiter: data.sign_msg ? data.sign_delimiter : null });
@@ -394,6 +408,9 @@ export class WAStartupService {
this.logger.verbose(`Chatwoot sign delimiter: ${data.sign_delimiter}`);
this.logger.verbose(`Chatwoot reopen conversation: ${data.reopen_conversation}`);
this.logger.verbose(`Chatwoot conversation pending: ${data.conversation_pending}`);
this.logger.verbose(`Chatwoot import contacts: ${data.import_contacts}`);
this.logger.verbose(`Chatwoot import messages: ${data.import_messages}`);
this.logger.verbose(`Chatwoot days limit import messages: ${data.days_limit_import_messages}`);
return {
enabled: data.enabled,
@@ -405,6 +422,9 @@ export class WAStartupService {
sign_delimiter: data.sign_delimiter || null,
reopen_conversation: data.reopen_conversation,
conversation_pending: data.conversation_pending,
import_contacts: data.import_contacts,
import_messages: data.import_messages,
days_limit_import_messages: data.days_limit_import_messages,
};
}
@@ -437,6 +457,9 @@ export class WAStartupService {
this.localSettings.read_status = data?.read_status;
this.logger.verbose(`Settings read_status: ${this.localSettings.read_status}`);
this.localSettings.sync_full_history = data?.sync_full_history;
this.logger.verbose(`Settings sync_full_history: ${this.localSettings.sync_full_history}`);
this.logger.verbose('Settings loaded');
}
@@ -449,6 +472,7 @@ export class WAStartupService {
this.logger.verbose(`Settings always_online: ${data.always_online}`);
this.logger.verbose(`Settings read_messages: ${data.read_messages}`);
this.logger.verbose(`Settings read_status: ${data.read_status}`);
this.logger.verbose(`Settings sync_full_history: ${data.sync_full_history}`);
Object.assign(this.localSettings, data);
this.logger.verbose('Settings set');
@@ -470,6 +494,7 @@ export class WAStartupService {
this.logger.verbose(`Settings always_online: ${data.always_online}`);
this.logger.verbose(`Settings read_messages: ${data.read_messages}`);
this.logger.verbose(`Settings read_status: ${data.read_status}`);
this.logger.verbose(`Settings sync_full_history: ${data.sync_full_history}`);
return {
reject_call: data.reject_call,
msg_call: data.msg_call,
@@ -477,6 +502,7 @@ export class WAStartupService {
always_online: data.always_online,
read_messages: data.read_messages,
read_status: data.read_status,
sync_full_history: data.sync_full_history,
};
}
@@ -1430,7 +1456,10 @@ export class WAStartupService {
msgRetryCounterCache: this.msgRetryCounterCache,
getMessage: async (key) => (await this.getMessage(key)) as Promise<proto.IMessage>,
generateHighQualityLinkPreview: true,
syncFullHistory: false,
syncFullHistory: this.localSettings.sync_full_history,
shouldSyncHistoryMessage: (msg: proto.Message.IHistorySyncNotification) => {
return this.historySyncNotification(msg);
},
userDevicesCache: this.userDevicesCache,
transactionOpts: { maxCommitRetries: 10, delayBetweenTriesMs: 10 },
patchMessageBeforeSending(message) {
@@ -1517,7 +1546,10 @@ export class WAStartupService {
msgRetryCounterCache: this.msgRetryCounterCache,
getMessage: async (key) => (await this.getMessage(key)) as Promise<proto.IMessage>,
generateHighQualityLinkPreview: true,
syncFullHistory: false,
syncFullHistory: this.localSettings.sync_full_history,
shouldSyncHistoryMessage: (msg: proto.Message.IHistorySyncNotification) => {
return this.historySyncNotification(msg);
},
userDevicesCache: this.userDevicesCache,
transactionOpts: { maxCommitRetries: 10, delayBetweenTriesMs: 10 },
patchMessageBeforeSending(message) {
@@ -1611,33 +1643,48 @@ export class WAStartupService {
private readonly contactHandle = {
'contacts.upsert': async (contacts: Contact[], database: Database) => {
this.logger.verbose('Event received: contacts.upsert');
try {
this.logger.verbose('Event received: contacts.upsert');
this.logger.verbose('Finding contacts in database');
const contactsRepository = await this.repository.contact.find({
where: { owner: this.instance.name },
});
this.logger.verbose('Finding contacts in database');
const contactsRepository = new Set(
(
await this.repository.contact.find({
select: { id: 1, _id: 0 },
where: { owner: this.instance.name },
})
).map((contact) => contact.id),
);
this.logger.verbose('Verifying if contacts exists in database to insert');
const contactsRaw: ContactRaw[] = [];
for await (const contact of contacts) {
if (contactsRepository.find((cr) => cr.id === contact.id)) {
continue;
this.logger.verbose('Verifying if contacts exists in database to insert');
const contactsRaw: ContactRaw[] = [];
for (const contact of contacts) {
if (contactsRepository.has(contact.id)) {
continue;
}
contactsRaw.push({
id: contact.id,
pushName: contact?.name || contact?.verifiedName || contact.id.split('@')[0],
profilePictureUrl: (await this.profilePicture(contact.id)).profilePictureUrl,
owner: this.instance.name,
});
}
contactsRaw.push({
id: contact.id,
pushName: contact?.name || contact?.verifiedName,
profilePictureUrl: (await this.profilePicture(contact.id)).profilePictureUrl,
owner: this.instance.name,
});
this.logger.verbose('Sending data to webhook in event CONTACTS_UPSERT');
this.sendDataWebhook(Events.CONTACTS_UPSERT, contactsRaw);
this.logger.verbose('Inserting contacts in database');
this.repository.contact.insert(contactsRaw, this.instance.name, database.SAVE_DATA.CONTACTS);
if (this.localChatwoot.enabled && this.localChatwoot.import_contacts && contactsRaw.length) {
this.chatwootService.addHistoryContacts({ instanceName: this.instance.name }, contactsRaw);
chatwootImport.importHistoryContacts({ instanceName: this.instance.name }, this.localChatwoot);
}
} catch (error) {
this.logger.error(error);
}
this.logger.verbose('Sending data to webhook in event CONTACTS_UPSERT');
this.sendDataWebhook(Events.CONTACTS_UPSERT, contactsRaw);
this.logger.verbose('Inserting contacts in database');
this.repository.contact.insert(contactsRaw, this.instance.name, database.SAVE_DATA.CONTACTS);
},
'contacts.update': async (contacts: Partial<Contact>[], database: Database) => {
@@ -1667,7 +1714,6 @@ export class WAStartupService {
{
messages,
chats,
isLatest,
}: {
chats: Chat[];
contacts: Contact[];
@@ -1676,55 +1722,115 @@ export class WAStartupService {
},
database: Database,
) => {
this.logger.verbose('Event received: messaging-history.set');
if (isLatest) {
this.logger.verbose('isLatest defined as true');
const chatsRaw: ChatRaw[] = chats.map((chat) => {
return {
try {
this.logger.verbose('Event received: messaging-history.set');
const instance: InstanceDto = { instanceName: this.instance.name };
const daysLimitToImport = this.localChatwoot.enabled ? this.localChatwoot.days_limit_import_messages : 1000;
this.logger.verbose(`Param days limit import messages is: ${daysLimitToImport}`);
const date = new Date();
const timestampLimitToImport = new Date(date.setDate(date.getDate() - daysLimitToImport)).getTime() / 1000;
const maxBatchTimestamp = Math.max(...messages.map((message) => message.messageTimestamp as number));
const processBatch = maxBatchTimestamp >= timestampLimitToImport;
if (!processBatch) {
this.logger.verbose('Batch ignored by maxTimestamp in this batch');
return;
}
const chatsRaw: ChatRaw[] = [];
const chatsRepository = new Set(
(
await this.repository.chat.find({
select: { id: 1, _id: 0 },
where: { owner: this.instance.name },
})
).map((chat) => chat.id),
);
for (const chat of chats) {
if (chatsRepository.has(chat.id)) {
continue;
}
chatsRaw.push({
id: chat.id,
owner: this.instance.name,
lastMsgTimestamp: chat.lastMessageRecvTimestamp,
};
});
});
}
this.logger.verbose('Sending data to webhook in event CHATS_SET');
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
this.logger.verbose('Inserting chats in database');
this.repository.chat.insert(chatsRaw, this.instance.name, database.SAVE_DATA.CHATS);
const messagesRaw: MessageRaw[] = [];
const messagesRepository = new Set(
chatwootImport.getRepositoryMessagesCache(instance) ??
(
await this.repository.message.find({
select: { key: { id: 1 }, _id: 0 },
where: { owner: this.instance.name },
})
).map((message) => message.key.id),
);
if (chatwootImport.getRepositoryMessagesCache(instance) === null) {
chatwootImport.setRepositoryMessagesCache(instance, messagesRepository);
}
for (const m of messages) {
if (!m.message || !m.key || !m.messageTimestamp) {
continue;
}
if (Long.isLong(m?.messageTimestamp)) {
m.messageTimestamp = m.messageTimestamp?.toNumber();
}
if (m.messageTimestamp <= timestampLimitToImport) {
continue;
}
if (messagesRepository.has(m.key.id)) {
continue;
}
messagesRaw.push({
key: m.key,
pushName: m.pushName || m.key.remoteJid.split('@')[0],
participant: m.participant,
message: { ...m.message },
messageType: getContentType(m.message),
messageTimestamp: m.messageTimestamp as number,
owner: this.instance.name,
});
}
this.logger.verbose('Sending data to webhook in event MESSAGES_SET');
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw]);
this.logger.verbose('Inserting messages in database');
await this.repository.message.insert(messagesRaw, this.instance.name, database.SAVE_DATA.NEW_MESSAGE);
if (this.localChatwoot.enabled && this.localChatwoot.import_messages && messagesRaw.length > 0) {
this.chatwootService.addHistoryMessages(
instance,
messagesRaw.filter((msg) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid)),
);
}
messages = undefined;
chats = undefined;
} catch (error) {
this.logger.error(error);
}
const messagesRaw: MessageRaw[] = [];
const messagesRepository = await this.repository.message.find({
where: { owner: this.instance.name },
});
for await (const [, m] of Object.entries(messages)) {
if (!m.message) {
continue;
}
if (messagesRepository.find((mr) => mr.owner === this.instance.name && mr.key.id === m.key.id)) {
continue;
}
if (Long.isLong(m?.messageTimestamp)) {
m.messageTimestamp = m.messageTimestamp?.toNumber();
}
messagesRaw.push({
key: m.key,
pushName: m.pushName,
participant: m.participant,
message: { ...m.message },
messageType: getContentType(m.message),
messageTimestamp: m.messageTimestamp as number,
owner: this.instance.name,
});
}
this.logger.verbose('Sending data to webhook in event MESSAGES_SET');
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw]);
messages = undefined;
},
'messages.upsert': async (
@@ -2215,6 +2321,35 @@ export class WAStartupService {
}
}
private historySyncNotification(msg: proto.Message.IHistorySyncNotification) {
const instance: InstanceDto = { instanceName: this.instance.name };
if (
this.localChatwoot.enabled &&
this.localChatwoot.import_messages &&
this.isSyncNotificationFromUsedSyncType(msg)
) {
if (msg.chunkOrder === 1) {
this.chatwootService.startImportHistoryMessages(instance);
}
if (msg.progress === 100) {
setTimeout(() => {
this.chatwootService.importHistoryMessages(instance);
}, 10000);
}
}
return true;
}
private isSyncNotificationFromUsedSyncType(msg: proto.Message.IHistorySyncNotification) {
return (
(this.localSettings.sync_full_history && msg?.syncType === 2) ||
(!this.localSettings.sync_full_history && msg?.syncType === 3)
);
}
private createJid(number: string): string {
this.logger.verbose('Creating jid with number: ' + number);