浏览代码

make work intervals more predictable

Mike L 3 年之前
父节点
当前提交
72160f38e6
共有 4 个文件被更改,包括 112 次插入85 次删除
  1. 47 45
      dist/twitter.js
  2. 10 1
      dist/utils.js
  3. 44 39
      src/twitter.ts
  4. 11 0
      src/utils.ts

+ 47 - 45
dist/twitter.js

@@ -266,7 +266,7 @@ class default_1 {
                         }).then(itemIds => util_1.promisify(setTimeout)(getTimeout()).then(() => itemIds.map(id => this.lazyGetMediaById(id))));
                     }).finally(() => { page.close(); });
                 });
-                setTimeout(this.work, this.workInterval * 1000);
+                setTimeout(this.work, this.workInterval * 1000 / this.lock.feed.length);
             });
         };
         this.queryUser = (username) => this.client.user.searchExact(username)
@@ -304,53 +304,55 @@ class default_1 {
                 setTimeout(this.work, this.workInterval * 1000);
                 return;
             }
-            if (lock.workon >= lock.feed.length)
-                lock.workon = 0;
-            if (!lock.threads[lock.feed[lock.workon]] ||
-                !lock.threads[lock.feed[lock.workon]].subscribers ||
-                lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
-                logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
-                delete lock.threads[lock.feed[lock.workon]];
-                lock.feed.splice(lock.workon, 1);
-                fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
-                this.work();
-                return;
-            }
-            const currentFeed = lock.feed[lock.workon];
-            const promise = new Promise(resolve => {
-                const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
-                if (!match) {
-                    logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
-                    return resolve([]);
+            lock.feed.forEach((feed, index) => {
+                if (!lock.threads[feed] ||
+                    !lock.threads[feed].subscribers ||
+                    lock.threads[feed].subscribers.length === 0) {
+                    logger.warn(`nobody subscribes thread ${feed}, removing from feed`);
+                    delete lock.threads[index];
+                    lock.feed.splice(index, 1);
+                    fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
                 }
-                this.queryUserMedia(match[1], this.lock.threads[currentFeed].offset)
-                    .then(resolve)
-                    .catch((error) => {
-                    logger.error(`error scraping media off profile page of ${match[1]}, error: ${error}`);
-                    resolve([]);
-                });
             });
-            promise.then((mediaItems) => {
-                const currentThread = lock.threads[currentFeed];
-                const updateDate = () => currentThread.updatedAt = new Date().toString();
-                if (!mediaItems || mediaItems.length === 0) {
-                    updateDate();
-                    return;
-                }
-                const topOfFeed = mediaItems[0].pk;
-                const updateOffset = () => currentThread.offset = topOfFeed;
-                if (currentThread.offset === '-1') {
-                    updateOffset();
-                    return;
-                }
-                if (currentThread.offset === '0')
-                    mediaItems.splice(1);
-                return this.workOnMedia(mediaItems, this.sendMedia(`thread ${currentFeed}`, ...currentThread.subscribers))
-                    .then(updateDate).then(updateOffset);
-            })
+            utils_1.chainPromises(utils_1.Arr.chunk(lock.feed, 5).map((arr, i) => () => Promise.all(arr.map((currentFeed, j) => {
+                lock.workon = i * 5 + j;
+                fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
+                const promiseDelay = this.workInterval * (Math.random() + j) * 500 / lock.feed.length;
+                logger.debug(`timeout for this pull job: ${promiseDelay * 2}`);
+                const promise = util_1.promisify(setTimeout)(promiseDelay).then(() => {
+                    const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
+                    if (!match) {
+                        logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
+                        return [];
+                    }
+                    return this.queryUserMedia(match[1], this.lock.threads[currentFeed].offset)
+                        .catch((error) => {
+                        logger.error(`error scraping media off profile page of ${match[1]}, error: ${error}`);
+                        return [];
+                    });
+                });
+                promise.then((mediaItems) => {
+                    const currentThread = lock.threads[currentFeed];
+                    const updateDate = () => currentThread.updatedAt = new Date().toString();
+                    if (!mediaItems || mediaItems.length === 0) {
+                        updateDate();
+                        return;
+                    }
+                    const topOfFeed = mediaItems[0].pk;
+                    const updateOffset = () => currentThread.offset = topOfFeed;
+                    if (currentThread.offset === '-1') {
+                        updateOffset();
+                        return;
+                    }
+                    if (currentThread.offset === '0')
+                        mediaItems.splice(1);
+                    return this.workOnMedia(mediaItems, this.sendMedia(`thread ${currentFeed}`, ...currentThread.subscribers))
+                        .then(updateDate).then(updateOffset);
+                }).then(() => fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)));
+                return util_1.promisify(setTimeout)(promiseDelay * 3);
+            }))))
                 .then(() => {
-                lock.workon++;
-                let timeout = this.workInterval * 1000 / lock.feed.length;
+                let timeout = this.workInterval * 500;
                 if (timeout < 1000)
                     timeout = 1000;
                 fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));

+ 10 - 1
dist/utils.js

@@ -1,8 +1,16 @@
 "use strict";
 Object.defineProperty(exports, "__esModule", { value: true });
-exports.BigNumOps = exports.chainPromises = void 0;
+exports.Arr = exports.BigNumOps = exports.chainPromises = void 0;
 const chainPromises = (lazyPromises, reducer = (lp1, lp2) => (p) => lp1(p).then(lp2), initialValue) => lazyPromises.reduce(reducer, p => Promise.resolve(p))(initialValue);
 exports.chainPromises = chainPromises;
+const chunkArray = (arr, size) => {
+    const noOfChunks = Math.ceil(size && arr.length / size);
+    const res = Array(noOfChunks);
+    for (let [i, j] = [0, 0]; i < noOfChunks; i++) {
+        res[i] = arr.slice(j, j += size);
+    }
+    return res;
+};
 const splitBigNumAt = (num, at) => num.replace(RegExp(String.raw `^([+-]?)(\d+)(\d{${at}})$`), '$1$2,$1$3')
     .replace(/^([^,]*)$/, '0,$1').split(',')
     .map(Number);
@@ -49,3 +57,4 @@ exports.BigNumOps = {
     lShift: bigNumLShift,
     parse: parseBigNum,
 };
+exports.Arr = { chunk: chunkArray };

+ 44 - 39
src/twitter.ts

@@ -18,7 +18,7 @@ import { SocksProxyAgent } from 'socks-proxy-agent';
 
 import { getLogger } from './loggers';
 import QQBot from './koishi';
-import { BigNumOps } from './utils';
+import { Arr, BigNumOps, chainPromises } from './utils';
 import Webshot, { Cookies, Page } from './webshot';
 
 const parseLink = (link: string): { userName?: string, postUrlSegment?: string } => {
@@ -418,7 +418,7 @@ export default class {
               ));
           }).finally(() => { page.close(); });
         });
-        setTimeout(this.work, this.workInterval * 1000);
+        setTimeout(this.work, this.workInterval * 1000 / this.lock.feed.length);
       }
     );
   };
@@ -479,52 +479,57 @@ export default class {
       setTimeout(this.work, this.workInterval * 1000);
       return;
     }
-    if (lock.workon >= lock.feed.length) lock.workon = 0;
-    if (!lock.threads[lock.feed[lock.workon]] ||
-      !lock.threads[lock.feed[lock.workon]].subscribers ||
-      lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
-      logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
-      delete lock.threads[lock.feed[lock.workon]];
-      lock.feed.splice(lock.workon, 1);
+    lock.feed.forEach((feed, index) => {
+      if (!lock.threads[feed] ||
+        !lock.threads[feed].subscribers ||
+        lock.threads[feed].subscribers.length === 0) {
+        logger.warn(`nobody subscribes thread ${feed}, removing from feed`);
+        delete lock.threads[index];
+        lock.feed.splice(index, 1);
+        fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
+      }
+    });
+    
+    chainPromises(Arr.chunk(lock.feed, 5).map((arr, i) => () => Promise.all(arr.map((currentFeed, j) => {
+      lock.workon = i * 5 + j;
       fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
-      this.work();
-      return;
-    }
 
-    const currentFeed = lock.feed[lock.workon];
+      const promiseDelay = this.workInterval * (Math.random() + j) * 500 / lock.feed.length;
+      logger.debug(`timeout for this pull job: ${promiseDelay * 2}`);
 
-    const promise = new Promise<LazyMediaItem[]>(resolve => {
-      const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
-      if (!match) {
-        logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
-        return resolve([]);
-      }
-      this.queryUserMedia(match[1], this.lock.threads[currentFeed].offset)
-        .then(resolve)
-        .catch((error: Error) => {
-          logger.error(`error scraping media off profile page of ${match[1]}, error: ${error}`);
-          resolve([]);
-        });
-    });
+      const promise = promisify(setTimeout)(promiseDelay).then(() => {
+        const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
+        if (!match) {
+          logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
+          return [] as LazyMediaItem[];
+        }
+        return this.queryUserMedia(match[1], this.lock.threads[currentFeed].offset)
+          .catch((error: Error) => {
+            logger.error(`error scraping media off profile page of ${match[1]}, error: ${error}`);
+            return [] as LazyMediaItem[];
+          });
+      });
+
+      promise.then((mediaItems: LazyMediaItem[]) => {
+        const currentThread = lock.threads[currentFeed];
 
-    promise.then((mediaItems: LazyMediaItem[]) => {
-      const currentThread = lock.threads[currentFeed];
+        const updateDate = () => currentThread.updatedAt = new Date().toString();
+        if (!mediaItems || mediaItems.length === 0) { updateDate(); return; }
 
-      const updateDate = () => currentThread.updatedAt = new Date().toString();
-      if (!mediaItems || mediaItems.length === 0) { updateDate(); return; }
+        const topOfFeed = mediaItems[0].pk;
+        const updateOffset = () => currentThread.offset = topOfFeed;
 
-      const topOfFeed = mediaItems[0].pk;
-      const updateOffset = () => currentThread.offset = topOfFeed;
+        if (currentThread.offset === '-1') { updateOffset(); return; }
+        if (currentThread.offset === '0') mediaItems.splice(1);
 
-      if (currentThread.offset === '-1') { updateOffset(); return; }
-      if (currentThread.offset === '0') mediaItems.splice(1);
+        return this.workOnMedia(mediaItems, this.sendMedia(`thread ${currentFeed}`, ...currentThread.subscribers))
+          .then(updateDate).then(updateOffset);
+      }).then(() => fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)));
 
-      return this.workOnMedia(mediaItems, this.sendMedia(`thread ${currentFeed}`, ...currentThread.subscribers))
-        .then(updateDate).then(updateOffset);
-    })
+      return promisify(setTimeout)(promiseDelay * 3);
+    }))))
       .then(() => {
-        lock.workon++;
-        let timeout = this.workInterval * 1000 / lock.feed.length;
+        let timeout = this.workInterval * 500;
         if (timeout < 1000) timeout = 1000;
         fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
         setTimeout(() => {

+ 11 - 0
src/utils.ts

@@ -4,6 +4,15 @@ export const chainPromises = <T>(
   initialValue?: T
 ) => lazyPromises.reduce(reducer, p => Promise.resolve(p))(initialValue);
 
+const chunkArray = <T>(arr: T[], size: number) => {
+  const noOfChunks = Math.ceil(size && arr.length / size);
+  const res = Array<T[]>(noOfChunks);
+  for (let [i, j] = [0, 0]; i < noOfChunks; i++) {
+    res[i] = arr.slice(j, j += size);
+  }
+  return res;
+};
+
 const splitBigNumAt = (num: string, at: number) => num.replace(RegExp(String.raw`^([+-]?)(\d+)(\d{${at}})$`), '$1$2,$1$3')
   .replace(/^([^,]*)$/, '0,$1').split(',')
   .map(Number);
@@ -53,3 +62,5 @@ export const BigNumOps = {
   lShift: bigNumLShift,
   parse: parseBigNum,
 };
+
+export const Arr = {chunk: chunkArray};