fix: retry messages

This commit is contained in:
Davidson Gomes
2024-04-17 18:10:43 -03:00
parent d190d8b1af
commit 5ae5d8546e
12 changed files with 71 additions and 33 deletions

View File

@@ -43,6 +43,7 @@ export class InstanceController {
private readonly proxyService: ProxyController,
private readonly cache: RedisCache,
private readonly chatwootCache: CacheService,
private readonly messagesLostCache: CacheService,
) {}
private readonly logger = new Logger(InstanceController.name);
@@ -108,6 +109,7 @@ export class InstanceController {
this.repository,
this.cache,
this.chatwootCache,
this.messagesLostCache,
);
} else {
instance = new BaileysStartupService(
@@ -116,6 +118,7 @@ export class InstanceController {
this.repository,
this.cache,
this.chatwootCache,
this.messagesLostCache,
);
}

View File

@@ -1,22 +0,0 @@
import { CacheConf, ConfigService } from '../../../../config/env.config';
import { ICache } from '../../../abstract/abstract.cache';
import { LocalCache } from './localcache';
import { RedisCache } from './rediscache';
export class CacheEngine {
private engine: ICache;
constructor(private readonly configService: ConfigService, module: string) {
const cacheConf = configService.get<CacheConf>('CACHE');
if (cacheConf?.REDIS?.ENABLED && cacheConf?.REDIS?.URI !== '') {
this.engine = new RedisCache(configService, module);
} else if (cacheConf?.LOCAL?.ENABLED) {
this.engine = new LocalCache(configService, module);
}
}
public getEngine() {
return this.engine;
}
}

View File

@@ -1,48 +0,0 @@
import NodeCache from 'node-cache';
import { CacheConf, CacheConfLocal, ConfigService } from '../../../../config/env.config';
import { ICache } from '../../../abstract/abstract.cache';
export class LocalCache implements ICache {
private conf: CacheConfLocal;
static localCache = new NodeCache();
constructor(private readonly configService: ConfigService, private readonly module: string) {
this.conf = this.configService.get<CacheConf>('CACHE')?.LOCAL;
}
async get(key: string): Promise<any> {
return LocalCache.localCache.get(this.buildKey(key));
}
async set(key: string, value: any, ttl?: number) {
return LocalCache.localCache.set(this.buildKey(key), value, ttl || this.conf.TTL);
}
async has(key: string) {
return LocalCache.localCache.has(this.buildKey(key));
}
async delete(key: string) {
return LocalCache.localCache.del(this.buildKey(key));
}
async deleteAll(appendCriteria?: string) {
const keys = await this.keys(appendCriteria);
if (!keys?.length) {
return 0;
}
return LocalCache.localCache.del(keys);
}
async keys(appendCriteria?: string) {
const filter = `${this.buildKey('')}${appendCriteria ? `${appendCriteria}:` : ''}`;
return LocalCache.localCache.keys().filter((key) => key.substring(0, filter.length) === filter);
}
buildKey(key: string) {
return `${this.module}:${key}`;
}
}

View File

@@ -1,59 +0,0 @@
import { createClient, RedisClientType } from 'redis';
import { CacheConf, CacheConfRedis, configService } from '../../../../config/env.config';
import { Logger } from '../../../../config/logger.config';
class Redis {
private logger = new Logger(Redis.name);
private client: RedisClientType = null;
private conf: CacheConfRedis;
private connected = false;
constructor() {
this.conf = configService.get<CacheConf>('CACHE')?.REDIS;
}
getConnection(): RedisClientType {
if (this.connected) {
return this.client;
} else {
this.client = createClient({
url: this.conf.URI,
});
this.client.on('connect', () => {
this.logger.verbose('redis connecting');
});
this.client.on('ready', () => {
this.logger.verbose('redis ready');
this.connected = true;
});
this.client.on('error', () => {
this.logger.error('redis disconnected');
this.connected = false;
});
this.client.on('end', () => {
this.logger.verbose('redis connection ended');
this.connected = false;
});
try {
this.logger.verbose('connecting new redis client');
this.client.connect();
this.connected = true;
this.logger.verbose('connected to new redis client');
} catch (e) {
this.connected = false;
this.logger.error('redis connect exception caught: ' + e);
return null;
}
return this.client;
}
}
}
export const redisClient = new Redis();

View File

@@ -1,83 +0,0 @@
import { RedisClientType } from 'redis';
import { CacheConf, CacheConfRedis, ConfigService } from '../../../../config/env.config';
import { Logger } from '../../../../config/logger.config';
import { ICache } from '../../../abstract/abstract.cache';
import { redisClient } from './rediscache.client';
export class RedisCache implements ICache {
private readonly logger = new Logger(RedisCache.name);
private client: RedisClientType;
private conf: CacheConfRedis;
constructor(private readonly configService: ConfigService, private readonly module: string) {
this.conf = this.configService.get<CacheConf>('CACHE')?.REDIS;
this.client = redisClient.getConnection();
}
async get(key: string): Promise<any> {
try {
return JSON.parse(await this.client.get(this.buildKey(key)));
} catch (error) {
this.logger.error(error);
}
}
async set(key: string, value: any, ttl?: number) {
try {
await this.client.setEx(this.buildKey(key), ttl || this.conf?.TTL, JSON.stringify(value));
} catch (error) {
this.logger.error(error);
}
}
async has(key: string) {
try {
return (await this.client.exists(this.buildKey(key))) > 0;
} catch (error) {
this.logger.error(error);
}
}
async delete(key: string) {
try {
return await this.client.del(this.buildKey(key));
} catch (error) {
this.logger.error(error);
}
}
async deleteAll(appendCriteria?: string) {
try {
const keys = await this.keys(appendCriteria);
if (!keys?.length) {
return 0;
}
return await this.client.del(keys);
} catch (error) {
this.logger.error(error);
}
}
async keys(appendCriteria?: string) {
try {
const match = `${this.buildKey('')}${appendCriteria ? `${appendCriteria}:` : ''}*`;
const keys = [];
for await (const key of this.client.scanIterator({
MATCH: match,
COUNT: 100,
})) {
keys.push(key);
}
return [...new Set(keys)];
} catch (error) {
this.logger.error(error);
}
}
buildKey(key: string) {
return `${this.conf?.PREFIX_KEY}:${this.module}:${key}`;
}
}

View File

@@ -1,5 +1,6 @@
import { isURL } from 'class-validator';
import { CacheEngine } from '../../../../cache/cacheengine';
import { ConfigService, HttpServer } from '../../../../config/env.config';
import { Logger } from '../../../../config/logger.config';
import { BadRequestException } from '../../../../exceptions';
@@ -7,7 +8,6 @@ import { InstanceDto } from '../../../dto/instance.dto';
import { RepositoryBroker } from '../../../repository/repository.manager';
import { waMonitor } from '../../../server.module';
import { CacheService } from '../../../services/cache.service';
import { CacheEngine } from '../cache/cacheengine';
import { ChatwootDto } from '../dto/chatwoot.dto';
import { ChatwootService } from '../services/chatwoot.service';

View File

@@ -1,3 +1,4 @@
import { CacheEngine } from '../cache/cacheengine';
import { configService } from '../config/env.config';
import { eventEmitter } from '../config/event.config';
import { Logger } from '../config/logger.config';
@@ -14,7 +15,6 @@ import { WebhookController } from './controllers/webhook.controller';
import { ChamaaiController } from './integrations/chamaai/controllers/chamaai.controller';
import { ChamaaiRepository } from './integrations/chamaai/repository/chamaai.repository';
import { ChamaaiService } from './integrations/chamaai/services/chamaai.service';
import { CacheEngine } from './integrations/chatwoot/cache/cacheengine';
import { ChatwootController } from './integrations/chatwoot/controllers/chatwoot.controller';
import { ChatwootRepository } from './integrations/chatwoot/repository/chatwoot.repository';
import { ChatwootService } from './integrations/chatwoot/services/chatwoot.service';
@@ -109,8 +109,16 @@ export const repository = new RepositoryBroker(
export const cache = new RedisCache();
const chatwootCache = new CacheService(new CacheEngine(configService, ChatwootService.name).getEngine());
const messagesLostCache = new CacheService(new CacheEngine(configService, 'baileys').getEngine());
export const waMonitor = new WAMonitoringService(eventEmitter, configService, repository, cache, chatwootCache);
export const waMonitor = new WAMonitoringService(
eventEmitter,
configService,
repository,
cache,
chatwootCache,
messagesLostCache,
);
const authService = new AuthService(configService, waMonitor, repository);
@@ -160,6 +168,7 @@ export const instanceController = new InstanceController(
proxyController,
cache,
chatwootCache,
messagesLostCache,
);
export const sendMessageController = new SendMessageController(waMonitor);
export const chatController = new ChatController(waMonitor);

View File

@@ -36,6 +36,7 @@ export class WAMonitoringService {
private readonly repository: RepositoryBroker,
private readonly cache: RedisCache,
private readonly chatwootCache: CacheService,
private readonly messagesLostCache: CacheService,
) {
this.logger.verbose('instance created');
@@ -346,6 +347,7 @@ export class WAMonitoringService {
this.repository,
this.cache,
this.chatwootCache,
this.messagesLostCache,
);
instance.instanceName = name;
@@ -356,6 +358,7 @@ export class WAMonitoringService {
this.repository,
this.cache,
this.chatwootCache,
this.messagesLostCache,
);
instance.instanceName = name;

View File

@@ -71,6 +71,8 @@ export class WAStartupService {
public chamaaiService = new ChamaaiService(waMonitor, this.configService);
public set instanceName(name: string) {
this.logger.setInstance(name);
this.logger.verbose(`Initializing instance '${name}'`);
if (!name) {
this.logger.verbose('Instance name not found, generating random name with uuid');

View File

@@ -121,8 +121,6 @@ import { Events, MessageSubtype, TypeMediaMessage, wa } from '../../types/wa.typ
import { CacheService } from './../cache.service';
import { WAStartupService } from './../whatsapp.service';
// const retryCache = {};
export class BaileysStartupService extends WAStartupService {
constructor(
public readonly configService: ConfigService,
@@ -130,12 +128,17 @@ export class BaileysStartupService extends WAStartupService {
public readonly repository: RepositoryBroker,
public readonly cache: RedisCache,
public readonly chatwootCache: CacheService,
public readonly messagesLostCache: CacheService,
) {
super(configService, eventEmitter, repository, chatwootCache);
this.logger.verbose('BaileysStartupService initialized');
this.cleanStore();
this.instance.qrcode = { count: 0 };
this.mobile = false;
setTimeout(() => {
this.recoveringMessages();
}, 30000);
}
private readonly msgRetryCounterCache: CacheStore = new NodeCache();
@@ -148,6 +151,18 @@ export class BaileysStartupService extends WAStartupService {
public phoneNumber: string;
public mobile: boolean;
private async recoveringMessages() {
this.logger.info('Recovering messages');
this.messagesLostCache.keys().then((keys) => {
keys.forEach(async (key) => {
const message = await this.messagesLostCache.get(key.split(':')[2]);
if (message.messageStubParameters && message.messageStubParameters[0] === 'Message absent from node')
await this.client.sendMessageAck(JSON.parse(message.messageStubParameters[1], BufferJSON.reviver));
});
});
}
public get connectionStatus() {
this.logger.verbose('Getting connection status');
return this.stateConnection;
@@ -378,10 +393,12 @@ export class BaileysStartupService extends WAStartupService {
│ CONNECTED TO WHATSAPP │
└──────────────────────────────┘`.replace(/^ +/gm, ' '),
);
this.logger.info(`
this.logger.info(
`
wuid: ${formattedWuid}
name: ${formattedName}
`);
`,
);
if (this.localChatwoot.enabled) {
this.chatwootService.eventWhatsapp(
@@ -1044,12 +1061,22 @@ export class BaileysStartupService extends WAStartupService {
}
}
await this.messagesLostCache.set(received.key.id, received);
if (received.messageStubParameters && received.messageStubParameters[0] === 'Message absent from node') {
this.logger.info('Recovering message lost');
await this.client.sendMessageAck(JSON.parse(received.messageStubParameters[1], BufferJSON.reviver));
await this.messagesLostCache.set(received.key.id, received);
continue;
}
// const retryCache = (await this.messagesLostCache.get(received.key.id)) || null;
// if (retryCache) {
// this.logger.info('Recovered message lost');
// await this.messagesLostCache.delete(received.key.id);
// }
if (
(type !== 'notify' && type !== 'append') ||
received.message?.protocolMessage ||
@@ -1248,7 +1275,6 @@ export class BaileysStartupService extends WAStartupService {
}
}
// if (key.remoteJid !== 'status@broadcast' && !key?.remoteJid?.match(/(:\d+)/)) {
if (key.remoteJid !== 'status@broadcast') {
this.logger.verbose('Message update is valid');
@@ -1479,27 +1505,12 @@ export class BaileysStartupService extends WAStartupService {
if (events['messages.upsert']) {
this.logger.verbose('Listening event: messages.upsert');
const payload = events['messages.upsert'];
// if (payload.messages.find((a) => a?.messageStubType === 2)) {
// const msg = payload.messages[0];
// retryCache[msg.key.id] = msg;
// return;
// }
this.messageHandle['messages.upsert'](payload, database, settings);
}
if (events['messages.update']) {
this.logger.verbose('Listening event: messages.update');
const payload = events['messages.update'];
// payload.forEach((message) => {
// if (retryCache[message.key.id]) {
// this.client.ev.emit('messages.upsert', {
// messages: [message],
// type: 'notify',
// });
// delete retryCache[message.key.id];
// return;
// }
// });
this.messageHandle['messages.update'](payload, database, settings);
}

View File

@@ -36,6 +36,7 @@ export class BusinessStartupService extends WAStartupService {
public readonly repository: RepositoryBroker,
public readonly cache: RedisCache,
public readonly chatwootCache: CacheService,
public readonly messagesLostCache: CacheService,
) {
super(configService, eventEmitter, repository, chatwootCache);
this.logger.verbose('BusinessStartupService initialized');