redis.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const redis_1 = require("redis");
  4. const loggers_1 = require("./loggers");
  5. const logger = loggers_1.getLogger('redis');
  6. class default_1 {
  7. constructor(opt) {
  8. this.chatAsString = (chat) => `${chat.chatType}:${chat.chatID.toString()}`;
  9. this.cacheContent = (contentId, content) => new Promise((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  10. logger.debug(`cached content ${contentId}, result: ${res}`);
  11. }).catch((err) => {
  12. logger.error(`failed to cache content ${contentId}, error: ${err}`);
  13. });
  14. this.cacheForChat = (postId, target) => {
  15. const targetStr = this.chatAsString(target);
  16. return new Promise((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  17. logger.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
  18. }).catch((err) => {
  19. logger.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
  20. });
  21. };
  22. this.getContent = (contentId) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  23. logger.debug(`retrieved cached content ${contentId}, result: ${res}`);
  24. return res;
  25. }).catch((err) => {
  26. logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
  27. throw err;
  28. });
  29. this.startProcess = (processId) => {
  30. this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
  31. if (err) {
  32. return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
  33. }
  34. logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`);
  35. });
  36. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  37. if (err)
  38. logger.error(`failed to subscribe to own process lock, error: ${err}`);
  39. });
  40. };
  41. this.finishProcess = (processId) => {
  42. this.client.publish(`twitter:lock/${processId}`, 'DONE');
  43. this.subscriber.unsubscribe(`twitter:lock/${processId}`, err => {
  44. if (err)
  45. return logger.error(`failed to unsubscribe from own process lock, error: ${err}`);
  46. logger.info(`successfully unsubscribed from process lock ${processId}`);
  47. });
  48. };
  49. this.waitForProcess = (processId, timeout) => {
  50. if (!(processId in this.subscriptions)) {
  51. logger.debug(`creating new waiting function for ${processId}...`);
  52. let timeoutHandle;
  53. this.subscriptions[processId] = new Promise((resolve, reject) => {
  54. this.subscriber.on('message', (channel, message) => {
  55. logger.debug(`received status notification from channel ${processId}, status: ${message}`);
  56. if (channel === `twitter:lock/${processId}`) {
  57. if (message === 'DONE')
  58. return resolve();
  59. if (message === 'BREAK')
  60. return reject();
  61. }
  62. if (channel === `twitter:ping/${processId}`) {
  63. this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => {
  64. if (err) {
  65. return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`);
  66. }
  67. logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`);
  68. });
  69. if (message === 'WIP') {
  70. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  71. if (err)
  72. logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`);
  73. logger.debug(`successfully subscribed to status channel of process ${processId}`);
  74. });
  75. }
  76. if (message === 'NONE')
  77. return resolve();
  78. }
  79. });
  80. timeoutHandle = setTimeout(() => {
  81. this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => {
  82. if (err) {
  83. logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`);
  84. }
  85. else {
  86. logger.warn(`timed out waiting for process ${processId}, give up waiting`);
  87. }
  88. });
  89. }, timeout);
  90. this.subscriber.subscribe(`twitter:ping/${processId}`, err => {
  91. if (err) {
  92. logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`);
  93. reject();
  94. }
  95. });
  96. this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => {
  97. if (err) {
  98. return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`);
  99. }
  100. logger.debug(`pinged process ${processId} via subscription client, result: ${res}`);
  101. });
  102. }).finally(() => {
  103. clearTimeout(timeoutHandle);
  104. logger.debug(`deleting waiting function for ${processId}...`);
  105. delete this.subscriptions[processId];
  106. });
  107. }
  108. return this.subscriptions[processId];
  109. };
  110. this.isCachedForChat = (postId, target) => {
  111. const targetStr = this.chatAsString(target);
  112. return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
  113. logger.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
  114. return Boolean(res);
  115. }).catch((err) => {
  116. logger.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
  117. return false;
  118. });
  119. };
  120. this.client = redis_1.createClient({
  121. host: opt.redisHost,
  122. port: opt.redisPort,
  123. });
  124. this.subscriber = this.client.duplicate();
  125. this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
  126. if (err) {
  127. logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
  128. process.exit(1);
  129. }
  130. logger.debug(`subscribers of global lock registry: ${reply[1]}`);
  131. if (reply[1] > 0)
  132. return;
  133. this.subscriber.subscribe('twitter:locks', err => {
  134. if (err) {
  135. logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
  136. process.exit(1);
  137. }
  138. logger.info(`nobody monitoring global lock registry, taken over now`);
  139. });
  140. this.subscriber.psubscribe(`twitter:lock/*`, err => {
  141. if (err)
  142. return logger.error(`failed to subscribe to active process locks, error: ${err}`);
  143. logger.debug(`monitoring all active locks`);
  144. });
  145. this.subscriber.on('message', (channel, message) => {
  146. if (channel === 'twitter:locks') {
  147. const match = /^(WIP|PING):(.+)$/.exec(message);
  148. if (!match)
  149. return;
  150. const processId = match[2];
  151. if (match[1] === 'WIP') {
  152. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  153. if (err)
  154. return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`);
  155. logger.info(`received notification from process ${processId}, accepting messages on channel...`);
  156. });
  157. }
  158. if (match[1] === 'PING') {
  159. logger.debug(`received ping request to process ${processId}, checking channel activity...`);
  160. this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => {
  161. if (err) {
  162. logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`);
  163. }
  164. const count = reply[1] || 0;
  165. const statusMsg = count > 0 ? 'WIP' : 'NONE';
  166. logger.debug(`status of channel ${processId}: ${statusMsg}`);
  167. this.client.publish(`twitter:ping/${processId}`, statusMsg, err => {
  168. if (err) {
  169. return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`);
  170. }
  171. logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`);
  172. });
  173. });
  174. }
  175. }
  176. });
  177. });
  178. this.subscriptions = {};
  179. this.expireAfter = opt.redisExpireTime;
  180. logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
  181. }
  182. }
  183. exports.default = default_1;