ソースを参照

actually fix last issue, redis subcat loggers

Mike L 3 年 前
コミット
4989c70f15
2 ファイル変更63 行追加37 行削除
  1. 32 19
      dist/redis.js
  2. 31 18
      src/redis.ts

+ 32 - 19
dist/redis.js

@@ -2,31 +2,38 @@
 Object.defineProperty(exports, "__esModule", { value: true });
 const redis_1 = require("redis");
 const loggers_1 = require("./loggers");
-const logger = (0, loggers_1.getLogger)('redis');
+const loggers = {
+    main: (0, loggers_1.getLogger)('redis'),
+    cacher: (0, loggers_1.getLogger)('redis/cache'),
+    controller: (0, loggers_1.getLogger)('redis/controller'),
+    replica: (0, loggers_1.getLogger)('redis/replica'),
+    agent: (0, loggers_1.getLogger)('redis/agent'),
+};
 class default_1 {
     constructor(opt) {
         this.chatAsString = (chat) => `${chat.chatType}:${chat.chatID.toString()}`;
         this.cacheContent = (contentId, content) => new Promise((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res))).then(res => {
-            logger.debug(`cached content ${contentId}, result: ${res}`);
+            loggers.cacher.debug(`cached content ${contentId}, result: ${res}`);
         }).catch((err) => {
-            logger.error(`failed to cache content ${contentId}, error: ${err}`);
+            loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`);
         });
         this.cacheForChat = (postId, target) => {
             const targetStr = this.chatAsString(target);
             return new Promise((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res))).then(res => {
-                logger.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
+                loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
             }).catch((err) => {
-                logger.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
+                loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
             });
         };
         this.getContent = (contentId) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
-            logger.debug(`retrieved cached content ${contentId}, result: ${res}`);
+            loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`);
             return res;
         }).catch((err) => {
-            logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
+            loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
             throw err;
         });
         this.startProcess = (processId) => {
+            const logger = loggers.agent;
             this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
                 if (err) {
                     return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
@@ -42,6 +49,7 @@ class default_1 {
             this.client.publish(`twitter:lock/${processId}`, 'DONE');
         };
         this.waitForProcess = (processId, timeout) => {
+            const logger = loggers.agent;
             if (!(processId in this.subscriptions)) {
                 logger.debug(`creating new waiting function for ${processId}...`);
                 let timeoutHandle;
@@ -105,10 +113,10 @@ class default_1 {
         this.isCachedForChat = (postId, target) => {
             const targetStr = this.chatAsString(target);
             return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
-                logger.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
+                loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
                 return Boolean(res);
             }).catch((err) => {
-                logger.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
+                loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
                 return false;
             });
         };
@@ -117,21 +125,26 @@ class default_1 {
             port: opt.redisPort,
         });
         this.subscriber = this.client.duplicate();
+        let logger = loggers.main;
         this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
             if (err) {
                 logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
                 process.exit(1);
             }
             logger.debug(`subscribers of global lock registry: ${reply[1]}`);
-            if (reply[1] > 0)
-                return;
-            this.subscriber.subscribe('twitter:locks', err => {
-                if (err) {
-                    logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
-                    process.exit(1);
-                }
-                logger.info(`nobody monitoring global lock registry, taken over now`);
-            });
+            if (reply[1] === 0) {
+                logger = loggers.controller;
+                this.subscriber.subscribe('twitter:locks', err => {
+                    if (err) {
+                        logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
+                        process.exit(1);
+                    }
+                    logger.info(`nobody monitoring global lock registry, taken over now`);
+                });
+            }
+            else {
+                logger = loggers.replica;
+            }
             this.subscriber.psubscribe(`twitter:lock/*`, err => {
                 if (err)
                     return logger.error(`failed to subscribe to active process locks, error: ${err}`);
@@ -187,7 +200,7 @@ class default_1 {
         });
         this.subscriptions = {};
         this.expireAfter = opt.redisExpireTime;
-        logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
+        loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
     }
 }
 exports.default = default_1;

+ 31 - 18
src/redis.ts

@@ -1,7 +1,13 @@
 import { createClient, Callback, ClientOpts, OverloadedCommand, RedisClient } from 'redis';
 import { getLogger } from './loggers';
 
-const logger = getLogger('redis');
+const loggers = {
+  main: getLogger('redis'),
+  cacher: getLogger('redis/cache'),
+  controller: getLogger('redis/controller'),
+  replica: getLogger('redis/replica'),
+  agent: getLogger('redis/agent'),
+};
 
 interface Client extends Omit<Omit<RedisClient, 'pubsub'>, 'duplicate'> {
   pubsub: OverloadedCommand<string | string[], (string | number)[], boolean>;
@@ -21,20 +27,25 @@ export default class {
       port: opt.redisPort,
     }) as unknown as Client;
     this.subscriber = this.client.duplicate();
+    let logger = loggers.main;
     this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
       if (err) {
         logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
         process.exit(1);
       }
       logger.debug(`subscribers of global lock registry: ${reply[1]}`);
-      if (reply[1] > 0) return;
-      this.subscriber.subscribe('twitter:locks', err => {
-        if (err) {
-          logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
-          process.exit(1);
-        }
-        logger.info(`nobody monitoring global lock registry, taken over now`);
-      });
+      if (reply[1] === 0) { 
+        logger = loggers.controller;
+        this.subscriber.subscribe('twitter:locks', err => {
+          if (err) {
+            logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
+            process.exit(1);
+          }
+          logger.info(`nobody monitoring global lock registry, taken over now`);
+        });
+      } else {
+        logger = loggers.replica;
+      }
       this.subscriber.psubscribe(`twitter:lock/*`, err => {
         if (err) return logger.error(`failed to subscribe to active process locks, error: ${err}`);
         logger.debug(`monitoring all active locks`);
@@ -82,7 +93,7 @@ export default class {
     });
     this.subscriptions = {};
     this.expireAfter = opt.redisExpireTime;
-    logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
+    loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
   }
 
   private chatAsString = (chat: IChat) => `${chat.chatType}:${chat.chatID.toString()}`;
@@ -93,9 +104,9 @@ export default class {
         err ? reject(err) : resolve(res)
       )
     ).then(res => {
-      logger.debug(`cached content ${contentId}, result: ${res}`);
+      loggers.cacher.debug(`cached content ${contentId}, result: ${res}`);
     }).catch((err: Error) => {
-      logger.error(`failed to cache content ${contentId}, error: ${err}`);
+      loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`);
     });
 
   public cacheForChat = (postId: string, target: IChat) => {
@@ -105,9 +116,9 @@ export default class {
         err ? reject(err) : resolve(res)
       )
     ).then(res => {
-      logger.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
+      loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
     }).catch((err: Error) => {
-      logger.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
+      loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
     });
   };
 
@@ -115,14 +126,15 @@ export default class {
     new Promise<string>((resolve, reject) =>
       this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))
     ).then(res => {
-      logger.debug(`retrieved cached content ${contentId}, result: ${res}`);
+      loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`);
       return res;
     }).catch((err: Error) => {
-      logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
+      loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
       throw err;
     });
 
   public startProcess = (processId: string) => {
+    const logger = loggers.agent;
     this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
       if (err) {
         return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
@@ -139,6 +151,7 @@ export default class {
   }
 
   public waitForProcess = (processId: string, timeout: number) => {
+    const logger = loggers.agent;
     if (!(processId in this.subscriptions)) {
       logger.debug(`creating new waiting function for ${processId}...`);
       let timeoutHandle: ReturnType<typeof setTimeout>;
@@ -200,10 +213,10 @@ export default class {
     return new Promise<number>((resolve, reject) =>
       this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))
     ).then(res => {
-      logger.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
+      loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
       return Boolean(res);
     }).catch((err: Error) => {
-      logger.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
+      loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
       return false;
     });
   };