Browse Source

save cache to disk, refresh user info on pull

Mike L 3 years ago
parent
commit
3973449301
5 changed files with 136 additions and 103 deletions
  1. 1 0
      config.example.json
  2. 31 30
      dist/main.js
  3. 29 19
      dist/twitter.js
  4. 21 20
      src/main.ts
  5. 54 34
      src/twitter.ts

+ 1 - 0
config.example.json

@@ -15,6 +15,7 @@
   "work_interval": 60,
   "webshot_delay": 20000,
   "webshot_cookies_lockfile": "",
+  "cachefile": "stories.cache",
   "lockfile": "subscriber.lock",
   "loglevel": "info"
 }

+ 31 - 30
dist/main.js

@@ -54,7 +54,7 @@ const warningFields = [
     'cq_ws_host', 'cq_ws_port', 'cq_access_token',
 ];
 const optionalFields = [
-    'lockfile', 'inactive_hours', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start', 'ig_socks_proxy',
+    'lockfile', 'cachefile', 'inactive_hours', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start', 'ig_socks_proxy',
 ].concat(warningFields);
 if (requiredFields.some((value) => config[value] === undefined)) {
     console.log(`${requiredFields.join(', ')} are required`);
@@ -79,40 +79,39 @@ if (!config[k] || config[k] < 2048 || config[k] > 65536) {
     config[k] = exampleConfig[k];
 }
 loggers_1.setLogLevels(config.loglevel);
-let lock;
-if (fs.existsSync(path.resolve(config.lockfile))) {
-    try {
-        lock = JSON.parse(fs.readFileSync(path.resolve(config.lockfile), 'utf8'));
-    }
-    catch (err) {
-        logger.error(`Failed to parse lockfile ${config.lockfile}: `, err);
-        lock = {
-            workon: 0,
-            feed: [],
-            threads: {},
-        };
-    }
-    fs.access(path.resolve(config.lockfile), fs.constants.W_OK, err => {
-        if (err) {
-            logger.fatal(`cannot write lockfile ${path.resolve(config.lockfile)}, permission denied`);
-            process.exit(1);
-        }
-    });
-}
-else {
-    lock = {
+const deserialized = {
+    lockfile: {
         workon: 0,
         feed: [],
         threads: {},
-    };
-    try {
-        fs.writeFileSync(path.resolve(config.lockfile), JSON.stringify(lock));
+    },
+    cachefile: {},
+};
+for (const file in deserialized)
+    if (fs.existsSync(path.resolve(config[file]))) {
+        try {
+            deserialized[file] = JSON.parse(fs.readFileSync(path.resolve(config[file]), 'utf8'));
+        }
+        catch (err) {
+            logger.error(`Failed to parse ${file} ${config[file]}: `, err);
+        }
+        fs.access(path.resolve(config[file]), fs.constants.W_OK, err => {
+            if (err) {
+                logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
+                process.exit(1);
+            }
+        });
     }
-    catch (err) {
-        logger.fatal(`cannot write lockfile ${path.resolve(config.lockfile)}, permission denied`);
-        process.exit(1);
+    else {
+        try {
+            fs.writeFileSync(path.resolve(config[file]), JSON.stringify(deserialized[file]));
+        }
+        catch (err) {
+            logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
+            process.exit(1);
+        }
     }
-}
+const { lockfile: lock, cachefile: cache } = deserialized;
 if (!config.resume_on_start) {
     Object.keys(lock.threads).forEach(key => {
         lock.threads[key].offset = '-1';
@@ -135,6 +134,8 @@ const worker = new twitter_1.default({
     proxyUrl: config.ig_socks_proxy,
     lock,
     lockfile: config.lockfile,
+    cache,
+    cachefile: config.cachefile,
     inactiveHours: config.inactive_hours,
     workInterval: config.work_interval,
     bot: qq,

+ 29 - 19
dist/twitter.js

@@ -206,10 +206,11 @@ class default_1 {
                 else
                     throw error;
             })
-                .then(user => {
-                logger.info(`initialized cache item for user ${user.full_name} (@${username})`);
-                this.cache[user.pk] = { user, stories: {}, pullOrder: 0 };
-                return `${user.username}:${user.pk}`;
+                .then(({ pk, username, full_name }) => {
+                this.cache[pk] = { user: { pk, username, full_name }, stories: {}, pullOrder: 0 };
+                fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
+                logger.info(`initialized cache item for user ${full_name} (@${username})`);
+                return `${username}:${pk}`;
             });
         };
         this.workOnMedia = (mediaItems, sendMedia) => Promise.resolve(mediaItems.forEach(({ msgs, text, author }) => sendMedia(msgs, text, author)));
@@ -227,7 +228,6 @@ class default_1 {
                 });
             });
         };
-        this.cache = {};
         this.workForAll = () => {
             if (this.isInactiveTime)
                 return;
@@ -242,9 +242,10 @@ class default_1 {
                         item.pullOrder = -1;
                     return Promise.resolve();
                 }
-                return util_1.promisify(setTimeout)((Math.random() * 2 + 1) * 5000).then(() => this.client.user.info(id).then(user => {
-                    logger.info(`initialized cache item for user ${user.full_name} (@${user.username})`);
-                    this.cache[id] = { user, stories: {}, pullOrder: -1 };
+                return util_1.promisify(setTimeout)((Math.random() * 2 + 1) * 5000).then(() => this.client.user.info(id).then(({ pk, username, full_name }) => {
+                    this.cache[id] = { user: { pk, username, full_name }, stories: {}, pullOrder: -1 };
+                    fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
+                    logger.info(`initialized cache item for user ${full_name} (@${username})`);
                 }));
             }))
                 .then(() => {
@@ -253,15 +254,22 @@ class default_1 {
                     this.pullOrders;
                 return utils_1.chainPromises(utils_1.Arr.chunk(userIdCache, 20).map(userIds => () => {
                     logger.info(`pulling stories from users:${userIds.map(id => ` @${this.cache[id].user.username}`)}`);
-                    return this.client.feed.reelsMedia({ userIds }).items()
-                        .then(storyItems => Promise.all(storyItems
-                        .filter(item => !(item.pk in this.cache[item.user.pk].stories))
-                        .map(item => this.webshot([Object.assign(Object.assign({}, item), { user: this.cache[item.user.pk].user })], (msgs, text, author) => this.cache[item.user.pk].stories[item.pk] = { pk: item.pk, msgs, text, author, original: item }, this.webshotDelay))))
-                        .finally(() => Object.values(this.lock.threads).forEach(thread => {
-                        if (userIds.includes(thread.id)) {
-                            thread.updatedAt = (this.cache[thread.id].updated = new Date()).toString();
-                        }
-                    }));
+                    return this.client.feed.reelsMedia({ userIds }).request()
+                        .then(({ reels }) => utils_1.chainPromises(Object.keys(reels).map(userId => () => this.client.user.info(userId).then(({ pk, username, full_name }) => {
+                        const cacheItem = this.cache[pk];
+                        cacheItem.user = { pk, username, full_name };
+                        return Promise.all(reels[userId].items
+                            .filter(item => !(item.pk in cacheItem.stories))
+                            .map(item => this.webshot([Object.assign(Object.assign({}, item), { user: cacheItem.user })], (msgs, text, author) => cacheItem.stories[item.pk] = { pk: item.pk, msgs, text, author, original: item }, this.webshotDelay)));
+                    }))))
+                        .finally(() => {
+                        fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
+                        Object.values(this.lock.threads).forEach(thread => {
+                            if (userIds.includes(thread.id)) {
+                                thread.updatedAt = this.cache[thread.id].updated = Date();
+                            }
+                        });
+                    });
                 }), (lp1, lp2) => () => lp1().then(() => util_1.promisify(setTimeout)(this.workInterval * 1000 / this.lock.feed.length).then(lp2)));
             })
                 .catch((error) => {
@@ -381,6 +389,8 @@ class default_1 {
         this.session = new SessionManager(this.client, opt.sessionLockfile, opt.credentials, opt.codeServicePort);
         this.lockfile = opt.lockfile;
         this.lock = opt.lock;
+        this.cachefile = opt.cachefile;
+        this.cache = opt.cache;
         this.inactiveHours = opt.inactiveHours;
         this.workInterval = opt.workInterval;
         this.bot = opt.bot;
@@ -393,14 +403,14 @@ class default_1 {
                 .then(userNameId => {
                 var _a, _b;
                 const userId = userNameId.split(':')[1];
-                if (Date.now() - ((_b = (_a = this.cache[userId]) === null || _a === void 0 ? void 0 : _a.updated) === null || _b === void 0 ? void 0 : _b.getTime()) > this.workInterval * 1000 &&
+                if (Date.now() - new Date((_b = (_a = this.cache[userId]) === null || _a === void 0 ? void 0 : _a.updated) !== null && _b !== void 0 ? _b : Date()).getTime() > this.workInterval * 1000 &&
                     Object.keys(this.cache[userId].stories).length > 0) {
                     return userId;
                 }
                 return this.client.feed.reelsMedia({ userIds: [userId] }).items()
                     .then(storyItems => Promise.all(storyItems
                     .filter(item => !(item.pk in this.cache[userId].stories))
-                    .map(item => this.webshot([Object.assign(Object.assign({}, item), { user: this.cache[userId].user })], (msgs, text, author) => this.cache[userId].stories[item.pk] = { pk: item.pk, msgs, text, author, original: item }, this.webshotDelay))).then(() => userId).finally(() => this.cache[userId].updated = new Date()));
+                    .map(item => this.webshot([Object.assign(Object.assign({}, item), { user: this.cache[userId].user })], (msgs, text, author) => this.cache[userId].stories[item.pk] = { pk: item.pk, msgs, text, author, original: item }, this.webshotDelay))).then(() => userId).finally(() => this.cache[userId].updated = Date()));
             })
                 .then(action)
                 .catch((error) => {

+ 21 - 20
src/main.ts

@@ -9,7 +9,7 @@ import * as exampleConfig from '../config.example.json';
 import { list, sub, unsub, unsubAll } from './command';
 import { getLogger, setLogLevels } from './loggers';
 import QQBot from './koishi';
-import Worker from './twitter';
+import Worker, { ICache } from './twitter';
 
 const logger = getLogger();
 
@@ -65,7 +65,7 @@ const warningFields = [
 ];
 
 const optionalFields = [
-  'lockfile', 'inactive_hours', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start', 'ig_socks_proxy',
+  'lockfile', 'cachefile', 'inactive_hours', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start', 'ig_socks_proxy',
 ].concat(warningFields);
 
 if (requiredFields.some((value) => config[value] === undefined)) {
@@ -95,38 +95,37 @@ if (!config[k] || config[k] < 2048 || config[k] > 65536) {
 
 setLogLevels(config.loglevel);
 
-let lock: ILock;
-if (fs.existsSync(path.resolve(config.lockfile))) {
+const deserialized = {
+  lockfile: {
+    workon: 0,
+    feed: [],
+    threads: {},
+  } as ILock,
+  cachefile: {} as ICache,
+};
+for (const file in deserialized) if (fs.existsSync(path.resolve(config[file]))) {
   try {
-    lock = JSON.parse(fs.readFileSync(path.resolve(config.lockfile), 'utf8')) as ILock;
+    deserialized[file] = JSON.parse(fs.readFileSync(path.resolve(config[file]), 'utf8'));
   } catch (err) {
-    logger.error(`Failed to parse lockfile ${config.lockfile}: `, err);
-    lock = {
-      workon: 0,
-      feed: [],
-      threads: {},
-    };
+    logger.error(`Failed to parse ${file} ${config[file]}: `, err);
   }
-  fs.access(path.resolve(config.lockfile), fs.constants.W_OK, err => {
+  fs.access(path.resolve(config[file]), fs.constants.W_OK, err => {
     if (err) {
-      logger.fatal(`cannot write lockfile ${path.resolve(config.lockfile)}, permission denied`);
+      logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
       process.exit(1);
     }
   });
 } else {
-  lock = {
-    workon: 0,
-    feed: [],
-    threads: {},
-  };
   try {
-    fs.writeFileSync(path.resolve(config.lockfile), JSON.stringify(lock));
+    fs.writeFileSync(path.resolve(config[file]), JSON.stringify(deserialized[file]));
   } catch (err) {
-    logger.fatal(`cannot write lockfile ${path.resolve(config.lockfile)}, permission denied`);
+    logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
     process.exit(1);
   }
 }
 
+const {lockfile: lock, cachefile: cache} = deserialized;
+
 if (!config.resume_on_start) {
   Object.keys(lock.threads).forEach(key => {
     lock.threads[key].offset = '-1';
@@ -151,6 +150,8 @@ const worker = new Worker({
   proxyUrl: config.ig_socks_proxy,
   lock,
   lockfile: config.lockfile,
+  cache,
+  cachefile: config.cachefile,
   inactiveHours: config.inactive_hours,
   workInterval: config.work_interval,
   bot: qq,

+ 54 - 34
src/twitter.ts

@@ -8,7 +8,7 @@ import { promisify } from 'util';
 import {
   IgApiClient,
   IgClientError, IgCookieNotFoundError, IgExactUserNotFoundError, IgLoginTwoFactorRequiredError, IgLoginRequiredError, IgNetworkError,
-  ReelsMediaFeedResponseItem, UserFeedResponseUser
+  ReelsMediaFeedResponseItem
 } from 'instagram-private-api';
 import { RequestError } from 'request-promise/errors';
 import { SocksProxyAgent } from 'socks-proxy-agent';
@@ -46,6 +46,8 @@ interface IWorkerOption {
   proxyUrl: string;
   lock: ILock;
   lockfile: string;
+  cache: ICache;
+  cachefile: string;
   webshotCookiesLockfile: string;
   bot: QQBot;
   inactiveHours: string[];
@@ -210,11 +212,23 @@ const retryOnError = <T, U>(
   doWork().then(resolve).catch(error => retry(error, 1));
 });
 
+export interface ICache {
+  [userId: string]: {
+    user: ReelsMediaFeedResponseItem['user'];
+    stories: {
+      [storyId: string]: CachedMediaItem;
+    };
+    pullOrder: number; // one-based; -1: subscribed, awaiting shuffle; 0: not subscribed
+    updated?: string; // Date.toString()
+  };
+}
 export default class {
 
   private client: IgApiClient;
   private lock: ILock;
   private lockfile: string;
+  private cache: ICache;
+  private cachefile: string;
   private inactiveHours: string[];
   private workInterval: number;
   private bot: QQBot;
@@ -245,6 +259,8 @@ export default class {
     this.session = new SessionManager(this.client, opt.sessionLockfile, opt.credentials, opt.codeServicePort);
     this.lockfile = opt.lockfile;
     this.lock = opt.lock;
+    this.cachefile = opt.cachefile;
+    this.cache = opt.cache;
     this.inactiveHours = opt.inactiveHours;
     this.workInterval = opt.workInterval;
     this.bot = opt.bot;
@@ -262,7 +278,7 @@ export default class {
       return this.queryUser(rawUserName)
         .then(userNameId => {
           const userId = userNameId.split(':')[1];
-          if (Date.now() - this.cache[userId]?.updated?.getTime() > this.workInterval * 1000 &&
+          if (Date.now() - new Date(this.cache[userId]?.updated ?? Date()).getTime() > this.workInterval * 1000 &&
             Object.keys(this.cache[userId].stories).length > 0) {
             return userId;
           }
@@ -270,12 +286,12 @@ export default class {
             .then(storyItems => Promise.all(storyItems
               .filter(item => !(item.pk in this.cache[userId].stories))
               .map(item => this.webshot(
-                [{...item, user: this.cache[userId].user}],
+                [{...item, user: this.cache[userId].user}], // guaranteed fresh cache entry
                 (msgs: string, text: string, author: string) =>
                   this.cache[userId].stories[item.pk] = {pk: item.pk, msgs, text, author, original: item},
                 this.webshotDelay
               ))
-            ).then(() => userId).finally(() => this.cache[userId].updated = new Date()));
+            ).then(() => userId).finally(() => this.cache[userId].updated = Date()));
         })
         .then(action)
         .catch((error: IgClientError & Partial<RequestError>) => {
@@ -383,10 +399,11 @@ export default class {
           return this.session.login().then(() => this.client.user.searchExact(username));
         } else throw error;
       })
-      .then(user => {
-        logger.info(`initialized cache item for user ${user.full_name} (@${username})`);
-        this.cache[user.pk] = {user, stories: {}, pullOrder: 0};
-        return `${user.username}:${user.pk}`;
+      .then(({pk, username, full_name}) => {
+        this.cache[pk] = {user: {pk, username, full_name}, stories: {}, pullOrder: 0};
+        fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
+        logger.info(`initialized cache item for user ${full_name} (@${username})`);
+        return `${username}:${pk}`;
       });
   };
 
@@ -411,15 +428,6 @@ export default class {
     });
   };
 
-  private cache: {
-    [userId: string]: {
-      user: UserFeedResponseUser & ReelsMediaFeedResponseItem['user'],
-      stories: {[storyId: string]: CachedMediaItem},
-      pullOrder: number, // one-based; -1: subscribed, awaiting shuffle; 0: not subscribed
-      updated?: Date,
-    },
-  } = {};
-
   private get pullOrders() {
     const arr: number[] = [];
     Object.values(this.cache).forEach(item => { if (item.pullOrder > 0) arr[item.pullOrder - 1] = item.user.pk; });
@@ -443,9 +451,10 @@ export default class {
         return Promise.resolve();
       }
       return promisify(setTimeout)((Math.random() * 2 + 1) * 5000).then(() =>
-        this.client.user.info(id).then(user => {
-          logger.info(`initialized cache item for user ${user.full_name} (@${user.username})`);
-          this.cache[id] = {user, stories: {}, pullOrder: -1};
+        this.client.user.info(id).then(({pk, username, full_name}) => {
+          this.cache[id] = {user: {pk, username, full_name}, stories: {}, pullOrder: -1};
+          fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
+          logger.info(`initialized cache item for user ${full_name} (@${username})`);
         })
       );
     }))
@@ -456,21 +465,32 @@ export default class {
         return chainPromises(
           Arr.chunk(userIdCache, 20).map(userIds => () => {
             logger.info(`pulling stories from users:${userIds.map(id => ` @${this.cache[id].user.username}`)}`);
-            return this.client.feed.reelsMedia({userIds}).items()
-              .then(storyItems => Promise.all(storyItems
-                .filter(item => !(item.pk in this.cache[item.user.pk].stories))
-                .map(item => this.webshot(
-                  [{...item, user: this.cache[item.user.pk].user}],
-                  (msgs: string, text: string, author: string) =>
-                    this.cache[item.user.pk].stories[item.pk] = {pk: item.pk, msgs, text, author, original: item},
-                  this.webshotDelay
-                ))
+            return this.client.feed.reelsMedia({userIds}).request()
+              .then(({reels}) => chainPromises(
+                Object.keys(reels).map(userId => () =>
+                  this.client.user.info(userId).then(({pk, username, full_name}) => {
+                    const cacheItem = this.cache[pk];
+                    cacheItem.user = {pk, username, full_name};
+                    return Promise.all(reels[userId].items
+                      .filter(item => !(item.pk in cacheItem.stories))
+                      .map(item => this.webshot(
+                        [{...item, user: cacheItem.user}],
+                        (msgs: string, text: string, author: string) =>
+                          cacheItem.stories[item.pk] = {pk: item.pk, msgs, text, author, original: item},
+                        this.webshotDelay
+                      ))
+                    );
+                  })
+                )
               ))
-              .finally(() => Object.values(this.lock.threads).forEach(thread => {
-                if (userIds.includes(thread.id)) {
-                  thread.updatedAt = (this.cache[thread.id].updated = new Date()).toString();
-                }
-              })) as unknown as Promise<void>;
+              .finally(() => {
+                fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
+                Object.values(this.lock.threads).forEach(thread => {
+                  if (userIds.includes(thread.id)) {
+                    thread.updatedAt = this.cache[thread.id].updated = Date();
+                  }
+                });
+              }) as unknown as Promise<void>;
           }),
           (lp1, lp2) => () => lp1().then(() => promisify(setTimeout)(this.workInterval * 1000 / this.lock.feed.length).then(lp2))
         );