Quellcode durchsuchen

fix missing end signal from foreign redis

Mike L vor 3 Jahren
Ursprung
Commit
7f05eb2197
2 geänderte Dateien mit 25 neuen und 9 gelöschten Zeilen
  1. 15 5
      dist/redis.js
  2. 10 4
      src/redis.ts

+ 15 - 5
dist/redis.js

@@ -40,11 +40,6 @@ class default_1 {
         };
         this.finishProcess = (processId) => {
             this.client.publish(`twitter:lock/${processId}`, 'DONE');
-            this.subscriber.unsubscribe(`twitter:lock/${processId}`, err => {
-                if (err)
-                    return logger.error(`failed to unsubscribe from own process lock, error: ${err}`);
-                logger.info(`successfully unsubscribed from process lock ${processId}`);
-            });
         };
         this.waitForProcess = (processId, timeout) => {
             if (!(processId in this.subscriptions)) {
@@ -173,6 +168,21 @@ class default_1 {
                         });
                     }
                 }
+                else {
+                    const match = /^twitter:lock\/(.+)$/.exec(channel);
+                    if (!match)
+                        return;
+                    const processId = match[1];
+                    if (message === 'DONE')
+                        logger.info(`received notification that process ${processId} finished successfully`);
+                    if (message === 'BREAK')
+                        logger.warn(`received notification that process ${processId} was terminated prematurely`);
+                    this.subscriber.unsubscribe(channel, err => {
+                        if (err)
+                            return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
+                        logger.info(`successfully unsubscribed from process lock ${processId}`);
+                    });
+                }
             });
         });
         this.subscriptions = {};

+ 10 - 4
src/redis.ts

@@ -67,6 +67,16 @@ export default class {
               });
             });
           }
+        } else {
+          const match = /^twitter:lock\/(.+)$/.exec(channel);
+          if (!match) return;
+          const processId = match[1];
+          if (message === 'DONE') logger.info(`received notification that process ${processId} finished successfully`);
+          if (message === 'BREAK') logger.warn(`received notification that process ${processId} was terminated prematurely`);
+          this.subscriber.unsubscribe(channel, err => {
+            if (err) return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
+            logger.info(`successfully unsubscribed from process lock ${processId}`);
+          });
         }
       });
     });
@@ -126,10 +136,6 @@ export default class {
 
   public finishProcess = (processId: string) => {
     this.client.publish(`twitter:lock/${processId}`, 'DONE');
-    this.subscriber.unsubscribe(`twitter:lock/${processId}`, err => {
-      if (err) return logger.error(`failed to unsubscribe from own process lock, error: ${err}`);
-      logger.info(`successfully unsubscribed from process lock ${processId}`);
-    });
   }
 
   public waitForProcess = (processId: string, timeout: number) => {