redis.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const redis_1 = require("redis");
  4. const loggers_1 = require("./loggers");
  5. const logger = (0, loggers_1.getLogger)('redis');
  6. class default_1 {
  7. constructor(opt) {
  8. this.chatAsString = (chat) => `${chat.chatType}:${chat.chatID.toString()}`;
  9. this.cacheContent = (contentId, content) => new Promise((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  10. logger.debug(`cached content ${contentId}, result: ${res}`);
  11. }).catch((err) => {
  12. logger.error(`failed to cache content ${contentId}, error: ${err}`);
  13. });
  14. this.cacheForChat = (postId, target) => {
  15. const targetStr = this.chatAsString(target);
  16. return new Promise((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  17. logger.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
  18. }).catch((err) => {
  19. logger.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
  20. });
  21. };
  22. this.getContent = (contentId) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  23. logger.debug(`retrieved cached content ${contentId}, result: ${res}`);
  24. return res;
  25. }).catch((err) => {
  26. logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
  27. throw err;
  28. });
  29. this.startProcess = (processId) => {
  30. this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
  31. if (err) {
  32. return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
  33. }
  34. logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`);
  35. });
  36. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  37. if (err)
  38. logger.error(`failed to subscribe to own process lock, error: ${err}`);
  39. });
  40. };
  41. this.finishProcess = (processId) => {
  42. this.client.publish(`twitter:lock/${processId}`, 'DONE');
  43. };
  44. this.waitForProcess = (processId, timeout) => {
  45. if (!(processId in this.subscriptions)) {
  46. logger.debug(`creating new waiting function for ${processId}...`);
  47. let timeoutHandle;
  48. this.subscriptions[processId] = new Promise((resolve, reject) => {
  49. this.subscriber.on('message', (channel, message) => {
  50. logger.debug(`received status notification from channel ${processId}, status: ${message}`);
  51. if (channel === `twitter:lock/${processId}`) {
  52. if (message === 'DONE')
  53. return resolve();
  54. if (message === 'BREAK')
  55. return reject();
  56. }
  57. if (channel === `twitter:ping/${processId}`) {
  58. this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => {
  59. if (err) {
  60. return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`);
  61. }
  62. logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`);
  63. });
  64. if (message === 'WIP') {
  65. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  66. if (err)
  67. logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`);
  68. logger.debug(`successfully subscribed to status channel of process ${processId}`);
  69. });
  70. }
  71. if (message === 'NONE')
  72. return resolve();
  73. }
  74. });
  75. timeoutHandle = setTimeout(() => {
  76. this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => {
  77. if (err) {
  78. logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`);
  79. }
  80. else {
  81. logger.warn(`timed out waiting for process ${processId}, give up waiting`);
  82. }
  83. });
  84. }, timeout);
  85. this.subscriber.subscribe(`twitter:ping/${processId}`, err => {
  86. if (err) {
  87. logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`);
  88. reject();
  89. }
  90. });
  91. this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => {
  92. if (err) {
  93. return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`);
  94. }
  95. logger.debug(`pinged process ${processId} via subscription client, result: ${res}`);
  96. });
  97. }).finally(() => {
  98. clearTimeout(timeoutHandle);
  99. logger.debug(`deleting waiting function for ${processId}...`);
  100. delete this.subscriptions[processId];
  101. });
  102. }
  103. return this.subscriptions[processId];
  104. };
  105. this.isCachedForChat = (postId, target) => {
  106. const targetStr = this.chatAsString(target);
  107. return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  108. logger.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
  109. return Boolean(res);
  110. }).catch((err) => {
  111. logger.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
  112. return false;
  113. });
  114. };
  115. this.client = (0, redis_1.createClient)({
  116. host: opt.redisHost,
  117. port: opt.redisPort,
  118. });
  119. this.subscriber = this.client.duplicate();
  120. this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
  121. if (err) {
  122. logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
  123. process.exit(1);
  124. }
  125. logger.debug(`subscribers of global lock registry: ${reply[1]}`);
  126. if (reply[1] > 0)
  127. return;
  128. this.subscriber.subscribe('twitter:locks', err => {
  129. if (err) {
  130. logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
  131. process.exit(1);
  132. }
  133. logger.info(`nobody monitoring global lock registry, taken over now`);
  134. });
  135. this.subscriber.psubscribe(`twitter:lock/*`, err => {
  136. if (err)
  137. return logger.error(`failed to subscribe to active process locks, error: ${err}`);
  138. logger.debug(`monitoring all active locks`);
  139. });
  140. this.subscriber.on('message', (channel, message) => {
  141. if (channel === 'twitter:locks') {
  142. const match = /^(WIP|PING):(.+)$/.exec(message);
  143. if (!match)
  144. return;
  145. const processId = match[2];
  146. if (match[1] === 'WIP') {
  147. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  148. if (err)
  149. return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`);
  150. logger.info(`received notification from process ${processId}, accepting messages on channel...`);
  151. });
  152. }
  153. if (match[1] === 'PING') {
  154. logger.debug(`received ping request to process ${processId}, checking channel activity...`);
  155. this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => {
  156. if (err) {
  157. logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`);
  158. }
  159. const count = reply[1] || 0;
  160. const statusMsg = count > 0 ? 'WIP' : 'NONE';
  161. logger.debug(`status of channel ${processId}: ${statusMsg}`);
  162. this.client.publish(`twitter:ping/${processId}`, statusMsg, err => {
  163. if (err) {
  164. return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`);
  165. }
  166. logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`);
  167. });
  168. });
  169. }
  170. }
  171. else {
  172. const match = /^twitter:lock\/(.+)$/.exec(channel);
  173. if (!match)
  174. return;
  175. const processId = match[1];
  176. if (message === 'DONE')
  177. logger.info(`received notification that process ${processId} finished successfully`);
  178. if (message === 'BREAK')
  179. logger.warn(`received notification that process ${processId} was terminated prematurely`);
  180. this.subscriber.unsubscribe(channel, err => {
  181. if (err)
  182. return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
  183. logger.info(`successfully unsubscribed from process lock ${processId}`);
  184. });
  185. }
  186. });
  187. });
  188. this.subscriptions = {};
  189. this.expireAfter = opt.redisExpireTime;
  190. logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
  191. }
  192. }
  193. exports.default = default_1;