Browse Source

Merge branch 'koishi-redis-waiting' into mediaonly-koishi-redis-waiting

Mike L 3 năm trước cách đây
mục cha
commit
b7f31f572b
4 tập tin đã thay đổi với 39 bổ sung20 xóa
  1. 15 5
      dist/redis.js
  2. 8 6
      dist/twitter.js
  3. 10 4
      src/redis.ts
  4. 6 5
      src/twitter.ts

+ 15 - 5
dist/redis.js

@@ -40,11 +40,6 @@ class default_1 {
         };
         this.finishProcess = (processId) => {
             this.client.publish(`twitter:lock/${processId}`, 'DONE');
-            this.subscriber.unsubscribe(`twitter:lock/${processId}`, err => {
-                if (err)
-                    return logger.error(`failed to unsubscribe from own process lock, error: ${err}`);
-                logger.info(`successfully unsubscribed from process lock ${processId}`);
-            });
         };
         this.waitForProcess = (processId, timeout) => {
             if (!(processId in this.subscriptions)) {
@@ -173,6 +168,21 @@ class default_1 {
                         });
                     }
                 }
+                else {
+                    const match = /^twitter:lock\/(.+)$/.exec(channel);
+                    if (!match)
+                        return;
+                    const processId = match[1];
+                    if (message === 'DONE')
+                        logger.info(`received notification that process ${processId} finished successfully`);
+                    if (message === 'BREAK')
+                        logger.warn(`received notification that process ${processId} was terminated prematurely`);
+                    this.subscriber.unsubscribe(channel, err => {
+                        if (err)
+                            return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
+                        logger.info(`successfully unsubscribed from process lock ${processId}`);
+                    });
+                }
             });
         });
         this.subscriptions = {};

+ 8 - 6
dist/twitter.js

@@ -279,7 +279,7 @@ class default_1 {
                         config.since_id = offset;
                     if (offset < -1)
                         config.max_id = offset.slice(1);
-                    const getMore = (gotTweets = []) => this.client.get(endpoint, config, (error, tweets) => {
+                    const getMore = (lastTweets = []) => this.client.get(endpoint, config, (error, tweets) => {
                         if (error) {
                             if (error instanceof Array && error.length > 0 && error[0].code === 34) {
                                 logger.warn(`error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
@@ -292,10 +292,12 @@ class default_1 {
                                 logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
                             }
                         }
-                        if (!tweets || tweets.length <= 1)
-                            return resolve(gotTweets);
-                        config.max_id = tweets.slice(-1)[0].id_str;
-                        getMore(gotTweets.concat(tweets));
+                        if (!(tweets instanceof Array) || tweets.length === 0)
+                            return resolve(lastTweets);
+                        if (offset <= 0)
+                            return resolve(lastTweets.concat(tweets));
+                        config.max_id = utils_1.BigNumOps.plus(tweets.slice(-1)[0].id_str, '-1');
+                        getMore(lastTweets.concat(tweets));
                     });
                     getMore();
                 }
@@ -305,7 +307,7 @@ class default_1 {
                 const currentThread = lock.threads[currentFeed];
                 const setOffset = (offset) => currentThread.offset = offset;
                 const updateDate = () => currentThread.updatedAt = new Date().toString();
-                if (!tweets || tweets.length === 0) {
+                if (tweets.length === 0) {
                     if (currentThread.offset < -1) {
                         setOffset(utils_1.BigNumOps.plus('1', currentThread.offset));
                     }

+ 10 - 4
src/redis.ts

@@ -67,6 +67,16 @@ export default class {
               });
             });
           }
+        } else {
+          const match = /^twitter:lock\/(.+)$/.exec(channel);
+          if (!match) return;
+          const processId = match[1];
+          if (message === 'DONE') logger.info(`received notification that process ${processId} finished successfully`);
+          if (message === 'BREAK') logger.warn(`received notification that process ${processId} was terminated prematurely`);
+          this.subscriber.unsubscribe(channel, err => {
+            if (err) return logger.error(`failed to unsubscribe from process lock ${processId}, error: ${err}`);
+            logger.info(`successfully unsubscribed from process lock ${processId}`);
+          });
         }
       });
     });
@@ -126,10 +136,6 @@ export default class {
 
   public finishProcess = (processId: string) => {
     this.client.publish(`twitter:lock/${processId}`, 'DONE');
-    this.subscriber.unsubscribe(`twitter:lock/${processId}`, err => {
-      if (err) return logger.error(`failed to unsubscribe from own process lock, error: ${err}`);
-      logger.info(`successfully unsubscribed from process lock ${processId}`);
-    });
   }
 
   public waitForProcess = (processId: string, timeout: number) => {

+ 6 - 5
src/twitter.ts

@@ -441,7 +441,7 @@ export default class {
         config.include_rts = false;
         if (offset as unknown as number > 0) config.since_id = offset;
         if (offset as unknown as number < -1) config.max_id = offset.slice(1);
-        const getMore = (gotTweets: Tweet[] = []) => this.client.get(
+        const getMore = (lastTweets: Tweet[] = []) => this.client.get(
           endpoint, config, (error: {[key: string]: any}[], tweets: Tweet[]
         ) => {
           if (error) {
@@ -455,9 +455,10 @@ export default class {
               logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
             }
           }
-          if (!tweets || tweets.length <= 1) return resolve(gotTweets);
-          config.max_id = tweets.slice(-1)[0].id_str;
-          getMore(gotTweets.concat(tweets));
+          if (!(tweets instanceof Array) || tweets.length === 0) return resolve(lastTweets);
+          if (offset as unknown as number <= 0) return resolve(lastTweets.concat(tweets));
+          config.max_id = BigNumOps.plus(tweets.slice(-1)[0].id_str, '-1');
+          getMore(lastTweets.concat(tweets));
         });
         getMore();
       }
@@ -469,7 +470,7 @@ export default class {
 
       const setOffset = (offset: string) => currentThread.offset = offset;
       const updateDate = () => currentThread.updatedAt = new Date().toString();
-      if (!tweets || tweets.length === 0) {
+      if (tweets.length === 0) {
         if (currentThread.offset as unknown as number < -1) {
           setOffset(BigNumOps.plus('1', currentThread.offset));
         }