import { createClient, Callback, ClientOpts, OverloadedCommand, RedisClient } from 'redis'; import { getLogger } from './loggers'; const loggers = { main: getLogger('redis'), cacher: getLogger('redis/cache'), controller: getLogger('redis/controller'), replica: getLogger('redis/replica'), agent: getLogger('redis/agent'), }; interface Client extends Omit, 'duplicate'> { pubsub: OverloadedCommand; duplicate: (options?: ClientOpts, cb?: Callback) => Client; } export default class { private client: Client; private subscriber: Client; private subscriptions: {[key: string]: Promise}; private expireAfter: number; constructor(opt: IRedisConfig) { this.client = createClient({ host: opt.redisHost, port: opt.redisPort, }) as unknown as Client; this.subscriber = this.client.duplicate(); let logger = loggers.main; this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => { if (err) { logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`); process.exit(1); } logger.debug(`subscribers of global lock registry: ${reply[1]}`); if (reply[1] === 0) { logger = loggers.controller; this.subscriber.subscribe('twitter:locks', err => { if (err) { logger.fatal(`failed to subscribe to global lock registry, error: ${err}`); process.exit(1); } logger.info(`nobody monitoring global lock registry, taken over now`); }); } else { logger = loggers.replica; } this.subscriber.psubscribe(`twitter:lock/*`, err => { if (err) return logger.error(`failed to subscribe to active process locks, error: ${err}`); logger.debug(`monitoring all active locks`); }); this.subscriber.on('message', (channel, message) => { if (channel === 'twitter:locks') { const match = /^(WIP|PING):(.+)$/.exec(message); if (!match) return; const processId = match[2]; if (match[1] === 'WIP') { this.subscriber.subscribe(`twitter:lock/${processId}`, err => { if (err) return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`); logger.info(`received notification from process ${processId}, accepting messages on channel...`); }); } if (match[1] === 'PING') { logger.debug(`received ping request to process ${processId}, checking channel activity...`); this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => { if (err) { logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`); } const count = reply[1] || 0; const statusMsg = count > 0 ? 'WIP' : 'NONE'; logger.debug(`status of channel ${processId}: ${statusMsg}`); this.client.publish(`twitter:ping/${processId}`, statusMsg, err => { if (err) { return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`); } logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`); }); }); } } else { const match = /^twitter:lock\/(.+)$/.exec(channel); if (!match) return; const processId = match[1]; if (message === 'DONE') logger.info(`received notification that process ${processId} finished successfully`); if (message === 'BREAK') logger.warn(`received notification that process ${processId} was terminated prematurely`); this.subscriber.unsubscribe(channel, err => { if (err) return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`); logger.info(`successfully unsubscribed from process lock ${processId}`); }); } }); }); this.subscriptions = {}; this.expireAfter = opt.redisExpireTime; loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`); } private chatAsString = (chat: IChat) => `${chat.chatType}:${chat.chatID.toString()}`; public cacheContent = (contentId: string, content: string) => new Promise<'OK'>((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res) ) ).then(res => { loggers.cacher.debug(`cached content ${contentId}, result: ${res}`); }).catch((err: Error) => { loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`); }); public cacheForChat = (postId: string, target: IChat) => { const targetStr = this.chatAsString(target); return new Promise<'OK'>((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res) ) ).then(res => { loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`); }).catch((err: Error) => { loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`); }); }; public getContent = (contentId: string) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res)) ).then(res => { loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`); return res; }).catch((err: Error) => { loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`); throw err; }); public startProcess = (processId: string) => { const logger = loggers.agent; this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => { if (err) { return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`); } logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`); }); this.subscriber.subscribe(`twitter:lock/${processId}`, err => { if (err) logger.error(`failed to subscribe to own process lock, error: ${err}`); }); } public finishProcess = (processId: string) => { this.client.publish(`twitter:lock/${processId}`, 'DONE'); } public waitForProcess = (processId: string, timeout: number) => { const logger = loggers.agent; if (!(processId in this.subscriptions)) { logger.debug(`creating new waiting function for ${processId}...`); let timeoutHandle: ReturnType; this.subscriptions[processId] = new Promise((resolve, reject) => { this.subscriber.on('message', (channel, message) => { logger.debug(`received status notification from channel ${processId}, status: ${message}`); if (channel === `twitter:lock/${processId}`) { if (message === 'DONE') return resolve(); if (message === 'BREAK') return reject(); } if (channel === `twitter:ping/${processId}`) { this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => { if (err) { return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`); } logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`); }); if (message === 'WIP') { this.subscriber.subscribe(`twitter:lock/${processId}`, err => { if (err) logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`); logger.debug(`successfully subscribed to status channel of process ${processId}`); }); } if (message === 'NONE') return resolve(); } }); timeoutHandle = setTimeout(() => { this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => { if (err) { logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`); } else { logger.warn(`timed out waiting for process ${processId}, give up waiting`); } }); }, timeout); this.subscriber.subscribe(`twitter:ping/${processId}`, err => { if (err) { logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`); reject(); } }) this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => { if (err) { return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`); } logger.debug(`pinged process ${processId} via subscription client, result: ${res}`); }); }).finally(() => { clearTimeout(timeoutHandle); logger.debug(`deleting waiting function for ${processId}...`); delete this.subscriptions[processId]; }) } return this.subscriptions[processId]; }; public isCachedForChat = (postId: string, target: IChat) => { const targetStr = this.chatAsString(target); return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res)) ).then(res => { loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`); return Boolean(res); }).catch((err: Error) => { loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`); return false; }); }; }