Browse Source

stream I/O for cache, misc. improvements

Mike L 1 year ago
parent
commit
5e1da69479
9 changed files with 291 additions and 193 deletions
  1. 1 1
      dist/command.js
  2. 34 0
      dist/json.js
  3. 75 63
      dist/main.js
  4. 36 32
      dist/twitter.js
  5. 3 5
      package.json
  6. 1 1
      src/command.ts
  7. 36 0
      src/json.ts
  8. 71 64
      src/main.ts
  9. 34 27
      src/twitter.ts

+ 1 - 1
dist/command.js

@@ -169,7 +169,7 @@ function view(chat, args, reply) {
         if (!optMatch)
             return reply(`未定义的查看参数:${arg}。`);
         const optKey = optMatch[1];
-        if (optMatch.length === 1 || !/^\d*$/.test(optMatch[2]))
+        if (optMatch.length === 1 || !/^-?\d*$/.test(optMatch[2]))
             return reply(`${confZH[optKey]}参数应为数值。`);
         if (optMatch[2] === '')
             return reply(`${confZH[optKey]}参数值不可为空。`);

+ 34 - 0
dist/json.js

@@ -0,0 +1,34 @@
+"use strict";
+Object.defineProperty(exports, "__esModule", { value: true });
+exports.readFile = exports.writeFile = void 0;
+const fs = require("fs");
+const json = require("@discoveryjs/json-ext");
+const loggers_1 = require("./loggers");
+const logger = (0, loggers_1.getLogger)('json');
+const writeFile = (path, obj) => new Promise((resolve, reject) => {
+    const renameSync = (oldPath, newPath) => {
+        try {
+            fs.renameSync(oldPath, newPath);
+            return newPath;
+        }
+        catch (err) {
+            reject(err);
+        }
+    };
+    let backupPath;
+    if (fs.statSync(path).size > 0) {
+        backupPath = renameSync(path, `${path}.bak`);
+    }
+    json.stringifyStream(obj)
+        .on('error', err => { logger.error(err); if (backupPath)
+        renameSync(backupPath, path); resolve(false); })
+        .pipe(fs.createWriteStream(path))
+        .on('error', err => { reject(err); })
+        .on('finish', () => { fs.unlinkSync(`${path}.bak`); resolve(true); });
+});
+exports.writeFile = writeFile;
+const readFile = (path) => new Promise((resolve, reject) => {
+    json.parseChunked(fs.createReadStream(path)
+        .on('error', err => { reject(err); })).then(resolve);
+});
+exports.readFile = readFile;

+ 75 - 63
dist/main.js

@@ -1,25 +1,35 @@
 #!/usr/bin/env node
 "use strict";
+var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
+    function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
+    return new (P || (P = Promise))(function (resolve, reject) {
+        function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
+        function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
+        function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
+        step((generator = generator.apply(thisArg, _arguments || [])).next());
+    });
+};
 Object.defineProperty(exports, "__esModule", { value: true });
 const fs = require("fs");
 const path = require("path");
 const commandLineUsage = require("command-line-usage");
 const exampleConfig = require("../config.example.json");
 const command_1 = require("./command");
+const json = require("./json");
 const loggers_1 = require("./loggers");
 const koishi_1 = require("./koishi");
 const twitter_1 = require("./twitter");
 const logger = (0, loggers_1.getLogger)();
 const sections = [
     {
-        header: 'GoCQHTTP Instagram Bot',
-        content: 'The QQ Bot that forwards Instagram.',
+        header: 'GoCQHTTP Instagram Stories Bot',
+        content: 'The QQ Bot that forwards Instagram stories.',
     },
     {
         header: 'Synopsis',
         content: [
-            '$ cq-instagram-bot {underline config.json}',
-            '$ cq-instagram-bot {bold --help}',
+            '$ cq-igstory-bot {underline config.json}',
+            '$ cq-igstory-bot {bold --help}',
         ],
     },
     {
@@ -79,70 +89,72 @@ if (!config[k] || config[k] < 2048 || config[k] > 65536) {
     config[k] = exampleConfig[k];
 }
 (0, loggers_1.setLogLevels)(config.loglevel);
-const deserialized = {
-    lockfile: {
-        workon: 0,
-        feed: [],
-        threads: {},
-    },
-    cachefile: {},
-};
-for (const file in deserialized)
-    if (fs.existsSync(path.resolve(config[file]))) {
-        try {
-            deserialized[file] = JSON.parse(fs.readFileSync(path.resolve(config[file]), 'utf8'));
-        }
-        catch (err) {
-            logger.error(`Failed to parse ${file} ${config[file]}: `, err);
-        }
-        fs.access(path.resolve(config[file]), fs.constants.W_OK, err => {
-            if (err) {
-                logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
-                process.exit(1);
+(() => __awaiter(void 0, void 0, void 0, function* () {
+    const deserialized = {
+        lockfile: {
+            workon: 0,
+            feed: [],
+            threads: {},
+        },
+        cachefile: {},
+    };
+    const fileEntries = Object.keys(deserialized).map(file => [file, path.resolve(config[file])]);
+    for (const [file, filePath] of fileEntries) {
+        if (fs.existsSync(filePath))
+            try {
+                deserialized[file] = yield json.readFile(filePath);
+                fs.access(filePath, fs.constants.W_OK, err => {
+                    if (err) {
+                        logger.fatal(`cannot write ${file} ${filePath}, permission denied`);
+                        process.exit(1);
+                    }
+                });
+                continue;
+            }
+            catch (err) {
+                logger.error(`Failed to parse ${file} ${config[file]}: `, err);
             }
-        });
-    }
-    else {
         try {
-            fs.writeFileSync(path.resolve(config[file]), JSON.stringify(deserialized[file]));
+            yield json.writeFile(filePath, deserialized[file]);
         }
         catch (err) {
-            logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
+            logger.fatal(`cannot write ${file} ${filePath}, permission denied`);
             process.exit(1);
         }
     }
-const { lockfile: lock, cachefile: cache } = deserialized;
-if (!config.resume_on_start) {
-    Object.keys(lock.threads).forEach(key => {
-        lock.threads[key].offset = '-1';
+    const { lockfile: lock, cachefile: cache } = deserialized;
+    if (!config.resume_on_start) {
+        Object.keys(lock.threads).forEach(key => {
+            lock.threads[key].offset = '-1';
+        });
+    }
+    const qq = new koishi_1.default({
+        access_token: config.cq_access_token,
+        host: config.cq_ws_host,
+        port: config.cq_ws_port,
+        bot_id: config.cq_bot_qq,
+        list: (c, a, cb) => (0, command_1.list)(c, a, cb, lock),
+        sub: (c, a, cb) => (0, command_1.sub)(c, a, cb, lock, config.lockfile),
+        unsub: (c, a, cb) => (0, command_1.unsub)(c, a, cb, lock, config.lockfile),
+        unsubAll: (c, a, cb) => (0, command_1.unsubAll)(c, a, cb, lock, config.lockfile),
     });
-}
-const qq = new koishi_1.default({
-    access_token: config.cq_access_token,
-    host: config.cq_ws_host,
-    port: config.cq_ws_port,
-    bot_id: config.cq_bot_qq,
-    list: (c, a, cb) => (0, command_1.list)(c, a, cb, lock),
-    sub: (c, a, cb) => (0, command_1.sub)(c, a, cb, lock, config.lockfile),
-    unsub: (c, a, cb) => (0, command_1.unsub)(c, a, cb, lock, config.lockfile),
-    unsubAll: (c, a, cb) => (0, command_1.unsubAll)(c, a, cb, lock, config.lockfile),
-});
-const worker = new twitter_1.default({
-    sessionLockfile: config.ig_session_lockfile,
-    credentials: [config.ig_username, config.ig_password],
-    codeServicePort: config.ig_2fa_code_receiver_port,
-    proxyUrl: config.ig_socks_proxy,
-    lock,
-    lockfile: config.lockfile,
-    cache,
-    cachefile: config.cachefile,
-    inactiveHours: config.inactive_hours,
-    workInterval: config.work_interval,
-    bot: qq,
-    webshotDelay: config.webshot_delay,
-    webshotCookiesLockfile: config.webshot_cookies_lockfile,
-    mode: config.mode,
-    wsUrl: config.playwright_ws_spec_endpoint,
-});
-worker.session.init().then(worker.launch);
-qq.connect();
+    const worker = new twitter_1.default({
+        sessionLockfile: config.ig_session_lockfile,
+        credentials: [config.ig_username, config.ig_password],
+        codeServicePort: config.ig_2fa_code_receiver_port,
+        proxyUrl: config.ig_socks_proxy,
+        lock,
+        lockfile: config.lockfile,
+        cache,
+        cachefile: config.cachefile,
+        inactiveHours: config.inactive_hours,
+        workInterval: config.work_interval,
+        bot: qq,
+        webshotDelay: config.webshot_delay,
+        webshotCookiesLockfile: config.webshot_cookies_lockfile,
+        mode: config.mode,
+        wsUrl: config.playwright_ws_spec_endpoint,
+    });
+    worker.session.init().then(worker.launch);
+    qq.connect();
+}))();

+ 36 - 32
dist/twitter.js

@@ -20,6 +20,7 @@ const instagram_private_api_1 = require("instagram-private-api");
 const socks_proxy_agent_1 = require("socks-proxy-agent");
 const datetime_1 = require("./datetime");
 const loggers_1 = require("./loggers");
+const json = require("./json");
 const utils_1 = require("./utils");
 const webshot_1 = require("./webshot");
 const parseLink = (link) => {
@@ -42,7 +43,7 @@ const linkBuilder = (config) => {
     return `https://www.instagram.com/stories/${config.userName}/${config.storyId}/`;
 };
 exports.linkBuilder = linkBuilder;
-const igErrorIsAuthError = (error) => / 401/.test(error.message) || error instanceof instagram_private_api_1.IgLoginRequiredError || error instanceof instagram_private_api_1.IgCookieNotFoundError;
+const igErrorIsAuthError = (error) => / 40[1-3]/.test(error.message) || error instanceof instagram_private_api_1.IgLoginRequiredError || error instanceof instagram_private_api_1.IgCookieNotFoundError;
 class SessionManager {
     constructor(client, file, credentials, codeServicePort) {
         this.init = () => {
@@ -92,6 +93,7 @@ class SessionManager {
             server.listen(this.codeServicePort);
         });
         this.login = () => this.ig.simulate.preLoginFlow()
+            .catch((err) => logger.error(err))
             .then(() => this.ig.account.login(this.username, this.password))
             .catch((err) => {
             if (err instanceof instagram_private_api_1.IgLoginTwoFactorRequiredError) {
@@ -105,7 +107,7 @@ class SessionManager {
                     verificationMethod: totp_two_factor_on ? '0' : '1',
                 }));
             }
-            throw err;
+            logger.error(err);
         })
             .then(user => {
             logger.info(`successfully logged in as ${this.username}`);
@@ -192,14 +194,18 @@ class default_1 {
                     }
                 }
                 const userIdCache = this.pullOrders;
-                if (Object.values(userIdCache).length !== userIdCache.length) {
-                    this.pullOrders = utils_1.Arr.shuffle(userIdCache);
-                    fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-                }
-                const timeout = Math.max(1000, this.workInterval * 1000 / this.lock.feed.length);
-                setInterval(() => { this.pullOrders = utils_1.Arr.shuffle(this.pullOrders); }, 21600000);
-                setTimeout(this.workForAll, timeout);
-                this.work();
+                return (() => {
+                    if (Object.values(userIdCache).length !== userIdCache.length) {
+                        this.pullOrders = utils_1.Arr.shuffle(userIdCache);
+                        return json.writeFile(path.resolve(this.cachefile), this.cache);
+                    }
+                    return Promise.resolve(true);
+                })().then(() => {
+                    const timeout = Math.max(1000, this.workInterval * 1000 / this.lock.feed.length);
+                    setInterval(() => { this.pullOrders = utils_1.Arr.shuffle(this.pullOrders); }, 21600000);
+                    setTimeout(this.workForAll, timeout);
+                    this.work();
+                });
             });
         };
         this.queryUserObject = (userName) => this.client.user.searchExact(userName)
@@ -220,9 +226,11 @@ class default_1 {
             return this.queryUserObject(username)
                 .then(({ pk, username, full_name }) => {
                 this.cache[pk] = { user: { pk, username, full_name }, stories: {}, pullOrder: 0 };
-                fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-                logger.info(`initialized cache item for user ${full_name} (@${username})`);
-                return `${username}:${pk}`;
+                return json.writeFile(path.resolve(this.cachefile), this.cache).then(res => {
+                    if (res)
+                        logger.info(`initialized cache item for user ${full_name} (@${username})`);
+                    return `${username}:${pk}`;
+                });
             });
         };
         this.workOnMedia = (mediaItems, sendMedia) => (0, utils_1.chainPromises)(mediaItems.map(({ msgs, text, author, original }) => {
@@ -255,7 +263,6 @@ class default_1 {
                 setTimeout(this.workForAll, timeout);
                 return;
             }
-            logger.debug(`current cache: ${JSON.stringify(this.cache)}`);
             (0, utils_1.chainPromises)(Object.entries(this.lock.threads).map(([feed, thread]) => () => {
                 const id = thread.id;
                 const userName = parseLink(feed).userName;
@@ -268,15 +275,14 @@ class default_1 {
                 }
                 return (0, util_1.promisify)(setTimeout)((Math.random() * 2 + 1) * 5000).then(() => this.client.user.info(id).then(({ pk, username, full_name }) => {
                     this.cache[id] = { user: { pk, username, full_name }, stories: {}, pullOrder: -1 };
-                    fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-                    logger.info(`initialized cache item for user ${full_name} (@${username})`);
+                    return json.writeFile(path.resolve(this.cachefile), this.cache).then(() => logger.info(`initialized cache item for user ${full_name} (@${username})`));
                 }));
             }))
                 .then(() => {
                 const userIdCache = Object.values(this.cache).some(item => item.pullOrder < 0) ?
                     this.pullOrders = utils_1.Arr.shuffle(Object.keys(this.cache)).map(Number) :
                     this.pullOrders;
-                return (0, utils_1.chainPromises)(utils_1.Arr.chunk(userIdCache, 20).map(userIds => () => {
+                return (0, utils_1.chainPromises)(utils_1.Arr.chunk(userIdCache.filter(v => Date.now() - new Date(this.cache[v].updated).getTime() > 3600000), 30).map(userIds => () => {
                     logger.info(`pulling stories from users:${userIds.map(id => ` @${this.cache[id].user.username}`)}`);
                     return this.client.feed.reelsMedia({ userIds }).request()
                         .then(({ reels }) => (0, utils_1.chainPromises)(Object.keys(reels).map(userId => this.cache[userId]).map(({ user, stories }) => () => this.queryUserObject(user.username)
@@ -291,14 +297,11 @@ class default_1 {
                     }).finally(() => Promise.all(reels[user.pk].items
                         .filter(item => !(item.pk in stories))
                         .map(item => this.webshot([Object.assign(Object.assign({}, item), { user })], (msgs, text, author) => stories[item.pk] = { pk: item.pk, msgs, text, author, original: item }, this.webshotDelay)))))))
-                        .finally(() => {
-                        fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-                        Object.values(this.lock.threads).forEach(thread => {
-                            if (userIds.includes(thread.id)) {
-                                thread.updatedAt = this.cache[thread.id].updated = Date();
-                            }
-                        });
-                    });
+                        .finally(() => json.writeFile(path.resolve(this.cachefile), this.cache).then(() => Object.values(this.lock.threads).forEach(thread => {
+                        if (userIds.includes(thread.id)) {
+                            thread.updatedAt = this.cache[thread.id].updated = Date();
+                        }
+                    })));
                 }), (lp1, lp2) => () => lp1().then(() => (0, util_1.promisify)(setTimeout)(timeout).then(lp2)));
             })
                 .catch((error) => {
@@ -336,10 +339,11 @@ class default_1 {
                 logger.warn(`nobody subscribes thread ${currentFeed}, removing from feed`);
                 delete lock.threads[currentFeed];
                 (this.cache[parseLink(currentFeed).userName] || {}).pullOrder = 0;
-                fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-                lock.feed.splice(lock.workon, 1);
-                fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
-                return this.work();
+                return json.writeFile(path.resolve(this.cachefile), this.cache).then(() => {
+                    lock.feed.splice(lock.workon, 1);
+                    fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
+                    return this.work();
+                });
             }
             logger.debug(`searching for new items from ${currentFeed} in cache`);
             const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
@@ -492,8 +496,6 @@ class default_1 {
         };
         exports.sendAllStories = (rawUserName, receiver, startIndex = 0, count = 10) => {
             const reply = msg => this.bot.sendTo(receiver, msg);
-            if (startIndex < 0)
-                return reply('跳过数量参数值应为非负整数。');
             if (count < 1)
                 return reply('最大查看数量参数值应为正整数。');
             const sender = this.sendStories(`instagram stories for ${rawUserName}`, receiver);
@@ -505,7 +507,9 @@ class default_1 {
                         .sort((i1, i2) => -utils_1.BigNumOps.compare(i2.pk, i1.pk));
                     if (storyItems.length === 0)
                         return reply(`当前用户 (@${userName}) 没有可用的 Instagram 限时动态。`);
-                    if (startIndex + 1 > storyItems.length)
+                    if (startIndex < 0)
+                        startIndex += storyItems.length;
+                    if (startIndex < 0 || startIndex + 1 > storyItems.length)
                         return reply('跳过数量到达或超过当前用户可用的限时动态数量。');
                     const endIndex = Math.min(storyItems.length, startIndex + count);
                     const sendRangeText = `${startIndex + 1}${endIndex - startIndex > 1 ? `-${endIndex}` : ''}`;

+ 3 - 5
package.json

@@ -1,18 +1,15 @@
 {
-  "name": "@CL-Jeremy/mirai-twitter-bot",
+  "name": "gocqhttp-twitter-bot",
   "version": "0.4.0",
   "description": "Mirai Twitter Bot",
   "main": "./dist/main.js",
   "bin": {
-    "mirai-twitter-bot": "./dist/main.js"
+    "cq-igstory-bot": "./dist/main.js"
   },
   "repository": {
     "type": "git",
     "url": "git+https://github.com/CL-Jeremy/mirai-twitter-bot.git"
   },
-  "publishConfig": {
-    "registry": "https://npm.pkg.github.com/"
-  },
   "keywords": [
     "qq",
     "qqbot",
@@ -30,6 +27,7 @@
     "lint": "npx eslint --fix --ext .ts ./"
   },
   "dependencies": {
+    "@discoveryjs/json-ext": "^0.5.7",
     "axios": "^0.21.4",
     "callable-instance": "^2.0.0",
     "command-line-usage": "^5.0.5",

+ 1 - 1
src/command.ts

@@ -179,7 +179,7 @@ function view(chat: IChat, args: string[], reply: (msg: string) => any): void {
     const optMatch = /^(count|skip)=(.*)/.exec(arg);
     if (!optMatch) return reply(`未定义的查看参数:${arg}。`);
     const optKey = optMatch[1] as keyof typeof confZH;
-    if (optMatch.length === 1 || !/^\d*$/.test(optMatch[2])) return reply(`${confZH[optKey]}参数应为数值。`);
+    if (optMatch.length === 1 || !/^-?\d*$/.test(optMatch[2])) return reply(`${confZH[optKey]}参数应为数值。`);
     if (optMatch[2] === '') return reply(`${confZH[optKey]}参数值不可为空。`);
     conf[optKey] = Number(optMatch[2]);
   }

+ 36 - 0
src/json.ts

@@ -0,0 +1,36 @@
+import * as fs from 'fs';
+
+import * as json from '@discoveryjs/json-ext';
+
+import { getLogger } from './loggers';
+
+const logger = getLogger('json');
+
+const writeFile = (path: fs.PathLike, obj: object) => new Promise<boolean>((resolve, reject) => {
+  const renameSync = (oldPath: fs.PathLike, newPath: fs.PathLike) => {
+    try {
+      fs.renameSync(oldPath, newPath);
+      return newPath;
+    } catch (err) {
+      reject(err);
+    }
+  }
+  let backupPath: fs.PathLike;
+  if (fs.statSync(path).size > 0) {
+    backupPath = renameSync(path, `${path}.bak`);
+  }
+  json.stringifyStream(obj)
+    .on('error', err => { logger.error(err); if (backupPath) renameSync(backupPath, path); resolve(false); })
+    .pipe(fs.createWriteStream(path))
+    .on('error', err => { reject(err); })
+    .on('finish', () => { fs.unlinkSync(`${path}.bak`); resolve(true); });
+});
+
+const readFile = (path: fs.PathLike) => new Promise<any>((resolve, reject) => {
+  json.parseChunked(
+    fs.createReadStream(path)
+      .on('error', err => { reject(err); })
+  ).then(resolve);
+});
+
+export {writeFile, readFile};

+ 71 - 64
src/main.ts

@@ -7,6 +7,7 @@ import * as commandLineUsage from 'command-line-usage';
 
 import * as exampleConfig from '../config.example.json';
 import { list, sub, unsub, unsubAll } from './command';
+import * as json from './json';
 import { getLogger, setLogLevels } from './loggers';
 import QQBot from './koishi';
 import Worker, { ICache } from './twitter';
@@ -15,14 +16,14 @@ const logger = getLogger();
 
 const sections: commandLineUsage.Section[] = [
   {
-    header: 'GoCQHTTP Instagram Bot',
-    content: 'The QQ Bot that forwards Instagram.',
+    header: 'GoCQHTTP Instagram Stories Bot',
+    content: 'The QQ Bot that forwards Instagram stories.',
   },
   {
     header: 'Synopsis',
     content: [
-      '$ cq-instagram-bot {underline config.json}',
-      '$ cq-instagram-bot {bold --help}',
+      '$ cq-igstory-bot {underline config.json}',
+      '$ cq-igstory-bot {bold --help}',
     ],
   },
   {
@@ -95,71 +96,77 @@ if (!config[k] || config[k] < 2048 || config[k] > 65536) {
 
 setLogLevels(config.loglevel);
 
-const deserialized = {
-  lockfile: {
-    workon: 0,
-    feed: [],
-    threads: {},
-  } as ILock,
-  cachefile: {} as ICache,
-};
-for (const file in deserialized) if (fs.existsSync(path.resolve(config[file]))) {
-  try {
-    deserialized[file] = JSON.parse(fs.readFileSync(path.resolve(config[file]), 'utf8'));
-  } catch (err) {
-    logger.error(`Failed to parse ${file} ${config[file]}: `, err);
-  }
-  fs.access(path.resolve(config[file]), fs.constants.W_OK, err => {
-    if (err) {
-      logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
+(async () => {
+  const deserialized = {
+    lockfile: {
+      workon: 0,
+      feed: [],
+      threads: {},
+    } as ILock,
+    cachefile: {} as ICache,
+  };
+
+  const fileEntries = Object.keys(deserialized).map(file => [file, path.resolve(config[file])]);
+
+  for (const [file, filePath] of fileEntries) {
+    if (fs.existsSync(filePath)) try {
+      deserialized[file] = await json.readFile(filePath);
+      fs.access(filePath, fs.constants.W_OK, err => {
+        if (err) {
+          logger.fatal(`cannot write ${file} ${filePath}, permission denied`);
+          process.exit(1);
+        }
+      });
+      continue;
+    } catch (err) {
+      logger.error(`Failed to parse ${file} ${config[file]}: `, err);
+    }
+  
+    try {
+      await json.writeFile(filePath, deserialized[file]);
+    } catch (err) {
+      logger.fatal(`cannot write ${file} ${filePath}, permission denied`);
       process.exit(1);
     }
-  });
-} else {
-  try {
-    fs.writeFileSync(path.resolve(config[file]), JSON.stringify(deserialized[file]));
-  } catch (err) {
-    logger.fatal(`cannot write ${file} ${path.resolve(config[file])}, permission denied`);
-    process.exit(1);
   }
-}
 
-const {lockfile: lock, cachefile: cache} = deserialized;
+  const {lockfile: lock, cachefile: cache} = deserialized;
 
-if (!config.resume_on_start) {
-  Object.keys(lock.threads).forEach(key => {
-    lock.threads[key].offset = '-1';
-  });
-}
+  if (!config.resume_on_start) {
+    Object.keys(lock.threads).forEach(key => {
+      lock.threads[key].offset = '-1';
+    });
+  }
 
-const qq = new QQBot({
-  access_token: config.cq_access_token,
-  host: config.cq_ws_host,
-  port: config.cq_ws_port,
-  bot_id: config.cq_bot_qq,
-  list: (c, a, cb) => list(c, a, cb, lock),
-  sub: (c, a, cb) => sub(c, a, cb, lock, config.lockfile),
-  unsub: (c, a, cb) => unsub(c, a, cb, lock, config.lockfile),
-  unsubAll: (c, a, cb) => unsubAll(c, a, cb, lock, config.lockfile),
-});
+  const qq = new QQBot({
+    access_token: config.cq_access_token,
+    host: config.cq_ws_host,
+    port: config.cq_ws_port,
+    bot_id: config.cq_bot_qq,
+    list: (c, a, cb) => list(c, a, cb, lock),
+    sub: (c, a, cb) => sub(c, a, cb, lock, config.lockfile),
+    unsub: (c, a, cb) => unsub(c, a, cb, lock, config.lockfile),
+    unsubAll: (c, a, cb) => unsubAll(c, a, cb, lock, config.lockfile),
+  });
 
-const worker = new Worker({
-  sessionLockfile: config.ig_session_lockfile,
-  credentials: [config.ig_username, config.ig_password],
-  codeServicePort: config.ig_2fa_code_receiver_port,
-  proxyUrl: config.ig_socks_proxy,
-  lock,
-  lockfile: config.lockfile,
-  cache,
-  cachefile: config.cachefile,
-  inactiveHours: config.inactive_hours,
-  workInterval: config.work_interval,
-  bot: qq,
-  webshotDelay: config.webshot_delay,
-  webshotCookiesLockfile: config.webshot_cookies_lockfile,
-  mode: config.mode,
-  wsUrl: config.playwright_ws_spec_endpoint,
-});
-worker.session.init().then(worker.launch);
+  const worker = new Worker({
+    sessionLockfile: config.ig_session_lockfile,
+    credentials: [config.ig_username, config.ig_password],
+    codeServicePort: config.ig_2fa_code_receiver_port,
+    proxyUrl: config.ig_socks_proxy,
+    lock,
+    lockfile: config.lockfile,
+    cache,
+    cachefile: config.cachefile,
+    inactiveHours: config.inactive_hours,
+    workInterval: config.work_interval,
+    bot: qq,
+    webshotDelay: config.webshot_delay,
+    webshotCookiesLockfile: config.webshot_cookies_lockfile,
+    mode: config.mode,
+    wsUrl: config.playwright_ws_spec_endpoint,
+  });
+  worker.session.init().then(worker.launch);
 
-qq.connect();
+  qq.connect();
+})();

+ 34 - 27
src/twitter.ts

@@ -15,6 +15,7 @@ import { SocksProxyAgent } from 'socks-proxy-agent';
 
 import { relativeDate } from './datetime';
 import { getLogger } from './loggers';
+import * as json from './json';
 import QQBot from './koishi';
 import { Arr, BigNumOps, chainPromises } from './utils';
 
@@ -40,7 +41,7 @@ const linkBuilder = (config: ReturnType<typeof parseLink>): string => {
 export {linkBuilder, parseLink};
 
 const igErrorIsAuthError = (error: IgClientError) =>
-  / 401/.test(error.message) || error instanceof IgLoginRequiredError || error instanceof IgCookieNotFoundError;
+  / 40[1-3]/.test(error.message) || error instanceof IgLoginRequiredError || error instanceof IgCookieNotFoundError;
 
 interface IWorkerOption {
   sessionLockfile: string;
@@ -121,6 +122,7 @@ export class SessionManager {
 
   public login = () =>
     this.ig.simulate.preLoginFlow()
+      .catch((err) => logger.error(err))
       .then(() => this.ig.account.login(this.username, this.password))
       .catch((err: IgClientError) => {
         if (err instanceof IgLoginTwoFactorRequiredError) {
@@ -134,7 +136,7 @@ export class SessionManager {
             verificationMethod: totp_two_factor_on ? '0' : '1',
           }));
         }
-        throw err;
+        logger.error(err);
       })
       .then(user => {
         logger.info(`successfully logged in as ${this.username}`);
@@ -352,7 +354,6 @@ export default class {
     }
     sendAllStories = (rawUserName, receiver, startIndex = 0, count = 10) => {
       const reply = msg => this.bot.sendTo(receiver, msg);
-      if (startIndex < 0) return reply('跳过数量参数值应为非负整数。');
       if (count < 1) return reply('最大查看数量参数值应为正整数。');
       const sender = this.sendStories(`instagram stories for ${rawUserName}`, receiver);
       workNow({
@@ -362,7 +363,8 @@ export default class {
           const storyItems = Object.values(this.cache[userId].stories)
             .sort((i1, i2) => -BigNumOps.compare(i2.pk, i1.pk)); // ascending!
           if (storyItems.length === 0) return reply(`当前用户 (@${userName}) 没有可用的 Instagram 限时动态。`);
-          if (startIndex + 1 > storyItems.length) return reply('跳过数量到达或超过当前用户可用的限时动态数量。');
+          if (startIndex < 0) startIndex += storyItems.length;
+          if (startIndex < 0 || startIndex + 1 > storyItems.length) return reply('跳过数量到达或超过当前用户可用的限时动态数量。');
           const endIndex = Math.min(storyItems.length, startIndex + count);
           const sendRangeText = `${startIndex + 1}${endIndex - startIndex > 1 ? `-${endIndex}` : ''}`;
           return this.workOnMedia(storyItems.slice(startIndex, endIndex), sender)
@@ -387,14 +389,18 @@ export default class {
           }
         }
         const userIdCache = this.pullOrders;
-        if (Object.values(userIdCache).length !== userIdCache.length) {
-          this.pullOrders = Arr.shuffle(userIdCache);
-          fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-        }
-        const timeout = Math.max(1000, this.workInterval * 1000 / this.lock.feed.length);
-        setInterval(() => { this.pullOrders = Arr.shuffle(this.pullOrders); }, 21600000);
-        setTimeout(this.workForAll, timeout);
-        this.work();
+        return (() => {
+          if (Object.values(userIdCache).length !== userIdCache.length) {
+            this.pullOrders = Arr.shuffle(userIdCache);
+            return json.writeFile(path.resolve(this.cachefile), this.cache);
+          }
+          return Promise.resolve(true);
+        })().then(() => {
+          const timeout = Math.max(1000, this.workInterval * 1000 / this.lock.feed.length);
+          setInterval(() => { this.pullOrders = Arr.shuffle(this.pullOrders); }, 21600000);
+          setTimeout(this.workForAll, timeout);
+          this.work();
+        });
       }
     );
   };
@@ -416,9 +422,10 @@ export default class {
     return this.queryUserObject(username)
       .then(({pk, username, full_name}) => {
         this.cache[pk] = {user: {pk, username, full_name}, stories: {}, pullOrder: 0};
-        fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-        logger.info(`initialized cache item for user ${full_name} (@${username})`);
-        return `${username}:${pk}`;
+        return json.writeFile(path.resolve(this.cachefile), this.cache).then(res => {
+          if (res) logger.info(`initialized cache item for user ${full_name} (@${username})`);
+          return `${username}:${pk}`;
+        });
       });
   };
 
@@ -471,7 +478,6 @@ export default class {
   private workForAll = () => {
     const timeout = Math.max(1000, this.workInterval * 1000 / this.lock.feed.length);
     if (this.isInactiveTime) { setTimeout(this.workForAll, timeout); return; }
-    logger.debug(`current cache: ${JSON.stringify(this.cache)}`);
     chainPromises(Object.entries(this.lock.threads).map(([feed, thread]) => () => {
       const id = thread.id;
       const userName = parseLink(feed).userName;
@@ -484,8 +490,9 @@ export default class {
       return promisify(setTimeout)((Math.random() * 2 + 1) * 5000).then(() =>
         this.client.user.info(id).then(({pk, username, full_name}) => {
           this.cache[id] = {user: {pk, username, full_name}, stories: {}, pullOrder: -1};
-          fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-          logger.info(`initialized cache item for user ${full_name} (@${username})`);
+          return json.writeFile(path.resolve(this.cachefile), this.cache).then(() =>
+            logger.info(`initialized cache item for user ${full_name} (@${username})`)
+          );
         })
       );
     }))
@@ -494,7 +501,7 @@ export default class {
           this.pullOrders = Arr.shuffle(Object.keys(this.cache)).map(Number) :
           this.pullOrders;
         return chainPromises(
-          Arr.chunk(userIdCache, 20).map(userIds => () => {
+          Arr.chunk(userIdCache.filter(v => Date.now() - new Date(this.cache[v].updated).getTime() > 3600000), 30).map(userIds => () => {
             logger.info(`pulling stories from users:${userIds.map(id => ` @${this.cache[id].user.username}`)}`);
             return this.client.feed.reelsMedia({userIds}).request()
               .then(({reels}) => chainPromises(
@@ -520,14 +527,13 @@ export default class {
                   )
                 )
               ))
-              .finally(() => {
-                fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
+              .finally(() => json.writeFile(path.resolve(this.cachefile), this.cache).then(() =>
                 Object.values(this.lock.threads).forEach(thread => {
                   if (userIds.includes(thread.id)) {
                     thread.updatedAt = this.cache[thread.id].updated = Date();
                   }
-                });
-              }) as unknown as Promise<void>;
+                })
+              )) as unknown as Promise<void>;
           }),
           (lp1, lp2) => () =>
             lp1().then(() => promisify(setTimeout)(timeout).then(lp2))
@@ -581,10 +587,11 @@ export default class {
       logger.warn(`nobody subscribes thread ${currentFeed}, removing from feed`);
       delete lock.threads[currentFeed];
       (this.cache[parseLink(currentFeed).userName] || {} as {pullOrder: number}).pullOrder = 0;
-      fs.writeFileSync(path.resolve(this.cachefile), JSON.stringify(this.cache));
-      lock.feed.splice(lock.workon, 1);
-      fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
-      return this.work();
+      return json.writeFile(path.resolve(this.cachefile), this.cache).then(() => {
+        lock.feed.splice(lock.workon, 1);
+        fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
+        return this.work();
+      });
     }
 
     logger.debug(`searching for new items from ${currentFeed} in cache`);