123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- const redis_1 = require("redis");
- const loggers_1 = require("./loggers");
- const loggers = {
- main: (0, loggers_1.getLogger)('redis'),
- cacher: (0, loggers_1.getLogger)('redis/cache'),
- controller: (0, loggers_1.getLogger)('redis/controller'),
- replica: (0, loggers_1.getLogger)('redis/replica'),
- agent: (0, loggers_1.getLogger)('redis/agent'),
- };
- class default_1 {
- constructor(opt) {
- this.chatAsString = (chat) => `${chat.chatType}:${chat.chatID.toString()}`;
- this.cacheContent = (contentId, content) => new Promise((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res))).then(res => {
- loggers.cacher.debug(`cached content ${contentId}, result: ${res}`);
- }).catch((err) => {
- loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`);
- });
- this.cacheForChat = (postId, target) => {
- const targetStr = this.chatAsString(target);
- return new Promise((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res))).then(res => {
- loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
- }).catch((err) => {
- loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
- });
- };
- this.getContent = (contentId) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
- loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`);
- return res;
- }).catch((err) => {
- loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
- throw err;
- });
- this.startProcess = (processId) => {
- const logger = loggers.agent;
- this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
- if (err) {
- return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
- }
- logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`);
- });
- this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
- if (err)
- logger.error(`failed to subscribe to own process lock, error: ${err}`);
- });
- };
- this.finishProcess = (processId) => {
- this.client.publish(`twitter:lock/${processId}`, 'DONE');
- };
- this.waitForProcess = (processId, timeout) => {
- const logger = loggers.agent;
- if (!(processId in this.subscriptions)) {
- logger.debug(`creating new waiting function for ${processId}...`);
- let timeoutHandle;
- this.subscriptions[processId] = new Promise((resolve, reject) => {
- this.subscriber.on('message', (channel, message) => {
- logger.debug(`received status notification from channel ${processId}, status: ${message}`);
- if (channel === `twitter:lock/${processId}`) {
- if (message === 'DONE')
- return resolve();
- if (message === 'BREAK')
- return reject();
- }
- if (channel === `twitter:ping/${processId}`) {
- this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => {
- if (err) {
- return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`);
- }
- logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`);
- });
- if (message === 'WIP') {
- this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
- if (err)
- logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`);
- logger.debug(`successfully subscribed to status channel of process ${processId}`);
- });
- }
- if (message === 'NONE')
- return resolve();
- }
- });
- timeoutHandle = setTimeout(() => {
- this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => {
- if (err) {
- logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`);
- }
- else {
- logger.warn(`timed out waiting for process ${processId}, give up waiting`);
- }
- });
- }, timeout);
- this.subscriber.subscribe(`twitter:ping/${processId}`, err => {
- if (err) {
- logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`);
- reject();
- }
- });
- this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => {
- if (err) {
- return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`);
- }
- logger.debug(`pinged process ${processId} via subscription client, result: ${res}`);
- });
- }).finally(() => {
- clearTimeout(timeoutHandle);
- logger.debug(`deleting waiting function for ${processId}...`);
- delete this.subscriptions[processId];
- });
- }
- return this.subscriptions[processId];
- };
- this.isCachedForChat = (postId, target) => {
- const targetStr = this.chatAsString(target);
- return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
- loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
- return Boolean(res);
- }).catch((err) => {
- loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
- return false;
- });
- };
- this.client = (0, redis_1.createClient)({
- host: opt.redisHost,
- port: opt.redisPort,
- });
- this.subscriber = this.client.duplicate();
- let logger = loggers.main;
- this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
- if (err) {
- logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
- process.exit(1);
- }
- logger.debug(`subscribers of global lock registry: ${reply[1]}`);
- if (reply[1] === 0) {
- logger = loggers.controller;
- this.subscriber.subscribe('twitter:locks', err => {
- if (err) {
- logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
- process.exit(1);
- }
- logger.info(`nobody monitoring global lock registry, taken over now`);
- });
- }
- else {
- logger = loggers.replica;
- }
- this.subscriber.psubscribe(`twitter:lock/*`, err => {
- if (err)
- return logger.error(`failed to subscribe to active process locks, error: ${err}`);
- logger.debug(`monitoring all active locks`);
- });
- this.subscriber.on('message', (channel, message) => {
- if (channel === 'twitter:locks') {
- const match = /^(WIP|PING):(.+)$/.exec(message);
- if (!match)
- return;
- const processId = match[2];
- if (match[1] === 'WIP') {
- this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
- if (err)
- return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`);
- logger.info(`received notification from process ${processId}, accepting messages on channel...`);
- });
- }
- if (match[1] === 'PING') {
- logger.debug(`received ping request to process ${processId}, checking channel activity...`);
- this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => {
- if (err) {
- logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`);
- }
- const count = reply[1] || 0;
- const statusMsg = count > 0 ? 'WIP' : 'NONE';
- logger.debug(`status of channel ${processId}: ${statusMsg}`);
- this.client.publish(`twitter:ping/${processId}`, statusMsg, err => {
- if (err) {
- return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`);
- }
- logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`);
- });
- });
- }
- }
- else {
- const match = /^twitter:lock\/(.+)$/.exec(channel);
- if (!match)
- return;
- const processId = match[1];
- if (message === 'DONE')
- logger.info(`received notification that process ${processId} finished successfully`);
- if (message === 'BREAK')
- logger.warn(`received notification that process ${processId} was terminated prematurely`);
- this.subscriber.unsubscribe(channel, err => {
- if (err)
- return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
- logger.info(`successfully unsubscribed from process lock ${processId}`);
- });
- }
- });
- });
- this.subscriptions = {};
- this.expireAfter = opt.redisExpireTime;
- loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
- }
- }
- exports.default = default_1;
|