redis.ts 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import { createClient, Callback, ClientOpts, OverloadedCommand, RedisClient } from 'redis';
  2. import { getLogger } from './loggers';
  3. const loggers = {
  4. main: getLogger('redis'),
  5. cacher: getLogger('redis/cache'),
  6. controller: getLogger('redis/controller'),
  7. replica: getLogger('redis/replica'),
  8. agent: getLogger('redis/agent'),
  9. };
  10. interface Client extends Omit<Omit<RedisClient, 'pubsub'>, 'duplicate'> {
  11. pubsub: OverloadedCommand<string | string[], (string | number)[], boolean>;
  12. duplicate: (options?: ClientOpts, cb?: Callback<Client>) => Client;
  13. }
  14. export default class {
  15. private client: Client;
  16. private subscriber: Client;
  17. private subscriptions: {[key: string]: Promise<void>};
  18. private expireAfter: number;
  19. constructor(opt: IRedisConfig) {
  20. this.client = createClient({
  21. host: opt.redisHost,
  22. port: opt.redisPort,
  23. }) as unknown as Client;
  24. this.subscriber = this.client.duplicate();
  25. let logger = loggers.main;
  26. this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
  27. if (err) {
  28. logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
  29. process.exit(1);
  30. }
  31. logger.debug(`subscribers of global lock registry: ${reply[1]}`);
  32. if (reply[1] === 0) {
  33. logger = loggers.controller;
  34. this.subscriber.subscribe('twitter:locks', err => {
  35. if (err) {
  36. logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
  37. process.exit(1);
  38. }
  39. logger.info(`nobody monitoring global lock registry, taken over now`);
  40. });
  41. } else {
  42. logger = loggers.replica;
  43. }
  44. this.subscriber.psubscribe(`twitter:lock/*`, err => {
  45. if (err) return logger.error(`failed to subscribe to active process locks, error: ${err}`);
  46. logger.debug(`monitoring all active locks`);
  47. });
  48. this.subscriber.on('message', (channel, message) => {
  49. if (channel === 'twitter:locks') {
  50. const match = /^(WIP|PING):(.+)$/.exec(message);
  51. if (!match) return;
  52. const processId = match[2];
  53. if (match[1] === 'WIP') {
  54. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  55. if (err) return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`);
  56. logger.info(`received notification from process ${processId}, accepting messages on channel...`);
  57. });
  58. }
  59. if (match[1] === 'PING') {
  60. logger.debug(`received ping request to process ${processId}, checking channel activity...`);
  61. this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => {
  62. if (err) {
  63. logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`);
  64. }
  65. const count = reply[1] || 0;
  66. const statusMsg = count > 0 ? 'WIP' : 'NONE';
  67. logger.debug(`status of channel ${processId}: ${statusMsg}`);
  68. this.client.publish(`twitter:ping/${processId}`, statusMsg, err => {
  69. if (err) {
  70. return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`);
  71. }
  72. logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`);
  73. });
  74. });
  75. }
  76. } else {
  77. const match = /^twitter:lock\/(.+)$/.exec(channel);
  78. if (!match) return;
  79. const processId = match[1];
  80. if (message === 'DONE') logger.info(`received notification that process ${processId} finished successfully`);
  81. if (message === 'BREAK') logger.warn(`received notification that process ${processId} was terminated prematurely`);
  82. this.subscriber.unsubscribe(channel, err => {
  83. if (err) return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
  84. logger.info(`successfully unsubscribed from process lock ${processId}`);
  85. });
  86. }
  87. });
  88. });
  89. this.subscriptions = {};
  90. this.expireAfter = opt.redisExpireTime;
  91. loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
  92. }
  93. private chatAsString = (chat: IChat) => `${chat.chatType}:${chat.chatID.toString()}`;
  94. public cacheContent = (contentId: string, content: string) =>
  95. new Promise<'OK'>((resolve, reject) =>
  96. this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) =>
  97. err ? reject(err) : resolve(res)
  98. )
  99. ).then(res => {
  100. loggers.cacher.debug(`cached content ${contentId}, result: ${res}`);
  101. }).catch((err: Error) => {
  102. loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`);
  103. });
  104. public cacheForChat = (postId: string, target: IChat) => {
  105. const targetStr = this.chatAsString(target);
  106. return new Promise<'OK'>((resolve, reject) =>
  107. this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) =>
  108. err ? reject(err) : resolve(res)
  109. )
  110. ).then(res => {
  111. loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
  112. }).catch((err: Error) => {
  113. loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
  114. });
  115. };
  116. public getContent = (contentId: string) =>
  117. new Promise<string>((resolve, reject) =>
  118. this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))
  119. ).then(res => {
  120. loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`);
  121. return res;
  122. }).catch((err: Error) => {
  123. loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
  124. throw err;
  125. });
  126. public startProcess = (processId: string) => {
  127. const logger = loggers.agent;
  128. this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
  129. if (err) {
  130. return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
  131. }
  132. logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`);
  133. });
  134. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  135. if (err) logger.error(`failed to subscribe to own process lock, error: ${err}`);
  136. });
  137. }
  138. public finishProcess = (processId: string) => {
  139. this.client.publish(`twitter:lock/${processId}`, 'DONE');
  140. }
  141. public waitForProcess = (processId: string, timeout: number) => {
  142. const logger = loggers.agent;
  143. if (!(processId in this.subscriptions)) {
  144. logger.debug(`creating new waiting function for ${processId}...`);
  145. let timeoutHandle: ReturnType<typeof setTimeout>;
  146. this.subscriptions[processId] = new Promise<void>((resolve, reject) => {
  147. this.subscriber.on('message', (channel, message) => {
  148. logger.debug(`received status notification from channel ${processId}, status: ${message}`);
  149. if (channel === `twitter:lock/${processId}`) {
  150. if (message === 'DONE') return resolve();
  151. if (message === 'BREAK') return reject();
  152. }
  153. if (channel === `twitter:ping/${processId}`) {
  154. this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => {
  155. if (err) {
  156. return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`);
  157. }
  158. logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`);
  159. });
  160. if (message === 'WIP') {
  161. this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
  162. if (err) logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`);
  163. logger.debug(`successfully subscribed to status channel of process ${processId}`);
  164. });
  165. }
  166. if (message === 'NONE') return resolve();
  167. }
  168. });
  169. timeoutHandle = setTimeout(() => {
  170. this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => {
  171. if (err) {
  172. logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`);
  173. } else {
  174. logger.warn(`timed out waiting for process ${processId}, give up waiting`);
  175. }
  176. });
  177. }, timeout);
  178. this.subscriber.subscribe(`twitter:ping/${processId}`, err => {
  179. if (err) {
  180. logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`);
  181. reject();
  182. }
  183. })
  184. this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => {
  185. if (err) {
  186. return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`);
  187. }
  188. logger.debug(`pinged process ${processId} via subscription client, result: ${res}`);
  189. });
  190. }).finally(() => {
  191. clearTimeout(timeoutHandle);
  192. logger.debug(`deleting waiting function for ${processId}...`);
  193. delete this.subscriptions[processId];
  194. })
  195. }
  196. return this.subscriptions[processId];
  197. };
  198. public isCachedForChat = (postId: string, target: IChat) => {
  199. const targetStr = this.chatAsString(target);
  200. return new Promise<number>((resolve, reject) =>
  201. this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))
  202. ).then(res => {
  203. loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
  204. return Boolean(res);
  205. }).catch((err: Error) => {
  206. loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
  207. return false;
  208. });
  209. };
  210. }