"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); const redis_1 = require("redis"); const loggers_1 = require("./loggers"); const loggers = { main: (0, loggers_1.getLogger)('redis'), cacher: (0, loggers_1.getLogger)('redis/cache'), controller: (0, loggers_1.getLogger)('redis/controller'), replica: (0, loggers_1.getLogger)('redis/replica'), agent: (0, loggers_1.getLogger)('redis/agent'), }; class default_1 { constructor(opt) { this.chatAsString = (chat) => `${chat.chatType}:${chat.chatID.toString()}`; this.cacheContent = (contentId, content) => new Promise((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res))).then(res => { loggers.cacher.debug(`cached content ${contentId}, result: ${res}`); }).catch((err) => { loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`); }); this.cacheForChat = (postId, target) => { const targetStr = this.chatAsString(target); return new Promise((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res))).then(res => { loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`); }).catch((err) => { loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`); }); }; this.getContent = (contentId) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => { loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`); return res; }).catch((err) => { loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`); throw err; }); this.startProcess = (processId) => { const logger = loggers.agent; this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => { if (err) { return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`); } logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`); }); this.subscriber.subscribe(`twitter:lock/${processId}`, err => { if (err) logger.error(`failed to subscribe to own process lock, error: ${err}`); }); }; this.finishProcess = (processId) => { this.client.publish(`twitter:lock/${processId}`, 'DONE'); }; this.waitForProcess = (processId, timeout) => { const logger = loggers.agent; if (!(processId in this.subscriptions)) { logger.debug(`creating new waiting function for ${processId}...`); let timeoutHandle; this.subscriptions[processId] = new Promise((resolve, reject) => { this.subscriber.on('message', (channel, message) => { logger.debug(`received status notification from channel ${processId}, status: ${message}`); if (channel === `twitter:lock/${processId}`) { if (message === 'DONE') return resolve(); if (message === 'BREAK') return reject(); } if (channel === `twitter:ping/${processId}`) { this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => { if (err) { return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`); } logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`); }); if (message === 'WIP') { this.subscriber.subscribe(`twitter:lock/${processId}`, err => { if (err) logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`); logger.debug(`successfully subscribed to status channel of process ${processId}`); }); } if (message === 'NONE') return resolve(); } }); timeoutHandle = setTimeout(() => { this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => { if (err) { logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`); } else { logger.warn(`timed out waiting for process ${processId}, give up waiting`); } }); }, timeout); this.subscriber.subscribe(`twitter:ping/${processId}`, err => { if (err) { logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`); reject(); } }); this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => { if (err) { return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`); } logger.debug(`pinged process ${processId} via subscription client, result: ${res}`); }); }).finally(() => { clearTimeout(timeoutHandle); logger.debug(`deleting waiting function for ${processId}...`); delete this.subscriptions[processId]; }); } return this.subscriptions[processId]; }; this.isCachedForChat = (postId, target) => { const targetStr = this.chatAsString(target); return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => { loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`); return Boolean(res); }).catch((err) => { loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`); return false; }); }; this.client = (0, redis_1.createClient)({ host: opt.redisHost, port: opt.redisPort, }); this.subscriber = this.client.duplicate(); let logger = loggers.main; this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => { if (err) { logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`); process.exit(1); } logger.debug(`subscribers of global lock registry: ${reply[1]}`); if (reply[1] === 0) { logger = loggers.controller; this.subscriber.subscribe('twitter:locks', err => { if (err) { logger.fatal(`failed to subscribe to global lock registry, error: ${err}`); process.exit(1); } logger.info(`nobody monitoring global lock registry, taken over now`); }); } else { logger = loggers.replica; } this.subscriber.psubscribe(`twitter:lock/*`, err => { if (err) return logger.error(`failed to subscribe to active process locks, error: ${err}`); logger.debug(`monitoring all active locks`); }); this.subscriber.on('message', (channel, message) => { if (channel === 'twitter:locks') { const match = /^(WIP|PING):(.+)$/.exec(message); if (!match) return; const processId = match[2]; if (match[1] === 'WIP') { this.subscriber.subscribe(`twitter:lock/${processId}`, err => { if (err) return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`); logger.info(`received notification from process ${processId}, accepting messages on channel...`); }); } if (match[1] === 'PING') { logger.debug(`received ping request to process ${processId}, checking channel activity...`); this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => { if (err) { logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`); } const count = reply[1] || 0; const statusMsg = count > 0 ? 'WIP' : 'NONE'; logger.debug(`status of channel ${processId}: ${statusMsg}`); this.client.publish(`twitter:ping/${processId}`, statusMsg, err => { if (err) { return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`); } logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`); }); }); } } else { const match = /^twitter:lock\/(.+)$/.exec(channel); if (!match) return; const processId = match[1]; if (message === 'DONE') logger.info(`received notification that process ${processId} finished successfully`); if (message === 'BREAK') logger.warn(`received notification that process ${processId} was terminated prematurely`); this.subscriber.unsubscribe(channel, err => { if (err) return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`); logger.info(`successfully unsubscribed from process lock ${processId}`); }); } }); }); this.subscriptions = {}; this.expireAfter = opt.redisExpireTime; loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`); } } exports.default = default_1;