redis.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const redis_1 = require("redis");
  4. const loggers_1 = require("./loggers");
  5. const loggers = {
  6. main: (0, loggers_1.getLogger)('redis'),
  7. cacher: (0, loggers_1.getLogger)('redis/cache'),
  8. controller: (0, loggers_1.getLogger)('redis/controller'),
  9. replica: (0, loggers_1.getLogger)('redis/replica'),
  10. agent: (0, loggers_1.getLogger)('redis/agent'),
  11. };
  12. class default_1 {
  13. constructor(opt) {
  14. this.chatAsString = (chat) => `${chat.chatType}:${chat.chatID.toString()}`;
  15. this.cacheContent = (contentId, content) => new Promise((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  16. loggers.cacher.debug(`cached content ${contentId}, result: ${res}`);
  17. }).catch((err) => {
  18. loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`);
  19. });
  20. this.cacheForChat = (postId, target) => {
  21. const targetStr = this.chatAsString(target);
  22. return new Promise((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  23. loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
  24. }).catch((err) => {
  25. loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
  26. });
  27. };
  28. this.getContent = (contentId) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  29. loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`);
  30. return res;
  31. }).catch((err) => {
  32. loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
  33. throw err;
  34. });
  35. this.startProcess = (processId) => {
  36. const logger = loggers.agent;
  37. this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
  38. if (err) {
  39. return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
  40. }
  41. logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`);
  42. });
  43. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  44. if (err)
  45. logger.error(`failed to subscribe to own process lock, error: ${err}`);
  46. });
  47. };
  48. this.finishProcess = (processId) => {
  49. this.client.publish(`twitter:lock/${processId}`, 'DONE');
  50. };
  51. this.waitForProcess = (processId, timeout) => {
  52. const logger = loggers.agent;
  53. if (!(processId in this.subscriptions)) {
  54. logger.debug(`creating new waiting function for ${processId}...`);
  55. let timeoutHandle;
  56. this.subscriptions[processId] = new Promise((resolve, reject) => {
  57. this.subscriber.on('message', (channel, message) => {
  58. logger.debug(`received status notification from channel ${processId}, status: ${message}`);
  59. if (channel === `twitter:lock/${processId}`) {
  60. if (message === 'DONE')
  61. return resolve();
  62. if (message === 'BREAK')
  63. return reject();
  64. }
  65. if (channel === `twitter:ping/${processId}`) {
  66. this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => {
  67. if (err) {
  68. return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`);
  69. }
  70. logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`);
  71. });
  72. if (message === 'WIP') {
  73. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  74. if (err)
  75. logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`);
  76. logger.debug(`successfully subscribed to status channel of process ${processId}`);
  77. });
  78. }
  79. if (message === 'NONE')
  80. return resolve();
  81. }
  82. });
  83. timeoutHandle = setTimeout(() => {
  84. this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => {
  85. if (err) {
  86. logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`);
  87. }
  88. else {
  89. logger.warn(`timed out waiting for process ${processId}, give up waiting`);
  90. }
  91. });
  92. }, timeout);
  93. this.subscriber.subscribe(`twitter:ping/${processId}`, err => {
  94. if (err) {
  95. logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`);
  96. reject();
  97. }
  98. });
  99. this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => {
  100. if (err) {
  101. return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`);
  102. }
  103. logger.debug(`pinged process ${processId} via subscription client, result: ${res}`);
  104. });
  105. }).finally(() => {
  106. clearTimeout(timeoutHandle);
  107. logger.debug(`deleting waiting function for ${processId}...`);
  108. delete this.subscriptions[processId];
  109. });
  110. }
  111. return this.subscriptions[processId];
  112. };
  113. this.isCachedForChat = (postId, target) => {
  114. const targetStr = this.chatAsString(target);
  115. return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  116. loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
  117. return Boolean(res);
  118. }).catch((err) => {
  119. loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
  120. return false;
  121. });
  122. };
  123. this.client = (0, redis_1.createClient)({
  124. host: opt.redisHost,
  125. port: opt.redisPort,
  126. });
  127. this.subscriber = this.client.duplicate();
  128. let logger = loggers.main;
  129. this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
  130. if (err) {
  131. logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
  132. process.exit(1);
  133. }
  134. logger.debug(`subscribers of global lock registry: ${reply[1]}`);
  135. if (reply[1] === 0) {
  136. logger = loggers.controller;
  137. this.subscriber.subscribe('twitter:locks', err => {
  138. if (err) {
  139. logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
  140. process.exit(1);
  141. }
  142. logger.info(`nobody monitoring global lock registry, taken over now`);
  143. });
  144. }
  145. else {
  146. logger = loggers.replica;
  147. }
  148. this.subscriber.psubscribe(`twitter:lock/*`, err => {
  149. if (err)
  150. return logger.error(`failed to subscribe to active process locks, error: ${err}`);
  151. logger.debug(`monitoring all active locks`);
  152. });
  153. this.subscriber.on('message', (channel, message) => {
  154. if (channel === 'twitter:locks') {
  155. const match = /^(WIP|PING):(.+)$/.exec(message);
  156. if (!match)
  157. return;
  158. const processId = match[2];
  159. if (match[1] === 'WIP') {
  160. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  161. if (err)
  162. return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`);
  163. logger.info(`received notification from process ${processId}, accepting messages on channel...`);
  164. });
  165. }
  166. if (match[1] === 'PING') {
  167. logger.debug(`received ping request to process ${processId}, checking channel activity...`);
  168. this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => {
  169. if (err) {
  170. logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`);
  171. }
  172. const count = reply[1] || 0;
  173. const statusMsg = count > 0 ? 'WIP' : 'NONE';
  174. logger.debug(`status of channel ${processId}: ${statusMsg}`);
  175. this.client.publish(`twitter:ping/${processId}`, statusMsg, err => {
  176. if (err) {
  177. return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`);
  178. }
  179. logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`);
  180. });
  181. });
  182. }
  183. }
  184. else {
  185. const match = /^twitter:lock\/(.+)$/.exec(channel);
  186. if (!match)
  187. return;
  188. const processId = match[1];
  189. if (message === 'DONE')
  190. logger.info(`received notification that process ${processId} finished successfully`);
  191. if (message === 'BREAK')
  192. logger.warn(`received notification that process ${processId} was terminated prematurely`);
  193. this.subscriber.unsubscribe(channel, err => {
  194. if (err)
  195. return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
  196. logger.info(`successfully unsubscribed from process lock ${processId}`);
  197. });
  198. }
  199. });
  200. });
  201. this.subscriptions = {};
  202. this.expireAfter = opt.redisExpireTime;
  203. loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
  204. }
  205. }
  206. exports.default = default_1;