Переглянути джерело

shuffle sparingly, fix working ever faster

Mike L 3 роки тому
батько
коміт
1f295d65dd
4 змінених файлів з 172 додано та 124 видалено
  1. 80 58
      dist/twitter.js
  2. 3 2
      dist/utils.js
  3. 85 62
      src/twitter.ts
  4. 4 2
      src/utils.ts

+ 80 - 58
dist/twitter.js

@@ -177,6 +177,7 @@ class default_1 {
                 setTimeout(this.workForAll, this.workInterval * 1000);
                 setTimeout(() => {
                     this.work();
+                    setInterval(() => { this.pullOrders = utils_1.Arr.shuffle(this.pullOrders); }, 21600000);
                     setInterval(this.workForAll, this.workInterval * 10000);
                 }, this.workInterval * 1200);
             });
@@ -188,7 +189,8 @@ class default_1 {
             }
             return this.client.user.searchExact(username)
                 .then(user => {
-                this.cache[user.username] = { user, stories: {} };
+                logger.info(`initialized cache item for user ${user.full_name} (@${username})`);
+                this.cache[user.username] = { user, stories: {}, pullOrder: 0 };
                 return `${user.username}:${user.pk}`;
             });
         };
@@ -211,31 +213,46 @@ class default_1 {
         this.workForAll = () => {
             if (this.isInactiveTime)
                 return;
-            const idToUserMap = {};
-            Promise.all(Object.entries(this.lock.threads).map(entry => {
-                const id = entry[1].id;
-                const userName = parseLink(entry[0]).userName;
+            logger.debug(`current cache: ${JSON.stringify(this.cache)}`);
+            utils_1.chainPromises(Object.entries(this.lock.threads).map(([feed, thread]) => {
+                const id = thread.id;
+                const userName = parseLink(feed).userName;
                 logger.debug(`preparing to add user @${userName} to next pull task...`);
-                if (userName in this.cache)
-                    return Promise.resolve(idToUserMap[id] = this.cache[userName].user);
-                return this.client.user.info(id).then(user => {
-                    logger.debug(`initialized cache item for user ${user.full_name} (@${userName})`);
-                    this.cache[userName] = { user, stories: {} };
-                    return idToUserMap[id] = user;
-                });
-            }))
-                .then(() => utils_1.chainPromises(utils_1.Arr.chunk(utils_1.Arr.shuffle(Object.keys(idToUserMap)), 20).map(userIds => () => {
-                logger.info(`pulling stories for users: ${userIds.map(id => idToUserMap[id].username)}`);
-                return this.client.feed.reelsMedia({ userIds }).items()
-                    .then(storyItems => storyItems.forEach(item => {
-                    if (!(item.pk in this.cache[idToUserMap[item.user.pk].username].stories)) {
-                        this.cache[idToUserMap[item.user.pk].username].stories[item.pk] = item;
+                return (map = {}) => {
+                    if (userName in this.cache) {
+                        const item = this.cache[userName];
+                        if (item.pullOrder === 0)
+                            item.pullOrder = -1;
+                        return Promise.resolve(Object.assign(map, { [id]: item.user }));
                     }
-                }));
-            }), (lp1, lp2) => () => lp1().then(() => util_1.promisify(setTimeout)(this.workInterval * 1000).then(lp2))))
+                    return util_1.promisify(setTimeout)((Math.random() * 2 + 1) * 5000).then(() => this.client.user.info(id).then(user => {
+                        logger.info(`initialized cache item for user ${user.full_name} (@${userName})`);
+                        this.cache[userName] = { user, stories: {}, pullOrder: -1 };
+                        return Object.assign(map, { [id]: user });
+                    }));
+                };
+            }))
+                .then(idToUserMap => {
+                const userIdCache = Object.values(this.cache).some(item => item.pullOrder < 0) ?
+                    this.pullOrders = utils_1.Arr.shuffle(Object.keys(idToUserMap)).map(Number) :
+                    this.pullOrders;
+                return utils_1.chainPromises(utils_1.Arr.chunk(userIdCache, 20).map(userIds => () => {
+                    logger.info(`pulling stories from users:${userIds.map(id => ` @${idToUserMap[id].username}`)}`);
+                    return this.client.feed.reelsMedia({ userIds }).items()
+                        .then(storyItems => storyItems.forEach(item => {
+                        if (!(item.pk in this.cache[idToUserMap[item.user.pk].username].stories)) {
+                            this.cache[idToUserMap[item.user.pk].username].stories[item.pk] = item;
+                        }
+                    }))
+                        .finally(() => Object.values(this.lock.threads).forEach(thread => {
+                        if (userIds.includes(thread.id))
+                            thread.updatedAt = new Date().toString();
+                    }));
+                }), (lp1, lp2) => () => lp1().then(() => util_1.promisify(setTimeout)(this.workInterval * 1000).then(lp2)));
+            })
                 .catch((error) => {
                 if (error instanceof instagram_private_api_1.IgNetworkError) {
-                    logger.warn(`error on fetching stories for all: ${JSON.stringify(error.cause)}`);
+                    logger.warn(`error while fetching stories for all: ${JSON.stringify(error.cause)}`);
                 }
                 else if (error instanceof instagram_private_api_1.IgLoginRequiredError) {
                     logger.warn('login required, logging in again...');
@@ -248,7 +265,6 @@ class default_1 {
         };
         this.work = () => {
             const lock = this.lock;
-            logger.debug(`current cache: ${JSON.stringify(this.cache)}`);
             if (this.workInterval < 1)
                 this.workInterval = 1;
             if (this.isInactiveTime || lock.feed.length === 0) {
@@ -257,42 +273,40 @@ class default_1 {
             }
             if (lock.workon >= lock.feed.length)
                 lock.workon = 0;
-            if (!lock.threads[lock.feed[lock.workon]] ||
-                !lock.threads[lock.feed[lock.workon]].subscribers ||
-                lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
-                logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
-                delete lock.threads[lock.feed[lock.workon]];
+            const currentFeed = lock.feed[lock.workon];
+            if (!lock.threads[currentFeed] ||
+                !lock.threads[currentFeed].subscribers ||
+                lock.threads[currentFeed].subscribers.length === 0) {
+                logger.warn(`nobody subscribes thread ${currentFeed}, removing from feed`);
+                delete lock.threads[currentFeed];
+                this.cache[parseLink(currentFeed).userName].pullOrder = 0;
                 lock.feed.splice(lock.workon, 1);
                 fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
                 this.work();
                 return;
             }
-            const currentFeed = lock.feed[lock.workon];
             logger.debug(`searching for new items from ${currentFeed} in cache`);
-            const promise = new Promise(resolve => {
-                const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
-                if (!match) {
-                    logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
-                    return resolve([]);
-                }
-                const cachedFeed = this.cache[match[1]];
-                if (!cachedFeed) {
-                    setTimeout(this.work, this.workInterval * 1000);
-                    return resolve([]);
-                }
-                const newer = (item) => utils_1.BigNumOps.compare(item.pk, lock.threads[currentFeed].offset) > 0;
-                resolve(Object.values(cachedFeed.stories)
-                    .filter(newer)
-                    .map(story => (Object.assign(Object.assign({}, story), { user: cachedFeed.user })))
-                    .sort((i1, i2) => utils_1.BigNumOps.compare(i2.pk, i1.pk)));
-            });
+            const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
+            if (!match) {
+                logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
+                lock.workon++;
+                setTimeout(this.work, this.workInterval * 1000);
+                return;
+            }
+            const cachedFeed = this.cache[match[1]];
+            if (!cachedFeed) {
+                setTimeout(this.work, this.workInterval * 1000);
+                return;
+            }
+            const newer = (item) => utils_1.BigNumOps.compare(item.pk, lock.threads[currentFeed].offset) > 0;
+            const promise = Promise.resolve(Object.values(cachedFeed.stories)
+                .filter(newer)
+                .map(story => (Object.assign(Object.assign({}, story), { user: cachedFeed.user })))
+                .sort((i1, i2) => utils_1.BigNumOps.compare(i2.pk, i1.pk)));
             promise.then((mediaItems) => {
                 const currentThread = lock.threads[currentFeed];
-                const updateDate = () => currentThread.updatedAt = new Date().toString();
-                if (!mediaItems || mediaItems.length === 0) {
-                    updateDate();
+                if (!mediaItems || mediaItems.length === 0)
                     return;
-                }
                 const topOfFeed = mediaItems[0].pk;
                 const updateOffset = () => currentThread.offset = topOfFeed;
                 if (currentThread.offset === '-1') {
@@ -302,7 +316,7 @@ class default_1 {
                 if (currentThread.offset === '0')
                     mediaItems.splice(1);
                 return this.workOnMedia(mediaItems, this.sendStories(`thread ${currentFeed}`, ...currentThread.subscribers))
-                    .then(updateDate).then(updateOffset);
+                    .then(updateOffset);
             })
                 .then(() => {
                 lock.workon++;
@@ -310,9 +324,7 @@ class default_1 {
                 if (timeout < 1000)
                     timeout = 1000;
                 fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
-                setTimeout(() => {
-                    this.work();
-                }, timeout);
+                setTimeout(this.work, timeout);
             });
         };
         this.client = new instagram_private_api_1.IgApiClient();
@@ -370,20 +382,30 @@ class default_1 {
                 .then(storyItems => this.workOnMedia(storyItems, sender))
                 .catch((error) => {
                 if (error instanceof instagram_private_api_1.IgNetworkError) {
-                    logger.warn(`error on fetching stories for ${rawUserName}: ${JSON.stringify(error.cause)}`);
-                    this.bot.sendTo(receiver, `获取 Fleets 时出现错误:原因: ${error.cause}`);
+                    logger.warn(`error while fetching stories for ${rawUserName}: ${JSON.stringify(error.cause)}`);
+                    this.bot.sendTo(receiver, `获取 Stories 时出现错误:原因: ${error.cause}`);
                 }
                 else if (error instanceof instagram_private_api_1.IgLoginRequiredError) {
                     logger.warn('login required, logging in again...');
                     this.session.login().then(() => exports.sendAllStories(rawUserName, receiver));
                 }
                 else {
-                    logger.error(`unhandled error on fetching media for ${rawUserName}: ${error}`);
-                    this.bot.sendTo(receiver, `获取 Fleets 时发生未知错误: ${error}`);
+                    logger.error(`unhandled error while fetching stories for ${rawUserName}: ${error}`);
+                    this.bot.sendTo(receiver, `获取 Stories 时发生未知错误: ${error}`);
                 }
             });
         };
     }
+    get pullOrders() {
+        const arr = [];
+        Object.values(this.cache).forEach(item => { if (item.pullOrder > 0)
+            arr[item.pullOrder - 1] = item.user.pk; });
+        return arr;
+    }
+    ;
+    set pullOrders(arr) {
+        Object.values(this.cache).forEach(item => { item.pullOrder = arr.indexOf(item.user.pk) + 1; });
+    }
     get isInactiveTime() {
         const timeToEpoch = (h = 0, m = 0) => new Date().setHours(h, m, 0, 0);
         return this.inactiveHours

+ 3 - 2
dist/utils.js

@@ -1,12 +1,13 @@
 "use strict";
 Object.defineProperty(exports, "__esModule", { value: true });
 exports.Arr = exports.BigNumOps = exports.chainPromises = void 0;
+const crypto = require("crypto");
 const chainPromises = (lazyPromises, reducer = (lp1, lp2) => (p) => lp1(p).then(lp2), initialValue) => lazyPromises.reduce(reducer, p => Promise.resolve(p))(initialValue);
 exports.chainPromises = chainPromises;
 const shuffleArray = (arr) => {
     const res = arr.slice(0);
-    for (let i = arr.length - 1; i > 0; i--) {
-        const j = Math.floor(Math.random() * (i + 1));
+    for (let i = res.length - 1; i > 0; i--) {
+        const j = crypto.randomInt(0, i + 1);
         [res[i], res[j]] = [res[j], res[i]];
     }
     return res;

+ 85 - 62
src/twitter.ts

@@ -269,14 +269,14 @@ export default class {
         .then(storyItems => this.workOnMedia(storyItems, sender))
         .catch((error: IgClientError & Partial<RequestError>) => {
           if (error instanceof IgNetworkError) {
-            logger.warn(`error on fetching stories for ${rawUserName}: ${JSON.stringify(error.cause)}`);
-            this.bot.sendTo(receiver, `获取 Fleets 时出现错误:原因: ${error.cause}`);
+            logger.warn(`error while fetching stories for ${rawUserName}: ${JSON.stringify(error.cause)}`);
+            this.bot.sendTo(receiver, `获取 Stories 时出现错误:原因: ${error.cause}`);
           } else if (error instanceof IgLoginRequiredError) {
             logger.warn('login required, logging in again...');
             this.session.login().then(() => sendAllStories(rawUserName, receiver));
           } else {
-            logger.error(`unhandled error on fetching media for ${rawUserName}: ${error}`);
-            this.bot.sendTo(receiver, `获取 Fleets 时发生未知错误: ${error}`); 
+            logger.error(`unhandled error while fetching stories for ${rawUserName}: ${error}`);
+            this.bot.sendTo(receiver, `获取 Stories 时发生未知错误: ${error}`); 
           }
         });
     };
@@ -290,6 +290,7 @@ export default class {
         setTimeout(this.workForAll, this.workInterval * 1000);
         setTimeout(() => {
           this.work();
+          setInterval(() => { this.pullOrders = Arr.shuffle(this.pullOrders); }, 21600000);
           setInterval(this.workForAll, this.workInterval * 10000);
         }, this.workInterval * 1200);
       }
@@ -303,7 +304,8 @@ export default class {
     }
     return this.client.user.searchExact(username)
       .then(user => {
-        this.cache[user.username] = {user, stories: {}};
+        logger.info(`initialized cache item for user ${user.full_name} (@${username})`);
+        this.cache[user.username] = {user, stories: {}, pullOrder: 0};
         return `${user.username}:${user.pk}`;
       });
   };
@@ -333,38 +335,65 @@ export default class {
     [userName: string]: {
       user: UserFeedResponseUser & ReelsMediaFeedResponseItem['user'],
       stories: {[storyId: string]: MediaItem},
+      pullOrder: number, // one-based; -1: subscribed, awaiting shuffle; 0: not subscribed
     },
   } = {};
 
+  private get pullOrders() {
+    const arr: number[] = [];
+    Object.values(this.cache).forEach(item => { if (item.pullOrder > 0) arr[item.pullOrder - 1] = item.user.pk; });
+    return arr;
+  };
+
+  private set pullOrders(arr: number[]) {
+    Object.values(this.cache).forEach(item => { item.pullOrder = arr.indexOf(item.user.pk) + 1; });
+  }
+
   private workForAll = () => {
     if (this.isInactiveTime) return;
-    const idToUserMap: {[id: number]: UserFeedResponseUser} = {};
-    Promise.all(Object.entries(this.lock.threads).map(entry => {
-      const id = entry[1].id;
-      const userName = parseLink(entry[0]).userName;
+    logger.debug(`current cache: ${JSON.stringify(this.cache)}`);
+    chainPromises(Object.entries(this.lock.threads).map(([feed, thread]) => {
+      const id = thread.id;
+      const userName = parseLink(feed).userName;
       logger.debug(`preparing to add user @${userName} to next pull task...`);
-      if (userName in this.cache) return Promise.resolve(idToUserMap[id] = this.cache[userName].user);
-      return this.client.user.info(id).then(user => {
-        logger.debug(`initialized cache item for user ${user.full_name} (@${userName})`);
-        this.cache[userName] = {user, stories: {}};
-        return idToUserMap[id] = user as UserFeedResponseUser;
-      });
+      return (map: {[key: number]: UserFeedResponseUser} = {}) => {
+        if (userName in this.cache) {
+          const item = this.cache[userName];
+          if (item.pullOrder === 0) item.pullOrder = -1;
+          return Promise.resolve(Object.assign(map, {[id]: item.user}));
+        }
+        return promisify(setTimeout)((Math.random() * 2 + 1) * 5000).then(() =>
+          this.client.user.info(id).then(user => {
+            logger.info(`initialized cache item for user ${user.full_name} (@${userName})`);
+            this.cache[userName] = {user, stories: {}, pullOrder: -1};
+            return Object.assign(map, {[id]: user});
+          })
+        );
+      };
     }))
-      .then(() => chainPromises(
-        Arr.chunk(Arr.shuffle(Object.keys(idToUserMap)), 20).map(userIds => () => {
-          logger.info(`pulling stories for users: ${userIds.map(id => idToUserMap[id as unknown as number].username)}`);
-          return this.client.feed.reelsMedia({userIds}).items()
-            .then(storyItems => storyItems.forEach(item => {
-              if (!(item.pk in this.cache[idToUserMap[item.user.pk].username].stories)) {
-                this.cache[idToUserMap[item.user.pk].username].stories[item.pk] = item;
-              }
-            }));
-        }),
-        (lp1, lp2) => () => lp1().then(() => promisify(setTimeout)(this.workInterval * 1000).then(lp2))
-      ))
+      .then(idToUserMap => {
+        const userIdCache = Object.values(this.cache).some(item => item.pullOrder < 0) ?
+          this.pullOrders = Arr.shuffle(Object.keys(idToUserMap)).map(Number) :
+          this.pullOrders;
+        return chainPromises(
+          Arr.chunk(userIdCache, 20).map(userIds => () => {
+            logger.info(`pulling stories from users:${userIds.map(id => ` @${idToUserMap[id].username}`)}`);
+            return this.client.feed.reelsMedia({userIds}).items()
+              .then(storyItems => storyItems.forEach(item => {
+                if (!(item.pk in this.cache[idToUserMap[item.user.pk].username].stories)) {
+                  this.cache[idToUserMap[item.user.pk].username].stories[item.pk] = item;
+                }
+              }))
+              .finally(() => Object.values(this.lock.threads).forEach(thread => {
+                if (userIds.includes(thread.id)) thread.updatedAt = new Date().toString();
+              }));
+          }),
+          (lp1, lp2) => () => lp1().then(() => promisify(setTimeout)(this.workInterval * 1000).then(lp2))
+        );
+      })
       .catch((error: IgClientError & Partial<RequestError>) => {
         if (error instanceof IgNetworkError) {
-          logger.warn(`error on fetching stories for all: ${JSON.stringify(error.cause)}`);
+          logger.warn(`error while fetching stories for all: ${JSON.stringify(error.cause)}`);
         } else if (error instanceof IgLoginRequiredError) {
           logger.warn('login required, logging in again...');
           this.session.login().then(this.workForAll);
@@ -385,51 +414,47 @@ export default class {
 
   public work = () => {
     const lock = this.lock;
-    logger.debug(`current cache: ${JSON.stringify(this.cache)}`);
     if (this.workInterval < 1) this.workInterval = 1;
     if (this.isInactiveTime || lock.feed.length === 0) {
-      setTimeout(this.work, this.workInterval * 1000);
-      return;
+      setTimeout(this.work, this.workInterval * 1000); return;
     }
     if (lock.workon >= lock.feed.length) lock.workon = 0;
-    if (!lock.threads[lock.feed[lock.workon]] ||
-      !lock.threads[lock.feed[lock.workon]].subscribers ||
-      lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
-      logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
-      delete lock.threads[lock.feed[lock.workon]];
+
+    const currentFeed = lock.feed[lock.workon];    
+    if (!lock.threads[currentFeed] ||
+      !lock.threads[currentFeed].subscribers ||
+      lock.threads[currentFeed].subscribers.length === 0) {
+      logger.warn(`nobody subscribes thread ${currentFeed}, removing from feed`);
+      delete lock.threads[currentFeed];
+      this.cache[parseLink(currentFeed).userName].pullOrder = 0;
       lock.feed.splice(lock.workon, 1);
       fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
       this.work();
       return;
     }
 
-    const currentFeed = lock.feed[lock.workon];
     logger.debug(`searching for new items from ${currentFeed} in cache`);
-
-    const promise = new Promise<MediaItem[]>(resolve => {
-      const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
-      if (!match) {
-        logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
-        return resolve([]);
-      }
-      const cachedFeed = this.cache[match[1]];
-      if (!cachedFeed) {
-        setTimeout(this.work, this.workInterval * 1000);
-        return resolve([]);
-      }
-      const newer = (item: MediaItem) => BigNumOps.compare(item.pk, lock.threads[currentFeed].offset) > 0;
-      resolve(Object.values(cachedFeed.stories)
-        .filter(newer)
-        .map(story => ({...story, user: cachedFeed.user}))
-        .sort((i1, i2) => BigNumOps.compare(i2.pk, i1.pk))
-      );
-    });
+  
+    const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
+    if (!match) {
+      logger.error(`current feed "${currentFeed}" is invalid, please remove this feed manually`);
+      lock.workon++; setTimeout(this.work, this.workInterval * 1000); return;
+    }
+    const cachedFeed = this.cache[match[1]];
+    if (!cachedFeed) {
+      setTimeout(this.work, this.workInterval * 1000); return;
+    }
+    const newer = (item: MediaItem) => BigNumOps.compare(item.pk, lock.threads[currentFeed].offset) > 0;
+    const promise = Promise.resolve(Object.values(cachedFeed.stories)
+      .filter(newer)
+      .map(story => ({...story, user: cachedFeed.user}))
+      .sort((i1, i2) => BigNumOps.compare(i2.pk, i1.pk))
+    );
 
     promise.then((mediaItems: MediaItem[]) => {
       const currentThread = lock.threads[currentFeed];
 
-      const updateDate = () => currentThread.updatedAt = new Date().toString();
-      if (!mediaItems || mediaItems.length === 0) { updateDate(); return; }
+      if (!mediaItems || mediaItems.length === 0) return;
 
       const topOfFeed = mediaItems[0].pk;
       const updateOffset = () => currentThread.offset = topOfFeed;
@@ -438,16 +463,14 @@ export default class {
       if (currentThread.offset === '0') mediaItems.splice(1);
 
       return this.workOnMedia(mediaItems, this.sendStories(`thread ${currentFeed}`, ...currentThread.subscribers))
-        .then(updateDate).then(updateOffset);
+        .then(updateOffset);
     })
       .then(() => {
         lock.workon++;
         let timeout = this.workInterval * 1000 / lock.feed.length;
         if (timeout < 1000) timeout = 1000;
         fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
-        setTimeout(() => {
-          this.work();
-        }, timeout);
+        setTimeout(this.work, timeout);
       });
   };
 }

+ 4 - 2
src/utils.ts

@@ -1,3 +1,5 @@
+import * as crypto from 'crypto';
+
 export const chainPromises = <T>(
   lazyPromises: ((p: T) => Promise<T>)[],
   reducer = (lp1: (p: T) => Promise<T>, lp2: (p: T) => Promise<T>) => (p: T) => lp1(p).then(lp2),
@@ -6,8 +8,8 @@ export const chainPromises = <T>(
 
 const shuffleArray = <T>(arr: T[]) => {
   const res = arr.slice(0);
-  for (let i = arr.length - 1; i > 0; i--) {
-    const j = Math.floor(Math.random() * (i + 1));
+  for (let i = res.length - 1; i > 0; i--) {
+    const j = crypto.randomInt(0, i + 1);
     [res[i], res[j]] = [res[j], res[i]];
   }
   return res;