mirror of
https://github.com/EvolutionAPI/evolution-api.git
synced 2026-01-08 13:00:24 -06:00
feat(chatwoot): import history messages to chatwoot on whatsapp connection
Messages are imported direct to chatwoot database. Media and group messages are ignored. New env.yml variables: CHATWOOT_IMPORT_DATABASE_CONNECTION_URI: URI to connect direct on chatwoot database. CHATWOOT_IMPORT_PLACEHOLDER_MEDIA_MESSAGE: Indicates to use a text placeholder on media messages. New instance setting: sync_full_history: Indicates to request a full history sync to baileys. New chatwoot options: import_contacts: Indicates to import contacts. import_messages: Indicates to import messages. days_limit_import_messages: Number of days to limit history messages to import.
This commit is contained in:
@@ -73,6 +73,7 @@ import { dbserver } from '../../libs/db.connect';
|
||||
import { RedisCache } from '../../libs/redis.client';
|
||||
import { getIO } from '../../libs/socket.server';
|
||||
import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server';
|
||||
import { chatwootImport } from '../../utils/chatwoot-import-helper';
|
||||
import { makeProxyAgent } from '../../utils/makeProxyAgent';
|
||||
import { useMultiFileAuthStateDb } from '../../utils/use-multi-file-auth-state-db';
|
||||
import { useMultiFileAuthStateRedisDb } from '../../utils/use-multi-file-auth-state-redis-db';
|
||||
@@ -103,6 +104,7 @@ import {
|
||||
GroupUpdateParticipantDto,
|
||||
GroupUpdateSettingDto,
|
||||
} from '../dto/group.dto';
|
||||
import { InstanceDto } from '../dto/instance.dto';
|
||||
import {
|
||||
ContactMessage,
|
||||
MediaMessage,
|
||||
@@ -355,6 +357,15 @@ export class WAStartupService {
|
||||
this.localChatwoot.conversation_pending = data?.conversation_pending;
|
||||
this.logger.verbose(`Chatwoot conversation pending: ${this.localChatwoot.conversation_pending}`);
|
||||
|
||||
this.localChatwoot.import_contacts = data?.import_contacts;
|
||||
this.logger.verbose(`Chatwoot import contacts: ${this.localChatwoot.import_contacts}`);
|
||||
|
||||
this.localChatwoot.import_messages = data?.import_messages;
|
||||
this.logger.verbose(`Chatwoot import messages: ${this.localChatwoot.import_messages}`);
|
||||
|
||||
this.localChatwoot.days_limit_import_messages = data?.days_limit_import_messages;
|
||||
this.logger.verbose(`Chatwoot days limit import messages: ${this.localChatwoot.days_limit_import_messages}`);
|
||||
|
||||
this.logger.verbose('Chatwoot loaded');
|
||||
}
|
||||
|
||||
@@ -369,6 +380,9 @@ export class WAStartupService {
|
||||
this.logger.verbose(`Chatwoot sign delimiter: ${data.sign_delimiter}`);
|
||||
this.logger.verbose(`Chatwoot reopen conversation: ${data.reopen_conversation}`);
|
||||
this.logger.verbose(`Chatwoot conversation pending: ${data.conversation_pending}`);
|
||||
this.logger.verbose(`Chatwoot import contacts: ${data.import_contacts}`);
|
||||
this.logger.verbose(`Chatwoot import messages: ${data.import_messages}`);
|
||||
this.logger.verbose(`Chatwoot days limit import messages: ${data.days_limit_import_messages}`);
|
||||
|
||||
Object.assign(this.localChatwoot, { ...data, sign_delimiter: data.sign_msg ? data.sign_delimiter : null });
|
||||
|
||||
@@ -394,6 +408,9 @@ export class WAStartupService {
|
||||
this.logger.verbose(`Chatwoot sign delimiter: ${data.sign_delimiter}`);
|
||||
this.logger.verbose(`Chatwoot reopen conversation: ${data.reopen_conversation}`);
|
||||
this.logger.verbose(`Chatwoot conversation pending: ${data.conversation_pending}`);
|
||||
this.logger.verbose(`Chatwoot import contacts: ${data.import_contacts}`);
|
||||
this.logger.verbose(`Chatwoot import messages: ${data.import_messages}`);
|
||||
this.logger.verbose(`Chatwoot days limit import messages: ${data.days_limit_import_messages}`);
|
||||
|
||||
return {
|
||||
enabled: data.enabled,
|
||||
@@ -405,6 +422,9 @@ export class WAStartupService {
|
||||
sign_delimiter: data.sign_delimiter || null,
|
||||
reopen_conversation: data.reopen_conversation,
|
||||
conversation_pending: data.conversation_pending,
|
||||
import_contacts: data.import_contacts,
|
||||
import_messages: data.import_messages,
|
||||
days_limit_import_messages: data.days_limit_import_messages,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -437,6 +457,9 @@ export class WAStartupService {
|
||||
this.localSettings.read_status = data?.read_status;
|
||||
this.logger.verbose(`Settings read_status: ${this.localSettings.read_status}`);
|
||||
|
||||
this.localSettings.sync_full_history = data?.sync_full_history;
|
||||
this.logger.verbose(`Settings sync_full_history: ${this.localSettings.sync_full_history}`);
|
||||
|
||||
this.logger.verbose('Settings loaded');
|
||||
}
|
||||
|
||||
@@ -449,6 +472,7 @@ export class WAStartupService {
|
||||
this.logger.verbose(`Settings always_online: ${data.always_online}`);
|
||||
this.logger.verbose(`Settings read_messages: ${data.read_messages}`);
|
||||
this.logger.verbose(`Settings read_status: ${data.read_status}`);
|
||||
this.logger.verbose(`Settings sync_full_history: ${data.sync_full_history}`);
|
||||
Object.assign(this.localSettings, data);
|
||||
this.logger.verbose('Settings set');
|
||||
|
||||
@@ -470,6 +494,7 @@ export class WAStartupService {
|
||||
this.logger.verbose(`Settings always_online: ${data.always_online}`);
|
||||
this.logger.verbose(`Settings read_messages: ${data.read_messages}`);
|
||||
this.logger.verbose(`Settings read_status: ${data.read_status}`);
|
||||
this.logger.verbose(`Settings sync_full_history: ${data.sync_full_history}`);
|
||||
return {
|
||||
reject_call: data.reject_call,
|
||||
msg_call: data.msg_call,
|
||||
@@ -477,6 +502,7 @@ export class WAStartupService {
|
||||
always_online: data.always_online,
|
||||
read_messages: data.read_messages,
|
||||
read_status: data.read_status,
|
||||
sync_full_history: data.sync_full_history,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1430,7 +1456,10 @@ export class WAStartupService {
|
||||
msgRetryCounterCache: this.msgRetryCounterCache,
|
||||
getMessage: async (key) => (await this.getMessage(key)) as Promise<proto.IMessage>,
|
||||
generateHighQualityLinkPreview: true,
|
||||
syncFullHistory: false,
|
||||
syncFullHistory: this.localSettings.sync_full_history,
|
||||
shouldSyncHistoryMessage: (msg: proto.Message.IHistorySyncNotification) => {
|
||||
return this.historySyncNotification(msg);
|
||||
},
|
||||
userDevicesCache: this.userDevicesCache,
|
||||
transactionOpts: { maxCommitRetries: 10, delayBetweenTriesMs: 10 },
|
||||
patchMessageBeforeSending(message) {
|
||||
@@ -1517,7 +1546,10 @@ export class WAStartupService {
|
||||
msgRetryCounterCache: this.msgRetryCounterCache,
|
||||
getMessage: async (key) => (await this.getMessage(key)) as Promise<proto.IMessage>,
|
||||
generateHighQualityLinkPreview: true,
|
||||
syncFullHistory: false,
|
||||
syncFullHistory: this.localSettings.sync_full_history,
|
||||
shouldSyncHistoryMessage: (msg: proto.Message.IHistorySyncNotification) => {
|
||||
return this.historySyncNotification(msg);
|
||||
},
|
||||
userDevicesCache: this.userDevicesCache,
|
||||
transactionOpts: { maxCommitRetries: 10, delayBetweenTriesMs: 10 },
|
||||
patchMessageBeforeSending(message) {
|
||||
@@ -1611,33 +1643,48 @@ export class WAStartupService {
|
||||
|
||||
private readonly contactHandle = {
|
||||
'contacts.upsert': async (contacts: Contact[], database: Database) => {
|
||||
this.logger.verbose('Event received: contacts.upsert');
|
||||
try {
|
||||
this.logger.verbose('Event received: contacts.upsert');
|
||||
|
||||
this.logger.verbose('Finding contacts in database');
|
||||
const contactsRepository = await this.repository.contact.find({
|
||||
where: { owner: this.instance.name },
|
||||
});
|
||||
this.logger.verbose('Finding contacts in database');
|
||||
const contactsRepository = new Set(
|
||||
(
|
||||
await this.repository.contact.find({
|
||||
select: { id: 1, _id: 0 },
|
||||
where: { owner: this.instance.name },
|
||||
})
|
||||
).map((contact) => contact.id),
|
||||
);
|
||||
|
||||
this.logger.verbose('Verifying if contacts exists in database to insert');
|
||||
const contactsRaw: ContactRaw[] = [];
|
||||
for await (const contact of contacts) {
|
||||
if (contactsRepository.find((cr) => cr.id === contact.id)) {
|
||||
continue;
|
||||
this.logger.verbose('Verifying if contacts exists in database to insert');
|
||||
const contactsRaw: ContactRaw[] = [];
|
||||
|
||||
for (const contact of contacts) {
|
||||
if (contactsRepository.has(contact.id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
contactsRaw.push({
|
||||
id: contact.id,
|
||||
pushName: contact?.name || contact?.verifiedName || contact.id.split('@')[0],
|
||||
profilePictureUrl: (await this.profilePicture(contact.id)).profilePictureUrl,
|
||||
owner: this.instance.name,
|
||||
});
|
||||
}
|
||||
|
||||
contactsRaw.push({
|
||||
id: contact.id,
|
||||
pushName: contact?.name || contact?.verifiedName,
|
||||
profilePictureUrl: (await this.profilePicture(contact.id)).profilePictureUrl,
|
||||
owner: this.instance.name,
|
||||
});
|
||||
this.logger.verbose('Sending data to webhook in event CONTACTS_UPSERT');
|
||||
this.sendDataWebhook(Events.CONTACTS_UPSERT, contactsRaw);
|
||||
|
||||
this.logger.verbose('Inserting contacts in database');
|
||||
this.repository.contact.insert(contactsRaw, this.instance.name, database.SAVE_DATA.CONTACTS);
|
||||
|
||||
if (this.localChatwoot.enabled && this.localChatwoot.import_contacts && contactsRaw.length) {
|
||||
this.chatwootService.addHistoryContacts({ instanceName: this.instance.name }, contactsRaw);
|
||||
chatwootImport.importHistoryContacts({ instanceName: this.instance.name }, this.localChatwoot);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(error);
|
||||
}
|
||||
|
||||
this.logger.verbose('Sending data to webhook in event CONTACTS_UPSERT');
|
||||
this.sendDataWebhook(Events.CONTACTS_UPSERT, contactsRaw);
|
||||
|
||||
this.logger.verbose('Inserting contacts in database');
|
||||
this.repository.contact.insert(contactsRaw, this.instance.name, database.SAVE_DATA.CONTACTS);
|
||||
},
|
||||
|
||||
'contacts.update': async (contacts: Partial<Contact>[], database: Database) => {
|
||||
@@ -1667,7 +1714,6 @@ export class WAStartupService {
|
||||
{
|
||||
messages,
|
||||
chats,
|
||||
isLatest,
|
||||
}: {
|
||||
chats: Chat[];
|
||||
contacts: Contact[];
|
||||
@@ -1676,55 +1722,115 @@ export class WAStartupService {
|
||||
},
|
||||
database: Database,
|
||||
) => {
|
||||
this.logger.verbose('Event received: messaging-history.set');
|
||||
if (isLatest) {
|
||||
this.logger.verbose('isLatest defined as true');
|
||||
const chatsRaw: ChatRaw[] = chats.map((chat) => {
|
||||
return {
|
||||
try {
|
||||
this.logger.verbose('Event received: messaging-history.set');
|
||||
|
||||
const instance: InstanceDto = { instanceName: this.instance.name };
|
||||
|
||||
const daysLimitToImport = this.localChatwoot.enabled ? this.localChatwoot.days_limit_import_messages : 1000;
|
||||
this.logger.verbose(`Param days limit import messages is: ${daysLimitToImport}`);
|
||||
|
||||
const date = new Date();
|
||||
const timestampLimitToImport = new Date(date.setDate(date.getDate() - daysLimitToImport)).getTime() / 1000;
|
||||
|
||||
const maxBatchTimestamp = Math.max(...messages.map((message) => message.messageTimestamp as number));
|
||||
|
||||
const processBatch = maxBatchTimestamp >= timestampLimitToImport;
|
||||
|
||||
if (!processBatch) {
|
||||
this.logger.verbose('Batch ignored by maxTimestamp in this batch');
|
||||
return;
|
||||
}
|
||||
|
||||
const chatsRaw: ChatRaw[] = [];
|
||||
const chatsRepository = new Set(
|
||||
(
|
||||
await this.repository.chat.find({
|
||||
select: { id: 1, _id: 0 },
|
||||
where: { owner: this.instance.name },
|
||||
})
|
||||
).map((chat) => chat.id),
|
||||
);
|
||||
|
||||
for (const chat of chats) {
|
||||
if (chatsRepository.has(chat.id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
chatsRaw.push({
|
||||
id: chat.id,
|
||||
owner: this.instance.name,
|
||||
lastMsgTimestamp: chat.lastMessageRecvTimestamp,
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.verbose('Sending data to webhook in event CHATS_SET');
|
||||
this.sendDataWebhook(Events.CHATS_SET, chatsRaw);
|
||||
|
||||
this.logger.verbose('Inserting chats in database');
|
||||
this.repository.chat.insert(chatsRaw, this.instance.name, database.SAVE_DATA.CHATS);
|
||||
|
||||
const messagesRaw: MessageRaw[] = [];
|
||||
const messagesRepository = new Set(
|
||||
chatwootImport.getRepositoryMessagesCache(instance) ??
|
||||
(
|
||||
await this.repository.message.find({
|
||||
select: { key: { id: 1 }, _id: 0 },
|
||||
where: { owner: this.instance.name },
|
||||
})
|
||||
).map((message) => message.key.id),
|
||||
);
|
||||
|
||||
if (chatwootImport.getRepositoryMessagesCache(instance) === null) {
|
||||
chatwootImport.setRepositoryMessagesCache(instance, messagesRepository);
|
||||
}
|
||||
|
||||
for (const m of messages) {
|
||||
if (!m.message || !m.key || !m.messageTimestamp) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (Long.isLong(m?.messageTimestamp)) {
|
||||
m.messageTimestamp = m.messageTimestamp?.toNumber();
|
||||
}
|
||||
|
||||
if (m.messageTimestamp <= timestampLimitToImport) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (messagesRepository.has(m.key.id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
messagesRaw.push({
|
||||
key: m.key,
|
||||
pushName: m.pushName || m.key.remoteJid.split('@')[0],
|
||||
participant: m.participant,
|
||||
message: { ...m.message },
|
||||
messageType: getContentType(m.message),
|
||||
messageTimestamp: m.messageTimestamp as number,
|
||||
owner: this.instance.name,
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.verbose('Sending data to webhook in event MESSAGES_SET');
|
||||
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw]);
|
||||
|
||||
this.logger.verbose('Inserting messages in database');
|
||||
await this.repository.message.insert(messagesRaw, this.instance.name, database.SAVE_DATA.NEW_MESSAGE);
|
||||
|
||||
if (this.localChatwoot.enabled && this.localChatwoot.import_messages && messagesRaw.length > 0) {
|
||||
this.chatwootService.addHistoryMessages(
|
||||
instance,
|
||||
messagesRaw.filter((msg) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid)),
|
||||
);
|
||||
}
|
||||
|
||||
messages = undefined;
|
||||
chats = undefined;
|
||||
} catch (error) {
|
||||
this.logger.error(error);
|
||||
}
|
||||
|
||||
const messagesRaw: MessageRaw[] = [];
|
||||
const messagesRepository = await this.repository.message.find({
|
||||
where: { owner: this.instance.name },
|
||||
});
|
||||
for await (const [, m] of Object.entries(messages)) {
|
||||
if (!m.message) {
|
||||
continue;
|
||||
}
|
||||
if (messagesRepository.find((mr) => mr.owner === this.instance.name && mr.key.id === m.key.id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (Long.isLong(m?.messageTimestamp)) {
|
||||
m.messageTimestamp = m.messageTimestamp?.toNumber();
|
||||
}
|
||||
|
||||
messagesRaw.push({
|
||||
key: m.key,
|
||||
pushName: m.pushName,
|
||||
participant: m.participant,
|
||||
message: { ...m.message },
|
||||
messageType: getContentType(m.message),
|
||||
messageTimestamp: m.messageTimestamp as number,
|
||||
owner: this.instance.name,
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.verbose('Sending data to webhook in event MESSAGES_SET');
|
||||
this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw]);
|
||||
|
||||
messages = undefined;
|
||||
},
|
||||
|
||||
'messages.upsert': async (
|
||||
@@ -2215,6 +2321,35 @@ export class WAStartupService {
|
||||
}
|
||||
}
|
||||
|
||||
private historySyncNotification(msg: proto.Message.IHistorySyncNotification) {
|
||||
const instance: InstanceDto = { instanceName: this.instance.name };
|
||||
|
||||
if (
|
||||
this.localChatwoot.enabled &&
|
||||
this.localChatwoot.import_messages &&
|
||||
this.isSyncNotificationFromUsedSyncType(msg)
|
||||
) {
|
||||
if (msg.chunkOrder === 1) {
|
||||
this.chatwootService.startImportHistoryMessages(instance);
|
||||
}
|
||||
|
||||
if (msg.progress === 100) {
|
||||
setTimeout(() => {
|
||||
this.chatwootService.importHistoryMessages(instance);
|
||||
}, 10000);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private isSyncNotificationFromUsedSyncType(msg: proto.Message.IHistorySyncNotification) {
|
||||
return (
|
||||
(this.localSettings.sync_full_history && msg?.syncType === 2) ||
|
||||
(!this.localSettings.sync_full_history && msg?.syncType === 3)
|
||||
);
|
||||
}
|
||||
|
||||
private createJid(number: string): string {
|
||||
this.logger.verbose('Creating jid with number: ' + number);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user